diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ec9a796cfd..bbdd98e356 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1137,14 +1137,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // wait for the stream task get ready for scan history data while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms", pId, - pTask->info.taskLevel, pId); + tqDebug( + "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", + pId, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); taosMsleep(100); } // now we can stop the stream task execution pStreamTask->status.taskStatus = TASK_STATUS__HALT; - tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId, + tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, pId); // if it's an source task, extract the last version in wal. diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0f2281ea73..f51efb23d1 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -201,6 +201,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (left == 0) { taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; + pTask->status.downstreamReady = 1; if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,