Merge pull request #15436 from taosdata/feature/3.0_mhli
refactor(sync): pre-commit integration
This commit is contained in:
commit
6207eef28c
|
@ -501,12 +501,13 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
|
if (cbMeta.isWeak == 0) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
|
vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
|
||||||
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
||||||
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
if (cbMeta.code == 0 && cbMeta.isWeak == 0) {
|
if (cbMeta.code == 0) {
|
||||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||||
|
@ -516,21 +517,24 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
||||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||||
} else {
|
} else {
|
||||||
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
|
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
|
||||||
vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), pMsg->msgType,
|
vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
|
||||||
TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
|
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
|
||||||
if (rsp.info.handle != NULL) {
|
if (rsp.info.handle != NULL) {
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
|
if (cbMeta.isWeak == 1) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
|
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64
|
||||||
|
", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
|
||||||
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
||||||
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
if (cbMeta.code == 0 && cbMeta.isWeak == 1) {
|
if (cbMeta.code == 0) {
|
||||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||||
|
@ -547,6 +551,7 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
|
|
Loading…
Reference in New Issue