From b4be5fe895545bdbbd660caa3d2878822d9e3a8d Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 20 Sep 2023 17:35:11 +0800 Subject: [PATCH] fix mem leak --- .../libs/executor/src/streamtimewindowoperator.c | 1 + source/libs/stream/src/streamSessionState.c | 14 ++++++++++---- source/libs/stream/src/streamState.c | 1 + 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 12e22f2597..1f98eb1210 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1520,6 +1520,7 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { tSimpleHashCleanup(pSup->pResultRows); destroyDiskbasedBuf(pSup->pResultBuf); blockDataDestroy(pSup->pScanBlock); + pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState); taosMemoryFreeClear(pSup->pState); taosMemoryFreeClear(pSup->pDummyCtx); } diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 90a20fc0df..344c3845af 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -203,7 +203,7 @@ void sessionWinStateCleanup(void* pBuff) { size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) { - SArray* pWinStates = (SArray*) pIte; + SArray* pWinStates = (SArray*) (*(void**)pIte); taosArrayDestroy(pWinStates); } tSimpleHashCleanup(pBuff); @@ -360,19 +360,23 @@ int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessio int32_t code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL); bool hasCurrentPrev = true; if (code == TSDB_CODE_FAILED) { + streamStateFreeCur(pCur); pCur = sessionWinStateSeekKeyNext(pFileState, key); code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL); hasCurrentPrev = false; } + if (code == TSDB_CODE_FAILED) { - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } if (sessionRangeKeyCmpr(key, &tmpKey) == 0) { *curKey = tmpKey; - return code; + goto _end; } else if (!hasCurrentPrev) { - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } sessionWinStateMoveToNext(pCur); @@ -383,6 +387,8 @@ int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessio code = TSDB_CODE_FAILED; } +_end: + streamStateFreeCur(pCur); return code; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 3d924ca73c..93442790a4 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -683,6 +683,7 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { } void streamStateFreeCur(SStreamStateCur* pCur) { if (!pCur || pCur->buffIndex >= 0) { + taosMemoryFree(pCur); return; } qDebug("streamStateFreeCur");