From cdaa5f9801a18a243c897c42140d93d23ed3dbc1 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 23 Apr 2024 15:40:50 +0800 Subject: [PATCH] fix(stream):mem leak --- source/libs/executor/inc/executorInt.h | 1 + .../executor/src/streamcountwindowoperator.c | 3 ++- .../executor/src/streameventwindowoperator.c | 4 ++- .../executor/src/streamtimewindowoperator.c | 25 +++++++++++++------ 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 723474adae..edfad8c872 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -944,6 +944,7 @@ int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); void resetUnCloseSessionWinInfo(SSHashObj* winMap); void setStreamOperatorCompleted(struct SOperatorInfo* pOperator); void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup); +void destroyFlusedPos(void* pRes); int32_t encodeSSessionKey(void** buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index f2d3bbb29a..9cef8f584e 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -50,12 +50,13 @@ void destroyStreamCountAggOperatorInfo(void* param) { destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupExprSupp(&pInfo->scalarSupp); clearGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + pInfo->pUpdated = NULL; colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStDeleted); - pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); cleanupGroupResInfo(&pInfo->groupResInfo); taosArrayDestroy(pInfo->historyWins); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 99b377e1d3..22c7f20658 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -46,6 +46,9 @@ void destroyStreamEventOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); clearGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + pInfo->pUpdated = NULL; + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -60,7 +63,6 @@ void destroyStreamEventOperatorInfo(void* param) { tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pAllUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); - pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); cleanupGroupResInfo(&pInfo->groupResInfo); taosArrayDestroy(pInfo->historyWins); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 0e3e74f16f..0cab4b7951 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -388,15 +388,20 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin } } +void destroyFlusedPos(void* pRes) { + SRowBuffPos* pPos = (SRowBuffPos*) pRes; + if (!pPos->needFree && !pPos->pRowBuff) { + taosMemoryFreeClear(pPos->pKey); + taosMemoryFree(pPos); + } +} + void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { if (pGroupResInfo->freeItem) { int32_t size = taosArrayGetSize(pGroupResInfo->pRows); for (int32_t i = pGroupResInfo->index; i < size; i++) { - SRowBuffPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); - if (!pPos->needFree && !pPos->pRowBuff) { - taosMemoryFreeClear(pPos->pKey); - taosMemoryFree(pPos); - } + void* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + destroyFlusedPos(pPos); } pGroupResInfo->freeItem = false; } @@ -409,6 +414,8 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); cleanupAggSup(&pInfo->aggSup); clearGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + pInfo->pUpdated = NULL; // it should be empty. void* pIte = NULL; @@ -437,7 +444,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { cleanupExprSupp(&pInfo->scalarSupp); tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; - pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); tSimpleHashCleanup(pInfo->pDeletedMap); blockDataDestroy(pInfo->pCheckpointRes); @@ -1664,6 +1670,8 @@ void destroyStreamSessionAggOperatorInfo(void* param) { destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupExprSupp(&pInfo->scalarSupp); clearGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + pInfo->pUpdated = NULL; if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -1679,7 +1687,6 @@ void destroyStreamSessionAggOperatorInfo(void* param) { blockDataDestroy(pInfo->pWinBlock); tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStDeleted); - pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); cleanupGroupResInfo(&pInfo->groupResInfo); taosArrayDestroy(pInfo->historyWins); @@ -3250,6 +3257,9 @@ void destroyStreamStateOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); clearGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + pInfo->pUpdated = NULL; + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -3263,7 +3273,6 @@ void destroyStreamStateOperatorInfo(void* param) { blockDataDestroy(pInfo->pDelRes); tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); - pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); cleanupGroupResInfo(&pInfo->groupResInfo); taosArrayDestroy(pInfo->historyWins);