Merge pull request #20464 from taosdata/fix/TD-23156
fix:source task needs to wait && stream state window does not handle null
This commit is contained in:
commit
51e503dd2c
|
@ -3911,7 +3911,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
|
||||
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
|
||||
for (int32_t i = 0; i < rows; i += winRows) {
|
||||
if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
|
||||
if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup) || colDataIsNull_s(pKeyColInfo, i)) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
|
||||
int32_t code;
|
||||
void* exec = pTask->exec.executor;
|
||||
while(atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
|
||||
while(pTask->taskLevel == TASK_LEVEL__SOURCE && atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
|
||||
qError("stream task wait for the end of fill history");
|
||||
taosMsleep(2);
|
||||
continue;
|
||||
|
|
|
@ -51,13 +51,8 @@ if $loop_count == 10 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
if $rows != 0 then
|
||||
print =====rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
|
|
|
@ -27,13 +27,8 @@ if $loop_count == 10 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
if $rows != 0 then
|
||||
print =====rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database test vgroups 4;
|
||||
sql select * from information_schema.ins_databases;
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print $data00 $data01 $data02
|
||||
|
||||
sql use test;
|
||||
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
|
||||
|
||||
print create stream streams1 trigger at_once IGNORE EXPIRED 0 into streamt1 as select _wstart, count(*) c1 from t1 state_window(a);
|
||||
|
||||
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 into streamt1 as select _wstart, count(*) c1 from t1 state_window(a);
|
||||
|
||||
sql insert into t1(ts) values(1648791213000);
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 300
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt1;
|
||||
print data00 data01
|
||||
print data10 data11
|
||||
|
||||
if $rows != 0 then
|
||||
print =====rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791214000,1,2,3,1.0,3);
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
|
||||
sleep 300
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt1;
|
||||
print data00 data01
|
||||
print data10 data11
|
||||
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791215000,2,2,3,1.0,4);
|
||||
|
||||
$loop_count = 0
|
||||
loop2:
|
||||
|
||||
sleep 300
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt1;
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
sql insert into t1(ts) values(1648791216000);
|
||||
|
||||
$loop_count = 0
|
||||
loop3:
|
||||
|
||||
sleep 300
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt1;
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
|
||||
print state1 end
|
||||
|
||||
system sh/stop_dnodes.sh
|
Loading…
Reference in New Issue