From 27446f8df1aca0cbacfd8868fa851b8b446b8328 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 4 Sep 2024 19:01:14 +0800 Subject: [PATCH] fix(query):free data block --- source/libs/executor/src/aggregateoperator.c | 52 +++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 61f1339c82..d9af279813 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -51,7 +51,7 @@ typedef struct SAggOperatorInfo { } SAggOperatorInfo; static void destroyAggOperatorInfo(void* param); -static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); +static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); @@ -63,7 +63,7 @@ static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, in static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size); -static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); +static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); @@ -184,7 +184,8 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { if (pBlock) { pAggInfo->pNewGroupBlock = NULL; tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable); - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + QUERY_CHECK_CODE(code, lino, _end); code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); QUERY_CHECK_CODE(code, lino, _end); @@ -225,12 +226,19 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { break; } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); - QUERY_CHECK_CODE(code, lino, _end); + if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } code = doAggregateImpl(pOperator, pSup->pCtx); - if (code != 0) { + if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); } @@ -427,20 +435,24 @@ void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) { *ppBlock = NULL; } -void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { +int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + int32_t code = TSDB_CODE_SUCCESS; SAggOperatorInfo* pAggInfo = pOperator->info; if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { - return; + return code; } - doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); + code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); // record the current active group id pAggInfo->groupId = groupId; + return code; } -void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { +int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { // for simple group by query without interval, all the tables belong to one group result. + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; @@ -452,23 +464,27 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true); if (pResultRow == NULL || pTaskInfo->code != 0) { - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + code = pTaskInfo->code; + lino = __LINE__; + goto _end; } /* * not assign result buffer yet, add new result buffer * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); - if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, terrno); - } + code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); + QUERY_CHECK_CODE(code, lino, _end); } - int32_t ret = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); - if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, ret); + code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); + QUERY_CHECK_CODE(code, lino, _end); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } + return code; } // a new buffer page for each table. Needs to opt this design