Merge pull request #29107 from taosdata/fix/liaohj

fix(stream): record the failure of dispatch msg, and set the update nodeId
This commit is contained in:
Shengliang Guan 2024-12-17 17:41:45 +08:00 committed by GitHub
commit cdfd618157
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 21 additions and 12 deletions

View File

@ -207,7 +207,6 @@ typedef struct {
typedef struct { typedef struct {
int32_t nodeId; int32_t nodeId;
SEpSet epset;
} SDownstreamTaskEpset; } SDownstreamTaskEpset;
typedef enum { typedef enum {

View File

@ -1430,7 +1430,7 @@ int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, cons
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo));
pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {

View File

@ -200,6 +200,7 @@ void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask); int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo); void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo);
int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo); void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo);
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,

View File

@ -21,7 +21,6 @@
#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec #define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec
static void processDownstreamReadyRsp(SStreamTask* pTask); static void processDownstreamReadyRsp(SStreamTask* pTask);
static int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
static void rspMonitorFn(void* param, void* tmrId); static void rspMonitorFn(void* param, void* tmrId);
static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
@ -226,13 +225,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart", ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
code = addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else { } else {
stError( stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed", "downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
code = addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
} }
streamMetaAddFailedTaskSelf(pTask, now); streamMetaAddFailedTaskSelf(pTask, now);
@ -373,11 +372,10 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
} }
} }
int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0; int32_t code = 0;
; bool existed = false;
bool existed = false;
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
@ -675,8 +673,8 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
SDownstreamStatusInfo* p = NULL; SDownstreamStatusInfo* p = NULL;
findCheckRspStatus(pInfo, *pTaskId, &p); findCheckRspStatus(pInfo, *pTaskId, &p);
if (p != NULL) { if (p != NULL) {
code = addIntoNodeUpdateList(pTask, p->vgId); code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list", stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpdate list",
id, vgId, p->taskId, p->vgId); id, vgId, p->taskId, p->vgId);
} }
} }
@ -717,7 +715,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread. // the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution // The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
// of restart in timer thread will result in a dead lock. // of restart in timer thread will result in a deadlock.
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK); return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK);
} }

View File

@ -1552,7 +1552,6 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t
int32_t* pFailed, const char* id) { int32_t* pFailed, const char* id) {
int32_t numOfRsp = 0; int32_t numOfRsp = 0;
int32_t numOfFailed = 0; int32_t numOfFailed = 0;
bool allRsp = false; bool allRsp = false;
int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo); int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo);
@ -1639,6 +1638,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t notRsp = 0; int32_t notRsp = 0;
int32_t numOfFailed = 0; int32_t numOfFailed = 0;
bool triggerDispatchRsp = false; bool triggerDispatchRsp = false;
bool addFailure = false;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t tmpCheckpointId = -1; int64_t tmpCheckpointId = -1;
int32_t tmpTranId = -1; int32_t tmpTranId = -1;
@ -1698,6 +1698,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} else { } else {
if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
// todo handle the role-changed during checkpoint generation, add test case // todo handle the role-changed during checkpoint generation, add test case
addFailure = true;
stError( stError(
"s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or " "s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or "
"restart already, treat it as success", "restart already, treat it as success",
@ -1745,6 +1746,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code)); msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
} }
if (addFailure) { // add failure downstream node id, and start the nodeEp update procedure
// ignore the return error and continue
int32_t unused = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}
// all msg rsp already, continue // all msg rsp already, continue
// we need to re-try send dispatch msg to downstream tasks // we need to re-try send dispatch msg to downstream tasks
if (allRsp && (numOfFailed == 0)) { if (allRsp && (numOfFailed == 0)) {
@ -1866,6 +1872,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
} }
} }
#if 0
// inject errors, and always refuse the upstream dispatch msg and trigger the task nodeEpset update trans.
status = TASK_INPUT_STATUS__REFUSED;
#endif
{ {
// do send response with the input status // do send response with the input status
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);