Merge pull request #17178 from taosdata/feature/TD-18820

fix(stream): sliding error
This commit is contained in:
Shengliang Guan 2022-09-30 16:23:47 +08:00 committed by GitHub
commit 94e07d238c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 7 deletions

View File

@ -1402,7 +1402,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
while (win.skey <= endTsCols[i]) {
do {
uint64_t winGpId = pGpDatas[i];
bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput);
SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
@ -1413,7 +1413,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
}
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
}
} while (win.ekey <= endTsCols[i]);
}
}
@ -3067,8 +3067,12 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin =
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
STimeWindow nextWin = {0};
if (IS_FINAL_OP(pInfo)) {
nextWin = getFinalTimeWindow(ts, &pInfo->interval);
} else {
nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
}
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
@ -3122,8 +3126,12 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
if (IS_FINAL_OP(pInfo)) {
forwardRows = 1;
} else {
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
NULL, TSDB_ORDER_ASC);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap);
}

View File

@ -369,9 +369,10 @@ endi
#$loop_all = 0
#=looptest:
sql drop database IF EXISTS test2;
sql drop stream IF EXISTS streams21;
sql drop stream IF EXISTS streams22;
sql drop stream IF EXISTS streams23;
sql drop database IF EXISTS test2;
sql create database test2 vgroups 6;
sql use test2;
@ -381,6 +382,7 @@ sql create table t2 using st tags(2,2,2);
sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s);
sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sql create stream streams23 trigger at_once into streamt3 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s);
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);
@ -508,6 +510,50 @@ if $data32 != 8 then
goto loop3
endi
$loop_count = 0
loop4:
sleep 100
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt3;
# row 0
if $rows != 5 then
print =====rows=$rows
goto loop4
endi
if $data01 != 4 then
print =====data01=$data01
goto loop4
endi
if $data11 != 6 then
print =====data11=$data11
goto loop4
endi
if $data21 != 4 then
print =====data21=$data21
goto loop4
endi
if $data31 != 4 then
print =====data31=$data31
goto loop4
endi
if $data41 != 2 then
print =====data41=$data41
goto loop4
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all