From f6d5d0c8ca9af994d9e00d45688e5fc2b138bb16 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 28 Jun 2023 02:39:01 +0000 Subject: [PATCH] add checkpoint --- source/dnode/vnode/src/inc/vnodeInt.h | 5 +- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 82 ++++++++++++-------- 2 files changed, 52 insertions(+), 35 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 22a7fdbef5..4f89889b0b 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -470,8 +470,9 @@ enum { SNAP_DATA_TQ_HANDLE = 7, SNAP_DATA_TQ_OFFSET = 8, SNAP_DATA_STREAM_TASK = 9, - SNAP_DATA_STREAM_STATE = 10, - SNAP_DATA_STREAM_STATE_BACKEND = 11, + SNAP_DATA_STREAM_TASK_CHECKPOINT = 10, + SNAP_DATA_STREAM_STATE = 11, + SNAP_DATA_STREAM_STATE_BACKEND = 12, }; struct SSnapDataHdr { diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index cda7d6fecc..591b21128b 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -18,6 +18,11 @@ #include "tq.h" // STqSnapReader ======================================== + +typedef struct { + int8_t type; + TTB* tbl; +} STablePair; struct SStreamTaskReader { STQ* pTq; int64_t sver; @@ -41,13 +46,17 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa pReader->sver = sver; pReader->ever = ever; pReader->tdbTbList = taosArrayInit(4, sizeof(void*)); - taosArrayPush(pReader->tdbTbList, &pTq->pStreamMeta->pTaskDb); - taosArrayPush(pReader->tdbTbList, &pTq->pStreamMeta->pCheckpointDb); + + STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK}; + taosArrayPush(pReader->tdbTbList, &pair1); + + STablePair pair2 = {.tbl = pTq->pStreamMeta->pCheckpointDb, .type = SNAP_DATA_STREAM_TASK_CHECKPOINT}; + taosArrayPush(pReader->tdbTbList, &pair2); + pReader->pos = 0; - // impl - - code = tdbTbcOpen(taosArrayGetP(pReader->tdbTbList, pReader->pos), &pReader->pCur, NULL); + STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos); + code = tdbTbcOpen(pPair->tbl, &pReader->pCur, NULL); if (code) { taosMemoryFree(pReader); goto _err; @@ -72,7 +81,7 @@ _err: int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { int32_t code = 0; - + taosArrayDestroy(pReader->tdbTbList); tdbTbcClose(pReader->pCur); taosMemoryFree(pReader); @@ -90,6 +99,8 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { *ppData = NULL; int8_t except = 0; + + STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos); NextTbl: for (;;) { if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { @@ -106,7 +117,9 @@ NextTbl: pReader->pos += 1; code = tdbTbcOpen(taosArrayGetP(pReader->tdbTbList, pReader->pos), &pReader->pCur, NULL); - tdbTbcMoveToNext(pReader->pCur); + tdbTbcMoveToFirst(pReader->pCur); + + pPair = taosArrayGet(pReader->tdbTbList, pReader->pos); goto NextTbl; } } @@ -118,18 +131,18 @@ NextTbl: } SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - pHdr->type = SNAP_DATA_STREAM_TASK; + pHdr->type = pPair->type; pHdr->size = vLen; memcpy(pHdr->data, pVal, vLen); - tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), - handle.snapshotVer, handle.subKey, vLen); + tqInfo("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen); return code; _exit: return code; _err: - tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode stream-task snapshot read data failed since %s", TD_VID(pReader->pTq->pVnode), + tstrerror(code)); return code; } @@ -202,32 +215,35 @@ _err: } int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STQ* pTq = pWriter->pTq; - STqHandle handle; + int32_t code = 0; + STQ* pTq = pWriter->pTq; + STqHandle handle; + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + if (pHdr->type == SNAP_DATA_STREAM_TASK) { + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - code = tDecodeStreamTask(&decoder, pTask); - if (code < 0) { + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + code = tDecodeStreamTask(&decoder, pTask); + if (code < 0) { + tDecoderClear(&decoder); + taosMemoryFree(pTask); + goto _err; + } tDecoderClear(&decoder); + // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) + if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), + (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { + taosMemoryFree(pTask); + return -1; + } taosMemoryFree(pTask); - goto _err; + } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { + // do nothing } - tDecoderClear(&decoder); - // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) - - if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), (uint8_t*)pData + sizeof(SSnapDataHdr), - nData - sizeof(SSnapDataHdr), NULL) < 0) { - taosMemoryFree(pTask); - return -1; - } - taosMemoryFree(pTask); return code;