fix(query): check the rows before apply the agg in session window.

This commit is contained in:
Haojun Liao 2024-04-11 16:21:41 +08:00
parent 917487b110
commit a951af2492
1 changed files with 15 additions and 13 deletions

View File

@ -1332,23 +1332,25 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pRowSup, tsList[j], gid);
} else { // start a new session window
SResultRow* pResult = NULL;
// start a new session window
if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window
SResultRow* pResult = NULL;
// keep the time window for the closed time window.
STimeWindow window = pRowSup->win;
// keep the time window for the closed time window.
STimeWindow window = pRowSup->win;
int32_t ret =
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
}
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
// here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid);