From 8acdbe597d4305037361bcd35992a53bec577554 Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Mon, 30 Sep 2024 11:16:42 +0800 Subject: [PATCH] fix:[TD-32372] Clean up in a right way. --- source/libs/executor/inc/executorInt.h | 3 ++ source/libs/executor/src/aggregateoperator.c | 40 ++++++++++++++++++- source/libs/executor/src/timewindowoperator.c | 21 +++++----- 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e391d274e3..572ff88be9 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -833,6 +833,9 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExpr SGroupResInfo* pGroupResInfo); void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap); +void cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo); + int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey, void* pState, SFunctionStateStore* pStore); void cleanupAggSup(SAggSupporter* pAggSup); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 9e5ad132f7..91b435fbec 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -159,8 +159,8 @@ void destroyAggOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); if (pInfo->pOperator) { - cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, - &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo); pInfo->pOperator = NULL; } cleanupAggSup(&pInfo->aggSup); @@ -627,6 +627,42 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp } } +void cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo) { + int32_t numOfExprs = pSup->numOfExprs; + int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; + SqlFunctionCtx* pCtx = pSup->pCtx; + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); + bool needCleanup = false; + + for (int32_t j = 0; j < numOfExprs; ++j) { + needCleanup |= pCtx[j].needCleanup; + } + if (!needCleanup) { + return; + } + + for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { + SResultRow* pRow = NULL; + SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); + if (page == NULL) { + qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); + continue; + } + pRow = (SResultRow*)((char*)page + pPos->pos.offset); + + + for (int32_t j = 0; j < numOfExprs; ++j) { + pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); + if (pCtx[j].fpSet.cleanup) { + pCtx[j].fpSet.cleanup(&pCtx[j]); + } + } + releaseBufPage(pBuf, page); + } +} + void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) { int32_t numOfExprs = pSup->numOfExprs; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8164281871..3817ef5b69 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1229,10 +1229,9 @@ static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->stateKey.pData); - - if (pInfo->pOperator != NULL) { - cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, - &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + if (pInfo->pOperator) { + cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo); pInfo->pOperator = NULL; } @@ -1257,10 +1256,9 @@ void destroyIntervalOperatorInfo(void* param) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); - - if (pInfo->pOperator != NULL) { - cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, - &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + if (pInfo->pOperator) { + cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo); pInfo->pOperator = NULL; } @@ -1764,10 +1762,9 @@ void destroySWindowOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); - - if (pInfo->pOperator != NULL) { - cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, - &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + if (pInfo->pOperator) { + cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo); pInfo->pOperator = NULL; }