Merge branch 'fix/checkfh' into fix/checkpoint

This commit is contained in:
Haojun Liao 2025-02-20 18:28:46 +08:00
commit 9d5735703b
1 changed files with 24 additions and 1 deletions

View File

@ -954,6 +954,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = NULL;
SStreamTask* pStreamTask = NULL;
char* pStatus = NULL;
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask == NULL) {
@ -964,7 +965,29 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step1
const char* id = pTask->id.idStr;
char* pStatus = streamTaskGetStatus(pTask).name;
streamMutexLock(&pTask->lock);
SStreamTaskState s = streamTaskGetStatus(pTask);
pStatus = s.name;
if ((s.state != TASK_STATUS__SCAN_HISTORY) || (pTask->status.downstreamReady == 0)) {
tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not allowed/ready for scan-history data, quit", id,
pMeta->vgId, s.name, pTask->status.downstreamReady);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
if (pTask->exec.pExecutor == NULL) {
tqError("s-task:%s vgId:%d executor is null, not executor scan history", id, pMeta->vgId);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
streamMutexUnlock(&pTask->lock);
// avoid multi-thread exec
while (1) {