From 7b50663240af51545d782f2e90f02b21848c79ee Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 16 May 2024 16:24:58 +0800 Subject: [PATCH] optimize stream operator checkpoint --- source/libs/executor/inc/executorInt.h | 30 +++++----- source/libs/executor/inc/streamexecutorInt.h | 32 ++++++++++ source/libs/executor/src/scanoperator.c | 18 ++++-- .../executor/src/streamcountwindowoperator.c | 19 +++--- .../executor/src/streameventwindowoperator.c | 19 +++--- source/libs/executor/src/streamexecutorInt.c | 30 ++++++++++ .../executor/src/streamtimewindowoperator.c | 58 ++++++++++++------- source/libs/stream/src/tstreamFileState.c | 7 ++- 8 files changed, 158 insertions(+), 55 deletions(-) create mode 100644 source/libs/executor/inc/streamexecutorInt.h create mode 100644 source/libs/executor/src/streamexecutorInt.c diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index fac216e6e6..592231f043 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -446,17 +446,23 @@ typedef struct STimeWindowAggSupp { SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STimeWindowAggSupp; +typedef struct SSteamOpBasicInfo { + int32_t primaryPkIndex; + bool updateOperatorInfo; +} SSteamOpBasicInfo; + typedef struct SStreamScanInfo { - SExprInfo* pPseudoExpr; - int32_t numOfPseudoExpr; - SExprSupp tbnameCalSup; - SExprSupp* pPartTbnameSup; - SExprSupp tagCalSup; - int32_t primaryTsIndex; // primary time stamp slot id - int32_t primaryKeyIndex; - SReadHandle readHandle; - SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. - SColMatchInfo matchInfo; + SSteamOpBasicInfo basic; + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + SExprSupp tbnameCalSup; + SExprSupp* pPartTbnameSup; + SExprSupp tagCalSup; + int32_t primaryTsIndex; // primary time stamp slot id + int32_t primaryKeyIndex; + SReadHandle readHandle; + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. + SColMatchInfo matchInfo; SArray* pBlockLists; // multiple SSDatablock. SSDataBlock* pRes; // result SSDataBlock @@ -568,10 +574,6 @@ typedef struct SOpCheckPointInfo { SHashObj* children; // key:child id } SOpCheckPointInfo; -typedef struct SSteamOpBasicInfo { - int32_t primaryPkIndex; -} SSteamOpBasicInfo; - typedef struct SStreamIntervalOperatorInfo { SOptrBasicInfo binfo; // basic info SSteamOpBasicInfo basic; diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h new file mode 100644 index 0000000000..8b61b93601 --- /dev/null +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef STREAM_EXECUTORINT_H +#define STREAM_EXECUTORINT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "executorInt.h" + +void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type); +bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); +void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo); + +#ifdef __cplusplus +} +#endif + +#endif // STREAM_EXECUTORINT_H diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 38d3fa8e96..110aabf9b1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -20,6 +20,7 @@ #include "os.h" #include "querynodes.h" #include "systable.h" +#include "streamexecutorInt.h" #include "tname.h" #include "tdatablock.h" @@ -2426,10 +2427,13 @@ void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) { if (!pInfo->pState) { return; } - void* pBuf = NULL; - int32_t len = streamScanOperatorEncode(pInfo, &pBuf); - pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); - taosMemoryFree(pBuf); + if (needSaveStreamOperatorInfo(&pInfo->basic)) { + void* pBuf = NULL; + int32_t len = streamScanOperatorEncode(pInfo, &pBuf); + pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); + taosMemoryFree(pBuf); + saveStreamOperatorStateComplete(&pInfo->basic); + } } // other properties are recovered from the execution plan @@ -2582,6 +2586,7 @@ FETCH_NEXT_BLOCK: case STREAM_NORMAL: case STREAM_GET_ALL: printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); return pBlock; case STREAM_RETRIEVE: { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; @@ -2622,6 +2627,7 @@ FETCH_NEXT_BLOCK: if (pInfo->pDeleteDataRes->info.rows > 0) { printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type); return pInfo->pDeleteDataRes; } else { goto FETCH_NEXT_BLOCK; @@ -2639,6 +2645,7 @@ FETCH_NEXT_BLOCK: if (pInfo->pDeleteDataRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type); return pInfo->pDeleteDataRes; } else { goto FETCH_NEXT_BLOCK; @@ -2652,6 +2659,7 @@ FETCH_NEXT_BLOCK: break; } printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); return pBlock; } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { qDebug("stream scan mode:%d, %s", pInfo->scanMode, id); @@ -2659,6 +2667,7 @@ FETCH_NEXT_BLOCK: case STREAM_SCAN_FROM_RES: { pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes); + setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); pInfo->pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); @@ -2762,6 +2771,7 @@ FETCH_NEXT_BLOCK: } doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); + setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 491a1da6aa..050e67e15d 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -17,6 +17,7 @@ #include "functionMgt.h" #include "operator.h" #include "querytask.h" +#include "streamexecutorInt.h" #include "tchecksum.h" #include "tcommon.h" #include "tdatablock.h" @@ -415,13 +416,16 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) { SStreamCountAggOperatorInfo* pInfo = pOperator->info; - int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true); - void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; - len = doStreamCountEncodeOpState(&pBuf, len, pOperator, true); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME, - strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), buf, len); - taosMemoryFree(buf); + if (needSaveStreamOperatorInfo(&pInfo->basic)) { + int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamCountEncodeOpState(&pBuf, len, pOperator, true); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME, + strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); + saveStreamOperatorStateComplete(&pInfo->basic); + } } void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) { @@ -550,6 +554,7 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) { break; } printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 1116851323..64d6244fe7 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -18,6 +18,7 @@ #include "functionMgt.h" #include "operator.h" #include "querytask.h" +#include "streamexecutorInt.h" #include "tchecksum.h" #include "tcommon.h" #include "tcompare.h" @@ -458,13 +459,16 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) { SStreamEventAggOperatorInfo* pInfo = pOperator->info; - int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator); - void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; - len = doStreamEventEncodeOpState(&pBuf, len, pOperator); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME, - strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len); - taosMemoryFree(buf); + if (needSaveStreamOperatorInfo(&pInfo->basic)) { + int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamEventEncodeOpState(&pBuf, len, pOperator); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME, + strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); + saveStreamOperatorStateComplete(&pInfo->basic); + } } static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) { @@ -531,6 +535,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { break; } printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c new file mode 100644 index 0000000000..875ae00350 --- /dev/null +++ b/source/libs/executor/src/streamexecutorInt.c @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executorInt.h" + +void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) { + if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) { + pBasicInfo->updateOperatorInfo = true; + } +} + +bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo) { + return pBasicInfo->updateOperatorInfo; +} + +void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) { + pBasicInfo->updateOperatorInfo = false; +} diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 08ce0e25f1..2da9ed0353 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -18,6 +18,7 @@ #include "functionMgt.h" #include "operator.h" #include "querytask.h" +#include "streamexecutorInt.h" #include "tchecksum.h" #include "tcommon.h" #include "tcompare.h" @@ -1211,13 +1212,16 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator); - void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; - len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator); - pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, - strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); - taosMemoryFree(buf); + if (needSaveStreamOperatorInfo(&pInfo->basic)) { + int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator); + pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, + strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); + saveStreamOperatorStateComplete(&pInfo->basic); + } } static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) { @@ -1347,6 +1351,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } pInfo->numOfDatapack++; printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; @@ -2690,13 +2695,16 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true); - void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; - len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, - strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); - taosMemoryFree(buf); + if (needSaveStreamOperatorInfo(&pInfo->basic)) { + int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, + strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); + saveStreamOperatorStateComplete(&pInfo->basic); + } } void resetUnCloseSessionWinInfo(SSHashObj* winMap) { @@ -2766,6 +2774,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { break; } printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -3176,6 +3185,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { break; } printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -3673,13 +3683,16 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; - int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true); - void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; - len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, - strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len); - taosMemoryFree(buf); + if (needSaveStreamOperatorInfo(&pInfo->basic)) { + int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, + strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); + saveStreamOperatorStateComplete(&pInfo->basic); + } } static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) { @@ -3746,6 +3759,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { break; } printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -4069,6 +4083,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { pInfo->numOfDatapack++; printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -4465,6 +4480,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } pInfo->numOfDatapack++; printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 82c36e6609..83de642e51 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -557,7 +557,6 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, const int32_t BATCH_LIMIT = 256; int64_t st = taosGetTimestampMs(); - int32_t numOfElems = listNEles(pSnapshot); SListNode* pNode = NULL; int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName); @@ -589,8 +588,11 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } taosMemoryFree(buf); - if (streamStateGetBatchSize(batch) > 0) { + int32_t numOfElems = streamStateGetBatchSize(batch); + if (numOfElems > 0) { streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); + } else { + goto _end; } streamStateClearBatch(batch); @@ -609,6 +611,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); } +_end: streamStateDestroyBatch(batch); return code; }