fix: memory leak issues

This commit is contained in:
dapan1121 2023-12-25 09:19:55 +08:00
parent 8bdf3df6a9
commit 40209db9c7
3 changed files with 65 additions and 53 deletions

View File

@ -19,9 +19,9 @@
extern "C" {
#endif
#define MJOIN_DEFAULT_BLK_ROWS_NUM 4
#define MJOIN_DEFAULT_BLK_ROWS_NUM 4096
#define MJOIN_HJOIN_CART_THRESHOLD 16
#define MJOIN_BLK_SIZE_LIMIT 20
#define MJOIN_BLK_SIZE_LIMIT 10485760
struct SMJoinOperatorInfo;
@ -77,14 +77,6 @@ typedef struct SMJoinTableCtx {
SMJoinColInfo* keyCols;
char* keyBuf;
char* keyData;
int32_t valNum;
SMJoinColInfo* valCols;
char* valData;
int32_t valBitMapSize;
int32_t valBufSize;
SArray* valVarCols;
bool valColExist;
bool newBlk;
SSDataBlock* blk;

View File

@ -519,22 +519,47 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
return (pBlock && pBlock->info.rows > 0) ? pBlock : NULL;
}
void destroyGrpArray(void* ppArray) {
SArray* pArray = *(SArray**)ppArray;
taosArrayDestroy(pArray);
}
void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) {
mJoinDestroyCreatedBlks(pTable->createdBlks);
taosArrayDestroy(pTable->createdBlks);
tSimpleHashCleanup(pTable->pGrpHash);
taosMemoryFree(pTable->primCol);
taosMemoryFree(pTable->finCols);
taosMemoryFree(pTable->keyCols);
taosMemoryFree(pTable->keyBuf);
taosArrayDestroy(pTable->eqGrps);
taosArrayDestroyEx(pTable->pGrpArrays, destroyGrpArray);
}
void destroyMergeJoinOperator(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinOperatorInfo* pJoin = (SMJoinOperatorInfo*)param;
pJoin->ctx.mergeCtx.finBlk = blockDataDestroy(pJoin->ctx.mergeCtx.finBlk);
pJoin->ctx.mergeCtx.midBlk = blockDataDestroy(pJoin->ctx.mergeCtx.midBlk);
mJoinDestroyCreatedBlks(pJoin->probe->createdBlks);
taosArrayDestroy(pJoin->probe->createdBlks);
tSimpleHashCleanup(pJoin->probe->pGrpHash);
if (pJoin->pFPreFilter != NULL) {
filterFreeInfo(pJoin->pFPreFilter);
pJoin->pFPreFilter = NULL;
}
if (pJoin->pPreFilter != NULL) {
filterFreeInfo(pJoin->pPreFilter);
pJoin->pPreFilter = NULL;
}
if (pJoin->pFinFilter != NULL) {
filterFreeInfo(pJoin->pFinFilter);
pJoin->pFinFilter = NULL;
}
mJoinDestroyCreatedBlks(pJoin->build->createdBlks);
taosArrayDestroy(pJoin->build->createdBlks);
tSimpleHashCleanup(pJoin->build->pGrpHash);
destroyMergeJoinTableCtx(pJoin->probe);
destroyMergeJoinTableCtx(pJoin->build);
taosMemoryFreeClear(param);
taosMemoryFreeClear(pJoin);
}

View File

