fix: memory leak issues

This commit is contained in:
dapan1121 2024-03-19 17:05:44 +08:00
parent dcc650521a
commit 2ce880f48c
5 changed files with 22 additions and 5 deletions

View File

@ -431,7 +431,7 @@ void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin);
void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin); void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin);
int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
void mWinJoinResetWindowCache(SMJoinWinCache* pCache); void mWinJoinResetWindowCache(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache);
SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator);

View File

@ -2613,7 +2613,7 @@ void mAsofJoinGroupReset(SMJoinOperatorInfo* pJoin) {
pWin->eqPostDone = false; pWin->eqPostDone = false;
pWin->lastTs = INT64_MIN; pWin->lastTs = INT64_MIN;
mWinJoinResetWindowCache(pCache); mWinJoinResetWindowCache(pWin, pCache);
mJoinResetGroupTableCtx(pJoin->probe); mJoinResetGroupTableCtx(pJoin->probe);
mJoinResetGroupTableCtx(pJoin->build); mJoinResetGroupTableCtx(pJoin->build);
@ -2623,6 +2623,8 @@ static FORCE_INLINE void mWinJoinPopFrontGroup(SMJoinWindowCtx* pCtx, SMJoinGrpR
pCtx->cache.rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx); pCtx->cache.rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx);
if (pGrp->blk == pCtx->cache.outBlk) { if (pGrp->blk == pCtx->cache.outBlk) {
blockDataCleanup(pGrp->blk); blockDataCleanup(pGrp->blk);
} else if (pGrp->clonedBlk) {
blockDataDestroy(pGrp->blk);
} }
taosArrayPopFrontBatch(pCtx->cache.grps, 1); taosArrayPopFrontBatch(pCtx->cache.grps, 1);
@ -3259,7 +3261,7 @@ void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin) {
pWin->eqPostDone = false; pWin->eqPostDone = false;
pWin->lastTs = INT64_MIN; pWin->lastTs = INT64_MIN;
mWinJoinResetWindowCache(pCache); mWinJoinResetWindowCache(pWin, pCache);
mJoinResetGroupTableCtx(pJoin->probe); mJoinResetGroupTableCtx(pJoin->probe);
mJoinResetGroupTableCtx(pJoin->build); mJoinResetGroupTableCtx(pJoin->build);
@ -3281,6 +3283,8 @@ int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin,
void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin) { void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin) {
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
mWinJoinResetWindowCache(pCtx, &pCtx->cache);
pCtx->finBlk = blockDataDestroy(pCtx->finBlk); pCtx->finBlk = blockDataDestroy(pCtx->finBlk);
pCtx->cache.outBlk = blockDataDestroy(pCtx->cache.outBlk); pCtx->cache.outBlk = blockDataDestroy(pCtx->cache.outBlk);

View File

@ -1425,7 +1425,7 @@ void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) {
pCtx->hashJoin = false; pCtx->hashJoin = false;
} }
void mWinJoinResetWindowCache(SMJoinWinCache* pCache) { void mWinJoinResetWindowCache(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache) {
pCache->outRowIdx = 0; pCache->outRowIdx = 0;
pCache->rowNum = 0; pCache->rowNum = 0;
pCache->grpIdx = 0; pCache->grpIdx = 0;
@ -1433,6 +1433,15 @@ void mWinJoinResetWindowCache(SMJoinWinCache* pCache) {
if (pCache->grpsQueue) { if (pCache->grpsQueue) {
TSWAP(pCache->grps, pCache->grpsQueue); TSWAP(pCache->grps, pCache->grpsQueue);
} }
int32_t grpNum = taosArrayGetSize(pCache->grps);
for (int32_t i = 0; i < grpNum; ++i) {
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
if (pGrp->blk != pCtx->cache.outBlk && pGrp->clonedBlk) {
blockDataDestroy(pGrp->blk);
}
}
taosArrayClear(pCache->grps); taosArrayClear(pCache->grps);
@ -1448,7 +1457,7 @@ void mJoinResetWindowCtx(SMJoinWindowCtx* pCtx) {
pCtx->eqPostDone = false; pCtx->eqPostDone = false;
pCtx->lastTs = INT64_MIN; pCtx->lastTs = INT64_MIN;
mWinJoinResetWindowCache(&pCtx->cache); mWinJoinResetWindowCache(pCtx, &pCtx->cache);
} }
void mJoinResetCtx(SMJoinOperatorInfo* pJoin) { void mJoinResetCtx(SMJoinOperatorInfo* pJoin) {

View File

@ -1439,6 +1439,8 @@ void nodesDestroyNode(SNode* pNode) {
destroyPhysiNode((SPhysiNode*)pPhyNode); destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pWindowOffset); nodesDestroyNode(pPhyNode->pWindowOffset);
nodesDestroyNode(pPhyNode->pJLimit); nodesDestroyNode(pPhyNode->pJLimit);
nodesDestroyNode(pPhyNode->leftPrimExpr);
nodesDestroyNode(pPhyNode->rightPrimExpr);
nodesDestroyList(pPhyNode->pEqLeft); nodesDestroyList(pPhyNode->pEqLeft);
nodesDestroyList(pPhyNode->pEqRight); nodesDestroyList(pPhyNode->pEqRight);
nodesDestroyNode(pPhyNode->pPrimKeyCond); nodesDestroyNode(pPhyNode->pPrimKeyCond);

View File

@ -1085,6 +1085,8 @@ static int32_t pdcJoinAddParentOnColsToTarget(SOptimizeContext* pCxt, SJoinLogic
} }
pTmp = (SJoinLogicNode*)pTmp->node.pParent; pTmp = (SJoinLogicNode*)pTmp->node.pParent;
} while (true); } while (true);
tSimpleHashCleanup(pTables);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pCondCols, &pTargets); code = createColumnByRewriteExprs(pCondCols, &pTargets);