diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 49435a6317..003e9b900a 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -171,6 +171,7 @@ typedef struct { int32_t taskId; int64_t streamId; int64_t streamBackendRid; + int8_t dump; } SStreamState; typedef struct SFunctionStateStore { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 50e77926f5..c7165e4fd6 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -409,8 +409,10 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pDelRes); pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); - taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner); - taosMemoryFreeClear(pInfo->pState->pTdbState); + if (pInfo->pState->dump == 1) { + taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner); + taosMemoryFreeClear(pInfo->pState->pTdbState); + } taosMemoryFreeClear(pInfo->pState); nodesDestroyNode((SNode*)pInfo->pPhyNode); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index a5e8c46f18..6d810f9c11 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1117,10 +1117,11 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); } void streamStateCopyBackend(SStreamState* src, SStreamState* dst) { - if (dst->pFileState == NULL) { + if (dst->pTdbState == NULL) { dst->pTdbState = taosMemoryCalloc(1, sizeof(STdbState)); dst->pTdbState->pOwner = taosMemoryCalloc(1, sizeof(SStreamTask)); } + dst->dump = 1; dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend; return; }