diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9537a76bd6..d7fce3a0b8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1111,7 +1111,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { return pInfo->pDelRes; } break; default: - ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE, "invalid SSDataBlock type"); + ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_RETRIEVE, "invalid SSDataBlock type"); return pBlock; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b8d293bdf6..ccdf0b6fa1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1897,6 +1897,7 @@ FETCH_NEXT_BLOCK: pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; copyDataBlock(pInfo->pUpdateRes, pBlock); + pInfo->updateResIndex = 0; prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); } break; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f4205455b4..7a23203e72 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1364,8 +1364,13 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa continue; } uint64_t winGpId = pGpDatas[i]; - bool res = doDeleteWindow(pOperator, win.skey, winGpId); SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; + void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); + if (chIds) { + getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); + continue; + } + bool res = doDeleteWindow(pOperator, win.skey, winGpId); if (pUpWins && res) { taosArrayPush(pUpWins, &winRes); } @@ -2615,6 +2620,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { streamStateCommit(pInfo->pState); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; } + qDebug("===stream===interval final close"); } return NULL; } else { diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index bf05d59a81..1cec299ccd 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -6,6 +6,32 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql create dnode $hostname2 port 7200 + +system sh/exec.sh -n dnode2 -s start + +print ===== step1 +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql select * from information_schema.ins_dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 2 then + return -1 +endi +if $data(1)[4] != ready then + goto step1 +endi +if $data(2)[4] != ready then + goto step1 +endi + print ===== step2 sql drop stream if exists stream_t1; sql drop database if exists test; @@ -458,4 +484,122 @@ if $data12 != 2 then goto loop3 endi +print ===== step3 + +sql drop database if exists test4; +sql create database test4 vgroups 10; +sql use test4; +sql create stable st(ts timestamp,a int,b int,c varchar(250) ) tags(ta int,tb int,tc int); +sql create table aaa using st tags(1,1,1); +sql create table bbb using st tags(2,2,2); +sql create table ccc using st tags(3,2,2); +sql create table ddd using st tags(4,2,2); + + +sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1, last_row(b) c2 from st partition by c interval(1s) ; + +sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa"); +sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa"); +sql insert into ccc values(1648791221001,2,2,"/a1/aa/aa"); +sql insert into ddd values(1648791221001,2,2,"/a1/aa/aa"); + +sql insert into aaa values(1648791222002,2,2,"/a2/aa/aa"); +sql insert into bbb values(1648791222002,2,2,"/a2/aa/aa"); +sql insert into ccc values(1648791222002,2,2,"/a2/aa/aa"); +sql insert into ddd values(1648791222002,2,2,"/a2/aa/aa"); + +sql insert into aaa values(1648791223003,2,2,"/a3/aa/aa"); +sql insert into bbb values(1648791223003,2,2,"/a3/aa/aa"); +sql insert into ccc values(1648791223003,2,2,"/a3/aa/aa"); +sql insert into ddd values(1648791223003,2,2,"/a3/aa/aa"); + +sql insert into aaa values(1648791224003,2,2,"/a4/aa/aa"); +sql insert into bbb values(1648791224003,2,2,"/a4/aa/aa"); +sql insert into ccc values(1648791224003,2,2,"/a4/aa/aa"); +sql insert into ddd values(1648791224003,2,2,"/a4/aa/aa"); + + +sql insert into aaa values(1648791225003,2,2,"/a5/aa/aa"); +sql insert into bbb values(1648791225003,2,2,"/a5/aa/aa"); +sql insert into ccc values(1648791225003,2,2,"/a5/aa/aa"); +sql insert into ddd values(1648791225003,2,2,"/a5/aa/aa"); + +sql insert into aaa values(1648791226003,2,2,"/a6/aa/aa"); +sql insert into bbb values(1648791226003,2,2,"/a6/aa/aa"); +sql insert into ccc values(1648791226003,2,2,"/a6/aa/aa"); +sql insert into ddd values(1648791226003,2,2,"/a6/aa/aa"); + +$loop_count = 0 + +loop4: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop4 +endi + +sql delete from aaa where ts = 1648791223003 ; + +$loop_count = 0 + +loop5: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop5 +endi + + +sql delete from ccc; + +$loop_count = 0 + +loop6: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop6 +endi + +sql delete from ddd; + +$loop_count = 0 + +loop7: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop7 +endi + +print ===== over + system sh/stop_dnodes.sh