From 7ebce2814f365e721b9228d33dc974aeb06f5b5a Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 11:18:10 +0800 Subject: [PATCH 1/2] mem leak --- source/libs/executor/src/filloperator.c | 1 + source/libs/executor/src/timewindowoperator.c | 3 +++ source/libs/stream/src/streamBackendRocksdb.c | 19 +++++++++++-------- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index be4cb8d2dc..bf7da7505a 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -851,6 +851,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; + resetFillWindow(&pFillSup->next); pFillSup->next.key = pFillSup->cur.key; pFillSup->next.pRowVal = pFillSup->cur.pRowVal; pFillInfo->preRowKey = INT64_MIN; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 37f737c2ce..16eaf0649d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2905,6 +2905,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -4096,6 +4097,7 @@ void destroyStreamStateOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupGroupResInfo(&pInfo->groupResInfo); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { @@ -4109,6 +4111,7 @@ void destroyStreamStateOperatorInfo(void* param) { taosArrayDestroy(pInfo->historyWins); tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); taosMemoryFreeClear(param); } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4a0ce81e68..1981cd76b3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1620,19 +1620,22 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* if (len < 0) { return -1; } + + if (pVLen != NULL) *pVLen = len; + + if (pKTmp->opNum != pCur->number) { + taosMemoryFree(val); + return -1; + } + if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) { + taosMemoryFree(val); + return -1; + } if (pVal != NULL) { *pVal = (char*)val; } else { taosMemoryFree(val); } - if (pVLen != NULL) *pVLen = len; - - if (pKTmp->opNum != pCur->number) { - return -1; - } - if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) { - return -1; - } *pKey = pKTmp->key; return 0; } From 1e108c4178bd6cc4051e2715c9d063b90b8da4ca Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 11:34:43 +0800 Subject: [PATCH 2/2] mem leak --- source/libs/stream/src/streamBackendRocksdb.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 1981cd76b3..571aca9935 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1613,6 +1613,9 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); stateSessionKeyDecode((void*)&ktmp, (char*)curKey); + if (pVal != NULL) *pVal = NULL; + if (pVLen != NULL) *pVLen = 0; + SStateSessionKey* pKTmp = &ktmp; const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); char* val = NULL; @@ -1621,8 +1624,6 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* return -1; } - if (pVLen != NULL) *pVLen = len; - if (pKTmp->opNum != pCur->number) { taosMemoryFree(val); return -1; @@ -1631,11 +1632,14 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* taosMemoryFree(val); return -1; } + if (pVal != NULL) { *pVal = (char*)val; } else { taosMemoryFree(val); } + + if (pVLen != NULL) *pVLen = len; *pKey = pKTmp->key; return 0; }