fix(stream): record the failure of dispatch msg, and set the update node id.

This commit is contained in:
Haojun Liao 2024-12-10 17:58:53 +08:00
parent b715df1ac4
commit aa12038fc4
4 changed files with 14 additions and 11 deletions

View File

@ -207,7 +207,6 @@ typedef struct {
typedef struct {
int32_t nodeId;
SEpSet epset;
} SDownstreamTaskEpset;
typedef enum {

View File

@ -201,6 +201,7 @@ void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo);
int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo);
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,

View File

@ -21,7 +21,6 @@
#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec
static void processDownstreamReadyRsp(SStreamTask* pTask);
static int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
static void rspMonitorFn(void* param, void* tmrId);
static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
@ -226,13 +225,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
code = addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else {
stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
code = addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}
streamMetaAddFailedTaskSelf(pTask, now);
@ -373,11 +372,10 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
}
}
int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0;
;
bool existed = false;
bool existed = false;
streamMutexLock(&pTask->lock);
@ -675,8 +673,8 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
SDownstreamStatusInfo* p = NULL;
findCheckRspStatus(pInfo, *pTaskId, &p);
if (p != NULL) {
code = addIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpdate list",
id, vgId, p->taskId, p->vgId);
}
}
@ -717,7 +715,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
// of restart in timer thread will result in a dead lock.
// of restart in timer thread will result in a deadlock.
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK);
}

View File

@ -1552,7 +1552,6 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t
int32_t* pFailed, const char* id) {
int32_t numOfRsp = 0;
int32_t numOfFailed = 0;
bool allRsp = false;
int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo);
@ -1639,6 +1638,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t notRsp = 0;
int32_t numOfFailed = 0;
bool triggerDispatchRsp = false;
bool addFailure = false;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t tmpCheckpointId = -1;
int32_t tmpTranId = -1;
@ -1698,6 +1698,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} else {
if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
// todo handle the role-changed during checkpoint generation, add test case
addFailure = true;
stError(
"s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or "
"restart already, treat it as success",
@ -1745,6 +1746,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
}
if (addFailure) { // add failure downstream node id, and start the nodeEp update procedure
streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}
// all msg rsp already, continue
// we need to re-try send dispatch msg to downstream tasks
if (allRsp && (numOfFailed == 0)) {