diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index f3ae3b773d..5e28568763 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -854,7 +854,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { } else if (pState->state == TASK_STATUS__UNINIT) { tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); ASSERT(pTask->status.downstreamReady == 0); -// /*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); } else { tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); @@ -1001,7 +1000,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t if (level == TASK_LEVEL__SINK && pTask->info.fillHistory == 0) { if (status == TASK_STATUS__UNINIT) { tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr); - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); } streamMetaReleaseTask(pMeta, pTask); return 0; @@ -1031,7 +1030,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ? if (pTask->info.fillHistory == 0) { tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr); - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); } } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 75d2fdb2c7..d986a36343 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -683,7 +683,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code)); } } else { - stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code)); + stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code)); } // TODO: monitoring the checkpoint-report msg