diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5cae6793be..69d07a84c6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1114,6 +1114,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { + streamTaskEnablePause(pTask); + SVersionRange* pRange = NULL; SStreamTask* pStreamTask = NULL; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 958a94d741..54688ed0cc 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -204,11 +204,6 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); } - // enable pause when init completed. - if (pTask->historyTaskId.taskId == 0) { - streamTaskEnablePause(pTask); - } - // when current stream task is ready, check the related fill history task. launchFillHistoryTask(pTask); } @@ -477,6 +472,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory } streamNotifyUpstreamContinue(pTask); + + // sink node does not receive the pause msg from mnode + if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + streamTaskEnablePause(pTask); + } } else { qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left); @@ -497,6 +497,8 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { streamMetaSaveTask(pMeta, pTask); taosWUnLockLatch(&pMeta->lock); + streamTaskEnablePause(pTask); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { streamSchedExec(pTask); }