diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1c0f73249c..7a07904ab6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -964,7 +964,27 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // do recovery step1 const char* id = pTask->id.idStr; - char* pStatus = streamTaskGetStatus(pTask).name; + streamMutexLock(&pTask->lock); + + SStreamTaskState s = streamTaskGetStatus(pTask); + if ((s.state != TASK_STATUS__SCAN_HISTORY) || (pTask->status.downstreamReady == 1)) { + tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not ready for scan-history data, quit", id, pMeta->vgId, + s.name, pTask->status.downstreamReady); + + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } + + if (pTask->exec.pExecutor == NULL) { + tqError("s-task:%s vgId:%d executor is null, not executor scan history", id, pMeta->vgId); + + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } + + streamMutexUnlock(&pTask->lock); // avoid multi-thread exec while (1) {