fix(stream): check status when resume to run.
This commit is contained in:
parent
b36cc97236
commit
cf941f7488
|
@ -820,15 +820,23 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
return 0;
|
return 0;
|
||||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
if (pTask != NULL) {
|
|
||||||
ASSERT(streamTaskReadyToRun(pTask, NULL));
|
|
||||||
int64_t execTs = pTask->status.lastExecTs;
|
|
||||||
int32_t idle = taosGetTimestampMs() - execTs;
|
|
||||||
tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
|
|
||||||
|
|
||||||
streamResumeTask(pTask);
|
if (pTask != NULL) {
|
||||||
|
char* pStatus = NULL;
|
||||||
|
if (streamTaskReadyToRun(pTask, &pStatus)) {
|
||||||
|
int64_t execTs = pTask->status.lastExecTs;
|
||||||
|
int32_t idle = taosGetTimestampMs() - execTs;
|
||||||
|
tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
|
||||||
|
|
||||||
|
streamResumeTask(pTask);
|
||||||
|
} else {
|
||||||
|
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
|
||||||
|
pTask->id.idStr, pStatus, status);
|
||||||
|
}
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue