diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 2d8fe4a0e1..226a06be7e 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -703,31 +703,31 @@ void rspMonitorFn(void* param, void* tmrId) { if (pStat->state == TASK_STATUS__UNINIT) { getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); + + numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); + numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); + + // fault tasks detected, not try anymore + ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total); + if (numOfFault > 0) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug( + "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " + "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + + streamTaskCompleteCheckRsp(pInfo, false, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + streamMetaReleaseTask(pMeta, pTask); + + taosArrayDestroy(pNotReadyList); + taosArrayDestroy(pTimeoutList); + return; + } } else { // unexpected status stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } - numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); - numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); - - // fault tasks detected, not try anymore - ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total); - if (numOfFault > 0) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug( - "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); - - streamTaskCompleteCheckRsp(pInfo, false, id); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - streamMetaReleaseTask(pMeta, pTask); - - taosArrayDestroy(pNotReadyList); - taosArrayDestroy(pTimeoutList); - return; - } - // checking of downstream tasks has been stopped by other threads if (pInfo->stopCheckProcess == 1) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);