fix(stream): add task node into update list if it is timeout for more than 100sec.

This commit is contained in:
Haojun Liao 2024-04-26 13:49:40 +08:00
parent e5069669cb
commit 19b0de6582
2 changed files with 95 additions and 61 deletions

View File

@ -435,6 +435,7 @@ typedef struct SUpstreamInfo {
typedef struct SDownstreamStatusInfo { typedef struct SDownstreamStatusInfo {
int64_t reqId; int64_t reqId;
int32_t taskId; int32_t taskId;
int32_t vgId;
int64_t rspTs; int64_t rspTs;
int32_t status; int32_t status;
} SDownstreamStatusInfo; } SDownstreamStatusInfo;
@ -847,7 +848,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask); bool streamTaskIsSinkTask(const SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id); int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
int32_t* pNotReady, const char* id); int32_t* pNotReady, const char* id);
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);

View File

@ -27,15 +27,17 @@ static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInf
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id); static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p); static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
// check status // check status
void streamTaskCheckDownstream(SStreamTask* pTask) { void streamTaskCheckDownstream(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange; SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window; STimeWindow* pWindow = &pRange->window;
const char* idstr = pTask->id.idStr;
SStreamTaskCheckReq req = { SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId, .streamId = pTask->id.streamId,
@ -51,15 +53,14 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask); streamTaskStartMonitorCheckRsp(pTask);
req.reqId = tGenIdPI64(); STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId);
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64, " window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
@ -71,24 +72,22 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
int32_t numOfVgs = taosArrayGetSize(vgInfo); int32_t numOfVgs = taosArrayGetSize(vgInfo);
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); idstr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
for (int32_t i = 0; i < numOfVgs; i++) { for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.reqId = tGenIdPI64();
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64, " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
} else { // for sink task, set it ready directly. } else { // for sink task, set it ready directly.
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
processDownstreamReadyRsp(pTask); processDownstreamReadyRsp(pTask);
} }
} }
@ -158,8 +157,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return 0; return 0;
} }
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) { int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0}; SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
taosThreadMutexLock(&pInfo->checkInfoLock); taosThreadMutexLock(&pInfo->checkInfoLock);
@ -447,6 +446,78 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i
} }
} }
void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) {
pReq->reqId = reqId;
pReq->downstreamTaskId = dstTaskId;
pReq->downstreamNodeId = dstNodeId;
}
void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
ASSERT(pTask->status.downstreamReady == 0);
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
ASSERT(p->status == -1 && p->rspTs == 0);
doSendCheckMsg(pTask, p);
}
}
pInfo->timeoutRetryCount += 1;
// timeout more than 100 sec, add into node update list
if (pInfo->timeoutRetryCount > 10) {
pInfo->timeoutRetryCount = 0;
for(int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*) taosArrayGet(pTimeoutList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
addIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
id, vgId, p->taskId, p->vgId);
}
}
stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
} else {
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
}
}
void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
int32_t numOfNotReady = taosArrayGetSize(pNotReadyList);
ASSERT(pTask->status.downstreamReady == 0);
// reset the info, and send the check msg to failure downstream again
for (int32_t i = 0; i < numOfNotReady; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
p->rspTs = 0;
p->status = -1;
doSendCheckMsg(pTask, p);
}
}
pInfo->notReadyRetryCount += 1;
stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
}
void rspMonitorFn(void* param, void* tmrId) { void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param; SStreamTask* pTask = param;
SStreamTaskState* pStat = streamTaskGetStatus(pTask); SStreamTaskState* pStat = streamTaskGetStatus(pTask);
@ -461,6 +532,7 @@ void rspMonitorFn(void* param, void* tmrId) {
int32_t numOfNotRsp = 0; int32_t numOfNotRsp = 0;
int32_t numOfNotReady = 0; int32_t numOfNotReady = 0;
int32_t numOfTimeout = 0; int32_t numOfTimeout = 0;
int32_t total = taosArrayGetSize(pInfo->pList);
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id); stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
@ -510,7 +582,7 @@ void rspMonitorFn(void* param, void* tmrId) {
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
// fault tasks detected, not try anymore // fault tasks detected, not try anymore
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total);
if (numOfFault > 0) { if (numOfFault > 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug( stDebug(
@ -550,57 +622,18 @@ void rspMonitorFn(void* param, void* tmrId) {
} }
if (numOfNotReady > 0) { // check to make sure not in recheck timer if (numOfNotReady > 0) { // check to make sure not in recheck timer
ASSERT(pTask->status.downstreamReady == 0); handleNotReadyDownstreamTask(pTask, pNotReadyList);
// reset the info, and send the check msg to failure downstream again
for (int32_t i = 0; i < numOfNotReady; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
p->rspTs = 0;
p->status = -1;
doSendCheckMsg(pTask, p);
}
}
pInfo->notReadyRetryCount += 1;
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
} }
// todo add into node update list and send to mnode
if (numOfTimeout > 0) { if (numOfTimeout > 0) {
ASSERT(pTask->status.downstreamReady == 0); handleTimeoutDownstreamTasks(pTask, pTimeoutList);
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
ASSERT(p->status == -1 && p->rspTs == 0);
doSendCheckMsg(pTask, p);
}
}
pInfo->timeoutRetryCount += 1;
// timeout more than 100 sec, add into node update list
if (pInfo->timeoutRetryCount > 10) {
pInfo->timeoutRetryCount = 0;
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout more than 100sec, add into nodeUpate list", id, vgId,
numOfTimeout);
} else {
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
}
} }
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id, stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList); taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList); taosArrayDestroy(pTimeoutList);