fix: rsp type mismatch after link broken is processed
This commit is contained in:
parent
e010d35839
commit
957ed637b0
|
@ -230,6 +230,7 @@ typedef struct SSchTask {
|
||||||
SSchRedirectCtx redirectCtx; // task redirect context
|
SSchRedirectCtx redirectCtx; // task redirect context
|
||||||
bool waitRetry; // wait for retry
|
bool waitRetry; // wait for retry
|
||||||
int32_t execId; // task current execute index
|
int32_t execId; // task current execute index
|
||||||
|
int32_t failedExecId; // last failed task execute index
|
||||||
SSchLevel *level; // level
|
SSchLevel *level; // level
|
||||||
SRWLatch planLock; // task update plan lock
|
SRWLatch planLock; // task update plan lock
|
||||||
SSubplan *plan; // subplan
|
SSubplan *plan; // subplan
|
||||||
|
|
|
@ -34,12 +34,12 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
|
||||||
if (lastMsgType != reqMsgType) {
|
if (lastMsgType != reqMsgType) {
|
||||||
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
|
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
|
||||||
TMSG_INFO(msgType));
|
TMSG_INFO(msgType));
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
|
||||||
}
|
}
|
||||||
if (taskStatus != JOB_TASK_STATUS_PART_SUCC) {
|
if (taskStatus != JOB_TASK_STATUS_PART_SUCC) {
|
||||||
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
|
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
|
||||||
TMSG_INFO(msgType));
|
TMSG_INFO(msgType));
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -60,13 +60,13 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
|
||||||
if (lastMsgType != reqMsgType) {
|
if (lastMsgType != reqMsgType) {
|
||||||
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
|
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
|
||||||
TMSG_INFO(msgType));
|
TMSG_INFO(msgType));
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskStatus != JOB_TASK_STATUS_EXEC) {
|
if (taskStatus != JOB_TASK_STATUS_EXEC) {
|
||||||
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
|
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
|
||||||
TMSG_INFO(msgType));
|
TMSG_INFO(msgType));
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -64,6 +64,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
|
||||||
pTask->plan = pPlan;
|
pTask->plan = pPlan;
|
||||||
pTask->level = pLevel;
|
pTask->level = pLevel;
|
||||||
pTask->execId = -1;
|
pTask->execId = -1;
|
||||||
|
pTask->failedExecId = -2;
|
||||||
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
|
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
|
||||||
pTask->taskId = schGenTaskId();
|
pTask->taskId = schGenTaskId();
|
||||||
|
|
||||||
|
@ -166,7 +167,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
|
||||||
|
|
||||||
schUpdateTaskExecNode(pJob, pTask, handle, execId);
|
schUpdateTaskExecNode(pJob, pTask, handle, execId);
|
||||||
|
|
||||||
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
|
if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it
|
||||||
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
|
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
|
||||||
pTask->execId, pTask->waitRetry);
|
pTask->execId, pTask->waitRetry);
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
|
||||||
|
@ -182,6 +183,8 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
|
||||||
return TSDB_CODE_SCH_IGNORE_ERROR;
|
return TSDB_CODE_SCH_IGNORE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTask->failedExecId = pTask->execId;
|
||||||
|
|
||||||
int8_t jobStatus = 0;
|
int8_t jobStatus = 0;
|
||||||
if (schJobNeedToStop(pJob, &jobStatus)) {
|
if (schJobNeedToStop(pJob, &jobStatus)) {
|
||||||
SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
|
SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
|
||||||
|
|
Loading…
Reference in New Issue