From bd8c7d8c9399e05e9a85d5d04319615083d41715 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Jun 2024 13:25:48 +0800 Subject: [PATCH] fix(stream): pause not stop the start procedure of stream tasks. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 28 ++------------------- source/libs/stream/src/streamStartHistory.c | 2 +- 2 files changed, 3 insertions(+), 27 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 0f52483149..3ea7ad0718 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -998,18 +998,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SINK) { - ASSERT (status != TASK_STATUS__UNINIT); /*{ -// tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr); -// -// if (pTask->pBackend == NULL) { // TODO: add test cases for this -// int32_t code = pMeta->expandTaskFn(pTask); -// if (code != TSDB_CODE_SUCCESS) { -// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId); -// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); -// } -// } -// int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); - }*/ + ASSERT (status != TASK_STATUS__UNINIT); streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1036,20 +1025,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t streamTrySchedExec(pTask); } } else { - ASSERT (status != TASK_STATUS__UNINIT);// { // todo: fill-history task init ? -// if (pTask->info.fillHistory == 0) { - // tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr); - // - // if (pTask->pBackend == NULL) { // TODO: add test cases for this - // int32_t code = pMeta->expandTaskFn(pTask); - // if (code != TSDB_CODE_SUCCESS) { - // tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId); - // streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); - // } - // } - // int32_t ret = */streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); -// } -// } + ASSERT (status != TASK_STATUS__UNINIT); } streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index adf4c3bef9..7a723647ab 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -204,7 +204,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { // check stream task status in the first place. SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) { + if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT && pStatus->state != TASK_STATUS__PAUSE) { stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, pStatus->name);