From 122b707ac81e10a2fe7fbf09200feaadabe0e24c Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Fri, 20 Sep 2024 11:25:39 +0800 Subject: [PATCH] enh:[TD-32158] Free memory allocated in function setup. --- source/libs/executor/inc/executorInt.h | 10 +++ source/libs/executor/src/aggregateoperator.c | 65 +++++++++++++++++++ .../libs/executor/src/eventwindowoperator.c | 17 +++++ source/libs/executor/src/executorInt.c | 4 -- source/libs/executor/src/groupoperator.c | 7 +- source/libs/executor/src/operator.c | 4 +- .../executor/src/streamcountwindowoperator.c | 4 ++ .../executor/src/streameventwindowoperator.c | 4 ++ .../executor/src/streamtimewindowoperator.c | 14 ++++ source/libs/executor/src/timewindowoperator.c | 18 ++++- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/inc/tpercentile.h | 2 +- source/libs/function/src/builtins.c | 1 + source/libs/function/src/builtinsimpl.c | 22 +++++-- source/libs/function/src/tpercentile.c | 28 ++++---- 15 files changed, 170 insertions(+), 31 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 4a2d6bfcd3..e391d274e3 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -553,6 +553,7 @@ typedef struct SIntervalAggOperatorInfo { EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] STimeWindowAggSupp twAggSup; SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. + struct SOperatorInfo* pOperator; // for limit optimization bool limited; int64_t limit; @@ -621,6 +622,7 @@ typedef struct SStreamIntervalOperatorInfo { int32_t midDelIndex; SSHashObj* pDeletedMap; bool destHasPrimaryKey; + struct SOperatorInfo* pOperator; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { @@ -676,6 +678,7 @@ typedef struct SStreamSessionAggOperatorInfo { bool recvGetAll; bool destHasPrimaryKey; SSHashObj* pPkDeleted; + struct SOperatorInfo* pOperator; } SStreamSessionAggOperatorInfo; typedef struct SStreamStateAggOperatorInfo { @@ -703,6 +706,7 @@ typedef struct SStreamStateAggOperatorInfo { bool recvGetAll; SSHashObj* pPkDeleted; bool destHasPrimaryKey; + struct SOperatorInfo* pOperator; } SStreamStateAggOperatorInfo; typedef struct SStreamEventAggOperatorInfo { @@ -732,6 +736,7 @@ typedef struct SStreamEventAggOperatorInfo { SFilterInfo* pEndCondInfo; SSHashObj* pPkDeleted; bool destHasPrimaryKey; + struct SOperatorInfo* pOperator; } SStreamEventAggOperatorInfo; typedef struct SStreamCountAggOperatorInfo { @@ -756,6 +761,7 @@ typedef struct SStreamCountAggOperatorInfo { SSDataBlock* pCheckpointRes; SSHashObj* pPkDeleted; bool destHasPrimaryKey; + struct SOperatorInfo* pOperator; } SStreamCountAggOperatorInfo; typedef struct SStreamPartitionOperatorInfo { @@ -823,6 +829,10 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore); void cleanupExprSupp(SExprSupp* pSup); +void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, + SGroupResInfo* pGroupResInfo); +void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap); int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey, void* pState, SFunctionStateStore* pStore); void cleanupAggSup(SAggSupporter* pAggSup); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index f1c9f1d32b..c0606a6f29 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -48,6 +48,7 @@ typedef struct SAggOperatorInfo { bool hasValidBlock; SSDataBlock* pNewGroupBlock; bool hasCountFunc; + SOperatorInfo* pOperator; } SAggOperatorInfo; static void destroyAggOperatorInfo(void* param); @@ -119,6 +120,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; pInfo->hasCountFunc = pAggNode->hasCountLikeFunc; + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -153,6 +155,9 @@ void destroyAggOperatorInfo(void* param) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarExprSup); cleanupGroupResInfo(&pInfo->groupResInfo); @@ -581,6 +586,66 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n return code; } +void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { + int32_t code = TSDB_CODE_SUCCESS; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + int32_t numOfExprs = pSup->numOfExprs; + int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; + SqlFunctionCtx* pCtx = pSup->pCtx; + + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); + + for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { + SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i); + SRowBuffPos* pPos = pWinInfo->pStatePos; + SResultRow* pRow = NULL; + + code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow); + if (TSDB_CODE_SUCCESS != code) { + qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo)); + continue; + } + + for (int32_t j = 0; j < numOfExprs; ++j) { + pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); + if (pCtx[j].fpSet.cleanup) { + pCtx[j].fpSet.cleanup(&pCtx[j]); + } + } + } +} + +void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) { + int32_t numOfExprs = pSup->numOfExprs; + int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; + SqlFunctionCtx* pCtx = pSup->pCtx; + + // begin from last iter + void* pData = pGroupResInfo->dataPos; + int32_t iter = pGroupResInfo->iter; + while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) { + SResultRowPosition* pos = pData; + + SFilePage* page = getBufPage(pBuf, pos->pageId); + if (page == NULL) { + qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); + continue; + } + + SResultRow* pRow = (SResultRow*)((char*)page + pos->offset); + + for (int32_t j = 0; j < numOfExprs; ++j) { + pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); + if (pCtx[j].fpSet.cleanup) { + pCtx[j].fpSet.cleanup(&pCtx[j]); + } + } + + releaseBufPage(pBuf, page); + } +} + void cleanupAggSup(SAggSupporter* pAggSup) { taosMemoryFreeClear(pAggSup->keyBuf); tSimpleHashCleanup(pAggSup->pResultRowHashTable); diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index e164e07252..79e7494518 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -37,6 +37,7 @@ typedef struct SEventWindowOperatorInfo { bool inWindow; SResultRow* pRow; SSDataBlock* pPreDataBlock; + SOperatorInfo* pOperator; } SEventWindowOperatorInfo; static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** pRes); @@ -128,6 +129,7 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy pInfo->tsSlotId = tsSlotId; pInfo->pPreDataBlock = NULL; + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -152,6 +154,19 @@ _error: return code; } +void cleanupResultInfoInEventWindow(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo) { + if (pInfo == NULL || pInfo->pRow == NULL) { + return; + } + SExprSupp* pSup = &pOperator->exprSupp; + for (int32_t j = 0; j < pSup->numOfExprs; ++j) { + pSup->pCtx[j].resultInfo = getResultEntryInfo(pInfo->pRow, j, pSup->rowEntryInfoOffset); + if (pSup->pCtx[j].fpSet.cleanup) { + pSup->pCtx[j].fpSet.cleanup(&pSup->pCtx[j]); + } + } +} + void destroyEWindowOperatorInfo(void* param) { SEventWindowOperatorInfo* pInfo = (SEventWindowOperatorInfo*)param; if (pInfo == NULL) { @@ -175,6 +190,8 @@ void destroyEWindowOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); + cleanupResultInfoInEventWindow(pInfo->pOperator, pInfo); + pInfo->pOperator = NULL; cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSup); taosMemoryFreeClear(param); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 07224961f9..5fc483087a 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1020,10 +1020,6 @@ static void destroySqlFunctionCtx(SqlFunctionCtx* pCtx, SExprInfo* pExpr, int32_ } for (int32_t i = 0; i < numOfOutput; ++i) { - if (pCtx[i].fpSet.cleanup != NULL) { - pCtx[i].fpSet.cleanup(&pCtx[i]); - } - if (pExpr != NULL) { SExprInfo* pExprInfo = &pExpr[i]; for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c862a44461..3ce20dbbd9 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -39,6 +39,7 @@ typedef struct SGroupbyOperatorInfo { int32_t groupKeyLen; // total group by column width SGroupResInfo groupResInfo; SExprSupp scalarSup; + SOperatorInfo *pOperator; } SGroupbyOperatorInfo; // The sort in partition may be needed later. @@ -85,9 +86,11 @@ static void destroyGroupOperatorInfo(void* param) { taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey); cleanupExprSupp(&pInfo->scalarSup); - + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); cleanupGroupResInfo(&pInfo->groupResInfo); cleanupAggSup(&pInfo->aggSup); + pInfo->pOperator = NULL; taosMemoryFreeClear(param); } @@ -569,6 +572,8 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; + pInfo->pOperator = pOperator; + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregateNext, NULL, destroyGroupOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index fe2f3f8dfe..bbd6ce39d1 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -667,13 +667,11 @@ void destroyOperator(SOperatorInfo* pOperator) { pOperator->numOfDownstream = 0; } - cleanupExprSupp(&pOperator->exprSupp); - - // close operator after cleanup exprSupp, since we need to call cleanup of sqlFunctionCtx first to avoid mem leak. if (pOperator->fpSet.closeFn != NULL && pOperator->info != NULL) { pOperator->fpSet.closeFn(pOperator->info); } + cleanupExprSupp(&pOperator->exprSupp); taosMemoryFreeClear(pOperator); } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index adf764a8c5..577af29bf7 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -48,6 +48,9 @@ typedef struct SBuffInfo { void destroyStreamCountAggOperatorInfo(void* param) { SStreamCountAggOperatorInfo* pInfo = (SStreamCountAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); + cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp, + &pInfo->groupResInfo); + pInfo->pOperator = NULL; destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupExprSupp(&pInfo->scalarSupp); clearGroupResInfo(&pInfo->groupResInfo); @@ -906,6 +909,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); taosMemoryFree(buff); } + pInfo->pOperator = pOperator; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAggNext, NULL, destroyStreamCountAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index a325616bd3..a46bbdace3 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -48,6 +48,9 @@ void destroyStreamEventOperatorInfo(void* param) { } SStreamEventAggOperatorInfo* pInfo = (SStreamEventAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); + cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp, + &pInfo->groupResInfo); + pInfo->pOperator = NULL; destroyStreamAggSupporter(&pInfo->streamAggSup); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); @@ -951,6 +954,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimayKey; + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED, pInfo, pTaskInfo); // for stream diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index fb8d8d654e..22e462abab 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -473,6 +473,9 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { } SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; cleanupAggSup(&pInfo->aggSup); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); @@ -2024,6 +2027,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey; + pInfo->pOperator = pOperator; pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -2088,6 +2092,9 @@ void destroyStreamSessionAggOperatorInfo(void* param) { } SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); + cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp, + &pInfo->groupResInfo); + pInfo->pOperator = NULL; destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupExprSupp(&pInfo->scalarSupp); clearGroupResInfo(&pInfo->groupResInfo); @@ -3855,6 +3862,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); + pInfo->pOperator = pOperator; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, @@ -4103,6 +4111,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; pOperator->operatorType = pPhyNode->type; + pInfo->pOperator = pOperator; if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->fpSet = @@ -4173,6 +4182,9 @@ void destroyStreamStateOperatorInfo(void* param) { } SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); + cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp, + &pInfo->groupResInfo); + pInfo->pOperator = NULL; destroyStreamAggSupporter(&pInfo->streamAggSup); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); @@ -5028,6 +5040,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey; + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -5361,6 +5374,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, &pInfo->pState->pFileState); QUERY_CHECK_CODE(code, lino, _error); + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fa914d3ee8..6ac24ad313 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -37,6 +37,7 @@ typedef struct SSessionAggOperatorInfo { int64_t gap; // session window gap int32_t tsSlotId; // primary timestamp slot id STimeWindowAggSupp twAggSup; + SOperatorInfo* pOperator; } SSessionAggOperatorInfo; typedef struct SStateWindowOperatorInfo { @@ -50,6 +51,7 @@ typedef struct SStateWindowOperatorInfo { SStateKeys stateKey; int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; + SOperatorInfo* pOperator; } SStateWindowOperatorInfo; typedef enum SResultTsInterpType { @@ -1224,6 +1226,9 @@ static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->stateKey.pData); + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; cleanupExprSupp(&pInfo->scalarSup); colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupAggSup(&pInfo->aggSup); @@ -1243,6 +1248,9 @@ void destroyIntervalOperatorInfo(void* param) { } SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSupp); @@ -1430,6 +1438,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode } } + pInfo->pOperator = pOperator; initResultRowInfo(&pInfo->binfo.resultRowInfo); setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -1706,7 +1715,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy QUERY_CHECK_CODE(code, lino, _error); pInfo->tsSlotId = tsSlotId; - + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo, @@ -1738,7 +1747,9 @@ void destroySWindowOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); - + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSupp); @@ -1805,6 +1816,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo, @@ -2121,6 +2133,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge initResultRowInfo(&iaInfo->binfo.resultRowInfo); code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); + iaInfo->pOperator = pOperator; setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, false, OP_NOT_OPENED, miaInfo, pTaskInfo); @@ -2460,6 +2473,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva } } + pIntervalInfo->pOperator = pOperator; initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo); setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 41e2cadace..0b2fb70eba 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -122,6 +122,7 @@ bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t percentileFunction(SqlFunctionCtx* pCtx); int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +void percentileFunctionCleanupExt(SqlFunctionCtx* pCtx); bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index 1b80c2b1da..09df42d3a3 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -69,7 +69,7 @@ typedef struct tMemBucket { int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup, tMemBucket **pBucket); -void tMemBucketDestroy(tMemBucket *pBucket); +void tMemBucketDestroy(tMemBucket **pBucket); int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 976d15e7d8..a43ae835ff 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -3115,6 +3115,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = percentileFunction, .sprocessFunc = percentileScalarFunction, .finalizeFunc = percentileFinalize, + .cleanupFunc = percentileFunctionCleanupExt, #ifdef BUILD_NO_CALL .invertFunc = NULL, #endif diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 7761ec8aa5..85be33aba8 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2009,6 +2009,17 @@ int32_t percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResu return TSDB_CODE_SUCCESS; } +void percentileFunctionCleanupExt(SqlFunctionCtx* pCtx) { + if (pCtx == NULL || GET_RES_INFO(pCtx) == NULL || GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)) == NULL) { + return; + } + SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + if (pInfo->pMemBucket != NULL) { + tMemBucketDestroy(&(pInfo->pMemBucket)); + pInfo->pMemBucket = NULL; + } +} + int32_t percentileFunction(SqlFunctionCtx* pCtx) { int32_t code = TSDB_CODE_SUCCESS; int32_t numOfElems = 0; @@ -2095,7 +2106,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { numOfElems += 1; code = tMemBucketPut(pInfo->pMemBucket, data, 1); if (code != TSDB_CODE_SUCCESS) { - tMemBucketDestroy(pInfo->pMemBucket); + tMemBucketDestroy(&(pInfo->pMemBucket)); return code; } } @@ -2113,8 +2124,8 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t code = 0; double v = 0; - tMemBucket* pMemBucket = ppInfo->pMemBucket; - if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null + tMemBucket** pMemBucket = &ppInfo->pMemBucket; + if ((*pMemBucket) != NULL && (*pMemBucket)->total > 0) { // check for null if (pCtx->numOfParams > 2) { char buf[3200] = {0}; // max length of double num is 317, e.g. use %.6lf to print -1.0e+308, consider the comma and bracket, 3200 is enough. @@ -2126,7 +2137,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { GET_TYPED_DATA(v, double, pVal->nType, &pVal->i); - code = getPercentile(pMemBucket, v, &ppInfo->result); + code = getPercentile((*pMemBucket), v, &ppInfo->result); if (code != TSDB_CODE_SUCCESS) { goto _fin_error; } @@ -2158,7 +2169,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { GET_TYPED_DATA(v, double, pVal->nType, &pVal->i); - code = getPercentile(pMemBucket, v, &ppInfo->result); + code = getPercentile((*pMemBucket), v, &ppInfo->result); if (code != TSDB_CODE_SUCCESS) { goto _fin_error; } @@ -6067,7 +6078,6 @@ int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { pInfo->pHash = NULL; return terrno; } - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 92a7c0d669..29c48460c0 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -291,12 +291,12 @@ int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, dou (*pBucket)->maxCapacity = 200000; (*pBucket)->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if ((*pBucket)->groupPagesMap == NULL) { - tMemBucketDestroy(*pBucket); + tMemBucketDestroy(pBucket); return terrno; } if (setBoundingBox(&(*pBucket)->range, (*pBucket)->type, minval, maxval) != 0) { // qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval); - tMemBucketDestroy(*pBucket); + tMemBucketDestroy(pBucket); return TSDB_CODE_FUNC_INVALID_VALUE_RANGE; } @@ -306,13 +306,13 @@ int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, dou (*pBucket)->hashFunc = getHashFunc((*pBucket)->type); if ((*pBucket)->hashFunc == NULL) { // qError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type); - tMemBucketDestroy(*pBucket); + tMemBucketDestroy(pBucket); return TSDB_CODE_FUNC_FUNTION_PARA_TYPE; } (*pBucket)->pSlots = (tMemBucketSlot *)taosMemoryCalloc((*pBucket)->numOfSlots, sizeof(tMemBucketSlot)); if ((*pBucket)->pSlots == NULL) { - tMemBucketDestroy(*pBucket); + tMemBucketDestroy(pBucket); return terrno; } @@ -320,13 +320,13 @@ int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, dou if (!osTempSpaceAvailable()) { // qError("MemBucket create disk based Buf failed since %s", terrstr(terrno)); - tMemBucketDestroy(*pBucket); + tMemBucketDestroy(pBucket); return TSDB_CODE_NO_DISKSPACE; } int32_t ret = createDiskbasedBuf(&(*pBucket)->pBuffer, (*pBucket)->bufPageSize, (*pBucket)->bufPageSize * DEFAULT_NUM_OF_SLOT * 4, "1", tsTempDir); if (ret != 0) { - tMemBucketDestroy(*pBucket); + tMemBucketDestroy(pBucket); return ret; } @@ -334,22 +334,22 @@ int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, dou return TSDB_CODE_SUCCESS; } -void tMemBucketDestroy(tMemBucket *pBucket) { - if (pBucket == NULL) { +void tMemBucketDestroy(tMemBucket **pBucket) { + if (*pBucket == NULL) { return; } - void *p = taosHashIterate(pBucket->groupPagesMap, NULL); + void *p = taosHashIterate((*pBucket)->groupPagesMap, NULL); while (p) { SArray **p1 = p; - p = taosHashIterate(pBucket->groupPagesMap, p); + p = taosHashIterate((*pBucket)->groupPagesMap, p); taosArrayDestroy(*p1); } - destroyDiskbasedBuf(pBucket->pBuffer); - taosMemoryFreeClear(pBucket->pSlots); - taosHashCleanup(pBucket->groupPagesMap); - taosMemoryFreeClear(pBucket); + destroyDiskbasedBuf((*pBucket)->pBuffer); + taosMemoryFreeClear((*pBucket)->pSlots); + taosHashCleanup((*pBucket)->groupPagesMap); + taosMemoryFreeClear(*pBucket); } int32_t tMemBucketUpdateBoundingBox(MinMaxEntry *r, const char *data, int32_t dataType) {