diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index dce7a9aadf..18548db56f 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -129,16 +129,17 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { int32_t code = 0; - SMnode *pMnode = pFsm->data; pMsg->info.conn.applyIndex = pMeta->index; pMsg->info.conn.applyTerm = pMeta->term; - atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); + if (pMsg->code == 0) { + SMnode *pMnode = pFsm->data; + atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); + } if (!syncUtilUserCommit(pMsg->msgType)) { goto _out; } - code = mndProcessWriteMsg(pFsm, pMsg, pMeta); _out: diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f942b3fa49..b49ca70bfa 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -429,24 +429,29 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code); + return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); +} + +static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { if (pMsg->code == 0) { - return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); + return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); } + const STraceId *trace = &pMsg->info.traceId; + SVnode *pVnode = pFsm->data; vnodePostBlockMsg(pVnode, pMsg); + SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; if (rsp.info.handle != NULL) { tmsgSendRsp(&rsp); } + + vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index); rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; return 0; } -static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { - return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); -} - static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { if (pMeta->isWeak == 1) { return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); @@ -539,7 +544,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } } while (true); - ASSERT(appliedIdx == commitIdx); + ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm)); walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true;