From 40209db9c763904abd292d24a5b3313574445eff Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 25 Dec 2023 09:19:55 +0800 Subject: [PATCH] fix: memory leak issues --- source/libs/executor/inc/mergejoin.h | 12 +--- source/libs/executor/src/mergejoinoperator.c | 43 ++++++++++--- source/libs/executor/test/joinTests.cpp | 63 +++++++++----------- 3 files changed, 65 insertions(+), 53 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 34a173a74b..4814986980 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -19,9 +19,9 @@ extern "C" { #endif -#define MJOIN_DEFAULT_BLK_ROWS_NUM 4 +#define MJOIN_DEFAULT_BLK_ROWS_NUM 4096 #define MJOIN_HJOIN_CART_THRESHOLD 16 -#define MJOIN_BLK_SIZE_LIMIT 20 +#define MJOIN_BLK_SIZE_LIMIT 10485760 struct SMJoinOperatorInfo; @@ -77,14 +77,6 @@ typedef struct SMJoinTableCtx { SMJoinColInfo* keyCols; char* keyBuf; char* keyData; - - int32_t valNum; - SMJoinColInfo* valCols; - char* valData; - int32_t valBitMapSize; - int32_t valBufSize; - SArray* valVarCols; - bool valColExist; bool newBlk; SSDataBlock* blk; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index d3162c91bc..0c6341e222 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -519,22 +519,47 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { return (pBlock && pBlock->info.rows > 0) ? pBlock : NULL; } +void destroyGrpArray(void* ppArray) { + SArray* pArray = *(SArray**)ppArray; + taosArrayDestroy(pArray); +} + +void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) { + mJoinDestroyCreatedBlks(pTable->createdBlks); + taosArrayDestroy(pTable->createdBlks); + tSimpleHashCleanup(pTable->pGrpHash); + + taosMemoryFree(pTable->primCol); + taosMemoryFree(pTable->finCols); + taosMemoryFree(pTable->keyCols); + taosMemoryFree(pTable->keyBuf); + + taosArrayDestroy(pTable->eqGrps); + taosArrayDestroyEx(pTable->pGrpArrays, destroyGrpArray); +} void destroyMergeJoinOperator(void* param) { - SOperatorInfo* pOperator = (SOperatorInfo*)param; - SMJoinOperatorInfo* pJoin = pOperator->info; + SMJoinOperatorInfo* pJoin = (SMJoinOperatorInfo*)param; pJoin->ctx.mergeCtx.finBlk = blockDataDestroy(pJoin->ctx.mergeCtx.finBlk); pJoin->ctx.mergeCtx.midBlk = blockDataDestroy(pJoin->ctx.mergeCtx.midBlk); - mJoinDestroyCreatedBlks(pJoin->probe->createdBlks); - taosArrayDestroy(pJoin->probe->createdBlks); - tSimpleHashCleanup(pJoin->probe->pGrpHash); + if (pJoin->pFPreFilter != NULL) { + filterFreeInfo(pJoin->pFPreFilter); + pJoin->pFPreFilter = NULL; + } + if (pJoin->pPreFilter != NULL) { + filterFreeInfo(pJoin->pPreFilter); + pJoin->pPreFilter = NULL; + } + if (pJoin->pFinFilter != NULL) { + filterFreeInfo(pJoin->pFinFilter); + pJoin->pFinFilter = NULL; + } - mJoinDestroyCreatedBlks(pJoin->build->createdBlks); - taosArrayDestroy(pJoin->build->createdBlks); - tSimpleHashCleanup(pJoin->build->pGrpHash); + destroyMergeJoinTableCtx(pJoin->probe); + destroyMergeJoinTableCtx(pJoin->build); - taosMemoryFreeClear(param); + taosMemoryFreeClear(pJoin); } diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 417f691d14..710df47752 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -66,7 +66,7 @@ enum { }; #define COL_DISPLAY_WIDTH 18 -#define JT_MAX_LOOP 3000 +#define JT_MAX_LOOP 100000 #define LEFT_BLK_ID 0 #define RIGHT_BLK_ID 1 @@ -169,13 +169,12 @@ typedef struct { int32_t cond; bool filter; bool asc; - SOperatorInfo* pDownstreams; SExecTaskInfo* pTask; } SJoinTestParam; SJoinTestCtx jtCtx = {0}; -SJoinTestCtrl jtCtrl = {1, 1, 1, 0}; +SJoinTestCtrl jtCtrl = {0, 0, 0, 0}; SJoinTestStat jtStat = {0}; SJoinTestResInfo jtRes = {0}; @@ -277,12 +276,12 @@ static int32_t jtMergeEqCond(SNode** ppDst, SNode** ppSrc) { } -SOperatorInfo* createDummyDownstreamOperators(int32_t num) { - SOperatorInfo* p = (SOperatorInfo*)taosMemoryCalloc(num, sizeof(SOperatorInfo)); +void createDummyDownstreamOperators(int32_t num, SOperatorInfo** ppRes) { for (int32_t i = 0; i < num; ++i) { - (p + i)->resultDataBlockId = i; + SOperatorInfo* p = (SOperatorInfo*)taosMemoryCalloc(1, sizeof(SOperatorInfo)); + p->resultDataBlockId = i; + ppRes[i] = p; } - return p; } void createTargetSlotList(SSortMergeJoinPhysiNode* p) { @@ -736,8 +735,7 @@ void createBlockDescNode(SDataBlockDescNode** ppNode) { } SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(EJoinType type, EJoinSubType sub, int32_t cond, bool filter, bool asc) { - char* t = (char*)taosMemoryCalloc(1, 1 + sizeof(SSortMergeJoinPhysiNode)); - SSortMergeJoinPhysiNode* p = (SSortMergeJoinPhysiNode*)(t + 1); + SSortMergeJoinPhysiNode* p = (SSortMergeJoinPhysiNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN); p->joinType = type; p->subType = sub; p->leftPrimSlotId = 0; @@ -1545,20 +1543,25 @@ void checkJoinRes(SSDataBlock* pBlock) { } } -void resetForJoinRerun(SOperatorInfo* pDownstreams, int32_t dsNum, SSortMergeJoinPhysiNode* pNode, SExecTaskInfo* pTask) { +void resetForJoinRerun(int32_t dsNum, SSortMergeJoinPhysiNode* pNode, SExecTaskInfo* pTask) { jtCtx.leftBlkReadIdx = 0; jtCtx.rightBlkReadIdx = 0; jtCtx.curKeyOffset = 0; memset(&jtRes, 0, sizeof(jtRes)); jtRes.succeed = true; - - SOperatorInfo* ppDownstreams[] = {pDownstreams, pDownstreams + 1}; + + SOperatorInfo* pDownstreams[2]; + createDummyDownstreamOperators(2, pDownstreams); + SOperatorInfo* ppDownstreams[] = {pDownstreams[0], pDownstreams[1]}; jtCtx.pJoinOp = createMergeJoinOperatorInfo(ppDownstreams, 2, pNode, pTask); ASSERT_TRUE(NULL != jtCtx.pJoinOp); } void handleJoinDone(bool* contLoop) { + destroyOperator(jtCtx.pJoinOp); + jtCtx.pJoinOp = NULL; + if (jtRes.succeed) { *contLoop = false; return; @@ -1569,10 +1572,7 @@ void handleJoinDone(bool* contLoop) { return; } - jtInRerun = true; - - jtCtx.pJoinOp->fpSet.closeFn(jtCtx.pJoinOp); - jtCtx.pJoinOp = NULL; + jtInRerun = true; } @@ -1642,7 +1642,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { while (contLoop) { rerunBlockedHere(); - resetForJoinRerun(param->pDownstreams, 2, pNode, param->pTask); + resetForJoinRerun(2, pNode, param->pTask); printBasicInfo(caseName); printOutputInfo(); @@ -1667,12 +1667,10 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { #if 1 TEST(leftOuterJoin, noCondTest) { - char* caseName = "leftOuterJoin:noCondTest"; - SOperatorInfo* pDownstreams = createDummyDownstreamOperators(2); - SExecTaskInfo* pTask = createDummyTaskInfo(caseName); SJoinTestParam param; + char* caseName = "leftOuterJoin:noCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); - param.pDownstreams = pDownstreams; param.pTask = pTask; param.joinType = JOIN_TYPE_LEFT; param.subType = JOIN_STYPE_OUTER; @@ -1688,17 +1686,16 @@ TEST(leftOuterJoin, noCondTest) { } printStatInfo(caseName); + taosMemoryFree(pTask); } #endif #if 1 TEST(leftOuterJoin, eqCondTest) { - char* caseName = "leftOuterJoin:eqCondTest"; - SOperatorInfo* pDownstreams = createDummyDownstreamOperators(2); - SExecTaskInfo* pTask = createDummyTaskInfo(caseName); SJoinTestParam param; + char* caseName = "leftOuterJoin:eqCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); - param.pDownstreams = pDownstreams; param.pTask = pTask; param.joinType = JOIN_TYPE_LEFT; param.subType = JOIN_STYPE_OUTER; @@ -1714,17 +1711,16 @@ TEST(leftOuterJoin, eqCondTest) { } printStatInfo(caseName); + taosMemoryFree(pTask); } #endif #if 1 TEST(leftOuterJoin, onCondTest) { - char* caseName = "leftOuterJoin:onCondTest"; - SOperatorInfo* pDownstreams = createDummyDownstreamOperators(2); - SExecTaskInfo* pTask = createDummyTaskInfo(caseName); SJoinTestParam param; + char* caseName = "leftOuterJoin:onCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); - param.pDownstreams = pDownstreams; param.pTask = pTask; param.joinType = JOIN_TYPE_LEFT; param.subType = JOIN_STYPE_OUTER; @@ -1740,17 +1736,16 @@ TEST(leftOuterJoin, onCondTest) { } printStatInfo(caseName); + taosMemoryFree(pTask); } #endif #if 1 TEST(leftOuterJoin, fullCondTest) { - char* caseName = "leftOuterJoin:fullCondTest"; - SOperatorInfo* pDownstreams = createDummyDownstreamOperators(2); - SExecTaskInfo* pTask = createDummyTaskInfo(caseName); SJoinTestParam param; + char* caseName = "leftOuterJoin:fullCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); - param.pDownstreams = pDownstreams; param.pTask = pTask; param.joinType = JOIN_TYPE_LEFT; param.subType = JOIN_STYPE_OUTER; @@ -1766,7 +1761,7 @@ TEST(leftOuterJoin, fullCondTest) { } printStatInfo(caseName); - + taosMemoryFree(pTask); } #endif