fix(sync): if msg commit, put it into apply-queue, do not care return code
This commit is contained in:
parent
01e8b03bf0
commit
e43794c366
|
@ -298,35 +298,24 @@ static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot)
|
||||||
static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
|
|
||||||
if (pMeta->code == 0) {
|
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
rpcMsg.info = pMsg->info;
|
||||||
rpcMsg.info = pMsg->info;
|
rpcMsg.info.conn.applyIndex = pMeta->index;
|
||||||
rpcMsg.info.conn.applyIndex = pMeta->index;
|
rpcMsg.info.conn.applyTerm = pMeta->term;
|
||||||
rpcMsg.info.conn.applyTerm = pMeta->term;
|
|
||||||
|
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
|
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
|
||||||
", weak:%d, code:%d, state:%d %s, type:%s",
|
", weak:%d, code:%d, state:%d %s, type:%s",
|
||||||
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak,
|
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, pMeta->code,
|
||||||
pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
|
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||||
} else {
|
|
||||||
SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info};
|
|
||||||
vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", pVnode->config.vgId,
|
|
||||||
TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code));
|
|
||||||
if (rsp.info.handle != NULL) {
|
|
||||||
tmsgSendRsp(&rsp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
if (pMeta->isWeak == 0) {
|
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
||||||
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
|
|
Loading…
Reference in New Issue