From 65eaa9b15fff87de710d3db88300a9d07dd6c6bb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 30 Aug 2023 19:06:12 +0800 Subject: [PATCH] fix(stream): add task release. --- source/dnode/vnode/src/tq/tqRestore.c | 1 + source/libs/stream/src/streamRecover.c | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index d97124877f..d2419009bf 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -84,6 +84,7 @@ int32_t tqSetStreamTasksReady(STQ* pTq) { tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", pTask->id.idStr); streamLaunchFillHistoryTask(pTask); + streamMetaReleaseTask(pMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 98aa19e7ce..25ec20d06b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -877,10 +877,10 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { pTask->status.taskStatus = pTask->status.keepTaskStatus; pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); - qInfo("vgId:%d s-task:%s resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); + qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); - qInfo("vgId:%d s-task:%s sink task.resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); + qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); } else { qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); }