From 9d668150d51cd58f41eb0f8bd661812e2cb65377 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 27 Nov 2024 16:24:46 +0800 Subject: [PATCH] fix stream issue --- source/libs/executor/src/streamfilloperator.c | 6 +- .../src/streamintervalsliceoperator.c | 15 +- source/libs/parser/src/parTranslater.c | 8 +- source/libs/stream/src/streamBackendRocksdb.c | 2 +- source/libs/stream/src/streamSliceState.c | 75 +++--- .../stream/streamFwcIntervalCheckpoint.sim | 67 ----- .../script/tsim/stream/streamTwaInterpFwc.sim | 247 ++++++++++++++++++ .../stream/streamTwaInterpFwcCheckpoint.sim | 180 +++++++++++++ 8 files changed, 487 insertions(+), 113 deletions(-) delete mode 100644 tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim create mode 100644 tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 7b364de09d..c992bd15b7 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1432,7 +1432,11 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pInfo->srcRowIndex = -1; } break; - case STREAM_CHECKPOINT: + case STREAM_CHECKPOINT: { + pInfo->stateStore.streamStateCommit(pInfo->pState); + (*ppRes) = pBlock; + goto _end; + } break; case STREAM_CREATE_CHILD_TABLE: { (*ppRes) = pBlock; goto _end; diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index fcf7d6ef10..e7a9c58710 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -360,6 +360,13 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** return code; } + if (pInfo->recvCkBlock) { + pInfo->recvCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->pCheckpointRes; + return code; + } + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -391,8 +398,6 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** case STREAM_CHECKPOINT: { pInfo->recvCkBlock = true; pAggSup->stateStore.streamStateCommit(pAggSup->pState); - // doStreamIntervalSliceSaveCheckpoint(pOperator); - pInfo->recvCkBlock = true; code = copyDataBlock(pInfo->pCheckpointRes, pBlock); QUERY_CHECK_CODE(code, lino, _end); continue; @@ -450,6 +455,12 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** QUERY_CHECK_CODE(code, lino, _end); if ((*ppRes) == NULL) { + if (pInfo->recvCkBlock) { + pInfo->recvCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->pCheckpointRes; + return code; + } pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); setStreamOperatorCompleted(pOperator); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0ee8f9be33..6facaed78d 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10931,10 +10931,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (pSelect->hasInterpFunc) { // Temporary code - if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "Stream interp function only support force window close"); - } + // if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + // return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + // "Stream interp function only support force window close"); + // } if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { if (pStmt->pOptions->fillHistory) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 09f4e95376..68c99aa1b3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -4130,7 +4130,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) { SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX}; - return streamStateFillSeekKeyNext_rocksdb(pState, &key); + return streamStateFillSeekKeyPrev_rocksdb(pState, &key); } #ifdef BUILD_NO_CALL diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index 238bff8afc..c23537d018 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -23,6 +23,33 @@ #define NUM_OF_CACHE_WIN 64 #define MAX_NUM_OF_CACHE_WIN 128 +int32_t recoverSearchBuff(SStreamFileState* pFileState, SArray* pWinStates, uint64_t groupId) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + SWinKey start = {.groupId = groupId, .ts = INT64_MAX}; + void* pState = getStateFileStore(pFileState); + SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start); + for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) { + SWinKey tmpKey = {.groupId = groupId}; + int32_t tmpRes = streamStateFillGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0); + if (tmpRes != TSDB_CODE_SUCCESS) { + break; + } + void* tmp = taosArrayPush(pWinStates, &tmpKey); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + streamStateCurPrev_rocksdb(pCur); + } + taosArraySort(pWinStates, winKeyCmprImpl); + streamStateFreeCur(pCur); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; @@ -38,22 +65,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo // recover if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { - TSKEY ts = getFlushMark(pFileState); - SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX}; - void* pState = getStateFileStore(pFileState); - SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start); - for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) { - SWinKey tmpKey = {.groupId = pKey->groupId}; - int32_t tmpRes = streamStateFillGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0); - if (tmpRes != TSDB_CODE_SUCCESS) { - break; - } - void* tmp = taosArrayPush(pWinStates, &tmpKey); - QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); - streamStateCurPrev_rocksdb(pCur); - } - taosArraySort(pWinStates, winKeyCmprImpl); - streamStateFreeCur(pCur); + recoverSearchBuff(pFileState, pWinStates, pKey->groupId); } code = addSearchItem(pFileState, pWinStates, pKey); @@ -203,29 +215,16 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW SArray* pWinStates = NULL; SSHashObj* pSearchBuff = getSearchBuff(pFileState); void* pState = getStateFileStore(pFileState); - void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); - if (ppBuff) { - pWinStates = (SArray*)(*ppBuff); - } else { - qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId); - SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); - void* tmpVal = NULL; - int32_t len = 0; - (*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); - if ((*pWinCode) == TSDB_CODE_SUCCESS) { - SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - if (!pNewPos || !pNewPos->pRowBuff) { - code = TSDB_CODE_OUT_OF_MEMORY; - QUERY_CHECK_CODE(code, lino, _end); - } - memcpy(pNewPos->pRowBuff, tmpVal, len); - taosMemoryFreeClear(tmpVal); - *pVLen = getRowStateRowSize(pFileState); - (*ppVal) = pNewPos; - } - streamStateFreeCur(pCur); - return code; + // void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); + + code = addArrayBuffIfNotExist(pSearchBuff, pKey->groupId, &pWinStates); + QUERY_CHECK_CODE(code, lino, _end); + + // recover + if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { + recoverSearchBuff(pFileState, pWinStates, pKey->groupId); } + int32_t size = taosArrayGetSize(pWinStates); int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); if (index >= 0) { diff --git a/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim b/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim deleted file mode 100644 index ed72d87e9a..0000000000 --- a/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim +++ /dev/null @@ -1,67 +0,0 @@ -system sh/stop_dnodes.sh -system sh/deploy.sh -n dnode1 -i 1 - -system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60 - -system sh/exec.sh -n dnode1 -s start -sleep 50 -sql connect - -print step1 -print =============== create database -sql create database test vgroups 4; -sql use test; - -sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); -sql create table t1 using st tags(1,1,1); -sql create table t2 using st tags(2,2,2); - -sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a) from st partition by tbname interval(2s); -sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a) from st interval(2s); - -run tsim/stream/checkTaskStatus.sim - -sleep 70000 - - -print restart taosd 01 ...... - -system sh/stop_dnodes.sh - -system sh/exec.sh -n dnode1 -s start - -run tsim/stream/checkTaskStatus.sim - -sql insert into t1 values(now + 3000a,1,1,1); - -$loop_count = 0 -loop0: - -sleep 2000 - -$loop_count = $loop_count + 1 -if $loop_count == 20 then - return -1 -endi - -print select * from streamt1; -sql select * from streamt1; - -print $data00 $data01 $data02 - -if $rows == 0 then - goto loop0 -endi - -print select * from streamt2; -sql select * from streamt2; - -print $data00 $data01 $data02 - -if $rows == 0 then - goto loop0 -endi - -print end - -system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamTwaInterpFwc.sim b/tests/script/tsim/stream/streamTwaInterpFwc.sim index ce76387c91..050e0282e2 100644 --- a/tests/script/tsim/stream/streamTwaInterpFwc.sim +++ b/tests/script/tsim/stream/streamTwaInterpFwc.sim @@ -304,6 +304,253 @@ if $rows != 1 then return -1 endi +print step3 +print =============== create database +sql create database test4 vgroups 4; +sql use test4; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1234567890t1 using st tags(1,1,1); +sql create table t1234567890t2 using st tags(2,2,2); + +sql create stable streamt9(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int); +sql create stable streamt10(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int); +sql create stable streamt11(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int); + +sql create stream streams9 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt9 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_1")) as select _irowts, interp(a), _isfilled as a1, interp(b) from st partition by tbname as ta, b as cc every(2s) fill(value, 100000,200000); +sql create stream streams10 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_2")) as select _wstart, twa(a), sum(b),max(c) from st partition by tbname as ta, b as cc interval(2s) fill(NULL); +sql create stream streams11 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt11 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_3")) as select _wstart, count(a),avg(c),min(b) from st partition by tbname as ta, b as cc interval(2s); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1234567890t1 values(now + 3s,100000,1,1); + +$loop_count = 0 +loop9: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,ta, * from streamt9; +sql select cc,ta, * from streamt9; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop9 +endi + +if $data00 != 1 then + return -1 +endi + +if $data01 != @t12@ then + return -1 +endi + +if $data03 != @100000@ then + return -1 +endi + +if $data04 != 1 then + return -1 +endi + +if $data05 != 64 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt9"; +sql select * from information_schema.ins_tables where stable_name = "streamt9"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt9" and table_name like "tbn-t1234567890t1_1%"; +sql select * from information_schema.ins_tables where stable_name = "streamt9" and table_name like "tbn-t1234567890t1_1%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +$loop_count = 0 +loop10: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,ta, * from streamt10; +sql select cc,ta, * from streamt10; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop10 +endi + +if $data00 != 1 then + return -1 +endi + +if $data01 != @t12@ then + return -1 +endi + +if $data03 != @100000.000@ then + return -1 +endi + +if $data04 != 1 then + return -1 +endi + +if $data05 != 1 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt10"; +sql select * from information_schema.ins_tables where stable_name = "streamt10"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt10" and table_name like "tbn-t1234567890t1_2%"; +sql select * from information_schema.ins_tables where stable_name = "streamt10" and table_name like "tbn-t1234567890t1_2%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +$loop_count = 0 +loop11: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,ta,* from streamt11; +sql select cc,ta,* from streamt11; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 1 then + print ======rows=$rows + goto loop11 +endi + +if $data00 != 1 then + return -1 +endi + +if $data01 != @t12@ then + return -1 +endi + +if $data03 != @1@ then + return -1 +endi + +if $data04 != 1 then + return -1 +endi + +if $data05 != 1 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt11"; +sql select * from information_schema.ins_tables where stable_name = "streamt11"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt11" and table_name like "tbn-t1234567890t1_3%"; +sql select * from information_schema.ins_tables where stable_name = "streamt11" and table_name like "tbn-t1234567890t1_3%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + + print end system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim b/tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim new file mode 100644 index 0000000000..f983dd3ab5 --- /dev/null +++ b/tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim @@ -0,0 +1,180 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60 +system sh/cfg.sh -n dnode1 -c ratioOfVnodeStreamThreads -v 4 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a),max(b) from st partition by tbname interval(5s); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a), max(b) from st interval(5s); +sql create stream streams3 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt3 as select _wstart, count(a), twa(b) from st partition by tbname interval(5s) fill(prev); +sql create stream streams4 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt4 as select _irowts, interp(a), interp(b) from st partition by tbname every(5s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3000a,1,1,1); + +$loop_count = 0 +loop0: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print select * from streamt3; +sql select * from streamt3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows == 0 then + goto loop0 +endi + + +print select * from streamt4; +sql select * from streamt4; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows == 0 then + goto loop0 +endi + + +sleep 70000 + +$loop_count = 0 +loop0_1: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print sql select * from information_schema.ins_stream_tasks where checkpoint_time is null; +sql select * from information_schema.ins_stream_tasks where checkpoint_time is null; + + +sleep 10000 + +if $rows > 0 then + print wait checkpoint.rows = $rows + goto loop0_1 +endi + +print restart taosd 01 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +run tsim/stream/checkTaskStatus.sim + +print select * from streamt3; +sql select * from streamt3; + +$streamt3_rows = $rows +print =====streamt3_rows=$streamt3_rows + +print select * from streamt4; +sql select * from streamt4; + +$streamt4_rows = $rows +print =====streamt4_rows=$streamt4_rows + +$loop_count = 0 +loop1: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print select * from streamt3; +sql select * from streamt3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows <= $streamt3_rows then + print =====rows=$rows + print =====streamt3_rows=$streamt3_rows + goto loop1 +endi + +print select * from streamt4; +sql select * from streamt4; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows <= $streamt4_rows then + print =====rows=$rows + print =====streamt4_rows=$streamt4_rows + goto loop1 +endi + +sql insert into t1 values(now + 3000a,10,10,10); + +$loop_count = 0 +loop2: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print select * from streamt1; +sql select * from streamt1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $data02 != 10 then + goto loop2 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT