diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2eb6fb2d64..fd8415c148 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -373,8 +373,11 @@ void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { if (pGroupResInfo->freeItem) { int32_t size = taosArrayGetSize(pGroupResInfo->pRows); for (int32_t i = pGroupResInfo->index; i < size; i++) { - void* pVal = taosArrayGetP(pGroupResInfo->pRows, i); - taosMemoryFree(pVal); + SRowBuffPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + if (!pPos->needFree && !pPos->pRowBuff) { + taosMemoryFreeClear(pPos->pKey); + taosMemoryFree(pPos); + } } pGroupResInfo->freeItem = false; } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index ac404893f0..b10db2a1af 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -297,6 +297,10 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin while ((pNode = tdListNext(&iter)) != NULL && i < max) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; if (pPos->beUsed == used) { + if (used && !pPos->pRowBuff) { + ASSERT(pPos->needFree == true); + continue; + } tdListAppend(pFlushList, &pPos); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->stateBuffRemoveByPosFn(pFileState, pPos);