@ -66,7 +66,7 @@ enum {
};
#define COL_DISPLAY_WIDTH 18
#define JT_MAX_LOOP 3000
#define JT_MAX_LOOP 100000
#define LEFT_BLK_ID 0
#define RIGHT_BLK_ID 1
@ -169,13 +169,12 @@ typedef struct {
int32_t cond;
bool filter;
bool asc;
SOperatorInfo* pDownstreams;
SExecTaskInfo* pTask;
} SJoinTestParam;
SJoinTestCtx jtCtx = {0};
SJoinTestCtrl jtCtrl = {1, 1, 1, 0};
SJoinTestCtrl jtCtrl = {0, 0, 0, 0};
SJoinTestStat jtStat = {0};
SJoinTestResInfo jtRes = {0};
@ -277,12 +276,12 @@ static int32_t jtMergeEqCond(SNode** ppDst, SNode** ppSrc) {
}
SOperatorInfo* createDummyDownstreamOperators(int32_t num) {
SOperatorInfo* p = (SOperatorInfo*)taosMemoryCalloc(num, sizeof(SOperatorInfo));
void createDummyDownstreamOperators(int32_t num, SOperatorInfo** ppRes) {
for (int32_t i = 0; i < num; ++i) {
(p + i)->resultDataBlockId = i;
SOperatorInfo* p = (SOperatorInfo*)taosMemoryCalloc(1, sizeof(SOperatorInfo));
p->resultDataBlockId = i;
ppRes[i] = p;
}
return p;
}
void createTargetSlotList(SSortMergeJoinPhysiNode* p) {
@ -736,8 +735,7 @@ void createBlockDescNode(SDataBlockDescNode** ppNode) {
}
SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(EJoinType type, EJoinSubType sub, int32_t cond, bool filter, bool asc) {
char* t = (char*)taosMemoryCalloc(1, 1 + sizeof(SSortMergeJoinPhysiNode));
SSortMergeJoinPhysiNode* p = (SSortMergeJoinPhysiNode*)(t + 1);
SSortMergeJoinPhysiNode* p = (SSortMergeJoinPhysiNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
p->joinType = type;
p->subType = sub;
p->leftPrimSlotId = 0;
@ -1545,20 +1543,25 @@ void checkJoinRes(SSDataBlock* pBlock) {
}
}
void resetForJoinRerun(SOperatorInfo* pDownstreams, int32_t dsNum, SSortMergeJoinPhysiNode* pNode, SExecTaskInfo* pTask) {
void resetForJoinRerun(int32_t dsNum, SSortMergeJoinPhysiNode* pNode, SExecTaskInfo* pTask) {
jtCtx.leftBlkReadIdx = 0;
jtCtx.rightBlkReadIdx = 0;
jtCtx.curKeyOffset = 0;
memset(&jtRes, 0, sizeof(jtRes));
jtRes.succeed = true;
SOperatorInfo* ppDownstreams[] = {pDownstreams, pDownstreams + 1};
SOperatorInfo* pDownstreams[2];
createDummyDownstreamOperators(2, pDownstreams);
SOperatorInfo* ppDownstreams[] = {pDownstreams[0], pDownstreams[1]};
jtCtx.pJoinOp = createMergeJoinOperatorInfo(ppDownstreams, 2, pNode, pTask);
ASSERT_TRUE(NULL != jtCtx.pJoinOp);
}
void handleJoinDone(bool* contLoop) {
destroyOperator(jtCtx.pJoinOp);
jtCtx.pJoinOp = NULL;
if (jtRes.succeed) {
*contLoop = false;
return;
@ -1569,10 +1572,7 @@ void handleJoinDone(bool* contLoop) {
return;
}
jtInRerun = true;
jtCtx.pJoinOp->fpSet.closeFn(jtCtx.pJoinOp);
jtCtx.pJoinOp = NULL;
jtInRerun = true;
}
@ -1642,7 +1642,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) {
while (contLoop) {
rerunBlockedHere();
resetForJoinRerun(param->pDownstreams, 2, pNode, param->pTask);
resetForJoinRerun(2, pNode, param->pTask);
printBasicInfo(caseName);
printOutputInfo();
@ -1667,12 +1667,10 @@ void runSingleTest(char* caseName, SJoinTestParam* param) {
#if 1
TEST(leftOuterJoin, noCondTest) {
char* caseName = "leftOuterJoin:noCondTest";
SOperatorInfo* pDownstreams = createDummyDownstreamOperators(2);
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
SJoinTestParam param;
char* caseName = "leftOuterJoin:noCondTest";
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
param.pDownstreams = pDownstreams;
param.pTask = pTask;
param.joinType = JOIN_TYPE_LEFT;
param.subType = JOIN_STYPE_OUTER;
@ -1688,17 +1686,16 @@ TEST(leftOuterJoin, noCondTest) {
}
printStatInfo(caseName);
taosMemoryFree(pTask);
}
#endif
#if 1
TEST(leftOuterJoin, eqCondTest) {
char* caseName = "leftOuterJoin:eqCondTest";
SOperatorInfo* pDownstreams = createDummyDownstreamOperators(2);
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
SJoinTestParam param;
char* caseName = "leftOuterJoin:eqCondTest";
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
param.pDownstreams = pDownstreams;
param.pTask = pTask;
param.joinType = JOIN_TYPE_LEFT;
param.subType = JOIN_STYPE_OUTER;
@ -1714,17 +1711,16 @@ TEST(leftOuterJoin, eqCondTest) {
}
printStatInfo(caseName);
taosMemoryFree(pTask);
}
#endif
#if 1
TEST(leftOuterJoin, onCondTest) {
char* caseName = "leftOuterJoin:onCondTest";
SOperatorInfo* pDownstreams = createDummyDownstreamOperators(2);
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
SJoinTestParam param;
char* caseName = "leftOuterJoin:onCondTest";
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
param.pDownstreams = pDownstreams;
param.pTask = pTask;
param.joinType = JOIN_TYPE_LEFT;
param.subType = JOIN_STYPE_OUTER;
@ -1740,17 +1736,16 @@ TEST(leftOuterJoin, onCondTest) {
}
printStatInfo(caseName);
taosMemoryFree(pTask);
}
#endif
#if 1
TEST(leftOuterJoin, fullCondTest) {
char* caseName = "leftOuterJoin:fullCondTest";
SOperatorInfo* pDownstreams = createDummyDownstreamOperators(2);
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
SJoinTestParam param;
char* caseName = "leftOuterJoin:fullCondTest";
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
param.pDownstreams = pDownstreams;
param.pTask = pTask;
param.joinType = JOIN_TYPE_LEFT;
param.subType = JOIN_STYPE_OUTER;
@ -1766,7 +1761,7 @@ TEST(leftOuterJoin, fullCondTest) {
}
printStatInfo(caseName);
taosMemoryFree(pTask);
}
#endif