From c95dc167b7a6956f3596946dcd038c2c4f81660e Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 26 Sep 2023 17:19:21 +0800 Subject: [PATCH] add ci --- include/libs/stream/tstreamFileState.h | 5 +- .../executor/src/streamtimewindowoperator.c | 10 +- source/libs/stream/src/streamSessionState.c | 58 +++-- source/libs/stream/src/streamState.c | 1 + source/libs/stream/src/tstreamFileState.c | 26 ++- tests/script/tsim/stream/basic4.sim | 8 +- tests/script/tsim/stream/basic5.sim | 217 ++++++++++++++++++ 7 files changed, 287 insertions(+), 38 deletions(-) create mode 100644 tests/script/tsim/stream/basic5.sim diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 345a758795..cc3d574a7f 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -32,7 +32,7 @@ typedef SList SStreamSnapshot; typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen); typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen); -typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen); +typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen, bool invalid); typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); typedef void* (*_state_buff_create_statekeyfn)(SRowBuffPos* pPos, int64_t num); @@ -53,6 +53,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); +void putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); @@ -74,7 +75,7 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState); int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen); int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); -int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen); +int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen, bool invalid); void sessionWinStateClear(SStreamFileState* pFileState); void sessionWinStateCleanup(void* pBuff); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5d37d001f6..88716587e1 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -3106,6 +3106,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, } } + qDebug("===stream===set state cur win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, + pCurWin->winInfo.sessionWin.win.ekey); + pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin; SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); @@ -3124,6 +3127,8 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, pNextWin->winInfo.isOutput = true; } pAggSup->stateStore.streamStateFreeCur(pCur); + qDebug("===stream===set state next win buff. skey:%" PRId64 ", endkey:%" PRId64, pNextWin->winInfo.sessionWin.win.skey, + pNextWin->winInfo.sessionWin.win.ekey); } int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId, @@ -3202,9 +3207,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SStateWindowInfo curWin = {0}; SStateWindowInfo nextWin = {0}; setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin); - if (IS_VALID_SESSION_WIN(nextWin.winInfo)) { - releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore); - } + releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore); + setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); winRows = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, pAggSup->pResultRows, pSeUpdated, pStDeleted); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index a29ae0e990..a4e06488d5 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -89,6 +89,7 @@ static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* } int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) { + int32_t code = TSDB_CODE_SUCCESS; SSHashObj* pSessionBuff = getRowStateBuff(pFileState); SArray* pWinStates = NULL; void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); @@ -105,6 +106,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, int32_t size = taosArrayGetSize(pWinStates); if (size == 0) { (*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey); + code = TSDB_CODE_FAILED; goto _end; } @@ -138,29 +140,35 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) { void* p = NULL; void* pFileStore = getStateFileStore(pFileState); - int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); - pNewPos->needFree = true; + int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); + if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); + pNewPos->needFree = true; - qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code); - if (code == TSDB_CODE_SUCCESS) { + qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file); memcpy(pNewPos->pRowBuff, p, *pVLen); + taosMemoryFree(p); + (*pVal) = pNewPos; + code = code_file; + goto _end; + } else { + taosMemoryFree(p); + streamFileStateReleaseBuff(pFileState, pNewPos, false); } - taosMemoryFree(p); - (*pVal) = pNewPos; - goto _end; } } if (index == size - 1) { (*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey); + code = TSDB_CODE_FAILED; goto _end; } (*pVal) = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1); + code = TSDB_CODE_FAILED; _end: - return (*pVal) != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; + return code; } int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { @@ -209,7 +217,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v return TSDB_CODE_SUCCESS; } -int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen) { +int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen, bool invalid) { SSHashObj* pSessionBuff = (SSHashObj*) pBuff; SSessionKey* pWinKey = (SSessionKey*) key; void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); @@ -221,8 +229,11 @@ int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen) { TSKEY gap = 0; int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); if (index >= 0) { - SRowBuffPos** ppos = taosArrayGet(pWinStates, index); - if (inSessionWindow((*ppos)->pKey, pWinKey->win.skey, gap)) { + SRowBuffPos* pPos = taosArrayGetP(pWinStates, index); + if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) { + if (invalid) { + pPos->beFlushed = true; + } taosArrayRemove(pWinStates, index); } } @@ -484,6 +495,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) { (*pVal) = pPos; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; + pPos->beUsed = true; *key = *pDestWinKey; goto _end; } @@ -495,27 +507,31 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || fn(pKeyData, stateKey) == true) { (*pVal) = pPos; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; + pPos->beUsed = true; *key = *pDestWinKey; goto _end; } } if (index + 1 == 0) { - if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) { + if (!isDeteled(pFileState, endTs)) { void* p = NULL; void* pFileStore = getStateFileStore(pFileState); - int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); pNewPos->needFree = true; - - qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code); - if (code == TSDB_CODE_SUCCESS) { + int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen); + if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { + memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); + qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); memcpy(pNewPos->pRowBuff, p, *pVLen); + (*pVal) = pNewPos; + taosMemoryFree(p); + code = code_file; + goto _end; + } else { + taosMemoryFree(p); + streamFileStateReleaseBuff(pFileState, pNewPos, false); } - taosMemoryFree(p); - (*pVal) = pNewPos; - goto _end; } } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 7030307edd..7c5fcba10c 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -723,6 +723,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void } code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen); streamStateReleaseBuf(pState, pos, true); + putFreeBuff(pState->pFileState, pos); qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, key->win.ekey, key->groupId, code); } else { diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 688088dc22..daad14dcfc 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -57,7 +57,13 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; -int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) { +int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen, bool invalid) { + if (invalid) { + SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen); + if (pos) { + (*pos)->beFlushed = true; + } + } return tSimpleHashRemove(pBuff, pKey, keyLen); } @@ -230,7 +236,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { putFreeBuff(pFileState, pPos); if (!all) { - pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen); + pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false); } destroyRowBuffPos(pPos); tdListPopNode(pFileState->usedBuffs, pNode); @@ -247,10 +253,10 @@ void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushLi SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL && i < max) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; - if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) { + if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0) && !pPos->beUsed) { tdListAppend(pFlushList, &pPos); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); - pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen); + pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false); tdListPopNode(pFileState->usedBuffs, pNode); taosMemoryFreeClear(pNode); if (pPos->pRowBuff) { @@ -284,7 +290,7 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin if (pPos->beUsed == used) { tdListAppend(pFlushList, &pPos); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); - pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen); + pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false); tdListPopNode(pFileState->usedBuffs, pNode); taosMemoryFreeClear(pNode); if (pPos->pRowBuff) { @@ -427,7 +433,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi } int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) { - int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen); + int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen, true); int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) { return TSDB_CODE_SUCCESS; @@ -462,7 +468,9 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** recoverSessionRowBuff(pFileState, pPos); (*pVal) = pPos->pRowBuff; - tdListPrepend(pFileState->usedBuffs, &pPos); + if (!pPos->needFree) { + tdListPrepend(pFileState->usedBuffs, &pPos); + } return TSDB_CODE_SUCCESS; } @@ -509,12 +517,12 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, void* batch = streamStateCreateBatch(); while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; - ASSERT(pPos->pRowBuff && pFileState->rowSize > 0); - if (pPos->beFlushed) { + if (pPos->beFlushed || !pPos->pRowBuff) { continue; } pPos->beFlushed = true; + qDebug("===stream===flushed start:%" PRId64 ", end:%" PRId64 , ((SSessionKey*)pPos->pKey)->win.skey, ((SSessionKey*)pPos->pKey)->win.ekey); if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStateClearBatch(batch); diff --git a/tests/script/tsim/stream/basic4.sim b/tests/script/tsim/stream/basic4.sim index e7a27976f7..cfd03ba5e5 100644 --- a/tests/script/tsim/stream/basic4.sim +++ b/tests/script/tsim/stream/basic4.sim @@ -8,6 +8,8 @@ sleep 500 sql connect +print step1============= + sql create database test vgroups 1; sql use test; sql create table t1(ts timestamp, a int, b int , c int, d double); @@ -71,7 +73,7 @@ if $rows != 29 then goto loop1 endi - +print step2============= sql create database test2 vgroups 10; sql use test2; @@ -137,12 +139,12 @@ if $rows != 29 then goto loop3 endi -print step2============= +print step3============= sql create database test1 vgroups 1; sql use test1; sql create table t1(ts timestamp, a int, b int , c int, d double); -sql create stream streams1 trigger at_once ignore expired 0 ignore update 1 into streamt1 as select _wstart, count(*) c1 from t1 session(ts, 1s); +sql create stream streams1 trigger at_once ignore expired 0 ignore update 0 into streamt1 as select _wstart, count(*) c1 from t1 session(ts, 1s); sql insert into t1 values(1648791211000,1,2,3,1.0); sql insert into t1 values(1648791213000,1,2,3,1.1); diff --git a/tests/script/tsim/stream/basic5.sim b/tests/script/tsim/stream/basic5.sim new file mode 100644 index 0000000000..583c803e4e --- /dev/null +++ b/tests/script/tsim/stream/basic5.sim @@ -0,0 +1,217 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c debugflag -v 135 +system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10 +system sh/exec.sh -n dnode1 -s start + +sleep 500 + +sql connect + +print step1============= + +sql create database test3 vgroups 1; +sql use test3; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 into streamt3 as select _wstart, count(*) c1 from t1 state_window(a); + +sql insert into t1 values(1648791211000,1,2,3,1.0); +sql insert into t1 values(1648791213000,2,2,3,1.1); +sql insert into t1 values(1648791215000,3,2,3,1.1); +sql insert into t1 values(1648791217000,4,2,3,1.1); +sql insert into t1 values(1648791219000,5,2,3,1.1); +sql insert into t1 values(1648791221000,6,2,3,1.0); +sql insert into t1 values(1648791223000,7,2,3,1.0); +sql insert into t1 values(1648791225000,8,2,3,1.0); +sql insert into t1 values(1648791227000,9,2,3,1.0); +sql insert into t1 values(1648791229000,10,2,3,1.0); + +sql insert into t1 values(1648791231000,11,2,3,1.0); +sql insert into t1 values(1648791233000,12,2,3,1.1); +sql insert into t1 values(1648791235000,13,2,3,1.1); +sql insert into t1 values(1648791237000,14,2,3,1.1); +sql insert into t1 values(1648791239000,15,2,3,1.1); +sql insert into t1 values(1648791241000,16,2,3,1.0); +sql insert into t1 values(1648791243000,17,2,3,1.0); +sql insert into t1 values(1648791245000,18,2,3,1.0); +sql insert into t1 values(1648791247000,19,2,3,1.0); +sql insert into t1 values(1648791249000,20,2,3,1.0); + +sql insert into t1 values(1648791251000,21,2,3,1.0); +sql insert into t1 values(1648791253000,22,2,3,1.1); +sql insert into t1 values(1648791255000,23,2,3,1.1); +sql insert into t1 values(1648791257000,24,2,3,1.1); +sql insert into t1 values(1648791259000,25,2,3,1.1); +sql insert into t1 values(1648791261000,26,2,3,1.0); +sql insert into t1 values(1648791263000,27,2,3,1.0); +sql insert into t1 values(1648791265000,28,2,3,1.0); +sql insert into t1 values(1648791267000,29,2,3,1.0); +sql insert into t1 values(1648791269000,30,2,3,1.0); + +$loop_count = 0 + +loop8: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt3; +sql select * from streamt3; + +if $rows != 30 then + print =====rows=$rows + goto loop8 +endi + +sql insert into t1 values(1648791211001,1,2,3,1.0); +sql insert into t1 values(1648791213001,2,2,3,1.1); +sql insert into t1 values(1648791215001,3,2,3,1.1); +sql insert into t1 values(1648791217001,4,2,3,1.1); +sql insert into t1 values(1648791219001,5,2,3,1.1); +sql insert into t1 values(1648791221001,6,2,3,1.0); +sql insert into t1 values(1648791223001,7,2,3,1.0); +sql insert into t1 values(1648791225001,8,2,3,1.0); +sql insert into t1 values(1648791227001,9,2,3,1.0); +sql insert into t1 values(1648791229001,10,2,3,1.0); + +$loop_count = 0 + +loop9: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt3; +sql select * from streamt3; + +if $rows != 30 then + print =====rows=$rows + goto loop9 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop9 +endi + +if $data91 != 2 then + print =====data91=$data91 + goto loop9 +endi + +sql insert into t1 values(1648791231001,11,2,3,1.0); +sql insert into t1 values(1648791233001,12,2,3,1.1); +sql insert into t1 values(1648791235001,13,2,3,1.1); +sql insert into t1 values(1648791237001,14,2,3,1.1); +sql insert into t1 values(1648791239001,15,2,3,1.1); +sql insert into t1 values(1648791241001,16,2,3,1.0); +sql insert into t1 values(1648791243001,17,2,3,1.0); +sql insert into t1 values(1648791245001,18,2,3,1.0); +sql insert into t1 values(1648791247001,19,2,3,1.0); +sql insert into t1 values(1648791249001,20,2,3,1.0); + +$loop_count = 0 + +loop10: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt3; +sql select * from streamt3; + +if $rows != 30 then + print =====rows=$rows + goto loop10 +endi + +if $data[10][1] != 2 then + print =====data[10][1]=$data[10][1] + goto loop10 +endi + +if $data[19][1] != 2 then + print =====data[19][1]=$data[19][1] + goto loop10 +endi + +sql insert into t1 values(1648791251001,21,2,3,1.0); +sql insert into t1 values(1648791253001,22,2,3,1.1); +sql insert into t1 values(1648791255001,23,2,3,1.1); +sql insert into t1 values(1648791257001,24,2,3,1.1); + +#/////////////////////// +$loop_count = 0 + +loop11: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt3; +sql select * from streamt3; + +if $rows != 30 then + print =====rows=$rows + goto loop11 +endi + +if $data[20][1] != 2 then + print =====[20][1]=$[20][1] + goto loop11 +endi +#/////////////////////// + +sql insert into t1 values(1648791259001,25,2,3,1.1); +sql insert into t1 values(1648791261001,26,2,3,1.0); +sql insert into t1 values(1648791263001,27,2,3,1.0); +sql insert into t1 values(1648791265001,28,2,3,1.0); +sql insert into t1 values(1648791267001,29,2,3,1.0); +sql insert into t1 values(1648791269001,30,2,3,1.0); + +$loop_count = 0 + +loop11: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt3; +sql select * from streamt3; + +if $rows != 30 then + print =====rows=$rows + goto loop11 +endi + +if $data[20][1] != 2 then + print =====[20][1]=$[20][1] + goto loop11 +endi + +if $data[29][1] != 2 then + print =====[29][1]=$[29][1] + goto loop11 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file