From 051c34e2f1f478e68feb21ff33bf9b6957113f5c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 10 Mar 2025 16:28:28 +0800 Subject: [PATCH] fix(stream): fix stream mem leak --- .../executor/src/streamcountwindowoperator.c | 3 +- .../executor/src/streameventwindowoperator.c | 3 +- .../executor/src/streamtimesliceoperator.c | 3 +- .../executor/src/streamtimewindowoperator.c | 16 ++++++---- source/libs/stream/src/tstreamFileState.c | 31 ++++++++++++------- 5 files changed, 35 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 63ff2fa92b..37466aac8d 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -57,11 +57,12 @@ void destroyStreamCountAggOperatorInfo(void* param) { } destroyStreamBasicInfo(&pInfo->basic); - destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupExprSupp(&pInfo->scalarSupp); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); pInfo->pUpdated = NULL; + destroyStreamAggSupporter(&pInfo->streamAggSup); colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index eacb2fcfc8..f25f711783 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -56,10 +56,11 @@ void destroyStreamEventOperatorInfo(void* param) { } destroyStreamBasicInfo(&pInfo->basic); - destroyStreamAggSupporter(&pInfo->streamAggSup); + clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); pInfo->pUpdated = NULL; + destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 681e07f452..f511c5ab4f 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -151,7 +151,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) { pInfo->pOperator = NULL; } colDataDestroy(&pInfo->twAggSup.timeWindowData); - destroyStreamAggSupporter(&pInfo->streamAggSup); + resetPrevAndNextWindow(pInfo->pFillSup); destroyStreamFillSupporter(pInfo->pFillSup); destroyStreamFillInfo(pInfo->pFillInfo); @@ -179,6 +179,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) { taosArrayDestroy(pInfo->historyWins); taosArrayDestroy(pInfo->pCloseTs); + destroyStreamAggSupporter(&pInfo->streamAggSup); taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 33efd0cfb1..c15a9f9224 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -462,7 +462,7 @@ _end: void destroyFlusedPos(void* pRes) { SRowBuffPos* pPos = (SRowBuffPos*)pRes; - if (!pPos->needFree && !pPos->pRowBuff) { + if (pPos->needFree && !pPos->pRowBuff) { taosMemoryFreeClear(pPos->pKey); taosMemoryFree(pPos); } @@ -475,9 +475,11 @@ void destroyFlusedppPos(void* ppRes) { void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { int32_t size = taosArrayGetSize(pGroupResInfo->pRows); - for (int32_t i = pGroupResInfo->index; i < size; i++) { - void* pPos = taosArrayGetP(pGroupResInfo->pRows, i); - destroyFlusedPos(pPos); + if (pGroupResInfo->index >= 0 && pGroupResInfo->index < size) { + for (int32_t i = pGroupResInfo->index; i < size; i++) { + void* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + destroyFlusedPos(pPos); + } } pGroupResInfo->freeItem = false; taosArrayDestroy(pGroupResInfo->pRows); @@ -2204,11 +2206,12 @@ void destroyStreamSessionAggOperatorInfo(void* param) { } destroyStreamBasicInfo(&pInfo->basic); - destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupExprSupp(&pInfo->scalarSupp); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); pInfo->pUpdated = NULL; + destroyStreamAggSupporter(&pInfo->streamAggSup); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -4442,10 +4445,11 @@ void destroyStreamStateOperatorInfo(void* param) { } destroyStreamBasicInfo(&pInfo->basic); - destroyStreamAggSupporter(&pInfo->streamAggSup); + clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); pInfo->pUpdated = NULL; + destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index d6dfde1ee6..0255ac01b5 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -394,14 +394,17 @@ int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlus pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->stateBuffRemoveByPosFn(pFileState, pPos); - SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); - taosMemoryFreeClear(tmp); + if (pPos->beUsed == false) { + SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(tmp); + } if (pPos->pRowBuff) { i++; } } } } + qDebug("clear flushed row buff. %d rows to disk. is all:%d", listNEles(pFlushList), all); _end: if (code != TSDB_CODE_SUCCESS) { @@ -433,7 +436,6 @@ int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; if (pPos->beUsed == used) { if (used && !pPos->pRowBuff) { - QUERY_CHECK_CONDITION((pPos->needFree == true), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); continue; } code = tdListAppend(pFlushList, &pPos); @@ -441,8 +443,10 @@ int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->stateBuffRemoveByPosFn(pFileState, pPos); - SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); - taosMemoryFreeClear(tmp); + if (pPos->beUsed == false) { + SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(tmp); + } if (pPos->pRowBuff) { i++; } @@ -511,9 +515,12 @@ int32_t clearRowBuff(SStreamFileState* pFileState) { if (pFileState->deleteMark != INT64_MAX) { clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false); } - if (isListEmpty(pFileState->freeBuffs)) { - return flushRowBuff(pFileState); - } + do { + int32_t code = flushRowBuff(pFileState); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } while (isListEmpty(pFileState->freeBuffs) && pFileState->curRowCount == pFileState->maxRowCount); return TSDB_CODE_SUCCESS; } @@ -756,10 +763,10 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** QUERY_CHECK_CODE(code, lino, _end); (*pVal) = pPos->pRowBuff; - if (!pPos->needFree) { - code = tdListPrepend(pFileState->usedBuffs, &pPos); - QUERY_CHECK_CODE(code, lino, _end); - } + // if (!pPos->needFree) { + // code = tdListPrepend(pFileState->usedBuffs, &pPos); + // QUERY_CHECK_CODE(code, lino, _end); + // } _end: if (code != TSDB_CODE_SUCCESS) {