From a8fac441be5698baf22ece6c0c99817bf242b0ef Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Apr 2024 14:02:31 +0800 Subject: [PATCH 1/4] fix(stream):update the check-rsp prcedure, to avoid repeatly start check-rsp procedure. --- include/libs/stream/tstream.h | 5 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 +- source/libs/stream/src/streamStart.c | 11 +-- source/libs/stream/src/streamTask.c | 81 ++++++++++++++-------- 4 files changed, 61 insertions(+), 41 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0f399da8fd..de7c743b7d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -444,6 +444,7 @@ typedef struct STaskCheckInfo { int64_t startTs; int32_t notReadyTasks; int32_t inCheckProcess; + int32_t stopCheckProcess; tmr_h checkRspTmr; TdThreadMutex checkInfoLock; } STaskCheckInfo; @@ -844,14 +845,12 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id); int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, int32_t* pNotReady, const char* id); void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo); -int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); -int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); +int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 4ce8579ea0..4667cd73b1 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -216,7 +216,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); - streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); + + streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -231,7 +232,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); streamTaskResetStatus(*ppHTask); - streamTaskCompleteCheck(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); + streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index b9b7c8ddfa..0c4f00de6a 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -184,13 +184,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { ASSERT(pTask->status.downstreamReady == 0); - int32_t code = streamTaskStartCheckDownstream(&pTask->taskCheckInfo, pTask->id.idStr); - if (code != TSDB_CODE_SUCCESS) { - return; - } - - streamTaskInitTaskCheckInfo(&pTask->taskCheckInfo, &pTask->outputInfo, taosGetTimestampMs()); - // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.reqId = tGenIdPI64(); @@ -230,7 +223,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamTaskStartMonitorCheckRsp(pTask); } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); + streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); doProcessDownstreamReadyRsp(pTask); } } @@ -405,7 +398,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (left == 0) { doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag - streamTaskCompleteCheck(pInfo, id); + streamTaskStopMonitorCheckRsp(pInfo, id); } else { stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7dc93ceccf..5d725b012c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -942,15 +942,13 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { return 0; } -int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { +static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { if (pInfo->pList == NULL) { pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); } else { taosArrayClear(pInfo->pList); } - taosThreadMutexLock(&pInfo->checkInfoLock); - if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { pInfo->notReadyTasks = 1; } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -959,8 +957,6 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut } pInfo->startTs = startTs; - - taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_SUCCESS; } @@ -1014,39 +1010,33 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t return TSDB_CODE_FAILED; } -int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { - taosThreadMutexLock(&pInfo->checkInfoLock); +static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { if (pInfo->inCheckProcess == 0) { pInfo->inCheckProcess = 1; } else { ASSERT(pInfo->startTs > 0); - stError("s-task:%s already in check procedure, checkTs:%"PRId64, id, pInfo->startTs); - - taosThreadMutexUnlock(&pInfo->checkInfoLock); + stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs); return TSDB_CODE_FAILED; } - taosThreadMutexUnlock(&pInfo->checkInfoLock); stDebug("s-task:%s set the in-check-procedure flag", id); return 0; } -int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) { - taosThreadMutexLock(&pInfo->checkInfoLock); +static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) { if (!pInfo->inCheckProcess) { - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; + stWarn("s-task:%s already not in-check-procedure", id); } int64_t el = taosGetTimestampMs() - pInfo->startTs; - stDebug("s-task:%s clear the in-check-procedure flag, elapsed time:%" PRId64 " ms", id, el); + stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; - pInfo->inCheckProcess = 0; pInfo->notReadyTasks = 0; + pInfo->inCheckProcess = 0; + pInfo->stopCheckProcess = 1; taosArrayClear(pInfo->pList); - taosThreadMutexUnlock(&pInfo->checkInfoLock); return 0; } @@ -1108,7 +1098,10 @@ static void rspMonitorFn(void* param, void* tmrId) { if (state == TASK_STATUS__STOP) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); - streamTaskCompleteCheck(pInfo, id); + + taosThreadMutexLock(&pInfo->checkInfoLock); + streamTaskCompleteCheckRsp(pInfo, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); return; @@ -1117,7 +1110,11 @@ static void rspMonitorFn(void* param, void* tmrId) { if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); - streamTaskCompleteCheck(pInfo, id); + + taosThreadMutexLock(&pInfo->checkInfoLock); + streamTaskCompleteCheckRsp(pInfo, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return; } @@ -1127,8 +1124,8 @@ static void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name, vgId, ref); + streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); - streamTaskCompleteCheck(pInfo, id); return; } @@ -1176,17 +1173,19 @@ static void rspMonitorFn(void* param, void* tmrId) { taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); - streamTaskCompleteCheck(pInfo, id); + streamTaskCompleteCheckRsp(pInfo, id); return; } // checking of downstream tasks has been stopped by other threads - if (pInfo->inCheckProcess == 0) { + if (pInfo->stopCheckProcess == 1) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " "timeout:%d, ready:%d ref:%d", id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + + streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. @@ -1238,10 +1237,10 @@ static void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now); } + taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); - stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, + stDebug("s-task:%s continue checking rsp in 300ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); @@ -1249,14 +1248,42 @@ static void rspMonitorFn(void* param, void* tmrId) { } int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { - ASSERT(pTask->taskCheckInfo.checkRspTmr == NULL); + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + + taosThreadMutexLock(&pInfo->checkInfoLock); + int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_FAILED; + } + + streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); - pTask->taskCheckInfo.checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); + + if (pInfo->checkRspTmr == NULL) { + pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); + } else { + taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, pInfo->checkRspTmr); + } + + taosThreadMutexUnlock(&pInfo->checkInfoLock); return 0; } +int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { + taosThreadMutexLock(&pInfo->checkInfoLock); + streamTaskCompleteCheckRsp(pInfo, id); + + pInfo->stopCheckProcess = 1; + taosThreadMutexUnlock(&pInfo->checkInfoLock); + + stDebug("s-task:%s set stop check rsp mon", id); + return TSDB_CODE_SUCCESS; +} + void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) { ASSERT(pInfo->inCheckProcess == 0); From 6c76790e6c2da2a7e45c5fed94a209eb4f012798 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Apr 2024 14:03:13 +0800 Subject: [PATCH 2/4] fix(stream): fix a typo --- source/libs/stream/src/streamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5d725b012c..c7fb86d556 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1034,7 +1034,7 @@ static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) pInfo->startTs = 0; pInfo->notReadyTasks = 0; pInfo->inCheckProcess = 0; - pInfo->stopCheckProcess = 1; + pInfo->stopCheckProcess = 0; taosArrayClear(pInfo->pList); return 0; From 97c1f88b2279f1a33c80b53457aeb8b3f5e9356f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Apr 2024 14:06:11 +0800 Subject: [PATCH 3/4] refactor: do some internal refactor. --- source/libs/stream/src/streamTask.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c7fb86d556..0dc3682da7 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1138,7 +1138,8 @@ static void rspMonitorFn(void* param, void* tmrId) { if (p->status == TASK_DOWNSTREAM_READY) { numOfReady += 1; } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { - stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", id, p->taskId); + stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id, + p->taskId); numOfFault += 1; } else { // TASK_DOWNSTREAM_NOT_READY if (p->rspTs == 0) { // not response yet From cf7ffb82e011d6797c17e914b083f565cfc570a0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Apr 2024 17:18:01 +0800 Subject: [PATCH 4/4] fix(stream): init before send msg, to avoid race condition. --- source/libs/stream/src/streamStart.c | 7 +++-- source/libs/stream/src/streamTask.c | 46 ++++++++++++++++------------ 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 0c4f00de6a..cc1987492c 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -186,6 +186,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + streamTaskStartMonitorCheckRsp(pTask); + req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; @@ -199,8 +201,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); - streamTaskStartMonitorCheckRsp(pTask); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + streamTaskStartMonitorCheckRsp(pTask); + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -219,8 +222,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } - - streamTaskStartMonitorCheckRsp(pTask); } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 0dc3682da7..72611f4c14 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -534,7 +534,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); TdThreadMutexAttr attr = {0}; - int code = taosThreadMutexAttrInit(&attr); + + int code = taosThreadMutexAttrInit(&attr); if (code != 0) { stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); return code; @@ -563,6 +564,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); if (pOutputInfo->pDownstreamUpdateList == NULL) { + stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); + if (pTask->taskCheckInfo.pList == NULL) { + stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return TSDB_CODE_OUT_OF_MEMORY; } @@ -943,11 +952,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { } static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { - if (pInfo->pList == NULL) { - pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); - } else { - taosArrayClear(pInfo->pList); - } + taosArrayClear(pInfo->pList); if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { pInfo->notReadyTasks = 1; @@ -1089,9 +1094,12 @@ static void rspMonitorFn(void* param, void* tmrId) { int64_t now = taosGetTimestampMs(); int64_t el = now - pInfo->startTs; ETaskStatus state = pStat->state; + const char* id = pTask->id.idStr; int32_t numOfReady = 0; int32_t numOfFault = 0; - const char* id = pTask->id.idStr; + int32_t numOfNotRsp = 0; + int32_t numOfNotReady = 0; + int32_t numOfTimeout = 0; stDebug("s-task:%s start to do check downstream rsp check", id); @@ -1147,7 +1155,7 @@ static void rspMonitorFn(void* param, void* tmrId) { if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. taosArrayPush(pTimeoutList, &p->taskId); } else { // el < CHECK_NOT_RSP_DURATION - // do nothing and continue waiting for their rsps + numOfNotRsp += 1; // do nothing and continue waiting for their rsp } } else { taosArrayPush(pNotReadyList, &p->taskId); @@ -1158,17 +1166,17 @@ static void rspMonitorFn(void* param, void* tmrId) { stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } - int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); - int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); + numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); + numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); // fault tasks detected, not try anymore - if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && - (numOfFault > 0)) { + ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); + if ((numOfNotRsp == 0) && (numOfFault > 0)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); taosThreadMutexUnlock(&pInfo->checkInfoLock); taosArrayDestroy(pNotReadyList); @@ -1182,9 +1190,9 @@ static void rspMonitorFn(void* param, void* tmrId) { if (pInfo->stopCheckProcess == 1) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( - "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " - "timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, " + "fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); @@ -1241,8 +1249,8 @@ static void rspMonitorFn(void* param, void* tmrId) { taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s continue checking rsp in 300ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, - numOfFault, numOfTimeout, numOfReady); + stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id, + numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList);