fix(stream):build create table request for force_window_close

This commit is contained in:
54liuyao 2024-11-25 13:54:31 +08:00
parent 062a8a2831
commit 39564a41ae
5 changed files with 224 additions and 0 deletions

View File

@ -538,6 +538,7 @@ typedef struct SStreamScanInfo {
int32_t pkColLen;
bool useGetResultRange;
STimeWindow lastScanRange;
SSDataBlock* pRangeScanRes; // update SSDataBlock
} SStreamScanInfo;
typedef struct {

View File

@ -3839,6 +3839,11 @@ FETCH_NEXT_BLOCK:
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE:
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
if (pInfo->pRangeScanRes != NULL) {
(*ppRes) = pInfo->pRangeScanRes;
pInfo->pRangeScanRes = NULL;
return code;
}
SSDataBlock* pSDB = NULL;
code = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex, &pSDB);
QUERY_CHECK_CODE(code, lino, _end);
@ -3852,6 +3857,15 @@ FETCH_NEXT_BLOCK:
printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
code = calBlockTbName(pInfo, pSDB, 0);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pCreateTbRes->info.rows > 0) {
printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "update",
GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pCreateTbRes;
pInfo->pRangeScanRes = pSDB;
return code;
}
(*ppRes) = pSDB;
return code;
}
@ -4629,6 +4643,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
pInfo->pFillSup = NULL;
pInfo->useGetResultRange = false;
pInfo->pRangeScanRes = NULL;
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -1481,6 +1481,18 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup
pBlock->info.id.groupId = pKey->groupId;
}
}
void* tbname = NULL;
int32_t winCode = TSDB_CODE_SUCCESS;
code =
pAggSup->stateStore.streamStateGetParName(pAggSup->pState, pBlock->info.id.groupId, &tbname, false, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
pAggSup->stateStore.streamStateFreeVal(tbname);
SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId};
SSlicePoint prevPoint = {0};
SSlicePoint nextPoint = {0};

View File

@ -1417,6 +1417,7 @@
,,y,script,./test.sh -f tsim/stream/streamTwaFwcFill.sim
,,y,script,./test.sh -f tsim/stream/streamTwaFwcFillPrimaryKey.sim
,,y,script,./test.sh -f tsim/stream/streamTwaFwcIntervalPrimaryKey.sim
,,y,script,./test.sh -f tsim/stream/streamTwaInterpFwc.sim
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
,,y,script,./test.sh -f tsim/stream/udTableAndCol0.sim

View File

@ -109,6 +109,201 @@ if $data01 != $query1_data01 then
return -1
endi
print step2
print =============== create database
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams6 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt6 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_1")) as select _irowts, interp(a), _isfilled as a1 from st partition by tbname, b as cc every(2s) fill(prev);
sql create stream streams7 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt7 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_2")) as select _wstart, twa(a) from st partition by tbname, b as cc interval(2s) fill(NULL);
sql create stream streams8 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt8 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_3")) as select _wstart, count(a) from st partition by tbname, b as cc interval(2s);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(now + 3s,1,1,1);
$loop_count = 0
loop6:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,* from streamt6;
sql select cc,* from streamt6;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop6
endi
if $data00 != 1 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt6";
sql select * from information_schema.ins_tables where stable_name = "streamt6";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt6" and table_name like "tbn-t1_1%";
sql select * from information_schema.ins_tables where stable_name = "streamt6" and table_name like "tbn-t1_1%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
$loop_count = 0
loop7:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,* from streamt7;
sql select cc,* from streamt7;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop7
endi
if $data00 != 1 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt7";
sql select * from information_schema.ins_tables where stable_name = "streamt7";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt7" and table_name like "tbn-t1_2%";
sql select * from information_schema.ins_tables where stable_name = "streamt7" and table_name like "tbn-t1_2%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
$loop_count = 0
loop8:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,* from streamt8;
sql select cc,* from streamt8;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 1 then
print ======rows=$rows
goto loop8
endi
if $data00 != 1 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt8";
sql select * from information_schema.ins_tables where stable_name = "streamt8";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt8" and table_name like "tbn-t1_3%";
sql select * from information_schema.ins_tables where stable_name = "streamt8" and table_name like "tbn-t1_3%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT