From 9d69b284db869b6f196a359d578ffdca39cfe428 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 27 Jun 2023 12:03:29 +0000 Subject: [PATCH] add checkpoint --- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 262 +++++++++---------- 1 file changed, 131 insertions(+), 131 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index d150ef1b65..97dabe8f26 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -26,102 +26,101 @@ struct SStreamTaskReader { }; int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) { - // int32_t code = 0; - // SStreamTaskReader* pReader = NULL; + int32_t code = 0; + SStreamTaskReader* pReader = NULL; - // // alloc - // pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); - // if (pReader == NULL) { - // code = TSDB_CODE_OUT_OF_MEMORY; - // goto _err; - // } - // pReader->pTq = pTq; - // pReader->sver = sver; - // pReader->ever = ever; + // alloc + pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); + if (pReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pReader->pTq = pTq; + pReader->sver = sver; + pReader->ever = ever; - // // impl - // code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); - // if (code) { - // taosMemoryFree(pReader); - // goto _err; - // } + // impl + code = tdbTbcOpen(pTq->pStreamMeta->pTaskDb, &pReader->pCur, NULL); + if (code) { + taosMemoryFree(pReader); + goto _err; + } - // code = tdbTbcMoveToFirst(pReader->pCur); - // if (code) { - // taosMemoryFree(pReader); - // goto _err; - // } + code = tdbTbcMoveToFirst(pReader->pCur); + if (code) { + taosMemoryFree(pReader); + goto _err; + } - // tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); + tqInfo("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode)); - // *ppReader = pReader; + *ppReader = pReader; - // _err: - // tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - // *ppReader = NULL; - // return code; +_err: + tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppReader = NULL; + return code; return 0; } int32_t streamTaskSnapReaderClose(SStreamTaskReader** ppReader) { - // int32_t code = 0; + int32_t code = 0; - // tdbTbcClose((*ppReader)->pCur); - // taosMemoryFree(*ppReader); - // *ppReader = NULL; + tdbTbcClose((*ppReader)->pCur); + taosMemoryFree(*ppReader); + *ppReader = NULL; - // return code; + return code; return 0; } int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { - // int32_t code = 0; - // const void* pKey = NULL; - // const void* pVal = NULL; - // int32_t kLen = 0; - // int32_t vLen = 0; - // SDecoder decoder; - // STqHandle handle; + int32_t code = 0; + const void* pKey = NULL; + const void* pVal = NULL; + int32_t kLen = 0; + int32_t vLen = 0; + SDecoder decoder; + STqHandle handle; - // *ppData = NULL; - // for (;;) { - // if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { - // goto _exit; - // } + *ppData = NULL; + for (;;) { + if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { + goto _exit; + } - // tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - // tDecodeSTqHandle(&decoder, &handle); - // tDecoderClear(&decoder); + // tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + // tDecodeSTqHandle(&decoder, &handle); + // tDecoderClear(&decoder); - // if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { - // tdbTbcMoveToNext(pReader->pCur); - // break; - // } else { - // tdbTbcMoveToNext(pReader->pCur); - // } - // } + if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { + tdbTbcMoveToNext(pReader->pCur); + break; + } else { + tdbTbcMoveToNext(pReader->pCur); + } + } - // *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); - // if (*ppData == NULL) { - // code = TSDB_CODE_OUT_OF_MEMORY; - // goto _err; - // } + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } - // SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - // pHdr->type = SNAP_DATA_TQ_HANDLE; - // pHdr->size = vLen; - // memcpy(pHdr->data, pVal, vLen); + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + pHdr->type = SNAP_DATA_STREAM_TASK; + pHdr->size = vLen; + memcpy(pHdr->data, pVal, vLen); - // tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", - // TD_VID(pReader->pTq->pVnode), - // handle.snapshotVer, handle.subKey, vLen); + tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), + handle.snapshotVer, handle.subKey, vLen); - // _exit: - // return code; +_exit: + return code; - // _err: - // tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); - // return code; +_err: + tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); + return code; return 0; } @@ -134,84 +133,85 @@ struct SStreamTaskWriter { }; int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) { - // int32_t code = 0; - // SStreamTaskWriter* pWriter; + int32_t code = 0; + SStreamTaskWriter* pWriter; - // // alloc - // pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); - // if (pWriter == NULL) { - // code = TSDB_CODE_OUT_OF_MEMORY; - // goto _err; - // } - // pWriter->pTq = pTq; - // pWriter->sver = sver; - // pWriter->ever = ever; + // alloc + pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); + if (pWriter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->pTq = pTq; + pWriter->sver = sver; + pWriter->ever = ever; - // if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - // code = -1; - // taosMemoryFree(pWriter); - // goto _err; - // } + if (tdbBegin(pTq->pStreamMeta->db, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + code = -1; + taosMemoryFree(pWriter); + goto _err; + } - // *ppWriter = pWriter; - // return code; + *ppWriter = pWriter; + return code; - // _err: - // tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - // *ppWriter = NULL; - // return code; +_err: + tqError("vgId:%d, stream-task snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppWriter = NULL; + return code; return 0; } int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) { - // int32_t code = 0; - // SStreamTaskWriter* pWriter = *ppWriter; - // STQ* pTq = pWriter->pTq; + int32_t code = 0; + SStreamTaskWriter* pWriter = *ppWriter; + STQ* pTq = pWriter->pTq; - // if (rollback) { - // tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); - // } else { - // code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); - // if (code) goto _err; - // code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); - // if (code) goto _err; - // } + if (rollback) { + tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn); + } else { + code = tdbCommit(pWriter->pTq->pStreamMeta->db, pWriter->txn); + if (code) goto _err; + code = tdbPostCommit(pWriter->pTq->pStreamMeta->db, pWriter->txn); + if (code) goto _err; + } - // taosMemoryFree(pWriter); - // *ppWriter = NULL; + taosMemoryFree(pWriter); + *ppWriter = NULL; - // // restore from metastore - // if (tqMetaRestoreHandle(pTq) < 0) { - // goto _err; - // } + // restore from metastore + // if (tqMetaRestoreHandle(pTq) < 0) { + // goto _err; + // } - // return code; + return code; - // _err: - // tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); - // return code; +_err: + tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); + return code; return 0; } int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { - // int32_t code = 0; - // STQ* pTq = pWriter->pTq; - // SDecoder decoder = {0}; - // SDecoder* pDecoder = &decoder; - // STqHandle handle; + int32_t code = 0; + STQ* pTq = pWriter->pTq; + SDecoder decoder = {0}; + SDecoder* pDecoder = &decoder; + STqHandle handle; - // tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - // code = tDecodeSTqHandle(pDecoder, &handle); - // if (code) goto _err; - // code = tqMetaSaveHandle(pTq, handle.subKey, &handle); - // if (code < 0) goto _err; - // tDecoderClear(pDecoder); + // tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + // code = tDecodeSTqHandle(pDecoder, &handle); + // if (code) goto _err; + // code = tqMetaSaveHandle(pTq, handle.subKey, &handle); + // if (code < 0) goto _err; + // tDecoderClear(pDecoder); + // insert into pStreamMeta tdb table - // return code; + return code; - // _err: - // tDecoderClear(pDecoder); - // tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - // return code; +_err: + tDecoderClear(pDecoder); + tqError("vgId:%d, stream-task snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + return code; return 0; }