From 2ce880f48cb1906b1e7d6ccb45610ae8e0e9375a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 19 Mar 2024 17:05:44 +0800 Subject: [PATCH] fix: memory leak issues --- source/libs/executor/inc/mergejoin.h | 2 +- source/libs/executor/src/mergejoin.c | 8 ++++++-- source/libs/executor/src/mergejoinoperator.c | 13 +++++++++++-- source/libs/nodes/src/nodesUtilFuncs.c | 2 ++ source/libs/planner/src/planOptimizer.c | 2 ++ 5 files changed, 22 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 6a8df55dc9..f2c992670f 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -431,7 +431,7 @@ void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin); void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin); int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); -void mWinJoinResetWindowCache(SMJoinWinCache* pCache); +void mWinJoinResetWindowCache(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache); SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator); diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index d35b194f7e..40baa1b4b6 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -2613,7 +2613,7 @@ void mAsofJoinGroupReset(SMJoinOperatorInfo* pJoin) { pWin->eqPostDone = false; pWin->lastTs = INT64_MIN; - mWinJoinResetWindowCache(pCache); + mWinJoinResetWindowCache(pWin, pCache); mJoinResetGroupTableCtx(pJoin->probe); mJoinResetGroupTableCtx(pJoin->build); @@ -2623,6 +2623,8 @@ static FORCE_INLINE void mWinJoinPopFrontGroup(SMJoinWindowCtx* pCtx, SMJoinGrpR pCtx->cache.rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx); if (pGrp->blk == pCtx->cache.outBlk) { blockDataCleanup(pGrp->blk); + } else if (pGrp->clonedBlk) { + blockDataDestroy(pGrp->blk); } taosArrayPopFrontBatch(pCtx->cache.grps, 1); @@ -3259,7 +3261,7 @@ void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin) { pWin->eqPostDone = false; pWin->lastTs = INT64_MIN; - mWinJoinResetWindowCache(pCache); + mWinJoinResetWindowCache(pWin, pCache); mJoinResetGroupTableCtx(pJoin->probe); mJoinResetGroupTableCtx(pJoin->build); @@ -3281,6 +3283,8 @@ int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin) { SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; + mWinJoinResetWindowCache(pCtx, &pCtx->cache); + pCtx->finBlk = blockDataDestroy(pCtx->finBlk); pCtx->cache.outBlk = blockDataDestroy(pCtx->cache.outBlk); diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index b8df782cd9..731f196a3d 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1425,7 +1425,7 @@ void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) { pCtx->hashJoin = false; } -void mWinJoinResetWindowCache(SMJoinWinCache* pCache) { +void mWinJoinResetWindowCache(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache) { pCache->outRowIdx = 0; pCache->rowNum = 0; pCache->grpIdx = 0; @@ -1433,6 +1433,15 @@ void mWinJoinResetWindowCache(SMJoinWinCache* pCache) { if (pCache->grpsQueue) { TSWAP(pCache->grps, pCache->grpsQueue); } + + int32_t grpNum = taosArrayGetSize(pCache->grps); + + for (int32_t i = 0; i < grpNum; ++i) { + SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i); + if (pGrp->blk != pCtx->cache.outBlk && pGrp->clonedBlk) { + blockDataDestroy(pGrp->blk); + } + } taosArrayClear(pCache->grps); @@ -1448,7 +1457,7 @@ void mJoinResetWindowCtx(SMJoinWindowCtx* pCtx) { pCtx->eqPostDone = false; pCtx->lastTs = INT64_MIN; - mWinJoinResetWindowCache(&pCtx->cache); + mWinJoinResetWindowCache(pCtx, &pCtx->cache); } void mJoinResetCtx(SMJoinOperatorInfo* pJoin) { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 9c0c22aac3..efeb0ae30a 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1439,6 +1439,8 @@ void nodesDestroyNode(SNode* pNode) { destroyPhysiNode((SPhysiNode*)pPhyNode); nodesDestroyNode(pPhyNode->pWindowOffset); nodesDestroyNode(pPhyNode->pJLimit); + nodesDestroyNode(pPhyNode->leftPrimExpr); + nodesDestroyNode(pPhyNode->rightPrimExpr); nodesDestroyList(pPhyNode->pEqLeft); nodesDestroyList(pPhyNode->pEqRight); nodesDestroyNode(pPhyNode->pPrimKeyCond); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 498a09c9b5..1145eaf0bf 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1085,6 +1085,8 @@ static int32_t pdcJoinAddParentOnColsToTarget(SOptimizeContext* pCxt, SJoinLogic } pTmp = (SJoinLogicNode*)pTmp->node.pParent; } while (true); + + tSimpleHashCleanup(pTables); if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pCondCols, &pTargets);