From a2a237a4b0ae6c6d13f0609e55535cf8408d7fed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 14:31:48 +0800 Subject: [PATCH] fix(query): avoid process data that belongs to the next session window. --- source/libs/executor/src/timewindowoperator.c | 27 ++++++++++--------- source/libs/function/src/builtinsimpl.c | 6 ++--- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8fb8aaa69d..e62763ebc5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1339,23 +1339,24 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); } else { // start a new session window - SResultRow* pResult = NULL; + if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window + SResultRow* pResult = NULL; - // keep the time window for the closed time window. - STimeWindow window = pRowSup->win; + // keep the time window for the closed time window. + STimeWindow window = pRowSup->win; + int32_t ret = + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } - pRowSup->win.ekey = pRowSup->win.skey; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + // pInfo->numOfRows data belong to the current session window + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } - // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); - // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepTuple(pRowSup, tsList[j], gid); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 6be95b786b..6f3f5f7e5c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2657,8 +2657,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first - // function. + // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function. #if 0 if (blockDataOrder == TSDB_ORDER_ASC) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { @@ -2709,6 +2708,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } #else + +// todo refactor if (!pInputCol->hasNull && !pCtx->hasPrimaryKey) { numOfElems = 1; @@ -2790,7 +2791,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } - // SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; }