add SyncApplyMsg into vmProcessApplyQueue

This commit is contained in:
Minghao Li 2022-04-20 15:02:00 +08:00
parent 9cb2192a87
commit ec73953a0f
2 changed files with 28 additions and 8 deletions

View File

@ -14,6 +14,8 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "sync.h"
#include "syncTools.h"
#include "vmInt.h" #include "vmInt.h"
static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
@ -180,7 +182,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
SRpcMsg rsp; SRpcMsg rsp;
static int64_t version = 0; // static int64_t version = 0;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
#if 1 #if 1
@ -191,11 +193,22 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp.code = 0; rsp.code = 0;
rsp.pCont = NULL; rsp.pCont = NULL;
rsp.contLen = 0; rsp.contLen = 0;
if (vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, version++, &rsp) < 0) {
// get original rpc msg
assert(pMsg->rpcMsg.msgType == TDMT_VND_SYNC_APPLY_MSG);
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(&pMsg->rpcMsg);
syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg);
SRpcMsg originalRpcMsg;
syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg);
if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) {
rsp.code = terrno; rsp.code = terrno;
dTrace("vnodeProcessWriteReq error, code:%d", terrno); dTrace("vnodeProcessWriteReq error, code:%d", terrno);
} }
syncApplyMsgDestroy(pSyncApplyMsg);
rpcFreeCont(originalRpcMsg.pCont);
if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
rsp.ahandle = pMsg->rpcMsg.ahandle; rsp.ahandle = pMsg->rpcMsg.ahandle;
rsp.handle = pMsg->rpcMsg.handle; rsp.handle = pMsg->rpcMsg.handle;

View File

@ -107,14 +107,21 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SVnode *pVnode = (SVnode *)(pFsm->data);
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta);
SRpcMsg applyMsg;
syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg);
syncApplyMsgDestroy(pSyncApplyMsg);
/*
SRpcMsg applyMsg; SRpcMsg applyMsg;
applyMsg = *pMsg; applyMsg = *pMsg;
applyMsg.pCont = rpcMallocCont(applyMsg.contLen); applyMsg.pCont = rpcMallocCont(applyMsg.contLen);
assert(applyMsg.contLen == pMsg->contLen); assert(applyMsg.contLen == pMsg->contLen);
memcpy(applyMsg.pCont, pMsg->pCont, applyMsg.contLen); memcpy(applyMsg.pCont, pMsg->pCont, applyMsg.contLen);
*/
// recover handle for response // recover handle for response
SVnode *pVnode = (SVnode *)(pFsm->data);
SRpcMsg saveRpcMsg; SRpcMsg saveRpcMsg;
int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg); int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg);
if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) { if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) {