diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 039c0fa68b..271e9c91a1 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -538,6 +538,7 @@ typedef struct SStreamScanInfo { int32_t pkColLen; bool useGetResultRange; STimeWindow lastScanRange; + SSDataBlock* pRangeScanRes; // update SSDataBlock } SStreamScanInfo; typedef struct { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 095d7e1a2b..07b7d052a2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3839,6 +3839,11 @@ FETCH_NEXT_BLOCK: } break; case STREAM_SCAN_FROM_DATAREADER_RANGE: case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: { + if (pInfo->pRangeScanRes != NULL) { + (*ppRes) = pInfo->pRangeScanRes; + pInfo->pRangeScanRes = NULL; + return code; + } SSDataBlock* pSDB = NULL; code = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex, &pSDB); QUERY_CHECK_CODE(code, lino, _end); @@ -3852,6 +3857,15 @@ FETCH_NEXT_BLOCK: printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo)); code = calBlockTbName(pInfo, pSDB, 0); QUERY_CHECK_CODE(code, lino, _end); + + if (pInfo->pCreateTbRes->info.rows > 0) { + printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "update", + GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->pCreateTbRes; + pInfo->pRangeScanRes = pSDB; + return code; + } + (*ppRes) = pSDB; return code; } @@ -4629,6 +4643,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; pInfo->pFillSup = NULL; pInfo->useGetResultRange = false; + pInfo->pRangeScanRes = NULL; code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index b120bb6374..efc5dd6d6a 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -1481,6 +1481,18 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup pBlock->info.id.groupId = pKey->groupId; } } + void* tbname = NULL; + int32_t winCode = TSDB_CODE_SUCCESS; + code = + pAggSup->stateStore.streamStateGetParName(pAggSup->pState, pBlock->info.id.groupId, &tbname, false, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + if (winCode != TSDB_CODE_SUCCESS) { + pBlock->info.parTbName[0] = 0; + } else { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } + pAggSup->stateStore.streamStateFreeVal(tbname); + SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId}; SSlicePoint prevPoint = {0}; SSlicePoint nextPoint = {0}; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 492dd11177..0378a932b6 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1417,6 +1417,7 @@ ,,y,script,./test.sh -f tsim/stream/streamTwaFwcFill.sim ,,y,script,./test.sh -f tsim/stream/streamTwaFwcFillPrimaryKey.sim ,,y,script,./test.sh -f tsim/stream/streamTwaFwcIntervalPrimaryKey.sim +,,y,script,./test.sh -f tsim/stream/streamTwaInterpFwc.sim ,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim ,,y,script,./test.sh -f tsim/stream/triggerSession0.sim ,,y,script,./test.sh -f tsim/stream/udTableAndCol0.sim diff --git a/tests/script/tsim/stream/streamTwaInterpFwc.sim b/tests/script/tsim/stream/streamTwaInterpFwc.sim index 2073378e92..ce76387c91 100644 --- a/tests/script/tsim/stream/streamTwaInterpFwc.sim +++ b/tests/script/tsim/stream/streamTwaInterpFwc.sim @@ -109,6 +109,201 @@ if $data01 != $query1_data01 then return -1 endi +print step2 +print =============== create database +sql create database test4 vgroups 4; +sql use test4; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + + +sql create stream streams6 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt6 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_1")) as select _irowts, interp(a), _isfilled as a1 from st partition by tbname, b as cc every(2s) fill(prev); +sql create stream streams7 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt7 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_2")) as select _wstart, twa(a) from st partition by tbname, b as cc interval(2s) fill(NULL); +sql create stream streams8 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt8 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_3")) as select _wstart, count(a) from st partition by tbname, b as cc interval(2s); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3s,1,1,1); + +$loop_count = 0 +loop6: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,* from streamt6; +sql select cc,* from streamt6; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop6 +endi + +if $data00 != 1 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt6"; +sql select * from information_schema.ins_tables where stable_name = "streamt6"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt6" and table_name like "tbn-t1_1%"; +sql select * from information_schema.ins_tables where stable_name = "streamt6" and table_name like "tbn-t1_1%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +$loop_count = 0 +loop7: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,* from streamt7; +sql select cc,* from streamt7; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop7 +endi + +if $data00 != 1 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt7"; +sql select * from information_schema.ins_tables where stable_name = "streamt7"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt7" and table_name like "tbn-t1_2%"; +sql select * from information_schema.ins_tables where stable_name = "streamt7" and table_name like "tbn-t1_2%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +$loop_count = 0 +loop8: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,* from streamt8; +sql select cc,* from streamt8; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 1 then + print ======rows=$rows + goto loop8 +endi + +if $data00 != 1 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt8"; +sql select * from information_schema.ins_tables where stable_name = "streamt8"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt8" and table_name like "tbn-t1_3%"; +sql select * from information_schema.ins_tables where stable_name = "streamt8" and table_name like "tbn-t1_3%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + print end system sh/exec.sh -n dnode1 -s stop -x SIGINT