fix fill next's issue

This commit is contained in:
54liuyao 2024-09-03 17:54:07 +08:00
parent 26c74603f3
commit 2b6dc42d0e
4 changed files with 585 additions and 16 deletions

View File

@ -507,6 +507,14 @@ static TSKEY adustEndTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) {
return pointTs;
}
static void adjustFillResRow(SResultRowData** ppResRow, SStreamFillSupporter* pFillSup) {
if (pFillSup->type == TSDB_FILL_PREV) {
(*ppResRow) = &pFillSup->cur;
} else if (pFillSup->type == TSDB_FILL_NEXT){
(*ppResRow) = &pFillSup->next;
}
}
static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -538,7 +546,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
pFillInfo->hasNext = false;
TSKEY startTs = adustPrevTsKey(pFillInfo->current, pFillSup->cur.key, &pFillSup->interval);
setFillKeyInfo(startTs, pFillSup->next.key, &pFillSup->interval, pFillInfo);
pFillInfo->pResRow = &pFillSup->cur;
adjustFillResRow(&pFillInfo->pResRow, pFillSup);
fillNormalRange(pFillSup, pFillInfo, pRes);
}
@ -1004,7 +1012,12 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
case TSDB_FILL_NULL_F:
case TSDB_FILL_SET_VALUE:
case TSDB_FILL_SET_VALUE_F: {
if (hasPrevWindow(pFillSup)) {
if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey == pFillInfo->prePointKey &&
pFillInfo->nextRowKey != pFillInfo->nextPointKey) {
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_MID;
pFillInfo->hasNext = true;
} else if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
} else {
@ -1033,19 +1046,25 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillInfo->pResRow = &pFillSup->prev;
} break;
case TSDB_FILL_NEXT: {
if (hasPrevWindow(pFillSup)) {
if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey == pFillInfo->prePointKey &&
pFillInfo->nextRowKey != pFillInfo->nextPointKey) {
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_MID;
pFillInfo->hasNext = true;
pFillInfo->pResRow = &pFillSup->cur;
} else if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
resetFillWindow(&pFillSup->next);
pFillSup->next.key = ts;
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
pFillInfo->pResRow = &pFillSup->next;
} else {
ASSERT(hasNextWindow(pFillSup));
setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
resetFillWindow(&pFillSup->prev);
}
pFillInfo->pResRow = &pFillSup->next;
}
} break;
case TSDB_FILL_LINEAR: {
if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
@ -1351,7 +1370,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup
}
QUERY_CHECK_CODE(code, lino, _end);
if (pFillSup->type == TSDB_FILL_PREV) {
if (pFillSup->type != TSDB_FILL_LINEAR) {
getPrevResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->preRowKey);
if (hasPrevWindow(pFillSup)) {
pFillInfo->prePointKey = prevPoint.key.ts;

View File

@ -399,6 +399,79 @@ if $data[15][1] != 3 then
goto loop6
endi
print step4
sql create database test4 vgroups 1;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _irowts, interp(a) as b, _isfilled as a from st partition by tbname, b as cc every(1s) fill(next);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,20000,2,3);
sleep 2000
sql insert into t1 values(1648791212000,10000,2,3) (1648791215001,20,2,3);
$loop_count = 0
loop7:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 300
print sql select a,b from streamt4;
sql select a,b from streamt4;
if $rows != 6 then
print ======rows=$rows
goto loop7
endi
if $data00 != 0 then
print ======data00=$data00
goto loop7
endi
if $data01 != 10000 then
print ======data01=$data01
goto loop7
endi
if $data10 != 1 then
print ======data10=$data10
goto loop7
endi
if $data20 != 1 then
print ======data20=$data20
goto loop7
endi
if $data41 != 20000 then
print ======data41=$data41
goto loop7
endi
if $data50 != 0 then
print ======data50=$data50
goto loop7
endi
if $data51 != 20000 then
print ======data51=$data51
goto loop7
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -21,11 +21,11 @@ run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,20000,2,3);
sleep 500
sleep 2000
sql insert into t1 values(1648791212000,10000,2,3);
sleep 500
sleep 2000
sql insert into t1 values(1648791215001,20,2,3);
@ -94,11 +94,11 @@ run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,20000,2,3);
sleep 500
sleep 2000
sql insert into t1 values(1648791212000,10000,2,3);
sleep 500
sleep 2000
sql insert into t1 values(1648791215001,20,2,3);
@ -167,11 +167,11 @@ run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,20000,2,3);
sleep 500
sleep 2000
sql insert into t1 values(1648791212000,10000,2,3);
sleep 500
sleep 2000
sql insert into t1 values(1648791215001,20,2,3);
@ -240,11 +240,11 @@ run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,20000,2,3);
sleep 500
sleep 2000
sql insert into t1 values(1648791212000,10000,2,3);
sleep 500
sleep 2000
sql insert into t1 values(1648791215001,20,2,3);
@ -313,11 +313,11 @@ run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,20000,2,3);
sleep 500
sleep 2000
sql insert into t1 values(1648791212000,10000,2,3);
sleep 500
sleep 2000
sql insert into t1 values(1648791215001,20,2,3);

View File

