fix issue

This commit is contained in:
54liuyao 2024-08-16 17:46:14 +08:00
parent 2a66a89adc
commit 4fe68034cf
5 changed files with 166 additions and 10 deletions

View File

@ -63,7 +63,6 @@ void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup);
bool hasCurWindow(SStreamFillSupporter* pFillSup); bool hasCurWindow(SStreamFillSupporter* pFillSup);
bool hasPrevWindow(SStreamFillSupporter* pFillSup); bool hasPrevWindow(SStreamFillSupporter* pFillSup);
bool hasNextWindow(SStreamFillSupporter* pFillSup); bool hasNextWindow(SStreamFillSupporter* pFillSup);
bool hasNextNextWindow(SStreamFillSupporter* pFillSup);
void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo); void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo);
int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell); int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell);
bool hasRemainCalc(SStreamFillInfo* pFillInfo); bool hasRemainCalc(SStreamFillInfo* pFillInfo);

View File

@ -109,8 +109,6 @@ typedef struct SStreamFillInfo {
TSKEY prePointKey; TSKEY prePointKey;
TSKEY nextRowKey; TSKEY nextRowKey;
TSKEY nextPointKey; TSKEY nextPointKey;
TSKEY nextNextRowKey;
TSKEY nextNextPointKey;
SResultRowData* pResRow; SResultRowData* pResRow;
SStreamFillLinearInfo* pLinearInfo; SStreamFillLinearInfo* pLinearInfo;
bool needFill; bool needFill;

View File

@ -235,7 +235,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
bool hasCurWindow(SStreamFillSupporter* pFillSup) { return pFillSup->cur.key != INT64_MIN; } bool hasCurWindow(SStreamFillSupporter* pFillSup) { return pFillSup->cur.key != INT64_MIN; }
bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; } bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; }
bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; } bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; }
bool hasNextNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->nextNext.key != INT64_MIN; } static bool hasNextNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->nextNext.key != INT64_MIN; }
static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) { static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);

View File

@ -1303,11 +1303,6 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor
if (hasNextWindow(pFillSup)) { if (hasNextWindow(pFillSup)) {
pFillInfo->nextPointKey = nextPoint.key.ts; pFillInfo->nextPointKey = nextPoint.key.ts;
} }
getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index + 1, &pFillInfo->nextNextRowKey);
if (hasNextNextWindow(pFillSup)) {
pFillInfo->nextNextPointKey = pFillSup->nextNext.key;
}
} }
setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts); setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts);

View File

@ -33,7 +33,7 @@ $loop_count = 0
loop4_1: loop4_1:
$loop_count = $loop_count + 1 $loop_count = $loop_count + 1
if $loop_count == 10 then if $loop_count == 20 then
return -1 return -1
endi endi
@ -105,6 +105,13 @@ sql insert into t1 values(1648791215001,20,2,3);
$loop_count = 0 $loop_count = 0
loop4_2: loop4_2:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 300
print sql select a,b from streamt4_2; print sql select a,b from streamt4_2;
sql select a,b from streamt4_2; sql select a,b from streamt4_2;
@ -171,6 +178,13 @@ sql insert into t1 values(1648791215001,20,2,3);
$loop_count = 0 $loop_count = 0
loop4_3: loop4_3:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 300
print sql select a,b from streamt4_3; print sql select a,b from streamt4_3;
sql select a,b from streamt4_3; sql select a,b from streamt4_3;
@ -237,6 +251,13 @@ sql insert into t1 values(1648791215001,20,2,3);
$loop_count = 0 $loop_count = 0
loop4_4: loop4_4:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 300
print sql select a,b from streamt4_4; print sql select a,b from streamt4_4;
sql select a,b from streamt4_4; sql select a,b from streamt4_4;
@ -303,6 +324,13 @@ sql insert into t1 values(1648791215001,20,2,3);
$loop_count = 0 $loop_count = 0
loop4_5: loop4_5:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 300
print sql select a,b from streamt4_5; print sql select a,b from streamt4_5;
sql select a,b from streamt4_5; sql select a,b from streamt4_5;
@ -341,6 +369,142 @@ if $data51 != 32 then
goto loop4_5 goto loop4_5
endi endi
print step5
print =============== create database
sql drop database if exists test5;
sql create database test5 vgroups 4 precision 'us';
sql use test5;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams5 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt as select interp(a), _isfilled as a1 from t1 every(1s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791213000001,1,1,1,1.0) (1648791215000001,20,1,1,1.0) (1648791216000000,3,1,1,1.0);
$loop_count = 0
loop5:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 300
print sql select cast(`_irowts` as bigint) from streamt order by 1;
sql select cast(`_irowts` as bigint) from streamt order by 1;
if $rows != 3 then
print ======rows=$rows
goto loop5
endi
if $data00 != 1648791214000000 then
print ======data00=$data00
goto loop5
endi
if $data10 != 1648791215000000 then
print ======data01=$data01
goto loop5
endi
if $data20 != 1648791216000000 then
print ======data01=$data01
goto loop5
endi
print step6
print =============== create database
sql drop database if exists test6;
sql create database test6 vgroups 4 precision 'us';
sql use test6;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams6 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt as select interp(a), _isfilled as a1 from t1 every(1s) fill(next);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791213000001,1,1,1,1.0) (1648791215000001,20,1,1,1.0) (1648791216000000,3,1,1,1.0);
$loop_count = 0
loop6:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 300
print sql select cast(`_irowts` as bigint) from streamt order by 1;
sql select cast(`_irowts` as bigint) from streamt order by 1;
if $rows != 3 then
print ======rows=$rows
goto loop6
endi
if $data00 != 1648791214000000 then
print ======data00=$data00
goto loop6
endi
if $data10 != 1648791215000000 then
print ======data01=$data01
goto loop6
endi
if $data20 != 1648791216000000 then
print ======data01=$data01
goto loop6
endi
print step7
print =============== create database
sql drop database if exists test7;
sql create database test7 vgroups 4 precision 'us';
sql use test7;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams7 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt as select interp(a), _isfilled as a1 from t1 every(1s) fill(linear);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791213000001,1,1,1,1.0) (1648791215000001,20,1,1,1.0) (1648791216000000,3,1,1,1.0);
$loop_count = 0
loop7:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 300
print sql select cast(`_irowts` as bigint) from streamt order by 1;
sql select cast(`_irowts` as bigint) from streamt order by 1;
if $rows != 3 then
print ======rows=$rows
goto loop7
endi
if $data00 != 1648791214000000 then
print ======data00=$data00
goto loop7
endi
if $data10 != 1648791215000000 then
print ======data01=$data01
goto loop7
endi
if $data20 != 1648791216000000 then
print ======data01=$data01
goto loop7
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT