From efeef24f1c01e600c87f94941f0b6d1bfa104405 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 5 Jul 2022 21:42:07 +0800 Subject: [PATCH] refactor: adjust vnode propose msg --- source/dnode/vnode/src/vnd/vnodeSync.c | 314 ++++++++++++------------- 1 file changed, 145 insertions(+), 169 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 7587b7a09f..add8c6069a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -217,171 +217,149 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { } int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - int32_t ret = 0; + int32_t code = 0; + const STraceId *trace = &pMsg->info.traceId; - if (syncEnvIsStart()) { - SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); - assert(pSyncNode != NULL); - - SMsgHead *pHead = pMsg->pCont; - STraceId *trace = &pMsg->info.traceId; - - do { - char *syncNodeStr = sync2SimpleStr(pVnode->sync); - static int64_t vndTick = 0; - if (++vndTick % 10 == 1) { - vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); - } - if (gRaftDetailLog) { - char logBuf[512] = {0}; - snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, - syncNodeStr); - syncRpcMsgLog2(logBuf, pMsg); - } - taosMemoryFree(syncNodeStr); - } while (0); - - SRpcMsg *pRpcMsg = pMsg; - - // ToDo: ugly! use function pointer - // use different strategy - if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { - if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { - SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); - syncAppendEntriesDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { - ret = vnodeSetStandBy(pVnode); - if (ret != 0 && terrno != 0) ret = terrno; - SRpcMsg rsp = {.code = ret, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else { - vError("==vnodeProcessSyncMsg== error msg type:%d", pRpcMsg->msgType); - ret = -1; - } - - } else { - // use wal first strategy - - if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { - SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); - syncClientRequestBatchDestroyDeep(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) { - SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg); - syncAppendEntriesBatchDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); - ASSERT(pSyncMsg != NULL); - ret = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { - ret = vnodeSetStandBy(pVnode); - if (ret != 0 && terrno != 0) ret = terrno; - SRpcMsg rsp = {.code = ret, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else { - vError("==vnodeProcessSyncMsg== error msg type:%d", pRpcMsg->msgType); - ret = -1; - } - } - - syncNodeRelease(pSyncNode); - } else { - vError("==vnodeProcessSyncMsg== error syncEnv stop"); - ret = -1; + if (!syncEnvIsStart()) { + vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId); + terrno = TSDB_CODE_APP_ERROR; + return -1; } - if (ret != 0 && terrno == 0) { + SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); + if (pSyncNode == NULL) { + vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + +#if 1 + char *syncNodeStr = sync2SimpleStr(pVnode->sync); + static int64_t vndTick = 0; + if (++vndTick % 10 == 1) { + vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); + } + if (gRaftDetailLog) { + char logBuf[512] = {0}; + snprintf(logBuf, sizeof(logBuf), "vnode process syncmsg, msgType:%d, syncNode:%s", pMsg->msgType, syncNodeStr); + syncRpcMsgLog2(logBuf, pMsg); + } + taosMemoryFree(syncNodeStr); +#endif + + if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { + if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_PING) { + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnPingCb(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); + syncClientRequestDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { + SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { + code = vnodeSetStandBy(pVnode); + if (code != 0 && terrno != 0) code = terrno; + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + tmsgSendRsp(&rsp); + } else { + vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType); + code = -1; + } + } else { + // use wal first strategy + if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_PING) { + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnPingCb(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); + syncClientRequestDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { + SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); + syncClientRequestBatchDestroyDeep(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) { + SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg); + syncAppendEntriesBatchDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { + code = vnodeSetStandBy(pVnode); + if (code != 0 && terrno != 0) code = terrno; + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + tmsgSendRsp(&rsp); + } else { + vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType); + code = -1; + } + } + + syncNodeRelease(pSyncNode); + if (code != 0 && terrno == 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } - return ret; + return code; } static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { @@ -414,7 +392,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info); rpcMsg.info.conn.applyIndex = cbMeta.index; - STraceId *trace = (STraceId *)&pMsg->info.traceId; + const STraceId *trace = (STraceId *)&pMsg->info.traceId; vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle); if (rpcMsg.info.handle != NULL) { @@ -431,9 +409,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), - "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - beginIndex); + "commitCb execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; @@ -446,16 +423,15 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), "preCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), "rollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); }