diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index a0ad574d54..26d59a90ba 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -879,6 +879,7 @@ typedef struct SStreamIntervalSliceOperatorInfo { SGroupResInfo groupResInfo; struct SOperatorInfo* pOperator; bool hasFill; + bool hasInterpoFunc; } SStreamIntervalSliceOperatorInfo; #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index fe8288955f..48d6e71fd9 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2821,6 +2821,8 @@ char* getStreamOpName(uint16_t opType) { return "stream count"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: return "stream interp"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL: + return "interval continue"; } return ""; } diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 58050561dd..a36c2a65e0 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1258,11 +1258,9 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) { SStreamAggSupporter* pAggSup = pInfo->pStreamAggSup; SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); TSKEY* tsCol = (TSKEY*)pTsCol->pData; - if (pFillInfo->type == TSDB_FILL_PREV) { - for (int32_t i = 0; i < pBlock->info.rows; i++){ - code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize); - QUERY_CHECK_CODE(code, lino, _end); - } + for (int32_t i = 0; i < pBlock->info.rows; i++){ + code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize); + QUERY_CHECK_CODE(code, lino, _end); } code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0); QUERY_CHECK_CODE(code, lino, _end); @@ -1323,10 +1321,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR (*ppRes) = resBlock; goto _end; } - - if (pInfo->pFillInfo->type == TSDB_FILL_PREV) { - pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); - } + pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; goto _end; @@ -1393,9 +1388,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR QUERY_CHECK_CODE(code, lino, _end); if ((*ppRes) == NULL) { - if (pInfo->pFillInfo->type == TSDB_FILL_PREV) { - pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); - } + pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); setStreamOperatorCompleted(pOperator); } @@ -1655,24 +1648,6 @@ _end: return code; } -int32_t initForceFillDownStream(SOperatorInfo* downstream) { - SExecTaskInfo* pTaskInfo = downstream->pTaskInfo; - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; - - if (downstream == NULL) { - return TSDB_CODE_STREAM_INTERNAL_ERROR; - } - - if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - code = initForceFillDownStream(downstream->pDownstream[0]); - return code; - } - SStreamScanInfo* pInfo = (SStreamScanInfo*) downstream->info; - pInfo->useGetResultRange = true; - return code; -} - int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); @@ -1755,9 +1730,6 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi pTaskInfo); if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - code = initForceFillDownStream(downstream); - QUERY_CHECK_CODE(code, lino, _error); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } else { diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index e873b31062..995ca02d4f 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -116,7 +116,7 @@ void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, in pPoint->pLastRow = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize); } -static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, STimeWindow* pTWin, int64_t groupId, +static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, bool needPrev, STimeWindow* pTWin, int64_t groupId, SInervalSlicePoint* pCurPoint, SInervalSlicePoint* pPrevPoint, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -126,26 +126,28 @@ static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterv &curVLen, pWinCode); QUERY_CHECK_CODE(code, lino, _end); - qDebug("===stream=== set stream twa next point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d", + qDebug("===stream=== set stream twa next point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d", curKey.ts, curKey.groupId, *pWinCode); initIntervalSlicePoint(pAggSup, pTWin, groupId, pCurPoint); - SWinKey prevKey = {.groupId = groupId}; - SET_WIN_KEY_INVALID(prevKey.ts); - int32_t prevVLen = 0; - int32_t prevWinCode = TSDB_CODE_SUCCESS; - code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, &curKey, &prevKey, - (void**)&pPrevPoint->pResPos, &prevVLen, &prevWinCode); - QUERY_CHECK_CODE(code, lino, _end); + if (needPrev) { + SWinKey prevKey = {.groupId = groupId}; + SET_WIN_KEY_INVALID(prevKey.ts); + int32_t prevVLen = 0; + int32_t prevWinCode = TSDB_CODE_SUCCESS; + code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, &curKey, &prevKey, (void**)&pPrevPoint->pResPos, + &prevVLen, &prevWinCode); + QUERY_CHECK_CODE(code, lino, _end); - if (prevWinCode == TSDB_CODE_SUCCESS) { - STimeWindow prevSTW = {.skey = prevKey.ts}; - prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval); - initIntervalSlicePoint(pAggSup, &prevSTW, groupId, pPrevPoint); - } else { - SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey); - SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey); + if (prevWinCode == TSDB_CODE_SUCCESS) { + STimeWindow prevSTW = {.skey = prevKey.ts}; + prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval); + initIntervalSlicePoint(pAggSup, &prevSTW, groupId, pPrevPoint); + } else { + SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey); + SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey); + } } _end: @@ -244,10 +246,10 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc } int32_t winCode = TSDB_CODE_SUCCESS; - code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curWin, groupId, &curPoint, &prevPoint, &winCode); + code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode); QUERY_CHECK_CODE(code, lino, _end); - if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && prevPoint.pLastRow->key != prevPoint.winKey.win.ekey) { + if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && prevPoint.pLastRow->key != prevPoint.winKey.win.ekey) { code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); QUERY_CHECK_CODE(code, lino, _end); @@ -266,13 +268,13 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc QUERY_CHECK_CODE(code, lino, _end); resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); - if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) { + if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) { doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START); } forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); int32_t prevEndPos = (forwardRows - 1) + startPos; - if (winCode != TSDB_CODE_SUCCESS) { + if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) { int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false); TSKEY endRowTs = tsCols[endRowId]; transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL); @@ -387,7 +389,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** code = setInputDataBlock(&pOperator->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); QUERY_CHECK_CODE(code, lino, _end); - code = doStreamIntervalSliceAggImpl(pOperator, pBlock, pInfo->pUpdatedMap, pInfo->pDeletedMap); + code = doStreamIntervalSliceAggImpl(pOperator, pBlock, pInfo->pUpdatedMap, NULL); QUERY_CHECK_CODE(code, lino, _end); } @@ -435,7 +437,7 @@ _end: int32_t initIntervalSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic, - SInterval* pInterval) { + SInterval* pInterval, bool hasInterpoFunc) { SExecTaskInfo* pTaskInfo = downstream->pTaskInfo; int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -447,10 +449,11 @@ int32_t initIntervalSliceDownStream(SOperatorInfo* downstream, SStreamAggSupport if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { code = - initIntervalSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pInterval); + initIntervalSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pInterval, hasInterpoFunc); return code; } SStreamScanInfo* pScanInfo = downstream->info; + pScanInfo->useGetResultRange = hasInterpoFunc; pScanInfo->igCheckUpdate = true; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->pState = pAggSup->pState; @@ -474,6 +477,18 @@ _end: return code; } +static bool windowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols) { + bool needed = false; + for (int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExpr = pCtx[i].pExpr; + if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) { + needed = true; + break; + } + } + return needed; +} + int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** ppOptInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -564,6 +579,7 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey; pInfo->pOperator = pOperator; pInfo->hasFill = false; + pInfo->hasInterpoFunc = windowinterpNeeded(pExpSup->pCtx, numOfExprs); setOperatorInfo(pOperator, "StreamIntervalSliceOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -574,7 +590,7 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN initStreamBasicInfo(&pInfo->basic); if (downstream) { code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex, - &pInfo->twAggSup, &pInfo->basic, &pInfo->interval); + &pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc); QUERY_CHECK_CODE(code, lino, _error); code = appendDownstream(pOperator, &downstream, 1); diff --git a/tests/script/tsim/stream/streamTwaError.sim b/tests/script/tsim/stream/streamTwaError.sim new file mode 100644 index 0000000000..2162c924e0 --- /dev/null +++ b/tests/script/tsim/stream/streamTwaError.sim @@ -0,0 +1,29 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(a) from st partition by tbname,ta interval(2s) fill(prev); + +sql_error create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, twa(a) from st partition by tbname,ta interval(2s) fill(prev); +sql_error create stream streams3 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, twa(a) from st partition by tbname,ta interval(2s) fill(prev); +sql_error create stream streams4 trigger max_delay 5s IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart, twa(a) from st partition by tbname,ta interval(2s) fill(prev); + +sql_error create stream streams5 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt5 as select _wstart, twa(a) from st interval(2s) fill(prev); +sql_error create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt6 as select last(ts), twa(a) from st partition by tbname,ta; +sql_error create stream streams7 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt7 as select _wstart, twa(a) from st partition by tbname,ta session(ts, 2s); +sql_error create stream streams8 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt8 as select _wstart, twa(a) from st partition by tbname,ta state_window(a); + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamTwaFwcFill.sim b/tests/script/tsim/stream/streamTwaFwcFill.sim new file mode 100644 index 0000000000..009798bd67 --- /dev/null +++ b/tests/script/tsim/stream/streamTwaFwcFill.sim @@ -0,0 +1,278 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(a), twa(b), elapsed(ts), now ,timezone(), ta from st partition by tbname,ta interval(2s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3s,1,1,1) (now + 4s,10,1,1) (now + 7s,20,2,2) (now + 8s,30,3,3); +sql insert into t2 values(now + 4s,1,1,1) (now + 5s,10,1,1) (now + 8s,20,2,2) (now + 9s,30,3,3); + + +print sql select * from t1; +sql select * from t1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select * from t2; +sql select * from t2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop0: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 1; +sql select * from streamt where ta == 1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 5 then + print ======rows=$rows + goto loop0 +endi + +$loop_count = 0 +loop1: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 2; +sql select * from streamt where ta == 2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 5 then + print ======rows=$rows + goto loop1 +endi + + +print step2 +print =============== create database +sql create database test2 vgroups 1; +sql use test2; + +sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams2 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(a), twa(b), elapsed(ts), now ,timezone(), ta from st partition by tbname interval(2s) fill(NULL); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3s,1,1,1) (now + 4s,10,1,1) (now + 7s,20,2,2) (now + 8s,30,3,3); +sql insert into t2 values(now + 4s,1,1,1) (now + 5s,10,1,1) (now + 8s,20,2,2) (now + 9s,30,3,3); + + +print sql select * from t1; +sql select * from t1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select * from t2; +sql select * from t2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop2: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 1; +sql select * from streamt where ta == 1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 5 then + print ======rows=$rows + goto loop2 +endi + +$loop_count = 0 +loop3: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 2; +sql select * from streamt where ta == 2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 5 then + print ======rows=$rows + goto loop3 +endi + +print step3 +print =============== create database +sql create database test3 vgroups 1; +sql use test3; + +sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams3 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(a), twa(b), elapsed(ts), now ,timezone(), ta from st partition by tbname interval(2s) fill(value,100,200,300); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3s,1,1,1) (now + 4s,10,1,1) (now + 7s,20,2,2) (now + 8s,30,3,3); +sql insert into t2 values(now + 4s,1,1,1) (now + 5s,10,1,1) (now + 8s,20,2,2) (now + 9s,30,3,3); + + +print sql select * from t1; +sql select * from t1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select * from t2; +sql select * from t2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop4: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 1; +sql select * from streamt where ta == 1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 5 then + print ======rows=$rows + goto loop4 +endi + +$loop_count = 0 +loop5: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 2; +sql select * from streamt where ta == 2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 5 then + print ======rows=$rows + goto loop5 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamTwaFwcFillPrimaryKey.sim b/tests/script/tsim/stream/streamTwaFwcFillPrimaryKey.sim new file mode 100644 index 0000000000..e3c967aaef --- /dev/null +++ b/tests/script/tsim/stream/streamTwaFwcFillPrimaryKey.sim @@ -0,0 +1,222 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp, a int primary key, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(b), count(*),ta from st partition by tbname, ta interval(2s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + +sql select now; + +sql insert into t1 values(now + 3s,1,1,1) (now + 3s,2,10,10) (now + 3s,3,30,30); +sql insert into t2 values(now + 4s,1,1,1) (now + 4s,2,10,10) (now + 4s,3,30,30); + + +print sql select _wstart, twa(b), count(*),ta from t1 partition by tbname, ta interval(2s); +sql select _wstart, twa(b), count(*),ta from t1 partition by tbname, ta interval(2s); + +$query1_data = $data01 + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select _wstart, twa(b), count(*),ta from t2 partition by tbname, ta interval(2s); +sql select _wstart, twa(b), count(*),ta from t2 partition by tbname, ta interval(2s); + +$query2_data = $data01 + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop0: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 1; +sql select * from streamt where ta == 1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 6 then + print ======rows=$rows + goto loop0 +endi + +if $data01 != $query1_data then + print ======data01=$data01 + return -1 +endi + + +$loop_count = 0 +loop1: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 2; +sql select * from streamt where ta == 2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 6 then + print ======rows=$rows + goto loop1 +endi + + +if $data01 != $query2_data then + print ======data01=$data01 + return -1 +endi + + +print step2 +print =============== create database +sql create database test2 vgroups 1; +sql use test2; + +sql create stable st(ts timestamp, a int primary key, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams2 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(b), ta from st partition by tbname, ta interval(2s) fill(NULL); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3s,1,1,1) (now + 3s,2,10,10) (now + 3s,3,30,30); +sql insert into t2 values(now + 4s,1,1,1) (now + 4s,2,10,10) (now + 4s,3,30,30); + + +print sql select _wstart, twa(b), count(*),ta from t1 partition by tbname, ta interval(2s); +sql select _wstart, twa(b), count(*),ta from t1 partition by tbname, ta interval(2s); + +$query1_data = $data01 + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select _wstart, twa(b), count(*),ta from t2 partition by tbname, ta interval(2s); +sql select _wstart, twa(b), count(*),ta from t2 partition by tbname, ta interval(2s); + +$query2_data = $data01 + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop2: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 1; +sql select * from streamt where ta == 1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 6 then + print ======rows=$rows + goto loop2 +endi + +if $data01 != $query1_data then + print ======data01=$data01 + return -1 +endi + +$loop_count = 0 +loop3: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 2; +sql select * from streamt where ta == 2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 6 then + print ======rows=$rows + goto loop3 +endi + + +if $data01 != $query2_data then + print ======data01=$data01 + return -1 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamTwaFwcIntervalPrimaryKeyl.sim b/tests/script/tsim/stream/streamTwaFwcIntervalPrimaryKeyl.sim new file mode 100644 index 0000000000..a195285967 --- /dev/null +++ b/tests/script/tsim/stream/streamTwaFwcIntervalPrimaryKeyl.sim @@ -0,0 +1,109 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp, a int primary key, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*), ta from st partition by tbname,ta interval(2s); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3s,1,1,1) (now + 3s,2,10,10) (now + 3s,3,30,30) (now + 11s,1,1,1) (now + 11s,2,10,10); +sql insert into t2 values(now + 4s,1,1,1) (now + 4s,2,10,10) (now + 4s,3,30,30) (now + 12s,1,1,1) (now + 12s,2,10,10); + + +print sql select _wstart, count(*) from st partition by tbname,ta interval(2s); +sql select _wstart, count(*) from st partition by tbname,ta interval(2s); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop0: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt order by +sql select * from streamt where ta == 1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop0 +endi + +if $data01 != 3 then + print ======data01=$data01 + return -1 +endi + +if $data11 != 2 then + print ======data11=$data11 + return -1 +endi + +$loop_count = 0 +loop1: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where ta == 2; +sql select * from streamt where ta == 2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop1 +endi + +if $data01 != 3 then + print ======data01=$data01 + return -1 +endi + +if $data11 != 2 then + print ======data11=$data11 + return -1 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT