enh: update pMnode->applied only when pMsg->code as zero
This commit is contained in:
parent
8139b725d2
commit
d679eaf648
|
@ -129,16 +129,17 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
|
||||||
|
|
||||||
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SMnode *pMnode = pFsm->data;
|
|
||||||
pMsg->info.conn.applyIndex = pMeta->index;
|
pMsg->info.conn.applyIndex = pMeta->index;
|
||||||
pMsg->info.conn.applyTerm = pMeta->term;
|
pMsg->info.conn.applyTerm = pMeta->term;
|
||||||
|
|
||||||
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
|
if (pMsg->code == 0) {
|
||||||
|
SMnode *pMnode = pFsm->data;
|
||||||
|
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
|
||||||
|
}
|
||||||
|
|
||||||
if (!syncUtilUserCommit(pMsg->msgType)) {
|
if (!syncUtilUserCommit(pMsg->msgType)) {
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
|
code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
|
|
|
@ -429,24 +429,29 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
|
||||||
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
|
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
|
||||||
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
|
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
|
||||||
|
|
||||||
|
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
if (pMsg->code == 0) {
|
if (pMsg->code == 0) {
|
||||||
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
|
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
SVnode *pVnode = pFsm->data;
|
||||||
vnodePostBlockMsg(pVnode, pMsg);
|
vnodePostBlockMsg(pVnode, pMsg);
|
||||||
|
|
||||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||||
if (rsp.info.handle != NULL) {
|
if (rsp.info.handle != NULL) {
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
pMsg->pCont = NULL;
|
pMsg->pCont = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
|
||||||
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
if (pMeta->isWeak == 1) {
|
if (pMeta->isWeak == 1) {
|
||||||
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
||||||
|
@ -539,7 +544,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
}
|
}
|
||||||
} while (true);
|
} while (true);
|
||||||
|
|
||||||
ASSERT(appliedIdx == commitIdx);
|
ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
|
||||||
walApplyVer(pVnode->pWal, commitIdx);
|
walApplyVer(pVnode->pWal, commitIdx);
|
||||||
|
|
||||||
pVnode->restored = true;
|
pVnode->restored = true;
|
||||||
|
|
Loading…
Reference in New Issue