From ad164871802ec4344a878cb9feed185f180717c1 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 6 Apr 2023 18:26:47 +0800 Subject: [PATCH] feat:add checkpoint --- source/common/src/tglobal.c | 2 +- source/libs/stream/src/tstreamFileState.c | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 761d712a3a..3a488d3399 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -49,7 +49,7 @@ int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; -float tsRatioOfVnodeStreamThreads = 1.0; +float tsRatioOfVnodeStreamThreads = 2.0; int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfQnodeQueryThreads = 4; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index a3b7cd9b51..381e0c55f7 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -279,9 +279,9 @@ void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t le void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) { *pLen = sizeof(TSKEY); - *pVal = taosMemoryCalloc(1, *pLen); - void** buff = pVal; - taosEncodeFixedI64(buff, pFileState->flushMark); + (*pVal) = taosMemoryCalloc(1, *pLen); + void* buff = *pVal; + taosEncodeFixedI64(&buff, pFileState->flushMark); } int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) { @@ -300,6 +300,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, streamFileStateEncode(pFileState, &buff, &len); SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len); + taosMemoryFree(buff); } return code; }