diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1c0f73249c..119edb47bc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -954,6 +954,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t code = TSDB_CODE_SUCCESS; SStreamTask* pTask = NULL; SStreamTask* pStreamTask = NULL; + char* pStatus = NULL; code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); if (pTask == NULL) { @@ -964,7 +965,29 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // do recovery step1 const char* id = pTask->id.idStr; - char* pStatus = streamTaskGetStatus(pTask).name; + streamMutexLock(&pTask->lock); + + SStreamTaskState s = streamTaskGetStatus(pTask); + pStatus = s.name; + + if ((s.state != TASK_STATUS__SCAN_HISTORY) || (pTask->status.downstreamReady == 0)) { + tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not allowed/ready for scan-history data, quit", id, + pMeta->vgId, s.name, pTask->status.downstreamReady); + + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } + + if (pTask->exec.pExecutor == NULL) { + tqError("s-task:%s vgId:%d executor is null, not executor scan history", id, pMeta->vgId); + + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } + + streamMutexUnlock(&pTask->lock); // avoid multi-thread exec while (1) {