diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 39cc8b10d5..76d89be802 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -159,7 +159,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); int32_t tqStartStreamTask(STQ* pTq); -int32_t tqStartStreamTasks(STQ* pTq); +int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); // tq util diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 47102531e2..db9ef9fa77 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1990,7 +1990,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); - tqStartStreamTasks(pTq); + tqResetStreamTaskStatus(pTq); tqCheckAndRunStreamTaskAsync(pTq); } else { vInfo("vgId:%d, follower node not start stream tasks", vgId); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b8aaf6bf60..efdd865d6c 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -227,7 +227,7 @@ int32_t tqStopStreamTasks(STQ* pTq) { return 0; } -int32_t tqStartStreamTasks(STQ* pTq) { +int32_t tqResetStreamTaskStatus(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = TD_VID(pTq->pVnode); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 6c03ed68e9..218624acaf 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -564,7 +564,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); - tqStartStreamTasks(pVnode->pTq); + tqResetStreamTaskStatus(pVnode->pTq); tqCheckAndRunStreamTaskAsync(pVnode->pTq); } } else { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 751b809fb7..e47f7fa6e7 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -271,13 +271,6 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { ETaskStatus status = streamTaskGetStatus(pTask, &p); ASSERT(status == TASK_STATUS__READY); - // todo refactor: remove this later -// if (pTask->info.fillHistory == 1) { -// stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); -// pTask->status.taskStatus = TASK_STATUS__DROPPING; -// ASSERT(pTask->hTaskInfo.id.taskId == 0); -// } - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader));