fix issue

This commit is contained in:
54liuyao 2024-08-07 14:02:55 +08:00
parent f70394f778
commit 08c838f0a6
2 changed files with 19 additions and 4 deletions

View File

@ -449,11 +449,14 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
SResultRow* pResultRow =
doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
if (pResultRow == NULL || pTaskInfo->code != 0) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
/*
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow == NULL || pResultRow->pageId == -1) {
if (pResultRow->pageId == -1) {
int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);

View File

@ -666,7 +666,9 @@ static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow*
* every tuple in every block.
* And the boundedQueue keeps refreshing all records with smaller ts key.
*/
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId) {
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (!pOperatorInfo->limited // if no limit info, no filter will be applied
|| pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
// if input/output ts order mismatch, no filter
@ -678,6 +680,7 @@ static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STime
if (pOperatorInfo->pBQ == NULL) {
pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosMemoryFree, pOperatorInfo);
QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
}
bool shouldFilter = false;
@ -694,12 +697,21 @@ static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STime
// cur win not been filtered out and not been pushed into BQ yet, push it into BQ
PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
*((TSKEY*)node.data) = win->skey;
if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
taosMemoryFree(node.data);
return true;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return false;
}
@ -731,7 +743,7 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow win =
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
if (filterWindowWithLimit(pInfo, &win, tableGroupId)) return false;
if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
@ -770,7 +782,7 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
pInfo->binfo.inputTsOrder);
if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId)) {
if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
break;
}
// null data, failed to allocate more memory buffer