From 2b6dc42d0ededa028b40d2824cf90e7dc6edd8d3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 3 Sep 2024 17:54:07 +0800 Subject: [PATCH] fix fill next's issue --- .../executor/src/streamtimesliceoperator.c | 31 +- .../script/tsim/stream/streamInterpNext1.sim | 73 +++ .../script/tsim/stream/streamInterpOther1.sim | 20 +- .../script/tsim/stream/streamInterpValue1.sim | 477 ++++++++++++++++++ 4 files changed, 585 insertions(+), 16 deletions(-) create mode 100644 tests/script/tsim/stream/streamInterpValue1.sim diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 617ee7c526..87a0afa917 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -507,6 +507,14 @@ static TSKEY adustEndTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) { return pointTs; } +static void adjustFillResRow(SResultRowData** ppResRow, SStreamFillSupporter* pFillSup) { + if (pFillSup->type == TSDB_FILL_PREV) { + (*ppResRow) = &pFillSup->cur; + } else if (pFillSup->type == TSDB_FILL_NEXT){ + (*ppResRow) = &pFillSup->next; + } +} + static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -538,7 +546,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p pFillInfo->hasNext = false; TSKEY startTs = adustPrevTsKey(pFillInfo->current, pFillSup->cur.key, &pFillSup->interval); setFillKeyInfo(startTs, pFillSup->next.key, &pFillSup->interval, pFillInfo); - pFillInfo->pResRow = &pFillSup->cur; + adjustFillResRow(&pFillInfo->pResRow, pFillSup); fillNormalRange(pFillSup, pFillInfo, pRes); } @@ -1004,7 +1012,12 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo case TSDB_FILL_NULL_F: case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE_F: { - if (hasPrevWindow(pFillSup)) { + if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey == pFillInfo->prePointKey && + pFillInfo->nextRowKey != pFillInfo->nextPointKey) { + setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_MID; + pFillInfo->hasNext = true; + } else if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; } else { @@ -1033,19 +1046,25 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pResRow = &pFillSup->prev; } break; case TSDB_FILL_NEXT: { - if (hasPrevWindow(pFillSup)) { + if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey == pFillInfo->prePointKey && + pFillInfo->nextRowKey != pFillInfo->nextPointKey) { + setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_MID; + pFillInfo->hasNext = true; + pFillInfo->pResRow = &pFillSup->cur; + } else if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; resetFillWindow(&pFillSup->next); pFillSup->next.key = ts; pFillSup->next.pRowVal = pFillSup->cur.pRowVal; + pFillInfo->pResRow = &pFillSup->next; } else { - ASSERT(hasNextWindow(pFillSup)); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; resetFillWindow(&pFillSup->prev); + pFillInfo->pResRow = &pFillSup->next; } - pFillInfo->pResRow = &pFillSup->next; } break; case TSDB_FILL_LINEAR: { if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) { @@ -1351,7 +1370,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup } QUERY_CHECK_CODE(code, lino, _end); - if (pFillSup->type == TSDB_FILL_PREV) { + if (pFillSup->type != TSDB_FILL_LINEAR) { getPrevResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->preRowKey); if (hasPrevWindow(pFillSup)) { pFillInfo->prePointKey = prevPoint.key.ts; diff --git a/tests/script/tsim/stream/streamInterpNext1.sim b/tests/script/tsim/stream/streamInterpNext1.sim index 9631ce8ece..f74863d7a3 100644 --- a/tests/script/tsim/stream/streamInterpNext1.sim +++ b/tests/script/tsim/stream/streamInterpNext1.sim @@ -399,6 +399,79 @@ if $data[15][1] != 3 then goto loop6 endi + +print step4 + +sql create database test4 vgroups 1; +sql use test4; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _irowts, interp(a) as b, _isfilled as a from st partition by tbname, b as cc every(1s) fill(next); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791217000,20000,2,3); + +sleep 2000 + +sql insert into t1 values(1648791212000,10000,2,3) (1648791215001,20,2,3); + +$loop_count = 0 +loop7: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 300 + +print sql select a,b from streamt4; +sql select a,b from streamt4; + +if $rows != 6 then + print ======rows=$rows + goto loop7 +endi + +if $data00 != 0 then + print ======data00=$data00 + goto loop7 +endi + +if $data01 != 10000 then + print ======data01=$data01 + goto loop7 +endi + +if $data10 != 1 then + print ======data10=$data10 + goto loop7 +endi + +if $data20 != 1 then + print ======data20=$data20 + goto loop7 +endi + +if $data41 != 20000 then + print ======data41=$data41 + goto loop7 +endi + +if $data50 != 0 then + print ======data50=$data50 + goto loop7 +endi + +if $data51 != 20000 then + print ======data51=$data51 + goto loop7 +endi + print end system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpOther1.sim b/tests/script/tsim/stream/streamInterpOther1.sim index 4a2ea56a8f..941b3e18f0 100644 --- a/tests/script/tsim/stream/streamInterpOther1.sim +++ b/tests/script/tsim/stream/streamInterpOther1.sim @@ -21,11 +21,11 @@ run tsim/stream/checkTaskStatus.sim sql insert into t1 values(1648791217000,20000,2,3); -sleep 500 +sleep 2000 sql insert into t1 values(1648791212000,10000,2,3); -sleep 500 +sleep 2000 sql insert into t1 values(1648791215001,20,2,3); @@ -94,11 +94,11 @@ run tsim/stream/checkTaskStatus.sim sql insert into t1 values(1648791217000,20000,2,3); -sleep 500 +sleep 2000 sql insert into t1 values(1648791212000,10000,2,3); -sleep 500 +sleep 2000 sql insert into t1 values(1648791215001,20,2,3); @@ -167,11 +167,11 @@ run tsim/stream/checkTaskStatus.sim sql insert into t1 values(1648791217000,20000,2,3); -sleep 500 +sleep 2000 sql insert into t1 values(1648791212000,10000,2,3); -sleep 500 +sleep 2000 sql insert into t1 values(1648791215001,20,2,3); @@ -240,11 +240,11 @@ run tsim/stream/checkTaskStatus.sim sql insert into t1 values(1648791217000,20000,2,3); -sleep 500 +sleep 2000 sql insert into t1 values(1648791212000,10000,2,3); -sleep 500 +sleep 2000 sql insert into t1 values(1648791215001,20,2,3); @@ -313,11 +313,11 @@ run tsim/stream/checkTaskStatus.sim sql insert into t1 values(1648791217000,20000,2,3); -sleep 500 +sleep 2000 sql insert into t1 values(1648791212000,10000,2,3); -sleep 500 +sleep 2000 sql insert into t1 values(1648791215001,20,2,3); diff --git a/tests/script/tsim/stream/streamInterpValue1.sim b/tests/script/tsim/stream/streamInterpValue1.sim new file mode 100644 index 0000000000..84a0e28300 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpValue1.sim @@ -0,0 +1,477 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,1,1,1,1.0); + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop0 +endi + + +sql insert into t1 values(1648791213009,3,3,3,1.0) (1648791217001,4,4,4,4.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(NULL); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != NULL then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != NULL then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != NULL then + print ======data31=$data31 + goto loop2 +endi + +if $data41 != NULL then + print ======data41=$data41 + goto loop2 +endi + + +print step2 + +sql create database test2 vgroups 1; +sql use test2; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791213000,1,1,1,1.0); + + +$loop_count = 0 + +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop3 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop3 +endi + + +sql insert into t1 values(1648791213009,3,3,3,1.0) (1648791217001,4,4,4,4.1) (1648791219000,5,5,5,5.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791219000) every(1s) fill(NULL); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791219000) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 +print $data60 $data61 $data62 $data63 $data64 +print $data70 $data71 $data72 $data73 $data74 + + +$loop_count = 0 +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 +print $data60 $data61 $data62 $data63 $data64 +print $data70 $data71 $data72 $data73 $data74 + +# row 0 +if $rows != 7 then + print ======rows=$rows + goto loop4 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop4 +endi + +if $data11 != NULL then + print ======data11=$data11 + goto loop4 +endi + +if $data21 != NULL then + print ======data21=$data21 + goto loop4 +endi + +if $data31 != NULL then + print ======data31=$data31 + goto loop4 +endi + +if $data41 != NULL then + print ======data41=$data41 + goto loop4 +endi + +if $data51 != NULL then + print ======data51=$data51 + goto loop4 +endi + +if $data61 != 5 then + print ======data61=$data61 + goto loop4 +endi + +print step3 + +sql create database test3 vgroups 1; +sql use test3; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791213001,1,1,1,1.0) (1648791219001,2,2,2,2.1) (1648791229001,3,3,3,3.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791229001) every(1s) fill(NULL); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791229001) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 +print $data60 $data61 $data62 $data63 $data64 +print $data70 $data71 $data72 $data73 $data74 + + +$loop_count = 0 +loop5: + +sleep 300 + +print sql select * from streamt order by 1; +sql select * from streamt order by 1; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 16 then + print =====rows=$rows + goto loop5 +endi + +sql insert into t1 values(1648791215001,4,4,4,4.0) (1648791217001,5,5,5,5.1) (1648791222000,6,6,6,6.1) (1648791226000,7,7,7,7.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791229001) every(1s) fill(NULL); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791229001) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 +print $data60 $data61 $data62 $data63 $data64 +print $data70 $data71 $data72 $data73 $data74 + + +$loop_count = 0 +loop6: + +sleep 300 + +print sql select * from streamt order by 1; +sql select * from streamt order by 1; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 16 then + goto loop6 +endi + +if $data01 != NULL then + print =====data01=$data01 + goto loop6 +endi + +if $data11 != NULL then + print =====data11=$data11 + goto loop6 +endi + +if $data21 != NULL then + print =====data21=$data21 + goto loop6 +endi + +if $data31 != NULL then + print =====data31=$data31 + goto loop6 +endi + +if $data41 != NULL then + print =====data41=$data41 + goto loop6 +endi + +if $data51 != NULL then + print =====data51=$data51 + goto loop6 +endi + +if $data61 != NULL then + print =====data61=$data61 + goto loop6 +endi + +if $data71 != NULL then + print =====data71=$data71 + goto loop6 +endi + +if $data81 != 6 then + print =====data81=$data81 + goto loop6 +endi + +if $data91 != NULL then + print =====data91=$data91 + goto loop6 +endi + +if $data[10][1] != NULL then + print =====data[10][1]=$data[10][1] + goto loop6 +endi + +if $data[11][1] != NULL then + print =====data[11][1]=$data[11][1] + goto loop6 +endi + +if $data[12][1] != 7 then + print =====data[12][1]=$data[12][1] + goto loop6 +endi + +if $data[13][1] != NULL then + print =====data[13][1]=$data[13][1] + goto loop6 +endi + +if $data[14][1] != NULL then + print =====data[14][1]=$data[14][1] + goto loop6 +endi + +if $data[15][1] != NULL then + print =====data[15][1]=$data[15][1] + goto loop6 +endi + + +print step4 + +sql create database test4 vgroups 1; +sql use test4; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _irowts, interp(a) as b, _isfilled as a from st partition by tbname, b as cc every(1s) fill(NULL); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791217000,20000,2,3); + +sleep 2000 + +sql insert into t1 values(1648791212000,10000,2,3) (1648791215001,20,2,3); + +$loop_count = 0 +loop7: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 300 + +print sql select a,b from streamt4; +sql select a,b from streamt4; + +if $rows != 6 then + print ======rows=$rows + goto loop7 +endi + +if $data00 != 0 then + print ======data00=$data00 + goto loop7 +endi + +if $data01 != 10000 then + print ======data01=$data01 + goto loop7 +endi + +if $data10 != 1 then + print ======data10=$data10 + goto loop7 +endi + +if $data20 != 1 then + print ======data20=$data20 + goto loop7 +endi + +if $data41 != NULL then + print ======data41=$data41 + goto loop7 +endi + +if $data50 != 0 then + print ======data50=$data50 + goto loop7 +endi + +if $data51 != 20000 then + print ======data51=$data51 + goto loop7 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT