From ec73953a0f518e9259bcd55e539bbb03f96efdb9 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Apr 2022 15:02:00 +0800 Subject: [PATCH] add SyncApplyMsg into vmProcessApplyQueue --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 17 +++++++++++++++-- source/dnode/vnode/src/vnd/vnodeSync.c | 19 +++++++++++++------ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index a53127afce..9e42f8dfb6 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -14,6 +14,8 @@ */ #define _DEFAULT_SOURCE +#include "sync.h" +#include "syncTools.h" #include "vmInt.h" static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { @@ -180,7 +182,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SNodeMsg *pMsg = NULL; SRpcMsg rsp; - static int64_t version = 0; + // static int64_t version = 0; for (int32_t i = 0; i < numOfMsgs; ++i) { #if 1 @@ -191,11 +193,22 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.code = 0; rsp.pCont = NULL; rsp.contLen = 0; - if (vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, version++, &rsp) < 0) { + + // get original rpc msg + assert(pMsg->rpcMsg.msgType == TDMT_VND_SYNC_APPLY_MSG); + SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(&pMsg->rpcMsg); + syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg); + SRpcMsg originalRpcMsg; + syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg); + + if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) { rsp.code = terrno; dTrace("vnodeProcessWriteReq error, code:%d", terrno); } + syncApplyMsgDestroy(pSyncApplyMsg); + rpcFreeCont(originalRpcMsg.pCont); + if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { rsp.ahandle = pMsg->rpcMsg.ahandle; rsp.handle = pMsg->rpcMsg.handle; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 4db483851e..93c40aa019 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -107,14 +107,21 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); - SRpcMsg applyMsg; - applyMsg = *pMsg; - applyMsg.pCont = rpcMallocCont(applyMsg.contLen); - assert(applyMsg.contLen == pMsg->contLen); - memcpy(applyMsg.pCont, pMsg->pCont, applyMsg.contLen); + SVnode *pVnode = (SVnode *)(pFsm->data); + SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta); + SRpcMsg applyMsg; + syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg); + syncApplyMsgDestroy(pSyncApplyMsg); + + /* + SRpcMsg applyMsg; + applyMsg = *pMsg; + applyMsg.pCont = rpcMallocCont(applyMsg.contLen); + assert(applyMsg.contLen == pMsg->contLen); + memcpy(applyMsg.pCont, pMsg->pCont, applyMsg.contLen); + */ // recover handle for response - SVnode *pVnode = (SVnode *)(pFsm->data); SRpcMsg saveRpcMsg; int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg); if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) {