From 1019d070fdb510506265de1fe6c9069442738c33 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 15:00:09 +0800 Subject: [PATCH 1/3] mem leak --- source/common/src/tdatablock.c | 3 ++- source/libs/executor/src/filloperator.c | 5 ++--- source/libs/executor/src/groupoperator.c | 3 ++- source/libs/executor/src/scanoperator.c | 3 ++- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index e21e3a06b2..e14e4335d5 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1407,8 +1407,9 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - void* pData = colDataGetData(pSrc, rowIdx); bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); + void* pData = NULL; + if (!isNull) pData = colDataGetData(pSrc, rowIdx); colDataSetVal(pDst, 0, pData, isNull); } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 53fef3b7e3..0c8ae41e6e 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -825,6 +825,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS (pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) { setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; + resetFillWindow(&pFillSup->prev); pFillSup->prev.key = pFillSup->cur.key; pFillSup->prev.pRowVal = pFillSup->cur.pRowVal; } else if (hasPrevWindow(pFillSup)) { @@ -1217,8 +1218,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { SWinKey nextKey = {.groupId = groupId, .ts = ts}; while (pInfo->srcDelRowIndex < pBlock->info.rows) { - void* nextVal = NULL; - int32_t nextLen = 0; TSKEY delTs = tsStarts[pInfo->srcDelRowIndex]; uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex]; int32_t code = TSDB_CODE_SUCCESS; @@ -1233,7 +1232,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { if (delTs == nextKey.ts) { code = pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur); if (code == TSDB_CODE_SUCCESS) { - code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextLen); + code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, NULL, NULL); } // ts will be deleted later if (delTs != ts) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index bc078621ce..fb2204eae8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -973,7 +973,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId); SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j); bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL); - char* pSrcData = colDataGetData(pSrcCol, rowIndex); + char* pSrcData = NULL; + if (!isNull) pSrcData = colDataGetData(pSrcCol, rowIndex); colDataSetVal(pDestCol, pDest->info.rows, pSrcData, isNull); } pDest->info.rows++; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 37f1ea597d..d0b892e0f1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1354,7 +1354,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j); SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j); bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL); - char* pSrcData = colDataGetData(pSrcCol, i); + char* pSrcData = NULL; + if (!isNull) pSrcData = colDataGetData(pSrcCol, i); colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull); } pResult->info.rows++; From 7d44b7e4cbfe4e96317491d41229f79bbae9f226 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 16:32:34 +0800 Subject: [PATCH 2/3] mem leak --- source/dnode/mnode/impl/src/mndStream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1cb4332737..e3dd4c2107 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1792,6 +1792,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } if (pStream->status != STREAM_STATUS__PAUSE) { + sdbRelease(pMnode->pSdb, pStream); return 0; } From 5f055b6cff740cbee6bc19686c01f783883d3c25 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 25 Aug 2023 14:55:46 +0800 Subject: [PATCH 3/3] mem leak --- source/libs/executor/src/streamtimewindowoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 4ead0df159..0596a14cbd 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1551,6 +1551,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { blockDataDestroy(pInfo->pWinBlock); tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStDeleted); + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); taosArrayDestroy(pInfo->historyWins); blockDataDestroy(pInfo->pCheckpointRes);