From 74764108fd5cd4d613f98b5a02e35e817dbcf9ad Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 27 Jun 2023 12:46:08 +0000 Subject: [PATCH] add checkpoint --- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 60 ++++++++++++-------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 8801b450f9..f7f6193e9d 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -88,20 +88,22 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { goto _exit; } - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } + // SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + // if (pTask == NULL) { + // return -1; + // } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - code = tDecodeStreamTask(&decoder, pTask); - if (code < 0) { - tDecoderClear(&decoder); - taosMemoryFree(pTask); - goto _err; - } - tDecoderClear(&decoder); + // SDecoder decoder; + // tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + // code = tDecodeStreamTask(&decoder, pTask); + // if (code < 0) { + // //tDecoderClear(&decoder); + // //taosMemoryFree(pTask); + // goto _err; + // } + // tDecoderClear(&decoder); + tdbTbcMoveToNext(pReader->pCur); + break; } *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); @@ -197,23 +199,33 @@ _err: int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; STQ* pTq = pWriter->pTq; - SDecoder decoder = {0}; - SDecoder* pDecoder = &decoder; STqHandle handle; - // tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - // code = tDecodeSTqHandle(pDecoder, &handle); - // if (code) goto _err; - // code = tqMetaSaveHandle(pTq, handle.subKey, &handle); - // if (code < 0) goto _err; - // tDecoderClear(pDecoder); - // insert into pStreamMeta tdb table + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + code = tDecodeStreamTask(&decoder, pTask); + if (code < 0) { + tDecoderClear(&decoder); + taosMemoryFree(pTask); + goto _err; + } + tDecoderClear(&decoder); + // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) + + if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), (uint8_t*)pData + sizeof(SSnapDataHdr), + nData - sizeof(SSnapDataHdr), NULL) < 0) { + taosMemoryFree(pTask); + return -1; + } return code; _err: - tDecoderClear(pDecoder); tqError("vgId:%d, stream-task snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); return code; - return 0; }