fix:state window return wrong block type
This commit is contained in:
parent
dab01b3159
commit
ae98ad43a5
|
@ -1829,6 +1829,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pUpdateRes, "recover update");
|
printDataBlock(pInfo->pUpdateRes, "recover update");
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
} break;
|
} break;
|
||||||
|
case STREAM_SCAN_FROM_DELETE_DATA: {
|
||||||
|
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||||
|
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||||
|
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||||
|
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||||
|
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||||
|
printDataBlock(pInfo->pDeleteDataRes, "recover delete");
|
||||||
|
return pInfo->pDeleteDataRes;
|
||||||
|
} break;
|
||||||
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
|
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
|
||||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||||
if (pSDB) {
|
if (pSDB) {
|
||||||
|
@ -2021,6 +2030,7 @@ FETCH_NEXT_BLOCK:
|
||||||
copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
|
copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
|
||||||
blockDataCleanup(pSup->pScanBlock);
|
blockDataCleanup(pSup->pScanBlock);
|
||||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||||
|
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 50
|
sleep 50
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
|
print step 1
|
||||||
print =============== create database
|
print =============== create database
|
||||||
sql create database test vgroups 4;
|
sql create database test vgroups 4;
|
||||||
sql select * from information_schema.ins_databases;
|
sql select * from information_schema.ins_databases;
|
||||||
|
@ -33,8 +34,8 @@ if $loop_count == 10 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql select * from streamt1;
|
sql select * from streamt1;
|
||||||
print data00 data01
|
print $data00 $data01
|
||||||
print data10 data11
|
print $data10 $data11
|
||||||
|
|
||||||
if $rows != 0 then
|
if $rows != 0 then
|
||||||
print =====rows=$rows
|
print =====rows=$rows
|
||||||
|
@ -52,8 +53,8 @@ if $loop_count == 10 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql select * from streamt1;
|
sql select * from streamt1;
|
||||||
print data00 data01
|
print $data00 $data01
|
||||||
print data10 data11
|
print $data10 $data11
|
||||||
|
|
||||||
if $rows != 1 then
|
if $rows != 1 then
|
||||||
print =====rows=$rows
|
print =====rows=$rows
|
||||||
|
@ -92,9 +93,64 @@ endi
|
||||||
sql select * from streamt1;
|
sql select * from streamt1;
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
print =====rows=$rows
|
print =====rows=$rows
|
||||||
goto loop2
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step 1 over
|
||||||
|
print step 2
|
||||||
|
|
||||||
|
sql create database test2 vgroups 1;
|
||||||
|
sql use test2;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
print create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a)
|
||||||
|
sql create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213010,1,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop4:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt2;
|
||||||
|
print $data00 $data01
|
||||||
|
print $data10 $data11
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
print insert into t1 values(1648791213005,2,2,3,1.1)
|
||||||
|
sql insert into t1 values(1648791213005,2,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop5:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print select * from streamt2
|
||||||
|
sql select * from streamt2;
|
||||||
|
print $data00 $data01
|
||||||
|
print $data10 $data11
|
||||||
|
print $data20 $data21
|
||||||
|
print $data30 $data31
|
||||||
|
|
||||||
|
if $rows != 3 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
print step 2 over
|
||||||
|
|
||||||
print state1 end
|
print state1 end
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue