diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 95637fad69..758bf55666 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1126,12 +1126,45 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // now we can stop the stream task execution int64_t latestVer = 0; - taosThreadMutexLock(&pStreamTask->lock); - streamTaskHalt(pStreamTask); - tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, - pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); - latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - taosThreadMutexUnlock(&pStreamTask->lock); + while (1) { + taosThreadMutexLock(&pStreamTask->lock); + int8_t status = pTask->status.taskStatus; + if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + // return; + // do nothing + } + + if (status == TASK_STATUS__HALT) { + // return; + // do nothing + } + + if (pTask->status.taskStatus == TASK_STATUS__CK) { + qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", + pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__CK)); + taosThreadMutexUnlock(&pStreamTask->lock); + taosMsleep(1000); + continue; + } + + // upgrade to halt status + if (status == TASK_STATUS__PAUSE) { + qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), + streamGetTaskStatusStr(TASK_STATUS__PAUSE)); + } else { + qDebug("s-task:%s halt task", pTask->id.idStr); + } + + pTask->status.keepTaskStatus = status; + pTask->status.taskStatus = TASK_STATUS__HALT; + + tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, + pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); + latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + + taosThreadMutexUnlock(&pStreamTask->lock); + break; + } // if it's an source task, extract the last version in wal. pRange = &pTask->dataRange.range;