From cf941f74884288b783139e5dc14e8ef2f4a5d391 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Jan 2024 11:12:23 +0800 Subject: [PATCH] fix(stream): check status when resume to run. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index da1c2ecf16..7883c858f0 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -820,15 +820,23 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead return 0; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - if (pTask != NULL) { - ASSERT(streamTaskReadyToRun(pTask, NULL)); - int64_t execTs = pTask->status.lastExecTs; - int32_t idle = taosGetTimestampMs() - execTs; - tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs); - streamResumeTask(pTask); + if (pTask != NULL) { + char* pStatus = NULL; + if (streamTaskReadyToRun(pTask, &pStatus)) { + int64_t execTs = pTask->status.lastExecTs; + int32_t idle = taosGetTimestampMs() - execTs; + tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs); + + streamResumeTask(pTask); + } else { + int8_t status = streamTaskSetSchedStatusInactive(pTask); + tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, + pTask->id.idStr, pStatus, status); + } streamMetaReleaseTask(pMeta, pTask); } + return 0; }