diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 7669611d3a..cda7d6fecc 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -23,6 +23,8 @@ struct SStreamTaskReader { int64_t sver; int64_t ever; TBC* pCur; + SArray* tdbTbList; + int8_t pos; }; int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) { @@ -38,9 +40,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa pReader->pTq = pTq; pReader->sver = sver; pReader->ever = ever; + pReader->tdbTbList = taosArrayInit(4, sizeof(void*)); + taosArrayPush(pReader->tdbTbList, &pTq->pStreamMeta->pTaskDb); + taosArrayPush(pReader->tdbTbList, &pTq->pStreamMeta->pCheckpointDb); + pReader->pos = 0; // impl - code = tdbTbcOpen(pTq->pStreamMeta->pTaskDb, &pReader->pCur, NULL); + + code = tdbTbcOpen(taosArrayGetP(pReader->tdbTbList, pReader->pos), &pReader->pCur, NULL); if (code) { taosMemoryFree(pReader); goto _err; @@ -63,15 +70,13 @@ _err: return 0; } -int32_t streamTaskSnapReaderClose(SStreamTaskReader** ppReader) { +int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { int32_t code = 0; - tdbTbcClose((*ppReader)->pCur); - taosMemoryFree(*ppReader); - *ppReader = NULL; + tdbTbcClose(pReader->pCur); + taosMemoryFree(pReader); return code; - return 0; } int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { @@ -84,27 +89,27 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { STqHandle handle; *ppData = NULL; + int8_t except = 0; +NextTbl: for (;;) { if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { - goto _exit; + except = 1; + break; } - // SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - // if (pTask == NULL) { - // return -1; - // } - - // SDecoder decoder; - // tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - // code = tDecodeStreamTask(&decoder, pTask); - // if (code < 0) { - // //tDecoderClear(&decoder); - // //taosMemoryFree(pTask); - // goto _err; - // } - // tDecoderClear(&decoder); tdbTbcMoveToNext(pReader->pCur); break; } + if (except == 1) { + if (pReader->pos + 1 >= taosArrayGetSize(pReader->tdbTbList)) { + } else { + tdbTbcClose(pReader->pCur); + + pReader->pos += 1; + code = tdbTbcOpen(taosArrayGetP(pReader->tdbTbList, pReader->pos), &pReader->pCur, NULL); + tdbTbcMoveToNext(pReader->pCur); + goto NextTbl; + } + } *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); if (*ppData == NULL) { @@ -218,7 +223,7 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), (uint8_t*)pData + sizeof(SSnapDataHdr), - nData - sizeof(SSnapDataHdr), ) < 0) { + nData - sizeof(SSnapDataHdr), NULL) < 0) { taosMemoryFree(pTask); return -1; }