From cd6261c8d4c4d8d3881555f150e323c3ba6cdaf9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:49:11 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckStatus.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index ea9b2ef89f..8b9de2df4c 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -35,6 +35,7 @@ static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId); static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); +static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId); static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); // check status @@ -524,7 +525,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { // the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread. // The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution // of restart in timer thread will result in a dead lock. -static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { +int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -614,8 +615,8 @@ void rspMonitorFn(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); @@ -630,9 +631,9 @@ void rspMonitorFn(void* param, void* tmrId) { if (pInfo->stopCheckProcess == 1) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( - "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, " - "fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, " + "notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock);