From ee196a88e68490d150db3e960dc42324d6030fe5 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 10 Jul 2023 13:45:18 +0800 Subject: [PATCH] fix ci bug --- source/libs/executor/src/scanoperator.c | 1 + source/libs/executor/src/timewindowoperator.c | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 73aea04f92..ae05f92ab5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1771,6 +1771,7 @@ void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) { void* pBuf = NULL; int32_t len = streamScanOperatorEncode(pInfo, &pBuf); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); + taosMemoryFree(pBuf); } // other properties are recovered from the execution plan diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 55f0920d2b..a0bf9d052a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2673,6 +2673,7 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { len = doStreamIntervalEncodeOpState(&pBuf, pOperator); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); } static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { @@ -4675,6 +4676,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + doStreamSessionSaveCheckpoint(pOperator); + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); + setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId); + copyDataBlock(pInfo->pCheckpointRes, pBlock); + continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); }