@ -0,0 +1,477 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791213000,1,1,1,1.0);
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 1 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != 1 then
print ======data01=$data01
goto loop0
endi
sql insert into t1 values(1648791213009,3,3,3,1.0) (1648791217001,4,4,4,4.1);
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(NULL);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(NULL);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 2 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop2
endi
# row 0
if $data01 != 1 then
print ======data01=$data01
goto loop2
endi
if $data11 != NULL then
print ======data11=$data11
goto loop2
endi
if $data21 != NULL then
print ======data21=$data21
goto loop2
endi
if $data31 != NULL then
print ======data31=$data31
goto loop2
endi
if $data41 != NULL then
print ======data41=$data41
goto loop2
endi
print step2
sql create database test2 vgroups 1;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791213000,1,1,1,1.0);
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 1 then
print ======rows=$rows
goto loop3
endi
# row 0
if $data01 != 1 then
print ======data01=$data01
goto loop3
endi
sql insert into t1 values(1648791213009,3,3,3,1.0) (1648791217001,4,4,4,4.1) (1648791219000,5,5,5,5.1);
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791219000) every(1s) fill(NULL);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791219000) every(1s) fill(NULL);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
print $data60 $data61 $data62 $data63 $data64
print $data70 $data71 $data72 $data73 $data74
$loop_count = 0
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 2 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
print $data60 $data61 $data62 $data63 $data64
print $data70 $data71 $data72 $data73 $data74
# row 0
if $rows != 7 then
print ======rows=$rows
goto loop4
endi
# row 0
if $data01 != 1 then
print ======data01=$data01
goto loop4
endi
if $data11 != NULL then
print ======data11=$data11
goto loop4
endi
if $data21 != NULL then
print ======data21=$data21
goto loop4
endi
if $data31 != NULL then
print ======data31=$data31
goto loop4
endi
if $data41 != NULL then
print ======data41=$data41
goto loop4
endi
if $data51 != NULL then
print ======data51=$data51
goto loop4
endi
if $data61 != 5 then
print ======data61=$data61
goto loop4
endi
print step3
sql create database test3 vgroups 1;
sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791213001,1,1,1,1.0) (1648791219001,2,2,2,2.1) (1648791229001,3,3,3,3.1);
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791229001) every(1s) fill(NULL);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791229001) every(1s) fill(NULL);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
print $data60 $data61 $data62 $data63 $data64
print $data70 $data71 $data72 $data73 $data74
$loop_count = 0
loop5:
sleep 300
print sql select * from streamt order by 1;
sql select * from streamt order by 1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 16 then
print =====rows=$rows
goto loop5
endi
sql insert into t1 values(1648791215001,4,4,4,4.0) (1648791217001,5,5,5,5.1) (1648791222000,6,6,6,6.1) (1648791226000,7,7,7,7.1);
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791229001) every(1s) fill(NULL);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791229001) every(1s) fill(NULL);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
print $data60 $data61 $data62 $data63 $data64
print $data70 $data71 $data72 $data73 $data74
$loop_count = 0
loop6:
sleep 300
print sql select * from streamt order by 1;
sql select * from streamt order by 1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 16 then
goto loop6
endi
if $data01 != NULL then
print =====data01=$data01
goto loop6
endi
if $data11 != NULL then
print =====data11=$data11
goto loop6
endi
if $data21 != NULL then
print =====data21=$data21
goto loop6
endi
if $data31 != NULL then
print =====data31=$data31
goto loop6
endi
if $data41 != NULL then
print =====data41=$data41
goto loop6
endi
if $data51 != NULL then
print =====data51=$data51
goto loop6
endi
if $data61 != NULL then
print =====data61=$data61
goto loop6
endi
if $data71 != NULL then
print =====data71=$data71
goto loop6
endi
if $data81 != 6 then
print =====data81=$data81
goto loop6
endi
if $data91 != NULL then
print =====data91=$data91
goto loop6
endi
if $data[10][1] != NULL then
print =====data[10][1]=$data[10][1]
goto loop6
endi
if $data[11][1] != NULL then
print =====data[11][1]=$data[11][1]
goto loop6
endi
if $data[12][1] != 7 then
print =====data[12][1]=$data[12][1]
goto loop6
endi
if $data[13][1] != NULL then
print =====data[13][1]=$data[13][1]
goto loop6
endi
if $data[14][1] != NULL then
print =====data[14][1]=$data[14][1]
goto loop6
endi
if $data[15][1] != NULL then
print =====data[15][1]=$data[15][1]
goto loop6
endi
print step4
sql create database test4 vgroups 1;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _irowts, interp(a) as b, _isfilled as a from st partition by tbname, b as cc every(1s) fill(NULL);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,20000,2,3);
sleep 2000
sql insert into t1 values(1648791212000,10000,2,3) (1648791215001,20,2,3);
$loop_count = 0
loop7:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 300
print sql select a,b from streamt4;
sql select a,b from streamt4;
if $rows != 6 then
print ======rows=$rows
goto loop7
endi
if $data00 != 0 then
print ======data00=$data00
goto loop7
endi
if $data01 != 10000 then
print ======data01=$data01
goto loop7
endi
if $data10 != 1 then
print ======data10=$data10
goto loop7
endi
if $data20 != 1 then
print ======data20=$data20
goto loop7
endi
if $data41 != NULL then
print ======data41=$data41
goto loop7
endi
if $data50 != 0 then
print ======data50=$data50
goto loop7
endi
if $data51 != 20000 then
print ======data51=$data51
goto loop7
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT