fix(stream): initialize the sink stream task.

This commit is contained in:
Haojun Liao 2024-06-21 09:22:09 +08:00
parent 8fd9baf6f5
commit ece139d921
1 changed files with 5 additions and 3 deletions

View File

@ -997,8 +997,10 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
ETaskStatus status = streamTaskGetStatus(pTask)->state;
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
if (level == TASK_LEVEL__SINK && pTask->info.fillHistory == 0) {
if (status == TASK_STATUS__UNINIT) {
tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr);
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
}
streamMetaReleaseTask(pMeta, pTask);
return 0;
@ -1025,9 +1027,9 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else {
streamTrySchedExec(pTask);
}
} else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ?
} else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ?
if (pTask->info.fillHistory == 0) {
tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr);
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
}
}