From 88af03096652e571c5a292351d3bbf731d6c58ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 13:53:41 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckStatus.c | 41 ++++++++++------------ 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 00a78b7f81..1135878929 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -18,7 +18,7 @@ #include "streamBackendRocksdb.h" #include "streamInt.h" -#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec +#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec static void processDownstreamReadyRsp(SStreamTask* pTask); static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); @@ -29,15 +29,15 @@ static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, cons static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p); static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId); -static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, - int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); +static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, + int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); // check status void streamTaskCheckDownstream(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; - const char* idstr = pTask->id.idStr; + const char* idstr = pTask->id.idStr; SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -60,8 +60,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64, - idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, - pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); + idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, + pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); @@ -71,8 +71,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); - stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, - idstr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); + stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr, + numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); @@ -181,7 +181,6 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { taosThreadMutexLock(&pInfo->checkInfoLock); int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { - taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; } @@ -306,10 +305,8 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); if (p != NULL) { - if (reqId != p->reqId) { - stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 - " expired check-rsp recv from downstream task:0x%x, discarded", + stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", id, reqId, p->reqId, taskId); taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; @@ -340,7 +337,8 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { pInfo->inCheckProcess = 1; } else { ASSERT(pInfo->startTs > 0); - stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs); + stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id, + pInfo->startTs); return TSDB_CODE_FAILED; } @@ -388,9 +386,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { STaskOutputInfo* pOutputInfo = &pTask->outputInfo; if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { - req.reqId = p->reqId; - req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId; - req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId; + STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher; + setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId); + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); @@ -403,12 +401,10 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); if (p->taskId == pVgInfo->taskId) { - req.reqId = p->reqId; - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; + setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 - " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, + " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); @@ -422,7 +418,6 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); if (p->status == TASK_DOWNSTREAM_READY) { @@ -476,8 +471,8 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { if (pInfo->timeoutRetryCount > 10) { pInfo->timeoutRetryCount = 0; - for(int32_t i = 0; i < numOfTimeout; ++i) { - int32_t taskId = *(int32_t*) taosArrayGet(pTimeoutList, i); + for (int32_t i = 0; i < numOfTimeout; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); if (p != NULL) { addIntoNodeUpdateList(pTask, p->vgId);