From 39564a41ae1f5ddf731686200848a8a654367432 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 25 Nov 2024 13:54:31 +0800 Subject: [PATCH 1/2] fix(stream):build create table request for force_window_close --- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/scanoperator.c | 15 ++ .../executor/src/streamtimesliceoperator.c | 12 ++ tests/parallel_test/cases.task | 1 + .../script/tsim/stream/streamTwaInterpFwc.sim | 195 ++++++++++++++++++ 5 files changed, 224 insertions(+) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 039c0fa68b..271e9c91a1 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -538,6 +538,7 @@ typedef struct SStreamScanInfo { int32_t pkColLen; bool useGetResultRange; STimeWindow lastScanRange; + SSDataBlock* pRangeScanRes; // update SSDataBlock } SStreamScanInfo; typedef struct { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 095d7e1a2b..07b7d052a2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3839,6 +3839,11 @@ FETCH_NEXT_BLOCK: } break; case STREAM_SCAN_FROM_DATAREADER_RANGE: case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: { + if (pInfo->pRangeScanRes != NULL) { + (*ppRes) = pInfo->pRangeScanRes; + pInfo->pRangeScanRes = NULL; + return code; + } SSDataBlock* pSDB = NULL; code = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex, &pSDB); QUERY_CHECK_CODE(code, lino, _end); @@ -3852,6 +3857,15 @@ FETCH_NEXT_BLOCK: printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo)); code = calBlockTbName(pInfo, pSDB, 0); QUERY_CHECK_CODE(code, lino, _end); + + if (pInfo->pCreateTbRes->info.rows > 0) { + printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "update", + GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->pCreateTbRes; + pInfo->pRangeScanRes = pSDB; + return code; + } + (*ppRes) = pSDB; return code; } @@ -4629,6 +4643,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; pInfo->pFillSup = NULL; pInfo->useGetResultRange = false; + pInfo->pRangeScanRes = NULL; code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index b120bb6374..efc5dd6d6a 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -1481,6 +1481,18 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup pBlock->info.id.groupId = pKey->groupId; } } + void* tbname = NULL; + int32_t winCode = TSDB_CODE_SUCCESS; + code = + pAggSup->stateStore.streamStateGetParName(pAggSup->pState, pBlock->info.id.groupId, &tbname, false, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + if (winCode != TSDB_CODE_SUCCESS) { + pBlock->info.parTbName[0] = 0; + } else { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } + pAggSup->stateStore.streamStateFreeVal(tbname); + SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId}; SSlicePoint prevPoint = {0}; SSlicePoint nextPoint = {0}; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 492dd11177..0378a932b6 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1417,6 +1417,7 @@ ,,y,script,./test.sh -f tsim/stream/streamTwaFwcFill.sim ,,y,script,./test.sh -f tsim/stream/streamTwaFwcFillPrimaryKey.sim ,,y,script,./test.sh -f tsim/stream/streamTwaFwcIntervalPrimaryKey.sim +,,y,script,./test.sh -f tsim/stream/streamTwaInterpFwc.sim ,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim ,,y,script,./test.sh -f tsim/stream/triggerSession0.sim ,,y,script,./test.sh -f tsim/stream/udTableAndCol0.sim diff --git a/tests/script/tsim/stream/streamTwaInterpFwc.sim b/tests/script/tsim/stream/streamTwaInterpFwc.sim index 2073378e92..ce76387c91 100644 --- a/tests/script/tsim/stream/streamTwaInterpFwc.sim +++ b/tests/script/tsim/stream/streamTwaInterpFwc.sim @@ -109,6 +109,201 @@ if $data01 != $query1_data01 then return -1 endi +print step2 +print =============== create database +sql create database test4 vgroups 4; +sql use test4; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + + +sql create stream streams6 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt6 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_1")) as select _irowts, interp(a), _isfilled as a1 from st partition by tbname, b as cc every(2s) fill(prev); +sql create stream streams7 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt7 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_2")) as select _wstart, twa(a) from st partition by tbname, b as cc interval(2s) fill(NULL); +sql create stream streams8 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt8 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_3")) as select _wstart, count(a) from st partition by tbname, b as cc interval(2s); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3s,1,1,1); + +$loop_count = 0 +loop6: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,* from streamt6; +sql select cc,* from streamt6; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop6 +endi + +if $data00 != 1 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt6"; +sql select * from information_schema.ins_tables where stable_name = "streamt6"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt6" and table_name like "tbn-t1_1%"; +sql select * from information_schema.ins_tables where stable_name = "streamt6" and table_name like "tbn-t1_1%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +$loop_count = 0 +loop7: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,* from streamt7; +sql select cc,* from streamt7; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop7 +endi + +if $data00 != 1 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt7"; +sql select * from information_schema.ins_tables where stable_name = "streamt7"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt7" and table_name like "tbn-t1_2%"; +sql select * from information_schema.ins_tables where stable_name = "streamt7" and table_name like "tbn-t1_2%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +$loop_count = 0 +loop8: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,* from streamt8; +sql select cc,* from streamt8; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 1 then + print ======rows=$rows + goto loop8 +endi + +if $data00 != 1 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt8"; +sql select * from information_schema.ins_tables where stable_name = "streamt8"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt8" and table_name like "tbn-t1_3%"; +sql select * from information_schema.ins_tables where stable_name = "streamt8" and table_name like "tbn-t1_3%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + print end system sh/exec.sh -n dnode1 -s stop -x SIGINT From 721170dcb30b63d698b2b90ca4d1ac3e8fdabdb2 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 26 Nov 2024 08:45:44 +0800 Subject: [PATCH 2/2] opt stream time slice --- source/libs/executor/src/streamtimesliceoperator.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index efc5dd6d6a..d307c651fc 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -748,7 +748,7 @@ _end: static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, - SSlicePoint* pNextPoint) { + SSlicePoint* pNextPoint, bool isFwc) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t tmpRes = TSDB_CODE_SUCCESS; @@ -769,6 +769,10 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS setPointBuff(pCurPoint, pFillSup); pFillSup->cur.key = pCurPoint->pRightRow->key; pFillSup->cur.pRowVal = (SResultCellData*)pCurPoint->pRightRow->pRowVal; + if (isFwc) { + qDebug("===stream=== only get current point state"); + goto _end; + } } else { pFillSup->cur.key = pCurPoint->key.ts + 1; } @@ -1466,6 +1470,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup return; } + bool isFwc = (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE); // clear the existed group id pBlock->info.id.groupId = 0; int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); @@ -1497,7 +1502,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup SSlicePoint prevPoint = {0}; SSlicePoint nextPoint = {0}; if (pFillSup->type != TSDB_FILL_LINEAR) { - code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); + code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, isFwc); } else { code = getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); @@ -1516,7 +1521,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup } } - if (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + if (isFwc) { setForceWindowCloseFillRule(pFillSup, pFillInfo, pKey->ts); } else { setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts); @@ -1558,7 +1563,7 @@ static void doBuildTimeSliceDeleteResult(SStreamAggSupporter* pAggSup, SStreamFi SSlicePoint nextPoint = {0}; STimeWindow tw = {0}; if (pFillSup->type != TSDB_FILL_LINEAR) { - code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); + code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, false); } else { code = getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);