fix(query): fix memory leak.
This commit is contained in:
parent
bbcba1855d
commit
8da7aa19b5
|
@ -917,9 +917,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
|
||||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
|
SExecTaskInfo* pTaskInfo);
|
||||||
SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
|
||||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
|
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
|
||||||
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
|
@ -4171,16 +4171,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
|
pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
|
||||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||||
|
pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
|
||||||
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
|
|
||||||
.calTrigger = pSessionNode->window.triggerType};
|
|
||||||
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
|
||||||
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
|
||||||
|
|
||||||
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
|
|
||||||
pPhyNode->pConditions, pTaskInfo);
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
|
||||||
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
|
||||||
|
|
|
@ -2439,12 +2439,14 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
|
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
|
|
||||||
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
|
|
||||||
|
cleanupAggSup(&pInfo->aggSup);
|
||||||
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
|
||||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
|
|
||||||
STimeWindowAggSupp* pTwAggSupp, SNode* pCondition,
|
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
|
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
@ -2455,6 +2457,10 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
|
|
||||||
|
int32_t numOfCols = 0;
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -2462,16 +2468,19 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
||||||
|
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||||
|
|
||||||
pInfo->twAggSup = *pTwAggSupp;
|
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
|
||||||
|
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
|
||||||
|
pInfo->gap = pSessionNode->gap;
|
||||||
|
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||||
|
|
||||||
pInfo->tsSlotId = tsSlotId;
|
pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||||
pInfo->gap = gap;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->binfo.pRes = pResBlock;
|
|
||||||
pInfo->winSup.prevTs = INT64_MIN;
|
pInfo->winSup.prevTs = INT64_MIN;
|
||||||
pInfo->reptScan = false;
|
pInfo->reptScan = false;
|
||||||
pInfo->pCondition = pCondition;
|
pInfo->pCondition = pSessionNode->window.node.pConditions;
|
||||||
|
|
||||||
pOperator->name = "SessionWindowAggOperator";
|
pOperator->name = "SessionWindowAggOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
|
|
Loading…
Reference in New Issue