enh: msg type and task status validate

This commit is contained in:
dapan1121 2022-07-06 10:13:17 +08:00
parent 641531bc6d
commit 3a17619046
1 changed files with 9 additions and 26 deletions

View File

@ -22,43 +22,28 @@
#include "trpc.h"
int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = pTask->lastMsgType;
int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
int32_t reqMsgType = msgType - 1;
int32_t reqMsgType = (msgType & 1U) ? msgType : (msgType - 1);
switch (msgType) {
case TDMT_SCH_LINK_BROKEN:
case TDMT_SCH_EXPLAIN_RSP:
return TSDB_CODE_SUCCESS;
case TDMT_SCH_MERGE_QUERY_RSP:
case TDMT_SCH_QUERY_RSP: // query_rsp may be processed later than ready_rsp
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
}
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
}
//SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
case TDMT_SCH_FETCH_RSP:
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
if (lastMsgType != reqMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
}
if (taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
//SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
case TDMT_SCH_MERGE_QUERY_RSP:
case TDMT_SCH_QUERY_RSP:
case TDMT_VND_CREATE_TABLE_RSP:
case TDMT_VND_DROP_TABLE_RSP:
case TDMT_VND_ALTER_TABLE_RSP:
@ -76,14 +61,12 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
if (taskStatus != JOB_TASK_STATUS_EXEC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
//SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
}
@ -97,7 +80,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId));
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) {