fix(stream): check status before scan history.

This commit is contained in:
Haojun Liao 2025-02-20 18:22:45 +08:00
parent 42bafc0d47
commit 66697ee188
1 changed files with 21 additions and 1 deletions

View File

@ -964,7 +964,27 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step1
const char* id = pTask->id.idStr;
char* pStatus = streamTaskGetStatus(pTask).name;
streamMutexLock(&pTask->lock);
SStreamTaskState s = streamTaskGetStatus(pTask);
if ((s.state != TASK_STATUS__SCAN_HISTORY) || (pTask->status.downstreamReady == 1)) {
tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not ready for scan-history data, quit", id, pMeta->vgId,
s.name, pTask->status.downstreamReady);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
if (pTask->exec.pExecutor == NULL) {
tqError("s-task:%s vgId:%d executor is null, not executor scan history", id, pMeta->vgId);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
streamMutexUnlock(&pTask->lock);
// avoid multi-thread exec
while (1) {