From 19b0de6582343bb1ef218eb1db48decf5950f991 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 13:49:40 +0800 Subject: [PATCH] fix(stream): add task node into update list if it is timeout for more than 100sec. --- include/libs/stream/tstream.h | 3 +- source/libs/stream/src/streamCheckStatus.c | 153 +++++++++++++-------- 2 files changed, 95 insertions(+), 61 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 260c1c54d2..48a6d046ea 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -435,6 +435,7 @@ typedef struct SUpstreamInfo { typedef struct SDownstreamStatusInfo { int64_t reqId; int32_t taskId; + int32_t vgId; int64_t rspTs; int32_t status; } SDownstreamStatusInfo; @@ -847,7 +848,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id); +int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id); int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, int32_t* pNotReady, const char* id); void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 42bf334fa3..00a78b7f81 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -27,15 +27,17 @@ static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInf static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id); static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p); +static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId); + static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); - static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); // check status void streamTaskCheckDownstream(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; + const char* idstr = pTask->id.idStr; SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -51,15 +53,14 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { streamTaskStartMonitorCheckRsp(pTask); - req.reqId = tGenIdPI64(); - req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; - req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; + STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; - streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); + setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId); + streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, + idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); @@ -71,24 +72,22 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { int32_t numOfVgs = taosArrayGetSize(vgInfo); stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, - pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); + idstr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.reqId = tGenIdPI64(); - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; - streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); + setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId); + streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); + idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { // for sink task, set it ready directly. - stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); + stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId); + streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr); processDownstreamReadyRsp(pTask); } } @@ -158,8 +157,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) { - SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0}; +int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) { + SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0}; taosThreadMutexLock(&pInfo->checkInfoLock); @@ -447,6 +446,78 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i } } +void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) { + pReq->reqId = reqId; + pReq->downstreamTaskId = dstTaskId; + pReq->downstreamNodeId = dstNodeId; +} + +void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); + + ASSERT(pTask->status.downstreamReady == 0); + + for (int32_t i = 0; i < numOfTimeout; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + ASSERT(p->status == -1 && p->rspTs == 0); + doSendCheckMsg(pTask, p); + } + } + + pInfo->timeoutRetryCount += 1; + + // timeout more than 100 sec, add into node update list + if (pInfo->timeoutRetryCount > 10) { + pInfo->timeoutRetryCount = 0; + + for(int32_t i = 0; i < numOfTimeout; ++i) { + int32_t taskId = *(int32_t*) taosArrayGet(pTimeoutList, i); + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + addIntoNodeUpdateList(pTask, p->vgId); + stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list", + id, vgId, p->taskId, p->vgId); + } + } + + stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout); + } else { + stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, + vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); + } +} + +void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + int32_t numOfNotReady = taosArrayGetSize(pNotReadyList); + + ASSERT(pTask->status.downstreamReady == 0); + + // reset the info, and send the check msg to failure downstream again + for (int32_t i = 0; i < numOfNotReady; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + p->rspTs = 0; + p->status = -1; + doSendCheckMsg(pTask, p); + } + } + + pInfo->notReadyRetryCount += 1; + stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id, + vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); +} + void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; SStreamTaskState* pStat = streamTaskGetStatus(pTask); @@ -461,6 +532,7 @@ void rspMonitorFn(void* param, void* tmrId) { int32_t numOfNotRsp = 0; int32_t numOfNotReady = 0; int32_t numOfTimeout = 0; + int32_t total = taosArrayGetSize(pInfo->pList); stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id); @@ -510,7 +582,7 @@ void rspMonitorFn(void* param, void* tmrId) { numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); // fault tasks detected, not try anymore - ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); + ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total); if (numOfFault > 0) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( @@ -550,57 +622,18 @@ void rspMonitorFn(void* param, void* tmrId) { } if (numOfNotReady > 0) { // check to make sure not in recheck timer - ASSERT(pTask->status.downstreamReady == 0); - - // reset the info, and send the check msg to failure downstream again - for (int32_t i = 0; i < numOfNotReady; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - p->rspTs = 0; - p->status = -1; - doSendCheckMsg(pTask, p); - } - } - - pInfo->notReadyRetryCount += 1; - stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id, - numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); + handleNotReadyDownstreamTask(pTask, pNotReadyList); } - // todo add into node update list and send to mnode if (numOfTimeout > 0) { - ASSERT(pTask->status.downstreamReady == 0); - - for (int32_t i = 0; i < numOfTimeout; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - ASSERT(p->status == -1 && p->rspTs == 0); - doSendCheckMsg(pTask, p); - } - } - - pInfo->timeoutRetryCount += 1; - - // timeout more than 100 sec, add into node update list - if (pInfo->timeoutRetryCount > 10) { - pInfo->timeoutRetryCount = 0; - stDebug("s-task:%s vgId:%d %d downstream task(s) timeout more than 100sec, add into nodeUpate list", id, vgId, - numOfTimeout); - } else { - stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, - vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); - } + handleTimeoutDownstreamTasks(pTask, pTimeoutList); } taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id, - numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); + stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", + id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList);