fix(stream): add task release.

This commit is contained in:
Haojun Liao 2023-08-30 19:06:12 +08:00
parent d99d8875e1
commit 65eaa9b15f
2 changed files with 3 additions and 2 deletions

View File

@ -84,6 +84,7 @@ int32_t tqSetStreamTasksReady(STQ* pTq) {
tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
streamMetaReleaseTask(pMeta, pTask);
continue;
}

View File

@ -877,10 +877,10 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s sink task.resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else {
qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
}