From 933801269a3d993696b1be082a90e1aa4c5f1096 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Jun 2023 09:55:11 +0800 Subject: [PATCH] fix(stream): fix memory leak and failed to close vnode. --- source/dnode/mnode/impl/src/mndDef.c | 32 +++++++++++-------- source/dnode/vnode/src/tq/tq.c | 3 +- source/libs/executor/src/timewindowoperator.c | 1 + 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index dfd54ab254..81f434a184 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -154,18 +154,10 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) { return 0; } -void tFreeStreamObj(SStreamObj *pStream) { - taosMemoryFree(pStream->sql); - taosMemoryFree(pStream->ast); - taosMemoryFree(pStream->physicalPlan); - - if (pStream->outputSchema.nCols) { - taosMemoryFree(pStream->outputSchema.pSchema); - } - - int32_t sz = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < sz; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); +static void* freeStreamTasks(SArray* pTaskLevel) { + int32_t numOfLevel = taosArrayGetSize(pTaskLevel); + for (int32_t i = 0; i < numOfLevel; i++) { + SArray *pLevel = taosArrayGetP(pTaskLevel, i); int32_t taskSz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < taskSz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); @@ -175,8 +167,20 @@ void tFreeStreamObj(SStreamObj *pStream) { taosArrayDestroy(pLevel); } - taosArrayDestroy(pStream->tasks); - taosArrayDestroy(pStream->pHTasksList); + return taosArrayDestroy(pTaskLevel); +} + +void tFreeStreamObj(SStreamObj *pStream) { + taosMemoryFree(pStream->sql); + taosMemoryFree(pStream->ast); + taosMemoryFree(pStream->physicalPlan); + + if (pStream->outputSchema.nCols) { + taosMemoryFree(pStream->outputSchema.pSchema); + } + + pStream->tasks = freeStreamTasks(pStream->tasks); + pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList); // tagSchema.pSchema if (pStream->tagSchema.nCols > 0) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 417a582d78..b6a3c95897 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1077,7 +1077,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); // wait for the stream task get ready for scan history data - while (pStreamTask->status.checkDownstream == 0 || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + while (((pStreamTask->status.checkDownstream == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || + pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr, pStreamTask->info.taskLevel); taosMsleep(100); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 23932dcbc7..aa364375ae 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3963,6 +3963,7 @@ void destroyStreamStateOperatorInfo(void* param) { } colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); + taosArrayDestroy(pInfo->historyWins); tSimpleHashCleanup(pInfo->pSeDeleted); taosMemoryFreeClear(param); }