fix(vnd): check return value.

This commit is contained in:
Haojun Liao 2024-10-10 10:52:48 +08:00
parent a68da3c91c
commit f70321ee53
1 changed files with 13 additions and 14 deletions

View File

@ -640,40 +640,39 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
} }
} break; } break;
case TDMT_STREAM_TASK_DROP: { case TDMT_STREAM_TASK_DROP: {
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { if ((code = tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_TASK_UPDATE_CHKPT: { case TDMT_STREAM_TASK_UPDATE_CHKPT: {
if (tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { if ((code = tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_CONSEN_CHKPT: { case TDMT_STREAM_CONSEN_CHKPT: {
if (pVnode->restored) { if (pVnode->restored && (code = tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg)) < 0) {
if (tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg) < 0) { goto _err;
goto _err;
}
} }
} break; } break;
case TDMT_STREAM_TASK_PAUSE: { case TDMT_STREAM_TASK_PAUSE: {
if (pVnode->restored && vnodeIsLeader(pVnode) && if (pVnode->restored && vnodeIsLeader(pVnode) &&
tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { (code = tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_TASK_RESUME: { case TDMT_STREAM_TASK_RESUME: {
if (pVnode->restored && vnodeIsLeader(pVnode) && if (pVnode->restored && vnodeIsLeader(pVnode) &&
tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { (code = tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_VND_STREAM_TASK_RESET: { case TDMT_VND_STREAM_TASK_RESET: {
if (pVnode->restored && vnodeIsLeader(pVnode)) { if (pVnode->restored && vnodeIsLeader(pVnode) &&
if (tqProcessTaskResetReq(pVnode->pTq, pMsg) < 0) { (code = tqProcessTaskResetReq(pVnode->pTq, pMsg)) < 0) {
goto _err; goto _err;
} }
}
} break; } break;
case TDMT_VND_ALTER_CONFIRM: case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange; needCommit = pVnode->config.hashChange;
@ -693,10 +692,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
case TDMT_VND_DROP_INDEX: case TDMT_VND_DROP_INDEX:
vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp); vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
break; break;
case TDMT_VND_STREAM_CHECK_POINT_SOURCE: case TDMT_VND_STREAM_CHECK_POINT_SOURCE: // always return true
tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp); tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp);
break; break;
case TDMT_VND_STREAM_TASK_UPDATE: case TDMT_VND_STREAM_TASK_UPDATE: // always return true
tqProcessTaskUpdateReq(pVnode->pTq, pMsg); tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
break; break;
case TDMT_VND_COMPACT: case TDMT_VND_COMPACT:
@ -752,7 +751,7 @@ _exit:
_err: _err:
vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
tstrerror(terrno), ver); tstrerror(code), ver);
return code; return code;
} }