From 24b963d6211fb45c6b00bb038c8b934249aec05a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 17 Oct 2024 16:43:47 +0800 Subject: [PATCH] add ci --- .../src/streamintervalsliceoperator.c | 22 ++- .../tsim/stream/streamTwaFwcInterval.sim | 129 ++++++++++++++++++ 2 files changed, 147 insertions(+), 4 deletions(-) create mode 100644 tests/script/tsim/stream/streamTwaFwcInterval.sim diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index c122ceb43d..2b53159cb9 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -26,6 +26,7 @@ typedef struct SInervalSlicePoint { SSessionKey winKey; + bool *pFinished; SSliceRowData* pLastRow; SRowBuffPos* pResPos; } SInervalSlicePoint; @@ -113,7 +114,8 @@ _end: void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, int64_t groupId, SInervalSlicePoint* pPoint) { pPoint->winKey.groupId = groupId; pPoint->winKey.win = *pTWin; - pPoint->pLastRow = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize); + pPoint->pFinished = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize); + pPoint->pLastRow = POINTER_SHIFT(pPoint->pFinished, sizeof(bool)); } static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, bool needPrev, STimeWindow* pTWin, int64_t groupId, @@ -217,6 +219,14 @@ _end: return code; } +static void setInterpoWindowFinished(SInervalSlicePoint* pPoint) { + (*pPoint->pFinished) = true; +} + +static bool isInterpoWindowFinished(SInervalSlicePoint* pPoint) { + return *pPoint->pFinished; +} + static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock, SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) { int32_t code = TSDB_CODE_SUCCESS; @@ -249,7 +259,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode); QUERY_CHECK_CODE(code, lino, _end); - if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && prevPoint.pLastRow->key != prevPoint.winKey.win.ekey) { + if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && isInterpoWindowFinished(&prevPoint) == false) { code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); QUERY_CHECK_CODE(code, lino, _end); @@ -262,7 +272,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc SWinKey prevKey = {.ts = prevPoint.winKey.win.skey, .groupId = prevPoint.winKey.groupId}; code = saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap); QUERY_CHECK_CODE(code, lino, _end); - prevPoint.pLastRow->key = prevPoint.winKey.win.ekey; + setInterpoWindowFinished(&prevPoint); } code = setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); @@ -294,6 +304,10 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc forwardRows, pBlock->info.rows, numOfOutput); QUERY_CHECK_CODE(code, lino, _end); + if (curPoint.pLastRow->key == curPoint.winKey.win.ekey) { + setInterpoWindowFinished(&curPoint); + } + startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); if (startPos < 0) { break; @@ -569,7 +583,7 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN QUERY_CHECK_CODE(code, lino, _error); int32_t keyBytes = sizeof(TSKEY); - keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock); + keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool); if (pPkCol) { keyBytes += pPkCol->bytes; } diff --git a/tests/script/tsim/stream/streamTwaFwcInterval.sim b/tests/script/tsim/stream/streamTwaFwcInterval.sim new file mode 100644 index 0000000000..85e4bb552b --- /dev/null +++ b/tests/script/tsim/stream/streamTwaFwcInterval.sim @@ -0,0 +1,129 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(a), ta from st partition by tbname,ta interval(2s); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3000a,1,1,1) (now + 3100a,5,10,10) (now + 3200a,5,10,10) (now + 5100a,20,1,1) (now + 5200a,30,10,10) (now + 5300a,40,10,10); +sql insert into t2 values(now + 3000a,1,1,1) (now + 3100a,2,10,10) (now + 3200a,30,10,10) (now + 5100a,10,1,1) (now + 5200a,40,10,10) (now + 5300a,7,10,10); + + +print sql select _wstart, twa(a) from t1 interval(2s); +sql select _wstart, twa(a) from t1 interval(2s); + +$query1_data01 = $data01 +$query1_data11 = $data11 + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select _wstart, twa(a) from t2 interval(2s); +sql select _wstart, twa(a) from t2 interval(2s); + +$query2_data01 = $data01 +$query2_data11 = $data11 + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop0: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 1; +sql select * from streamt where ta == 1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop0 +endi + +if $data01 != $query1_data01 then + print ======data01========$data01 + print ======query1_data01=$query1_data01 + return -1 +endi + +if $data11 != $query1_data11 then + print ======data11========$data11 + print ======query1_data11=$query1_data11 + return -1 +endi + +$loop_count = 0 +loop1: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 2; +sql select * from streamt where ta == 2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop1 +endi + +if $data01 != $query2_data01 then + print ======data01======$data01 + print ====query2_data01=$query2_data01 + return -1 +endi + +if $data11 != $query2_data11 then + print ======data11======$data11 + print ====query2_data11=$query2_data11 + return -1 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT