diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8ce9ff6def..adf7d85aeb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1009,7 +1009,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre /*A(pTask->info.nodeId > 0);*/ SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); if (pVgObj == NULL) { - taosRUnLockLatch(&pStream->lock); + taosWUnLockLatch(&pStream->lock); mndTransDrop(pTrans); return -1; } @@ -1019,7 +1019,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, pTask->id.taskId) < 0) { mndReleaseVgroup(pMnode, pVgObj); - taosRUnLockLatch(&pStream->lock); + taosWUnLockLatch(&pStream->lock); mndTransDrop(pTrans); return -1; } @@ -1034,7 +1034,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); - taosRUnLockLatch(&pStream->lock); + taosWUnLockLatch(&pStream->lock); mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); return -1; @@ -1079,6 +1079,78 @@ _ERR: mndTransDrop(pTrans); return -1; } + +static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, + int64_t checkpointId) { + taosWLockLatch(&pStream->lock); + + int32_t totLevel = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < totLevel; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + SStreamTask *pTask = taosArrayGetP(pLevel, 0); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t sz = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + /*A(pTask->info.nodeId > 0);*/ + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + if (pVgObj == NULL) { + taosWUnLockLatch(&pStream->lock); + return -1; + } + + void *buf; + int32_t tlen; + if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, + pTask->id.taskId) < 0) { + mndReleaseVgroup(pMnode, pVgObj); + taosWUnLockLatch(&pStream->lock); + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; + + mndReleaseVgroup(pMnode, pVgObj); + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(buf); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } + } + } + + pStream->checkpointFreq = checkpointId; + pStream->checkpointId = checkpointId; + pStream->checkpointFreq = taosGetTimestampMs(); + atomic_store_64(&pStream->currentTick, 0); + // 3. commit log: stream checkpoint info + pStream->version = pStream->version + 1; + + taosWUnLockLatch(&pStream->lock); + + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL) { + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { + sdbFreeRaw(pCommitRaw); + mError("failed to prepare trans rebalance since %s", terrstr()); + return -1; + } + return 0; +} static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1089,16 +1161,38 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); + if (pTrans == NULL) { + mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; + } + mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); + + mndTransSetDbName(pTrans, "checkpoint", "checkpoint"); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, + tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); + mndTransDrop(pTrans); + return -1; + } + while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId); - if (code == -1) { - mInfo("stream:%s failed to do checkpoint, reason: last checkpoint not finished", pStream->name); - } + + code = mndAddStreamCheckpointToTrans(pTrans, pStream, pMnode, checkpointId); sdbRelease(pSdb, pStream); + if (code == -1) { + break; + } } - return 0; + if (code == 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("failed to prepre trans rebalance since %s", terrstr()); + } + } + mndTransDrop(pTrans); + return code; } static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7ebaf6dda5..f622d49a4f 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -28,9 +28,9 @@ #define TRANS_ARRAY_SIZE 8 #define TRANS_RESERVE_SIZE 48 -static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); -static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); -static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc); +static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); +static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); +static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); @@ -100,10 +100,9 @@ static int32_t mndTransGetActionsSize(SArray *pArray) { return rawDataLen; } - static int32_t mndTransEncodeAction(SSdbRaw *pRaw, int32_t *offset, SArray *pActions, int32_t actionsNum) { int32_t dataPos = *offset; - int8_t unused = 0; + int8_t unused = 0; int32_t ret = -1; for (int32_t i = 0; i < actionsNum; ++i) { @@ -266,16 +265,16 @@ _OVER: SSdbRow *mndTransDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_INVALID_MSG; - SSdbRow *pRow = NULL; - STrans *pTrans = NULL; - char *pData = NULL; - int32_t dataLen = 0; - int8_t sver = 0; - int32_t prepareActionNum = 0; - int32_t redoActionNum = 0; - int32_t undoActionNum = 0; - int32_t commitActionNum = 0; - int32_t dataPos = 0; + SSdbRow *pRow = NULL; + STrans *pTrans = NULL; + char *pData = NULL; + int32_t dataLen = 0; + int8_t sver = 0; + int32_t prepareActionNum = 0; + int32_t redoActionNum = 0; + int32_t undoActionNum = 0; + int32_t commitActionNum = 0; + int32_t dataPos = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -577,7 +576,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); - pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0; + pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64(); taosInitRWLatch(&pTrans->lockRpcArray); taosThreadMutexInit(&pTrans->mutex, NULL); @@ -1327,7 +1326,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) } bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { - bool continueExec = true; + bool continueExec = true; int32_t code = 0; int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index cb4b3231f6..1907b569d5 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -452,7 +452,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { } } - if (diffIdx == -1 && diffIdx == 0) { + if (diffIdx == -1 || diffIdx == 0) { goto _err; } @@ -939,7 +939,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) { return 0; } - metaInfo("ttl find expired table count: %zu" , TARRAY_SIZE(tbUids)); + metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids)); metaDropTables(pMeta, tbUids); return 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d7373b2aac..927c85b262 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1788,10 +1788,13 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); int32_t tlen = len - (pBuff - buf); - void* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0, pInfo->igCheckUpdate); + void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo); if (code == TSDB_CODE_SUCCESS) { + pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); pInfo->pUpdateInfo = pUpInfo; + } else { + taosMemoryFree(pUpInfo); } } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 0b7d6d7693..cfccf1d286 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -40,16 +40,8 @@ typedef struct { rocksdb_comparator_t** pCompares; } RocksdbCfInst; -uint32_t nextPow2(uint32_t x) { - if (x <= 1) return 2; - x = x - 1; - x = x | (x >> 1); - x = x | (x >> 2); - x = x | (x >> 4); - x = x | (x >> 8); - x = x | (x >> 16); - return x + 1; -} +uint32_t nextPow2(uint32_t x); + int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); void destroyRocksdbCfInst(RocksdbCfInst* inst); @@ -262,8 +254,8 @@ void streamBackendCleanup(void* arg) { taosThreadMutexDestroy(&pHandle->cfMutex); - taosMemoryFree(pHandle); qDebug("destroy stream backend backend:%p", pHandle); + taosMemoryFree(pHandle); return; } void streamBackendHandleCleanup(void* arg) { @@ -986,8 +978,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t char suffix[64] = {0}; rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); - RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*)); - rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t**)); + RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam)); + rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*)); rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); for (int i = 0; i < nCf; i++) { @@ -1153,7 +1145,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { param[i].tableOpt = tableOpt; }; - rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); + rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); for (int i = 0; i < cfLen; i++) { SCfInit* cf = &ginitDict[i]; @@ -1294,8 +1286,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChk int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ if (err != NULL) { \ - taosMemoryFree(err); \ qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + taosMemoryFree(err); \ code = -1; \ } else { \ qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \ @@ -2361,3 +2353,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { } return 0; } +uint32_t nextPow2(uint32_t x) { + if (x <= 1) return 2; + x = x - 1; + x = x | (x >> 1); + x = x | (x >> 2); + x = x | (x >> 4); + x = x | (x >> 8); + x = x | (x >> 16); + return x + 1; +} diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 6d61f65394..bca9dcabda 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -507,10 +507,12 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); taosMemoryFreeClear(pNode); + taosMemoryFreeClear(pVal); break; } ASSERT(pVLen == pFileState->rowSize); memcpy(pNewPos->pRowBuff, pVal, pVLen); + taosMemoryFreeClear(pVal); code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { destroyRowBuffPos(pNewPos); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 21fed2e1f5..d3bd114cd8 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -997,6 +997,7 @@ ,,y,script,./test.sh -f tsim/stream/basic2.sim ,,y,script,./test.sh -f tsim/stream/basic3.sim ,,y,script,./test.sh -f tsim/stream/basic4.sim +,,y,script,./test.sh -f tsim/stream/checkpointInterval0.sim ,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim ,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim ,,y,script,./test.sh -f tsim/stream/deleteInterval.sim diff --git a/tests/script/tsim/stream/checkpoint0.sim b/tests/script/tsim/stream/checkpointInterval0.sim similarity index 64% rename from tests/script/tsim/stream/checkpoint0.sim rename to tests/script/tsim/stream/checkpointInterval0.sim index 325667ee2b..5bc8222a54 100644 --- a/tests/script/tsim/stream/checkpoint0.sim +++ b/tests/script/tsim/stream/checkpointInterval0.sim @@ -4,20 +4,16 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +print step 1 + print =============== create database sql create database test vgroups 1; -sql select * from information_schema.ins_databases -if $rows != 3 then - return -1 -endi - -print $data00 $data01 $data02 sql use test; sql create table t1(ts timestamp, a int, b int , c int, d double); -sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s); +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s); sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791213001,2,2,3,1.1); @@ -29,7 +25,7 @@ sleep 1000 sql select * from streamt; $loop_count = $loop_count + 1 -if $loop_count == 20 then +if $loop_count == 10 then return -1 endi @@ -49,13 +45,13 @@ if $data02 != 3 then goto loop0 endi -print waiting for checkpoint generation ...... +print waiting for checkpoint generation 1 ...... sleep 25000 -print restart taosd +print restart taosd 01 ...... -system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/stop_dnodes.sh system sh/exec.sh -n dnode1 -s start @@ -69,7 +65,7 @@ sleep 1000 sql select * from streamt; $loop_count = $loop_count + 1 -if $loop_count == 20 then +if $loop_count == 10 then return -1 endi @@ -89,7 +85,7 @@ if $data02 != 6 then goto loop1 endi -sql insert into t1 values(1648791223002,4,2,3,1.1); +sql insert into t1 values(1648791223003,4,2,3,1.1); $loop_count = 0 @@ -99,7 +95,7 @@ sleep 1000 sql select * from streamt; $loop_count = $loop_count + 1 -if $loop_count == 20 then +if $loop_count == 10 then return -1 endi @@ -121,13 +117,62 @@ endi # row 1 if $data11 != 1 then - print =====data01=$data01 + print =====data11=$data11 goto loop2 endi if $data12 != 4 then - print =====data02=$data02 + print =====data12=$data12 goto loop2 endi +print step 2 + +print restart taosd 02 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791223004,5,2,3,1.1); + +loop20: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop20 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop20 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop20 +endi + +# row 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop20 +endi + +if $data12 != 9 then + print =====data12=$data12 + goto loop20 +endi + +print end--------------------------------- + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/stream/checkpointInterval1.sim b/tests/script/tsim/stream/checkpointInterval1.sim new file mode 100644 index 0000000000..21825e7f48 --- /dev/null +++ b/tests/script/tsim/stream/checkpointInterval1.sim @@ -0,0 +1,104 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step 1 + +sql create database test vgroups 4; + +sql use test; + +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st interval(10s); + +sql insert into t1 values(1648791213000,1,2,3,1.0); + +sql insert into t2 values(1648791213001,2,2,3,1.1); + +$loop_count = 0 + +loop0: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop0 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 3 then + print =====data02=$data02 + goto loop0 +endi + +print waiting for checkpoint generation 1 ...... + +sleep 25000 + +print restart taosd + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791213002,3,2,3,1.1); +sql insert into t2 values(1648791223003,4,2,3,1.1); + +$loop_count = 0 + +loop1: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop1 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop1 +endi + +# row 1 +if $data11 != 1 then + print =====data11=$data11 + goto loop1 +endi + +if $data12 != 4 then + print =====data12=$data12 + goto loop1 +endi + +print end--------------------------------- + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/stream/checkpointSession0.sim b/tests/script/tsim/stream/checkpointSession0.sim new file mode 100644 index 0000000000..1d503806c5 --- /dev/null +++ b/tests/script/tsim/stream/checkpointSession0.sim @@ -0,0 +1,178 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step 1 + +print =============== create database +sql create database test vgroups 1; + +sql use test; + + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 session(ts, 10s); +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791213001,2,2,3,1.1); + +$loop_count = 0 + +loop0: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop0 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 3 then + print =====data02=$data02 + goto loop0 +endi + +print waiting for checkpoint generation 1 ...... + +sleep 25000 + +print restart taosd 01 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791213002,3,2,3,1.1); + +$loop_count = 0 + +loop1: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop1 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop1 +endi + +sql insert into t1 values(1648791233003,4,2,3,1.1); + +$loop_count = 0 + +loop2: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop2 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop2 +endi + +# row 1 +if $data11 != 1 then + print =====data11=$data11 + goto loop2 +endi + +if $data12 != 4 then + print =====data12=$data12 + goto loop2 +endi + +print step 2 + +print restart taosd 02 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791233004,5,2,3,1.1); + +loop20: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop20 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop20 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop20 +endi + +# row 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop20 +endi + +if $data12 != 9 then + print =====data12=$data12 + goto loop20 +endi + +print end--------------------------------- + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/stream/checkpointSession1.sim b/tests/script/tsim/stream/checkpointSession1.sim new file mode 100644 index 0000000000..5c9625aabb --- /dev/null +++ b/tests/script/tsim/stream/checkpointSession1.sim @@ -0,0 +1,104 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step 1 + +sql create database test vgroups 4; + +sql use test; + +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st session(ts, 10s); + +sql insert into t1 values(1648791213000,1,2,3,1.0); + +sql insert into t2 values(1648791213001,2,2,3,1.1); + +$loop_count = 0 + +loop0: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop0 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 3 then + print =====data02=$data02 + goto loop0 +endi + +print waiting for checkpoint generation 1 ...... + +sleep 25000 + +print restart taosd + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791213002,3,2,3,1.1); +sql insert into t2 values(1648791233003,4,2,3,1.1); + +$loop_count = 0 + +loop1: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop1 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop1 +endi + +# row 1 +if $data11 != 1 then + print =====data11=$data11 + goto loop1 +endi + +if $data12 != 4 then + print =====data12=$data12 + goto loop1 +endi + +print end--------------------------------- + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/win-test-file b/tests/script/win-test-file index d394ce6876..eb94f32599 100644 --- a/tests/script/win-test-file +++ b/tests/script/win-test-file @@ -240,6 +240,7 @@ ./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/basic3.sim ./test.sh -f tsim/stream/basic4.sim +./test.sh -f tsim/stream/checkpointInterval0.sim ./test.sh -f tsim/stream/checkStreamSTable1.sim ./test.sh -f tsim/stream/checkStreamSTable.sim ./test.sh -f tsim/stream/deleteInterval.sim