diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9b4b7f1ee8..dceb696d54 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1402,7 +1402,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3 SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); - while (win.skey <= endTsCols[i]) { + do { uint64_t winGpId = pGpDatas[i]; bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput); SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; @@ -1413,7 +1413,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3 taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey)); } getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); - } + } while (win.ekey <= endTsCols[i]); } } @@ -3067,8 +3067,12 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p int32_t startPos = 0; TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); - STimeWindow nextWin = - getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); + STimeWindow nextWin = {0}; + if (IS_FINAL_OP(pInfo)) { + nextWin = getFinalTimeWindow(ts, &pInfo->interval); + } else { + nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); + } while (1) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { @@ -3122,8 +3126,12 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, - TSDB_ORDER_ASC); + if (IS_FINAL_OP(pInfo)) { + forwardRows = 1; + } else { + forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, + NULL, TSDB_ORDER_ASC); + } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap); } diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index bd8d3b0579..b7477fe36c 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -369,9 +369,10 @@ endi #$loop_all = 0 #=looptest: -sql drop database IF EXISTS test2; sql drop stream IF EXISTS streams21; sql drop stream IF EXISTS streams22; +sql drop stream IF EXISTS streams23; +sql drop database IF EXISTS test2; sql create database test2 vgroups 6; sql use test2; @@ -381,6 +382,7 @@ sql create table t2 using st tags(2,2,2); sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s); sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); +sql create stream streams23 trigger at_once into streamt3 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s); sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791223001,2,2,2,1.1); @@ -508,6 +510,50 @@ if $data32 != 8 then goto loop3 endi + +$loop_count = 0 + +loop4: +sleep 100 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt3; + +# row 0 +if $rows != 5 then + print =====rows=$rows + goto loop4 +endi + +if $data01 != 4 then + print =====data01=$data01 + goto loop4 +endi + +if $data11 != 6 then + print =====data11=$data11 + goto loop4 +endi + +if $data21 != 4 then + print =====data21=$data21 + goto loop4 +endi + +if $data31 != 4 then + print =====data31=$data31 + goto loop4 +endi + +if $data41 != 2 then + print =====data41=$data41 + goto loop4 +endi + $loop_all = $loop_all + 1 print ============loop_all=$loop_all