diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 971604ab1c..5e145d8fbb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -432,7 +432,7 @@ struct SStreamTask { typedef struct STaskStartInfo { int64_t startTs; int64_t readyTs; - int32_t startedAfterNodeUpdate; + int32_t startAllTasksFlag; SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing int32_t elapsedTime; } STaskStartInfo; @@ -735,7 +735,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask); int32_t onNormalTaskReady(SStreamTask* pTask); int32_t onScanhistoryTaskReady(SStreamTask* pTask); -//int32_t streamTaskStartScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); @@ -798,7 +797,6 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); -int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaReopen(SStreamMeta* pMeta); @@ -808,12 +806,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); +int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); void streamTaskClearCheckInfo(SStreamTask* pTask); - int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 12e273c32d..89d70bfabb 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -231,7 +231,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq); +int32_t tqLaunchStreamTaskAsync(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index db9ef9fa77..e1b27049af 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1267,12 +1267,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); -#if 0 - // the fill-history task starts to process data in wal, let's set it status to be normal now - if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) { - streamSetStatusNormal(pTask); - } -#endif // now the fill-history task starts to scan data from wal files. streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); @@ -1530,14 +1524,17 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } streamTaskResume(pTask); + ETaskStatus status = streamTaskGetStatus(pTask, NULL); int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SINK) { + if (status == TASK_STATUS__UNINIT) { + + } streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } - ETaskStatus status = streamTaskGetStatus(pTask, NULL); if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) { // no lock needs to secure the access of the version if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { @@ -1558,6 +1555,11 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } else { streamSchedExec(pTask); } + } else if (status == TASK_STATUS__UNINIT) { + if (pTask->info.fillHistory == 0) { + EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + streamTaskHandleEvent(pTask->status.pSM, event); + } } streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1951,7 +1953,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - pMeta->startInfo.startedAfterNodeUpdate = 1; + pMeta->startInfo.startAllTasksFlag = 1; if (updateTasks < numOfTasks) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, @@ -1960,7 +1962,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } else { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); - pMeta->startInfo.startedAfterNodeUpdate = 0; + pMeta->startInfo.startAllTasksFlag = 0; taosWUnLockLatch(&pMeta->lock); } else { tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); @@ -1991,7 +1993,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqResetStreamTaskStatus(pTq); - tqCheckAndRunStreamTaskAsync(pTq); + tqLaunchStreamTaskAsync(pTq); } else { vInfo("vgId:%d, follower node not start stream tasks", vgId); } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index efdd865d6c..1f1dd61c3c 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -98,6 +98,7 @@ int32_t tqStartStreamTask(STQ* pTq) { streamLaunchFillHistoryTask(pTask); } + streamMetaUpdateTaskReadyInfo(pTask); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -111,7 +112,7 @@ int32_t tqStartStreamTask(STQ* pTq) { return 0; } -int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { +int32_t tqLaunchStreamTaskAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 218624acaf..f64c490ab3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -552,7 +552,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) pVnode->restored = true; taosWLockLatch(&pVnode->pTq->pStreamMeta->lock); - if (pVnode->pTq->pStreamMeta->startInfo.startedAfterNodeUpdate) { + if (pVnode->pTq->pStreamMeta->startInfo.startAllTasksFlag) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); return; @@ -565,7 +565,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); tqResetStreamTaskStatus(pVnode->pTq); - tqCheckAndRunStreamTaskAsync(pVnode->pTq); + tqLaunchStreamTaskAsync(pVnode->pTq); } } else { vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index eb4fe3a498..f788e244cd 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -229,10 +229,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF int32_t streamMetaReopen(SStreamMeta* pMeta) { // backup the restart flag - int32_t restartFlag = pMeta->startInfo.startedAfterNodeUpdate; + int32_t restartFlag = pMeta->startInfo.startAllTasksFlag; streamMetaClear(pMeta); - pMeta->startInfo.startedAfterNodeUpdate = restartFlag; + pMeta->startInfo.startAllTasksFlag = restartFlag; // NOTE: role should not be changed during reopen meta pMeta->streamBackendRid = -1; @@ -446,24 +446,6 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { return (int32_t)size; } -int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { - int32_t num = 0; - size_t size = taosArrayGetSize(pMeta->pTaskList); - for (int32_t i = 0; i < size; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); - SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); - if (p == NULL) { - continue; - } - - if ((*p)->info.fillHistory == 0) { - num += 1; - } - } - - return num; -} - SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { taosRLockLatch(&pMeta->lock); @@ -1106,6 +1088,6 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) { void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { taosHashClear(pStartInfo->pReadyTaskSet); - pStartInfo->startedAfterNodeUpdate = 0; + pStartInfo->startAllTasksFlag = 0; pStartInfo->readyTs = 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index e47f7fa6e7..2d951147d0 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -35,7 +35,7 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); static void tryLaunchHistoryTask(void* param, void* tmrId); -static int32_t updateTaskReadyInMeta(SStreamTask* pTask); +static void doProcessDownstreamReadyRsp(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { char* p = NULL; @@ -57,7 +57,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - updateTaskReadyInMeta(pTask); + streamMetaUpdateTaskReadyInfo(pTask); return TSDB_CODE_SUCCESS; } @@ -114,7 +114,7 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) { } // check status -static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { +void streamTaskCheckDownstream(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; @@ -163,10 +163,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { } } else { stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT); + doProcessDownstreamReadyRsp(pTask); } - - return 0; } static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { @@ -272,8 +270,13 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { ASSERT(status == TASK_STATUS__READY); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int64_t startVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (startVer == -1) { + startVer = pTask->chkInfo.nextProcessVer; + } + stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, - id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader)); + id, p, pTask->status.schedStatus, startVer); } else { stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus); } @@ -304,36 +307,15 @@ int32_t onScanhistoryTaskReady(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -// todo: refactor this function. -static void doProcessDownstreamReadyRsp(SStreamTask* pTask) { - streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT); - -#if 0 - const char* id = pTask->id.idStr; - - int8_t status = pTask->status.taskStatus; - const char* str = streamGetTaskStatusStr(status); - - ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__READY); - streamTaskSetRangeStreamCalc(pTask); - - if (status == TASK_STATUS__SCAN_HISTORY) { - stDebug("s-task:%s enter into scan-history data stage, status:%s", id, str); - streamTaskStartScanHistory(pTask); - // start the related fill-history task, when current task is ready - streamLaunchFillHistoryTask(pTask); +void doProcessDownstreamReadyRsp(SStreamTask* pTask) { + EStreamTaskEvent event; + if (pTask->info.fillHistory == 0) { + event = HAS_RELATED_FILLHISTORY_TASK(pTask)? TASK_EVENT_INIT_STREAM_SCANHIST:TASK_EVENT_INIT; } else { - // fill-history tasks are not allowed to reach here. - if (pTask->info.fillHistory == 1) { - stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); - pTask->status.taskStatus = TASK_STATUS__DROPPING; - ASSERT(pTask->hTaskInfo.id.taskId == 0); - } else { - stDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); - streamTaskEnablePause(pTask); - } + event = TASK_EVENT_INIT_SCANHIST; } -#endif + + streamTaskOnHandleEventSuccess(pTask->status.pSM, event); } int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { @@ -951,17 +933,6 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } } -// only the downstream tasks are ready, set the task to be ready to work. -void streamTaskCheckDownstream(SStreamTask* pTask) { -// if (pTask->info.fillHistory) { -// ASSERT(0); -// stDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); -// return; -// } - - doCheckDownstreamStatus(pTask); -} - void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { #if 0 int8_t status = pTask->status.taskStatus; @@ -1076,7 +1047,7 @@ void streamTaskEnablePause(SStreamTask* pTask) { pTask->status.pauseAllowed = 1; } -int32_t updateTaskReadyInMeta(SStreamTask* pTask) { +int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; taosWLockLatch(&pMeta->lock); @@ -1088,13 +1059,9 @@ int32_t updateTaskReadyInMeta(SStreamTask* pTask) { if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) { STaskStartInfo* pStartInfo = &pMeta->startInfo; - pStartInfo->readyTs = pTask->execInfo.start; - if (pStartInfo->startTs != 0) { - pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs; - } else { - pStartInfo->elapsedTime = 0; - } + pStartInfo->readyTs = pTask->execInfo.start; + pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; streamMetaResetStartInfo(pStartInfo); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c3294432c2..2e41490be6 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -140,9 +140,9 @@ void streamTaskRestoreStatus(SStreamTask* pTask) { pSM->prev.evt = 0; pSM->startTs = taosGetTimestampMs(); + stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); taosThreadMutexUnlock(&pTask->lock); - stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); } SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { @@ -186,6 +186,7 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) { static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) { SStreamTask* pTask = pSM->pTask; + const char* id = pTask->id.idStr; if (pTrans->attachEvent.event != 0) { attachEvent(pTask, &pTrans->attachEvent); @@ -198,16 +199,15 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt taosThreadMutexUnlock(&pTask->lock); if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) { - stDebug("s-task:%s attached event:%s handled", pTask->id.idStr, StreamTaskEventList[pTrans->event].name); + stDebug("s-task:%s attached event:%s handled", id, StreamTaskEventList[pTrans->event].name); return TSDB_CODE_SUCCESS; } else { // this event has been handled already - stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pTask->id.idStr, - StreamTaskEventList[event].name); + stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, StreamTaskEventList[event].name); taosMsleep(100); } } - } else { // override current active trans + } else { // override current active trans pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); @@ -244,7 +244,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (pSM->pActiveTrans != NULL) { // currently in some state transfer procedure, not auto invoke transfer, abort it - stDebug("s-task:%s handle event procedure %s quit, status %s -> %s failed, handle event %s now", + stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, pSM->pActiveTrans->next.name, StreamTaskEventList[event].name); } @@ -275,16 +275,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even ETaskStatus s = pSM->current.state; ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP); // the pSM->prev.evt may be 0, so print string is not appropriate. - stDebug("status not handled success, current status:%s, trigger event:%d, %s", pSM->current.name, pSM->prev.evt, - pTask->id.idStr); + stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr, + StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_INVALID_PARA; } if (pTrans->event != event) { - stWarn("s-task:%s handle event:%s failed, current status:%s", pTask->id.idStr, StreamTaskEventList[event].name, - pSM->current.name); + stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, + StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_INVALID_PARA; } @@ -318,7 +318,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even int32_t code = pNextTrans->pAction(pSM->pTask); if (pNextTrans->autoInvokeEndFn) { - return streamTaskOnHandleEventSuccess(pSM, event); + return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event); } else { return code; } diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index 128aafb0c9..673bc77c0f 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -18,6 +18,7 @@ sql create table ts4 using st tags(4,2,2); sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); sleep 1000 +sleep 1000 sql pause stream streams1; sql insert into ts1 values(1648791213001,1,12,3,1.0); @@ -247,6 +248,7 @@ sql create table ts4 using st tags(4,2,2); sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt3 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt4 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); sql create stream streams5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt5 as select _wstart, count(*) c1, sum(a) c3 from ts1 interval(10s); +sleep 1000 sql pause stream streams3; diff --git a/tests/system-test/8-stream/max_delay_session.py b/tests/system-test/8-stream/max_delay_session.py index 1a734e0e61..46c4c5801d 100644 --- a/tests/system-test/8-stream/max_delay_session.py +++ b/tests/system-test/8-stream/max_delay_session.py @@ -35,6 +35,8 @@ class TDTestCase: self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value) self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value) init_num = 0 + time.sleep(1) + for i in range(self.tdCom.range_count): if i == 0: window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session'])