From 08c838f0a696879733eb711e3de111c34da7bfbc Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Aug 2024 14:02:55 +0800 Subject: [PATCH] fix issue --- source/libs/executor/src/aggregateoperator.c | 5 ++++- source/libs/executor/src/timewindowoperator.c | 18 +++++++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index f0aadd12de..af10bf8e49 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -449,11 +449,14 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true); + if (pResultRow == NULL || pTaskInfo->code != 0) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } /* * not assign result buffer yet, add new result buffer * all group belong to one result set, and each group result has different group id so set the id to be one */ - if (pResultRow == NULL || pResultRow->pageId == -1) { + if (pResultRow->pageId == -1) { int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); if (ret != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, terrno); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a367afeed4..99fa941071 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -666,7 +666,9 @@ static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* * every tuple in every block. * And the boundedQueue keeps refreshing all records with smaller ts key. */ -static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId) { +static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId, SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (!pOperatorInfo->limited // if no limit info, no filter will be applied || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder // if input/output ts order mismatch, no filter @@ -678,6 +680,7 @@ static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STime if (pOperatorInfo->pBQ == NULL) { pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosMemoryFree, pOperatorInfo); + QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno); } bool shouldFilter = false; @@ -694,12 +697,21 @@ static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STime // cur win not been filtered out and not been pushed into BQ yet, push it into BQ PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))}; + QUERY_CHECK_NULL(node.data, code, lino, _end, terrno); + *((TSKEY*)node.data) = win->skey; if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) { taosMemoryFree(node.data); return true; } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } return false; } @@ -731,7 +743,7 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder); - if (filterWindowWithLimit(pInfo, &win, tableGroupId)) return false; + if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false; int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); @@ -770,7 +782,7 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t prevEndPos = forwardRows - 1 + startPos; startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->binfo.inputTsOrder); - if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId)) { + if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) { break; } // null data, failed to allocate more memory buffer