fix(query):free data block

This commit is contained in:
54liuyao 2024-09-04 19:01:14 +08:00
parent d3e4203dcb
commit 27446f8df1
1 changed files with 34 additions and 18 deletions

View File

@ -51,7 +51,7 @@ typedef struct SAggOperatorInfo {
} SAggOperatorInfo;
static void destroyAggOperatorInfo(void* param);
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
@ -63,7 +63,7 @@ static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, in
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
@ -184,7 +184,8 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
if (pBlock) {
pAggInfo->pNewGroupBlock = NULL;
tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
QUERY_CHECK_CODE(code, lino, _end);
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end);
@ -225,12 +226,19 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
break;
}
// the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end);
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != 0) {
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
@ -427,20 +435,24 @@ void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
*ppBlock = NULL;
}
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
int32_t code = TSDB_CODE_SUCCESS;
SAggOperatorInfo* pAggInfo = pOperator->info;
if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
return;
return code;
}
doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
// record the current active group id
pAggInfo->groupId = groupId;
return code;
}
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
// for simple group by query without interval, all the tables belong to one group result.
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
@ -452,23 +464,27 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
if (pResultRow == NULL || pTaskInfo->code != 0) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
code = pTaskInfo->code;
lino = __LINE__;
goto _end;
}
/*
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t ret = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
// a new buffer page for each table. Needs to opt this design