From ece139d921d037d97091972690169497cb009cda Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 09:22:09 +0800 Subject: [PATCH] fix(stream): initialize the sink stream task. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 50a52e58c5..def556534c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -997,8 +997,10 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t ETaskStatus status = streamTaskGetStatus(pTask)->state; int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SINK) { + if (level == TASK_LEVEL__SINK && pTask->info.fillHistory == 0) { if (status == TASK_STATUS__UNINIT) { + tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr); + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); } streamMetaReleaseTask(pMeta, pTask); return 0; @@ -1025,9 +1027,9 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else { streamTrySchedExec(pTask); } - } else if (status == TASK_STATUS__UNINIT) { - // todo: fill-history task init ? + } else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ? if (pTask->info.fillHistory == 0) { + tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr); streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); } }