diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 6508eca3f5..8f70f03406 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -24,6 +24,8 @@ struct SStreamStateReader { int64_t sver; int64_t ever; TBC* pCur; + + SStreamSnapReader* pReaderImpl; }; int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) { @@ -40,86 +42,51 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pReader->sver = sver; pReader->ever = ever; - // impl - code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); - if (code) { - taosMemoryFree(pReader); - goto _err; - } + SStreamSnapReader* pSnapReader = NULL; + streamSnapReaderOpen(pTq, sver, ever, &pSnapReader); - code = tdbTbcMoveToFirst(pReader->pCur); - if (code) { - taosMemoryFree(pReader); - goto _err; - } + pReader->pReaderImpl = pSnapReader; - tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); + tqInfo("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode)); *ppReader = pReader; return code; _err: - tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode stream-state snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); *ppReader = NULL; return code; } -int32_t streamStatSnapReaderClose(SStreamStateReader** ppReader) { +int32_t streamStatSnapReaderClose(SStreamStateReader* pReader) { int32_t code = 0; - - tdbTbcClose((*ppReader)->pCur); - taosMemoryFree(*ppReader); - *ppReader = NULL; - + streamSnapReaderClose(pReader->pReaderImpl); + taosMemoryFree(pReader); return code; } int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { - int32_t code = 0; - const void* pKey = NULL; - const void* pVal = NULL; - int32_t kLen = 0; - int32_t vLen = 0; - SDecoder decoder; - STqHandle handle; + int32_t code = 0; - *ppData = NULL; - for (;;) { - if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { - goto _exit; - } - - tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeSTqHandle(&decoder, &handle); - tDecoderClear(&decoder); - - if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { - tdbTbcMoveToNext(pReader->pCur); - break; - } else { - tdbTbcMoveToNext(pReader->pCur); - } - } - - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); + uint8_t* rowData = NULL; + int64_t len; + code = streamSnapRead(pReader->pReaderImpl, &rowData, &len); + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + len); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - + // refactor later, avoid mem/free freq SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - pHdr->type = SNAP_DATA_TQ_HANDLE; - pHdr->size = vLen; - memcpy(pHdr->data, pVal, vLen); - - tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), - handle.snapshotVer, handle.subKey, vLen); - -_exit: + pHdr->type = SNAP_DATA_STREAM_STATE_BACKEND; + pHdr->size = len; + memcpy(pHdr->data, rowData, len); + tqInfo("vgId:%d, vnode stream-state snapshot read data", TD_VID(pReader->pTq->pVnode)); return code; _err: - tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode stream-state snapshot read data failed since %s", TD_VID(pReader->pTq->pVnode), + tstrerror(code)); return code; } @@ -129,6 +96,8 @@ struct SStreamStateWriter { int64_t sver; int64_t ever; TXN* txn; + + SStreamSnapWriter* pWriterImpl; }; int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter) { @@ -145,14 +114,10 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->sver = sver; pWriter->ever = ever; - if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - code = -1; - taosMemoryFree(pWriter); - goto _err; - } + SStreamSnapWriter* pSnapWriter = NULL; + streamSnapWriterOpen(pTq, sver, ever, &pSnapWriter); - *ppWriter = pWriter; - return code; + pWriter->pWriterImpl = pSnapWriter; _err: tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); @@ -160,53 +125,15 @@ _err: return code; } -int32_t streamStateSnapWriterClose(SStreamStateWriter** ppWriter, int8_t rollback) { - int32_t code = 0; - SStreamStateWriter* pWriter = *ppWriter; - STQ* pTq = pWriter->pTq; - - if (rollback) { - tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); - } else { - code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); - if (code) goto _err; - code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); - if (code) goto _err; - } - +int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) { + int32_t code = 0; + code = streamSnapWriterClose(pWriter->pWriterImpl, rollback); taosMemoryFree(pWriter); - *ppWriter = NULL; - - // restore from metastore - if (tqMetaRestoreHandle(pTq) < 0) { - goto _err; - } - - return code; - -_err: - tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); return code; } int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STQ* pTq = pWriter->pTq; - SDecoder decoder = {0}; - SDecoder* pDecoder = &decoder; - STqHandle handle; - - tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - code = tDecodeSTqHandle(pDecoder, &handle); - if (code) goto _err; - code = tqMetaSaveHandle(pTq, handle.subKey, &handle); - if (code < 0) goto _err; - tDecoderClear(pDecoder); - - return code; - -_err: - tDecoderClear(pDecoder); - tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + int32_t code = 0; + code = streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); return code; } diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 8b2d59a70d..d150ef1b65 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -26,100 +26,103 @@ struct SStreamTaskReader { }; int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) { - int32_t code = 0; - SStreamTaskReader* pReader = NULL; + // int32_t code = 0; + // SStreamTaskReader* pReader = NULL; - // alloc - pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); - if (pReader == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pReader->pTq = pTq; - pReader->sver = sver; - pReader->ever = ever; + // // alloc + // pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); + // if (pReader == NULL) { + // code = TSDB_CODE_OUT_OF_MEMORY; + // goto _err; + // } + // pReader->pTq = pTq; + // pReader->sver = sver; + // pReader->ever = ever; - // impl - code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); - if (code) { - taosMemoryFree(pReader); - goto _err; - } + // // impl + // code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); + // if (code) { + // taosMemoryFree(pReader); + // goto _err; + // } - code = tdbTbcMoveToFirst(pReader->pCur); - if (code) { - taosMemoryFree(pReader); - goto _err; - } + // code = tdbTbcMoveToFirst(pReader->pCur); + // if (code) { + // taosMemoryFree(pReader); + // goto _err; + // } - tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); + // tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); - *ppReader = pReader; - return code; + // *ppReader = pReader; -_err: - tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - *ppReader = NULL; - return code; + // _err: + // tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + // *ppReader = NULL; + // return code; + return 0; } int32_t streamTaskSnapReaderClose(SStreamTaskReader** ppReader) { - int32_t code = 0; + // int32_t code = 0; - tdbTbcClose((*ppReader)->pCur); - taosMemoryFree(*ppReader); - *ppReader = NULL; + // tdbTbcClose((*ppReader)->pCur); + // taosMemoryFree(*ppReader); + // *ppReader = NULL; - return code; + // return code; + return 0; } int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { - int32_t code = 0; - const void* pKey = NULL; - const void* pVal = NULL; - int32_t kLen = 0; - int32_t vLen = 0; - SDecoder decoder; - STqHandle handle; + // int32_t code = 0; + // const void* pKey = NULL; + // const void* pVal = NULL; + // int32_t kLen = 0; + // int32_t vLen = 0; + // SDecoder decoder; + // STqHandle handle; - *ppData = NULL; - for (;;) { - if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { - goto _exit; - } + // *ppData = NULL; + // for (;;) { + // if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { + // goto _exit; + // } - tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeSTqHandle(&decoder, &handle); - tDecoderClear(&decoder); + // tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + // tDecodeSTqHandle(&decoder, &handle); + // tDecoderClear(&decoder); - if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { - tdbTbcMoveToNext(pReader->pCur); - break; - } else { - tdbTbcMoveToNext(pReader->pCur); - } - } + // if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { + // tdbTbcMoveToNext(pReader->pCur); + // break; + // } else { + // tdbTbcMoveToNext(pReader->pCur); + // } + // } - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); - if (*ppData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + // *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); + // if (*ppData == NULL) { + // code = TSDB_CODE_OUT_OF_MEMORY; + // goto _err; + // } - SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - pHdr->type = SNAP_DATA_TQ_HANDLE; - pHdr->size = vLen; - memcpy(pHdr->data, pVal, vLen); + // SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + // pHdr->type = SNAP_DATA_TQ_HANDLE; + // pHdr->size = vLen; + // memcpy(pHdr->data, pVal, vLen); - tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), - handle.snapshotVer, handle.subKey, vLen); + // tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", + // TD_VID(pReader->pTq->pVnode), + // handle.snapshotVer, handle.subKey, vLen); -_exit: - return code; + // _exit: + // return code; -_err: - tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); - return code; + // _err: + // tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); + // return code; + return 0; } // STqSnapWriter ======================================== @@ -131,81 +134,84 @@ struct SStreamTaskWriter { }; int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) { - int32_t code = 0; - SStreamTaskWriter* pWriter; + // int32_t code = 0; + // SStreamTaskWriter* pWriter; - // alloc - pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); - if (pWriter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pWriter->pTq = pTq; - pWriter->sver = sver; - pWriter->ever = ever; + // // alloc + // pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); + // if (pWriter == NULL) { + // code = TSDB_CODE_OUT_OF_MEMORY; + // goto _err; + // } + // pWriter->pTq = pTq; + // pWriter->sver = sver; + // pWriter->ever = ever; - if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - code = -1; - taosMemoryFree(pWriter); - goto _err; - } + // if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + // code = -1; + // taosMemoryFree(pWriter); + // goto _err; + // } - *ppWriter = pWriter; - return code; + // *ppWriter = pWriter; + // return code; -_err: - tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - *ppWriter = NULL; - return code; + // _err: + // tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + // *ppWriter = NULL; + // return code; + return 0; } int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) { - int32_t code = 0; - SStreamTaskWriter* pWriter = *ppWriter; - STQ* pTq = pWriter->pTq; + // int32_t code = 0; + // SStreamTaskWriter* pWriter = *ppWriter; + // STQ* pTq = pWriter->pTq; - if (rollback) { - tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); - } else { - code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); - if (code) goto _err; - code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); - if (code) goto _err; - } + // if (rollback) { + // tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); + // } else { + // code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); + // if (code) goto _err; + // code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); + // if (code) goto _err; + // } - taosMemoryFree(pWriter); - *ppWriter = NULL; + // taosMemoryFree(pWriter); + // *ppWriter = NULL; - // restore from metastore - if (tqMetaRestoreHandle(pTq) < 0) { - goto _err; - } + // // restore from metastore + // if (tqMetaRestoreHandle(pTq) < 0) { + // goto _err; + // } - return code; + // return code; -_err: - tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); - return code; + // _err: + // tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); + // return code; + return 0; } int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STQ* pTq = pWriter->pTq; - SDecoder decoder = {0}; - SDecoder* pDecoder = &decoder; - STqHandle handle; + // int32_t code = 0; + // STQ* pTq = pWriter->pTq; + // SDecoder decoder = {0}; + // SDecoder* pDecoder = &decoder; + // STqHandle handle; - tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - code = tDecodeSTqHandle(pDecoder, &handle); - if (code) goto _err; - code = tqMetaSaveHandle(pTq, handle.subKey, &handle); - if (code < 0) goto _err; - tDecoderClear(pDecoder); + // tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + // code = tDecodeSTqHandle(pDecoder, &handle); + // if (code) goto _err; + // code = tqMetaSaveHandle(pTq, handle.subKey, &handle); + // if (code < 0) goto _err; + // tDecoderClear(pDecoder); - return code; + // return code; -_err: - tDecoderClear(pDecoder); - tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - return code; + // _err: + // tDecoderClear(pDecoder); + // tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + // return code; + return 0; }