refactor: adjust vnode propose msg

This commit is contained in:
Shengliang Guan 2022-07-05 21:42:07 +08:00
parent 3087208dc9
commit efeef24f1c
1 changed files with 145 additions and 169 deletions

View File

@ -217,16 +217,23 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
} }
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = 0; int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId;
if (!syncEnvIsStart()) {
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId);
terrno = TSDB_CODE_APP_ERROR;
return -1;
}
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL); if (pSyncNode == NULL) {
vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
SMsgHead *pHead = pMsg->pCont; #if 1
STraceId *trace = &pMsg->info.traceId;
do {
char *syncNodeStr = sync2SimpleStr(pVnode->sync); char *syncNodeStr = sync2SimpleStr(pVnode->sync);
static int64_t vndTick = 0; static int64_t vndTick = 0;
if (++vndTick % 10 == 1) { if (++vndTick % 10 == 1) {
@ -234,154 +241,125 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
if (gRaftDetailLog) { if (gRaftDetailLog) {
char logBuf[512] = {0}; char logBuf[512] = {0};
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, snprintf(logBuf, sizeof(logBuf), "vnode process syncmsg, msgType:%d, syncNode:%s", pMsg->msgType, syncNodeStr);
syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg); syncRpcMsgLog2(logBuf, pMsg);
} }
taosMemoryFree(syncNodeStr); taosMemoryFree(syncNodeStr);
} while (0); #endif
SRpcMsg *pRpcMsg = pMsg;
// ToDo: ugly! use function pointer
// use different strategy
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg); syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg); syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { code = vnodeSetStandBy(pVnode);
ret = vnodeSetStandBy(pVnode); if (code != 0 && terrno != 0) code = terrno;
if (ret != 0 && terrno != 0) ret = terrno; SRpcMsg rsp = {.code = code, .info = pMsg->info};
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else { } else {
vError("==vnodeProcessSyncMsg== error msg type:%d", pRpcMsg->msgType); vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
ret = -1; code = -1;
} }
} else { } else {
// use wal first strategy // use wal first strategy
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
syncClientRequestBatchDestroyDeep(pSyncMsg); syncClientRequestBatchDestroyDeep(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg); syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) { SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg);
SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg);
syncAppendEntriesBatchDestroy(pSyncMsg); syncAppendEntriesBatchDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { code = vnodeSetStandBy(pVnode);
ret = vnodeSetStandBy(pVnode); if (code != 0 && terrno != 0) code = terrno;
if (ret != 0 && terrno != 0) ret = terrno; SRpcMsg rsp = {.code = code, .info = pMsg->info};
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else { } else {
vError("==vnodeProcessSyncMsg== error msg type:%d", pRpcMsg->msgType); vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
ret = -1; code = -1;
} }
} }
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} else { if (code != 0 && terrno == 0) {
vError("==vnodeProcessSyncMsg== error syncEnv stop");
ret = -1;
}
if (ret != 0 && terrno == 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} }
return ret; return code;
} }
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
@ -414,7 +392,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info); syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index; rpcMsg.info.conn.applyIndex = cbMeta.index;
STraceId *trace = (STraceId *)&pMsg->info.traceId; const STraceId *trace = (STraceId *)&pMsg->info.traceId;
vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode), vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle); TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
if (rpcMsg.info.handle != NULL) { if (rpcMsg.info.handle != NULL) {
@ -431,9 +409,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", "commitCb execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", pFsm,
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
@ -446,16 +423,15 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf), "preCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm,
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", snprintf(logBuf, sizeof(logBuf), "rollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm,
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }