From e810f2ad648994f5bb7a512ffe56974b50343ef4 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 18 Oct 2022 13:53:03 +0800 Subject: [PATCH] refactor(sync): delete old code, ok --- include/libs/sync/sync.h | 27 +- include/libs/sync/syncTools.h | 24 +- source/dnode/mnode/impl/src/mndMain.c | 95 --- source/dnode/vnode/src/vnd/vnodeSync.c | 124 --- source/libs/sync/inc/syncAppendEntries.h | 3 - source/libs/sync/inc/syncAppendEntriesReply.h | 4 - source/libs/sync/inc/syncIO.h | 5 +- source/libs/sync/inc/syncInt.h | 8 +- source/libs/sync/inc/syncReplication.h | 11 - source/libs/sync/inc/syncRequestVote.h | 3 - source/libs/sync/inc/syncRequestVoteReply.h | 3 - source/libs/sync/inc/syncSnapshot.h | 21 +- source/libs/sync/src/syncAppendEntries.c | 774 +----------------- source/libs/sync/src/syncAppendEntriesReply.c | 332 -------- source/libs/sync/src/syncCommit.c | 13 +- source/libs/sync/src/syncElection.c | 4 +- source/libs/sync/src/syncIO.c | 14 +- source/libs/sync/src/syncMain.c | 241 +----- source/libs/sync/src/syncRaftLog.c | 186 +---- source/libs/sync/src/syncReplication.c | 439 ---------- source/libs/sync/src/syncRequestVote.c | 117 --- source/libs/sync/src/syncRequestVoteReply.c | 120 --- source/libs/sync/src/syncSnapshot.c | 24 +- .../test/syncConfigChangeSnapshotTest.cpp | 4 +- source/libs/sync/test/syncEncodeTest.cpp | 7 +- source/libs/sync/test/syncLogStoreTest.cpp | 12 +- source/libs/sync/test/syncTestTool.cpp | 6 +- 27 files changed, 77 insertions(+), 2544 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 73abe3f469..8c9e86f525 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -159,32 +159,15 @@ typedef struct SSyncLogStore { SLRUCache* pCache; void* data; - // append one log entry - int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); - - // get one log entry, user need to free pEntry->pCont - SSyncRaftEntry* (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index); - - // truncate log with index, entries after the given index (>=index) will be deleted - int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); - - // return index of last entry - SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore); - - // return term of last entry - SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore); - - // update log store commit index with "index" - int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); - - // return commit index of log - SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore); + int32_t (*syncLogUpdateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); + SyncIndex (*syncLogCommitIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore); - bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore); + int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore); int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index); + bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore); bool (*syncLogExist)(struct SSyncLogStore* pLogStore, SyncIndex index); SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore); @@ -230,7 +213,7 @@ SyncGroupId syncGetVgId(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); -int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); +// int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 7aafaf028e..92e1e47d0a 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -678,29 +678,9 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); -int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex); -int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg); -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); -int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); - -int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg); -int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); -int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg); -int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); - -int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg); -int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); - -int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg); -int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg); - -int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex); int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); - int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex); int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); @@ -718,8 +698,8 @@ typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg); typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg); -typedef int32_t (*FpOnSnapshotSendCb)(SSyncNode* ths, SyncSnapshotSend* pMsg); -typedef int32_t (*FpOnSnapshotRspCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg); +typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg); +typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg); // option ---------------------------------- bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 5e1cec55a9..25c194ce75 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -561,101 +561,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { code = -1; } - /* - // ToDo: ugly! use function pointer - if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) { - if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); - code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); - code = syncNodeOnPingCb(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); - code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); - code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); - code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); - code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { - SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); - code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg); - syncAppendEntriesDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); - code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { - SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); - code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg); - syncSnapshotSendDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { - SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); - code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); - syncSnapshotRspDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { - code = syncSetStandby(pMgmt->sync); - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else { - mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = -1; - } - } else { - if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); - code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); - code = syncNodeOnPingCb(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); - code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); - code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); - code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); - code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { - SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); - code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); - syncAppendEntriesDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); - code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { - code = syncSetStandby(pMgmt->sync); - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else { - mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = -1; - } - } - */ - syncNodeRelease(pSyncNode); if (code != 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 559cd050cc..5501f261df 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -454,130 +454,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { code = -1; } -#if 0 - if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { - if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnPingCb(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { - SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); - syncClientRequestBatchDestroyDeep(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { - SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); - syncAppendEntriesDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { - code = vnodeSetStandBy(pVnode); - if (code != 0 && terrno != 0) code = terrno; - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else { - vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); - code = -1; - } - - } else if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_WAL_FIRST) { - // use wal first strategy - if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnPingCb(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { - SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); - syncClientRequestBatchDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) { - SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg); - syncAppendEntriesBatchDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { - SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); - code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg); - syncSnapshotSendDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { - SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); - code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); - syncSnapshotRspDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { - code = vnodeSetStandBy(pVnode); - if (code != 0 && terrno != 0) code = terrno; - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else { - vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); - code = -1; - } - } -#endif - vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), code); syncNodeRelease(pSyncNode); diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 87bb1c7b05..dc40e2fc72 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -92,9 +92,6 @@ extern "C" { // /\ UNCHANGED <> // /\ UNCHANGED <> // -int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); -int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg); -int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index c3316aec49..0227d832fc 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -40,10 +40,6 @@ extern "C" { // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); -int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); -int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); - int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index f3064e668d..69cd92bf85 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -56,9 +56,8 @@ typedef struct SSyncIO { int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg); int32_t (*FpOnSyncTimeout)(SSyncNode *pSyncNode, SyncTimeout *pMsg); - - int32_t (*FpOnSyncSnapshotSend)(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg); - int32_t (*FpOnSyncSnapshotRsp)(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg); + int32_t (*FpOnSyncSnapshot)(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg); + int32_t (*FpOnSyncSnapshotReply)(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg); int8_t isStart; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index a231a8be58..6ef0988aab 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -179,8 +179,8 @@ typedef struct SSyncNode { FpOnRequestVoteReplyCb FpOnRequestVoteReply; FpOnAppendEntriesCb FpOnAppendEntries; FpOnAppendEntriesReplyCb FpOnAppendEntriesReply; - FpOnSnapshotSendCb FpOnSnapshotSend; - FpOnSnapshotRspCb FpOnSnapshotRsp; + FpOnSnapshotCb FpOnSnapshot; + FpOnSnapshotReplyCb FpOnSnapshotReply; // tools SSyncRespMgr* pSyncRespMgr; @@ -210,7 +210,6 @@ void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); -int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); // option bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); @@ -276,15 +275,14 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm); - SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode); - SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index); SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index); int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm); bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg); int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); +int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex); int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code); int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index d0f273dc9c..90c11b2300 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -51,17 +51,6 @@ extern "C" { // mdest |-> j]) // /\ UNCHANGED <> // -int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); -int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode); -int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode); - -int32_t syncNodeAppendEntriesOnePeer(SSyncNode* pSyncNode, SRaftId* pDestId, SyncIndex nextIndex); - -int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer); -int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); -int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); - -//--------------------------------------------- int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg); int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode); diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index fc32e72419..73b4a0efae 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -49,9 +49,6 @@ extern "C" { // m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); -int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg); - int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 6ec7524149..6bef18405c 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -44,9 +44,6 @@ extern "C" { // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); -int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); - int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 57e259e4bc..b8b7af2dda 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void * pReader; - void * pCurrentBlock; + void *pReader; + void *pCurrentBlock; int32_t blockLen; SSnapshotParam snapshotParam; SSnapshot snapshot; SSyncCfg lastConfig; int64_t sendingMS; - SSyncNode * pSyncNode; + SSyncNode *pSyncNode; int32_t replicaIndex; SyncTerm term; SyncTerm privateTerm; @@ -65,8 +65,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char * snapshotSender2Str(SSyncSnapshotSender *pSender); -char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +char *snapshotSender2Str(SSyncSnapshotSender *pSender); +char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId); @@ -74,13 +74,13 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId); typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void * pWriter; + void *pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshotParam snapshotParam; SSnapshot snapshot; SRaftId fromId; - SSyncNode * pSyncNode; + SSyncNode *pSyncNode; } SSyncSnapshotReceiver; @@ -92,14 +92,11 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); //--------------------------------------------------- // on message -int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); -int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); - int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 75a9b5a719..5ea07249c6 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -89,240 +89,6 @@ // /\ UNCHANGED <> // -int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { - int32_t ret = 0; - - // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncLogRecvAppendEntries(ths, pMsg, "maybe replica already dropped"); - return -1; - } - - // maybe update term - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); - } - ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - - // reset elect timer - if (pMsg->term == ths->pRaftStore->currentTerm) { - ths->leaderCache = pMsg->srcId; - syncNodeResetElectTimer(ths); - } - ASSERT(pMsg->dataLen >= 0); - - // return to follower state - if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { - syncLogRecvAppendEntries(ths, pMsg, "candidate to follower"); - syncNodeBecomeFollower(ths, "from candidate by append entries"); - return -1; // ret or reply? - } - - SyncTerm localPreLogTerm = 0; - if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { - SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex); - if (pEntry == NULL) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "getEntry error, index:%" PRId64 ", since %s", pMsg->prevLogIndex, terrstr()); - syncNodeErrorLog(ths, logBuf); - return -1; - } - - localPreLogTerm = pEntry->term; - syncEntryDestory(pEntry); - } - - bool logOK = - (pMsg->prevLogIndex == SYNC_INDEX_INVALID) || - ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) && - (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm)); - - // reject request - if ((pMsg->term < ths->pRaftStore->currentTerm) || - ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { - syncLogRecvAppendEntries(ths, pMsg, "reject"); - - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = false; - pReply->matchIndex = SYNC_INDEX_INVALID; - pReply->startTime = ths->startTime; - - // msg event log - syncLogSendAppendEntriesReply(ths, pReply, ""); - - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - - return ret; - } - - // accept request - if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { - // preIndex = -1, or has preIndex entry in local log - ASSERT(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); - - // has extra entries (> preIndex) in local log - bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); - - // has entries in SyncAppendEntries msg - bool hasAppendEntries = pMsg->dataLen > 0; - - syncLogRecvAppendEntries(ths, pMsg, "accept"); - - if (hasExtraEntries && hasAppendEntries) { - // not conflict by default - bool conflict = false; - - SyncIndex extraIndex = pMsg->prevLogIndex + 1; - SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex); - if (pExtraEntry == NULL) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "getEntry error2, index:%" PRId64 ", since %s", extraIndex, terrstr()); - syncNodeErrorLog(ths, logBuf); - return -1; - } - - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - if (pAppendEntry == NULL) { - syncNodeErrorLog(ths, "syncEntryDeserialize pAppendEntry error"); - return -1; - } - - // log not match, conflict - ASSERT(extraIndex == pAppendEntry->index); - if (pExtraEntry->term != pAppendEntry->term) { - conflict = true; - } - - if (conflict) { - // roll back - SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); - SyncIndex delEnd = extraIndex; - - sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%" PRId64 ", delEnd:%" PRId64, conflict, delBegin, - delEnd); - - // notice! reverse roll back! - for (SyncIndex index = delEnd; index >= delBegin; --index) { - if (ths->pFsm->FpRollBackCb != NULL) { - SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index); - if (pRollBackEntry == NULL) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "getEntry error3, index:%" PRId64 ", since %s", index, terrstr()); - syncNodeErrorLog(ths, logBuf); - return -1; - } - - // if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) { - if (syncUtilUserRollback(pRollBackEntry->msgType)) { - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); - - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pRollBackEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pRollBackEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = ths->state; - cbMeta.seqNum = pRollBackEntry->seqNum; - ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta); - rpcFreeCont(rpcMsg.pCont); - } - - syncEntryDestory(pRollBackEntry); - } - } - - // delete confict entries - ths->pLogStore->truncate(ths->pLogStore, extraIndex); - - // append new entries - ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); - - // pre commit - syncNodePreCommit(ths, pAppendEntry, 0); - } - - // free memory - syncEntryDestory(pExtraEntry); - syncEntryDestory(pAppendEntry); - - } else if (hasExtraEntries && !hasAppendEntries) { - // do nothing - - } else if (!hasExtraEntries && hasAppendEntries) { - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - if (pAppendEntry == NULL) { - syncNodeErrorLog(ths, "syncEntryDeserialize pAppendEntry2 error"); - return -1; - } - - // append new entries - ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); - - // pre commit - syncNodePreCommit(ths, pAppendEntry, 0); - - // free memory - syncEntryDestory(pAppendEntry); - - } else if (!hasExtraEntries && !hasAppendEntries) { - // do nothing - - } else { - syncNodeLog3("", ths); - ASSERT(0); - } - - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = true; - - if (hasAppendEntries) { - pReply->matchIndex = pMsg->prevLogIndex + 1; - } else { - pReply->matchIndex = pMsg->prevLogIndex; - } - - pReply->startTime = ths->startTime; - - // msg event log - syncLogSendAppendEntriesReply(ths, pReply, ""); - - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - - // maybe update commit index from leader - if (pMsg->commitIndex > ths->commitIndex) { - // has commit entry in local - if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { - SyncIndex beginIndex = ths->commitIndex + 1; - SyncIndex endIndex = pMsg->commitIndex; - - // update commit index - ths->commitIndex = pMsg->commitIndex; - - // call back Wal - ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); - - int32_t code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); - ASSERT(code == 0); - } - } - } - - return ret; -} - static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t code; @@ -504,544 +270,6 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries return false; } -int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) { - int32_t ret = 0; - int32_t code = 0; - - // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncLogRecvAppendEntriesBatch(ths, pMsg, "maybe replica already dropped"); - return -1; - } - - // maybe update term - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); - } - ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - - // reset elect timer - if (pMsg->term == ths->pRaftStore->currentTerm) { - ths->leaderCache = pMsg->srcId; - syncNodeResetElectTimer(ths); - } - ASSERT(pMsg->dataLen >= 0); - - // candidate to follower - // - // operation: - // to follower - do { - bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; - if (condition) { - syncLogRecvAppendEntriesBatch(ths, pMsg, "candidate to follower"); - syncNodeBecomeFollower(ths, "from candidate by append entries"); - return 0; // do not reply? - } - } while (0); - - // fake match - // - // condition1: - // preIndex <= my commit index - // - // operation: - // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry - // match my-commit-index or my-commit-index + batchSize - do { - bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && - (pMsg->prevLogIndex <= ths->commitIndex); - if (condition) { - syncLogRecvAppendEntriesBatch(ths, pMsg, "fake match"); - - SyncIndex matchIndex = ths->commitIndex; - bool hasAppendEntries = pMsg->dataLen > 0; - SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg); - - if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { - int32_t pass = 0; - SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex; - - // make log same - if (hasExtraEntries) { - // make log same, rollback deleted entries - pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); - ASSERT(pass >= 0); - } - - // append entry batch - if (pass == 0) { - // assert! no batch - ASSERT(pMsg->dataCount <= 1); - - for (int32_t i = 0; i < pMsg->dataCount; ++i) { - SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - if (code != 0) { - return -1; - } - - code = syncNodePreCommit(ths, pAppendEntry, 0); - ASSERT(code == 0); - - // syncEntryDestory(pAppendEntry); - } - } - - // fsync once - SSyncLogStoreData* pData = ths->pLogStore->data; - SWal* pWal = pData->pWal; - walFsync(pWal, false); - - // update match index - matchIndex = pMsg->prevLogIndex + pMsg->dataCount; - } - - // prepare response msg - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; - pReply->success = true; - pReply->matchIndex = matchIndex; - pReply->startTime = ths->startTime; - - // msg event log - syncLogSendAppendEntriesReply(ths, pReply, ""); - - // send response - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - - return 0; - } - } while (0); - - // calculate logOK here, before will coredump, due to fake match - bool logOK = syncNodeOnAppendEntriesBatchLogOK(ths, pMsg); - - // not match - // - // condition1: - // term < myTerm - // - // condition2: - // !logOK - // - // operation: - // not match - // no operation on log - do { - bool condition1 = pMsg->term < ths->pRaftStore->currentTerm; - bool condition2 = - (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK; - bool condition = condition1 || condition2; - - if (condition) { - syncLogRecvAppendEntriesBatch(ths, pMsg, "not match"); - - // maybe update commit index by snapshot - syncNodeMaybeUpdateCommitBySnapshot(ths); - - // prepare response msg - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; - pReply->success = false; - pReply->matchIndex = ths->commitIndex; - pReply->startTime = ths->startTime; - - // msg event log - syncLogSendAppendEntriesReply(ths, pReply, ""); - - // send response - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - - return 0; - } - } while (0); - - // really match - // - // condition: - // logOK - // - // operation: - // match - // make log same - do { - bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && logOK; - if (condition) { - // has extra entries (> preIndex) in local log - SyncIndex myLastIndex = syncNodeGetLastIndex(ths); - bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex; - - // has entries in SyncAppendEntries msg - bool hasAppendEntries = pMsg->dataLen > 0; - SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg); - - syncLogRecvAppendEntriesBatch(ths, pMsg, "really match"); - - int32_t pass = 0; - - if (hasExtraEntries) { - // make log same, rollback deleted entries - pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); - ASSERT(pass >= 0); - } - - if (hasAppendEntries) { - // append entry batch - if (pass == 0) { - // assert! no batch - ASSERT(pMsg->dataCount <= 1); - - // append entry batch - for (int32_t i = 0; i < pMsg->dataCount; ++i) { - SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - if (code != 0) { - return -1; - } - - code = syncNodePreCommit(ths, pAppendEntry, 0); - ASSERT(code == 0); - - // syncEntryDestory(pAppendEntry); - } - } - - // fsync once - SSyncLogStoreData* pData = ths->pLogStore->data; - SWal* pWal = pData->pWal; - walFsync(pWal, false); - } - - // prepare response msg - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; - pReply->success = true; - pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex; - pReply->startTime = ths->startTime; - - // msg event log - syncLogSendAppendEntriesReply(ths, pReply, ""); - - // send response - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - - // maybe update commit index, leader notice me - if (pMsg->commitIndex > ths->commitIndex) { - SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - - SyncIndex beginIndex = 0; - SyncIndex endIndex = -1; - - // has commit entry in local - if (pMsg->commitIndex <= lastIndex) { - beginIndex = ths->commitIndex + 1; - endIndex = pMsg->commitIndex; - - // update commit index - ths->commitIndex = pMsg->commitIndex; - - // call back Wal - code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); - ASSERT(code == 0); - - } else if (pMsg->commitIndex > lastIndex && ths->commitIndex < lastIndex) { - beginIndex = ths->commitIndex + 1; - endIndex = lastIndex; - - // update commit index, speed up - ths->commitIndex = lastIndex; - - // call back Wal - code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); - ASSERT(code == 0); - } - - code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); - ASSERT(code == 0); - } - - return 0; - } - } while (0); - - return 0; -} - -int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) { - int32_t ret = 0; - int32_t code = 0; - - // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncLogRecvAppendEntries(ths, pMsg, "maybe replica already dropped"); - return -1; - } - - // maybe update term - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); - } - ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - - // reset elect timer - if (pMsg->term == ths->pRaftStore->currentTerm) { - ths->leaderCache = pMsg->srcId; - syncNodeResetElectTimer(ths); - } - ASSERT(pMsg->dataLen >= 0); - - // candidate to follower - // - // operation: - // to follower - do { - bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; - if (condition) { - syncLogRecvAppendEntries(ths, pMsg, "candidate to follower"); - syncNodeBecomeFollower(ths, "from candidate by append entries"); - return 0; // do not reply? - } - } while (0); - - // fake match - // - // condition1: - // preIndex <= my commit index - // - // operation: - // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry - // match my-commit-index or my-commit-index + 1 - // no operation on log - do { - bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && - (pMsg->prevLogIndex <= ths->commitIndex); - if (condition) { - syncLogRecvAppendEntries(ths, pMsg, "fake match"); - - SyncIndex matchIndex = ths->commitIndex; - bool hasAppendEntries = pMsg->dataLen > 0; - if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { - // append entry - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - ASSERT(pAppendEntry != NULL); - - { - // has extra entries (> preIndex) in local log - SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex; - - if (hasExtraEntries) { - // make log same, rollback deleted entries - code = syncNodeMakeLogSame(ths, pMsg); - ASSERT(code == 0); - } - } - - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - if (code != 0) { - return -1; - } - - // pre commit - code = syncNodePreCommit(ths, pAppendEntry, 0); - ASSERT(code == 0); - - // update match index - matchIndex = pMsg->prevLogIndex + 1; - - syncEntryDestory(pAppendEntry); - } - - // prepare response msg - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; - pReply->success = true; - pReply->matchIndex = matchIndex; - pReply->startTime = ths->startTime; - - // msg event log - syncLogSendAppendEntriesReply(ths, pReply, ""); - - // send response - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - - return ret; - } - } while (0); - - // calculate logOK here, before will coredump, due to fake match - bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg); - - // not match - // - // condition1: - // term < myTerm - // - // condition2: - // !logOK - // - // operation: - // not match - // no operation on log - do { - bool condition1 = pMsg->term < ths->pRaftStore->currentTerm; - bool condition2 = - (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK; - bool condition = condition1 || condition2; - - if (condition) { - syncLogRecvAppendEntries(ths, pMsg, "not match"); - - // prepare response msg - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; - pReply->success = false; - pReply->matchIndex = SYNC_INDEX_INVALID; - pReply->startTime = ths->startTime; - - // msg event log - syncLogSendAppendEntriesReply(ths, pReply, ""); - - // send response - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - - return ret; - } - } while (0); - - // really match - // - // condition: - // logOK - // - // operation: - // match - // make log same - do { - bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && logOK; - if (condition) { - // has extra entries (> preIndex) in local log - SyncIndex myLastIndex = syncNodeGetLastIndex(ths); - bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex; - - // has entries in SyncAppendEntries msg - bool hasAppendEntries = pMsg->dataLen > 0; - - syncLogRecvAppendEntries(ths, pMsg, "really match"); - - if (hasExtraEntries) { - // make log same, rollback deleted entries - code = syncNodeMakeLogSame(ths, pMsg); - ASSERT(code == 0); - } - - if (hasAppendEntries) { - // append entry - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - ASSERT(pAppendEntry != NULL); - - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - if (code != 0) { - return -1; - } - - // pre commit - code = syncNodePreCommit(ths, pAppendEntry, 0); - ASSERT(code == 0); - - syncEntryDestory(pAppendEntry); - } - - // prepare response msg - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; - pReply->success = true; - pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex; - pReply->startTime = ths->startTime; - - // msg event log - syncLogSendAppendEntriesReply(ths, pReply, ""); - - // send response - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - - // maybe update commit index, leader notice me - if (pMsg->commitIndex > ths->commitIndex) { - // has commit entry in local - if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { - // advance commit index to sanpshot first - SSnapshot snapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { - SyncIndex commitBegin = ths->commitIndex; - SyncIndex commitEnd = snapshot.lastApplyIndex; - ths->commitIndex = snapshot.lastApplyIndex; - - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, - commitBegin, commitEnd); - syncNodeEventLog(ths, eventLog); - } - - SyncIndex beginIndex = ths->commitIndex + 1; - SyncIndex endIndex = pMsg->commitIndex; - - // update commit index - ths->commitIndex = pMsg->commitIndex; - - // call back Wal - code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); - ASSERT(code == 0); - - code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); - ASSERT(code == 0); - } - } - return ret; - } - } while (0); - - return ret; -} - int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { // maybe update commit index, leader notice me if (newCommitIndex > ths->commitIndex) { @@ -1068,7 +296,7 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { ths->commitIndex = newCommitIndex; // call back Wal - int32_t code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); + int32_t code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex); ASSERT(code == 0); code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 2f56c142ab..3c0b2bebfc 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -38,74 +38,6 @@ // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = 0; - - // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped"); - return -1; - } - - // drop stale response - if (pMsg->term < ths->pRaftStore->currentTerm) { - syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); - return 0; - } - - // no need this code, because if I receive reply.term, then I must have sent for that term. - // if (pMsg->term > ths->pRaftStore->currentTerm) { - // syncNodeUpdateTerm(ths, pMsg->term); - // } - - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); - return -1; - } - - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - - // update time - syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); - syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); - - SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - - if (pMsg->success) { - // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); - - // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] - syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); - - // maybe commit - syncMaybeAdvanceCommitIndex(ths); - - } else { - SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - - // notice! int64, uint64 - if (nextIndex > SYNC_INDEX_BEGIN) { - --nextIndex; - } else { - nextIndex = SYNC_INDEX_BEGIN; - } - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); - } - - SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - do { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "before next:%" PRId64 ", match:%" PRId64 ", after next:%" PRId64 ", match:%" PRId64, beforeNextIndex, - beforeMatchIndex, afterNextIndex, afterMatchIndex); - syncLogRecvAppendEntriesReply(ths, pMsg, logBuf); - } while (0); - - return 0; -} // only start once static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, @@ -152,270 +84,6 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync } } -int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = 0; - - // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped"); - return -1; - } - - // drop stale response - if (pMsg->term < ths->pRaftStore->currentTerm) { - syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); - return 0; - } - - // error term - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); - return -1; - } - - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - - // update time - syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); - syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); - - SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - - if (pMsg->success) { - SyncIndex newNextIndex = pMsg->matchIndex + 1; - SyncIndex newMatchIndex = pMsg->matchIndex; - - bool needStartSnapshot = false; - if (newMatchIndex >= SYNC_INDEX_BEGIN && !ths->pLogStore->syncLogExist(ths->pLogStore, newMatchIndex)) { - needStartSnapshot = true; - } - - if (!needStartSnapshot) { - // update next-index, match-index - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex); - syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); - - // maybe commit - if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncMaybeAdvanceCommitIndex(ths); - } - - } else { - // start snapshot - SSnapshot oldSnapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot); - if (oldSnapshot.lastApplyIndex > newMatchIndex) { - syncNodeStartSnapshotOnce(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, - pMsg); // term maybe not ok? - } - - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1); - syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); - } - - // event log, update next-index - do { - char host[64]; - int16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "reset next-index:%" PRId64 ", match-index:%" PRId64 " for %s:%d", newNextIndex, - newMatchIndex, host, port); - syncNodeEventLog(ths, logBuf); - - } while (0); - - } else { - SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - - if (nextIndex > SYNC_INDEX_BEGIN) { - --nextIndex; - - // speed up - if (nextIndex > pMsg->matchIndex + 1) { - nextIndex = pMsg->matchIndex + 1; - } - - bool needStartSnapshot = false; - if (nextIndex >= SYNC_INDEX_BEGIN && !ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex)) { - needStartSnapshot = true; - } - if (nextIndex - 1 >= SYNC_INDEX_BEGIN && !ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex - 1)) { - needStartSnapshot = true; - } - - if (!needStartSnapshot) { - // do nothing - - } else { - SSnapshot oldSnapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot); - SyncTerm newSnapshotTerm = oldSnapshot.lastApplyTerm; - - SyncIndex endIndex; - if (ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex + 1)) { - endIndex = nextIndex; - } else { - endIndex = oldSnapshot.lastApplyIndex; - } - syncNodeStartSnapshotOnce(ths, pMsg->matchIndex + 1, endIndex, newSnapshotTerm, pMsg); - - // get sender - SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); - ASSERT(pSender != NULL); - SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; - - // update nextIndex to sentryIndex - if (nextIndex <= sentryIndex) { - nextIndex = sentryIndex; - } - } - - } else { - nextIndex = SYNC_INDEX_BEGIN; - } - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); - - SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - if (pMsg->matchIndex > oldMatchIndex) { - syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); - } - - // event log, update next-index - do { - char host[64]; - int16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - SyncIndex newNextIndex = nextIndex; - SyncIndex newMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "reset2 next-index:%" PRId64 ", match-index:%" PRId64 " for %s:%d", newNextIndex, - newMatchIndex, host, port); - syncNodeEventLog(ths, logBuf); - - } while (0); - } - - SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - do { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "before next:%" PRId64 ", match:%" PRId64 ", after next:%" PRId64 ", match:%" PRId64, beforeNextIndex, - beforeMatchIndex, afterNextIndex, afterMatchIndex); - syncLogRecvAppendEntriesReply(ths, pMsg, logBuf); - } while (0); - - return 0; -} - -int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = 0; - - // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped"); - return -1; - } - - // drop stale response - if (pMsg->term < ths->pRaftStore->currentTerm) { - syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); - return 0; - } - - // no need this code, because if I receive reply.term, then I must have sent for that term. - // if (pMsg->term > ths->pRaftStore->currentTerm) { - // syncNodeUpdateTerm(ths, pMsg->term); - // } - - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); - return -1; - } - - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - - // update time - syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); - syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); - - SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - - if (pMsg->success) { - // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); - - // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] - syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); - - // maybe commit - if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncMaybeAdvanceCommitIndex(ths); - } - - } else { - SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - - // notice! int64, uint64 - if (nextIndex > SYNC_INDEX_BEGIN) { - --nextIndex; - - // get sender - SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); - ASSERT(pSender != NULL); - - SSnapshot snapshot = {.data = NULL, - .lastApplyIndex = SYNC_INDEX_INVALID, - .lastApplyTerm = 0, - .lastConfigIndex = SYNC_INDEX_INVALID}; - void* pReader = NULL; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader); - if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 && - !snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { - // has snapshot - ASSERT(pReader != NULL); - SSnapshotParam readerParam = {.start = 0, .end = snapshot.lastApplyIndex}; - snapshotSenderStart(pSender, readerParam, snapshot, pReader); - - } else { - // no snapshot - if (pReader != NULL) { - ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); - } - } - - SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; - - // update nextIndex to sentryIndex - if (nextIndex <= sentryIndex) { - nextIndex = sentryIndex; - } - - } else { - nextIndex = SYNC_INDEX_BEGIN; - } - - syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); - } - - SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); - do { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "before next:%" PRId64 ", match:%" PRId64 ", after next:%" PRId64 ", match:%" PRId64, beforeNextIndex, - beforeMatchIndex, afterNextIndex, afterMatchIndex); - syncLogRecvAppendEntriesReply(ths, pMsg, logBuf); - } while (0); - - return 0; -} - int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index b604d25816..f83b640876 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -75,8 +75,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (h) { pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); } else { - pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); - ASSERT(pEntry != NULL); + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry); + ASSERT(code == 0); + + // pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); + // ASSERT(pEntry != NULL); } // cannot commit, even if quorum agree. need check term! if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { @@ -122,7 +125,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pSyncNode->commitIndex = newCommitIndex; // call back Wal - pSyncNode->pLogStore->updateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex); + pSyncNode->pLogStore->syncLogUpdateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex); // execute fsm if (pSyncNode->pFsm != NULL) { @@ -215,6 +218,7 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { return quorum; } +/* bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { int agreeCount = 0; for (int i = 0; i < pSyncNode->replicaNum; ++i) { @@ -227,8 +231,8 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { } return false; } +*/ -/* bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { int agreeCount = 0; for (int i = 0; i < pSyncNode->replicaNum; ++i) { @@ -241,4 +245,3 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { } return false; } -*/ diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index c8517fe1ae..af7ea9201b 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -40,8 +40,8 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->peersId[i]; pMsg->term = pSyncNode->pRaftStore->currentTerm; - pMsg->lastLogIndex = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); - pMsg->lastLogTerm = pSyncNode->pLogStore->getLastTerm(pSyncNode->pLogStore); + pMsg->lastLogIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); + pMsg->lastLogTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore); ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg); ASSERT(ret == 0); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index b00be6edb6..afa2d43e13 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -30,7 +30,7 @@ static int32_t syncIODestroy(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io); -static void * syncIOConsumerFunc(void *param); +static void *syncIOConsumerFunc(void *param); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); @@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO * io = param; + SSyncIO *io = param; STaosQall *qall = taosAllocateQall(); - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; SQueueInfo qinfo = {0}; while (1) { @@ -326,18 +326,18 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { - if (io->FpOnSyncSnapshotSend != NULL) { + if (io->FpOnSyncSnapshot != NULL) { SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pRpcMsg); ASSERT(pSyncMsg != NULL); - io->FpOnSyncSnapshotSend(io->pSyncNode, pSyncMsg); + io->FpOnSyncSnapshot(io->pSyncNode, pSyncMsg); syncSnapshotSendDestroy(pSyncMsg); } } else if (pRpcMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { - if (io->FpOnSyncSnapshotRsp != NULL) { + if (io->FpOnSyncSnapshotReply != NULL) { SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pRpcMsg); ASSERT(pSyncMsg != NULL); - io->FpOnSyncSnapshotRsp(io->pSyncNode, pSyncMsg); + io->FpOnSyncSnapshotReply(io->pSyncNode, pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b3dc444174..f52a719f7f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -783,24 +783,6 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { return ret; } -int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) { - if (arrSize < 0) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - ASSERT(rid == pSyncNode->rid); - - int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgPArr, pIsWeakArr, arrSize); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return ret; -} - static bool syncNodeBatchOK(SRpcMsg** pMsgPArr, int32_t arrSize) { for (int32_t i = 0; i < arrSize; ++i) { if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE) { @@ -815,91 +797,6 @@ static bool syncNodeBatchOK(SRpcMsg** pMsgPArr, int32_t arrSize) { return true; } -int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) { - if (!syncNodeBatchOK(pMsgPArr, arrSize)) { - syncNodeErrorLog(pSyncNode, "sync propose batch error"); - terrno = TSDB_CODE_SYN_BATCH_ERROR; - return -1; - } - - if (arrSize > SYNC_MAX_BATCH_SIZE) { - syncNodeErrorLog(pSyncNode, "sync propose batch error"); - terrno = TSDB_CODE_SYN_BATCH_ERROR; - return -1; - } - - if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { - syncNodeErrorLog(pSyncNode, "sync propose not leader"); - terrno = TSDB_CODE_SYN_NOT_LEADER; - return -1; - } - - if (pSyncNode->changing) { - syncNodeErrorLog(pSyncNode, "sync propose not ready"); - terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; - return -1; - } - - SRaftMeta raftArr[SYNC_MAX_BATCH_SIZE]; - for (int i = 0; i < arrSize; ++i) { - do { - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "propose message, type:%s batch:%d", TMSG_INFO(pMsgPArr[i]->msgType), - arrSize); - syncNodeEventLog(pSyncNode, eventLog); - } while (0); - - SRespStub stub; - stub.createTime = taosGetTimestampMs(); - stub.rpcMsg = *(pMsgPArr[i]); - uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); - - raftArr[i].isWeak = pIsWeakArr[i]; - raftArr[i].seqNum = seqNum; - } - - SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgPArr, raftArr, arrSize, pSyncNode->vgId); - ASSERT(pSyncMsg != NULL); - - SRpcMsg rpcMsg; - syncClientRequestBatch2RpcMsg(pSyncMsg, &rpcMsg); - taosMemoryFree(pSyncMsg); // only free msg body, do not free rpc msg content - - if (pSyncNode->replicaNum == 1 && pSyncNode->vgId != 1) { - int32_t code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); - if (code == 0) { - // update rpc msg applyIndex - SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pSyncMsg); - ASSERT(arrSize == pSyncMsg->dataCount); - for (int i = 0; i < arrSize; ++i) { - pMsgPArr[i]->info.conn.applyIndex = msgArr[i].info.conn.applyIndex; - syncRespMgrDel(pSyncNode->pSyncRespMgr, raftArr[i].seqNum); - } - - rpcFreeCont(rpcMsg.pCont); - terrno = 0; - return 1; - - } else { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - - } else { - if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { - // enqueue msg ok - return 0; - - } else { - sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - } - - return 0; -} - int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; @@ -951,7 +848,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { // optimized one replica if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) { SyncIndex retIndex; - int32_t code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, &retIndex); + int32_t code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex); if (code == 0) { pMsg->info.conn.applyIndex = retIndex; pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm; @@ -1248,26 +1145,14 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init callback pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; - pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb; + pSyncNode->FpOnClientRequest = syncNodeOnClientRequest; pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb; - - pSyncNode->FpOnSnapshotSend = syncNodeOnSnapshotSendCb; - pSyncNode->FpOnSnapshotRsp = syncNodeOnSnapshotRspCb; - - if (pSyncNode->pRaftCfg->snapshotStrategy) { - sInfo("vgId:%d, sync node use snapshot", pSyncNode->vgId); - pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteSnapshotCb; - pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplySnapshotCb; - pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesSnapshotCb; - pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplySnapshotCb; - - } else { - sInfo("vgId:%d, sync node do not use snapshot", pSyncNode->vgId); - pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; - pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; - pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; - pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; - } + pSyncNode->FpOnSnapshot = syncNodeOnSnapshot; + pSyncNode->FpOnSnapshotReply = syncNodeOnSnapshotReply; + pSyncNode->FpOnRequestVote = syncNodeOnRequestVote; + pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReply; + pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntries; + pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReply; // tools pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS); @@ -2886,7 +2771,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { if (ths->state == TAOS_SYNC_STATE_LEADER) { int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); ASSERT(code == 0); - syncNodeReplicate(ths, false); } if (h) { @@ -2990,58 +2874,6 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { // /\ UNCHANGED <> // -int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) { - int32_t ret = 0; - int32_t code = 0; - syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); - - SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); - SyncTerm term = ths->pRaftStore->currentTerm; - SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); - ASSERT(pEntry != NULL); - - LRUHandle* h = NULL; - syncCacheEntry(ths->pLogStore, pEntry, &h); - - if (ths->state == TAOS_SYNC_STATE_LEADER) { - // append entry - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); - if (code != 0) { - // del resp mgr, call FpCommitCb - ASSERT(0); - return -1; - } - - // if mulit replica, start replicate right now - if (ths->replicaNum > 1) { - syncNodeReplicate(ths, false); - - // pre commit - syncNodePreCommit(ths, pEntry, 0); - } - - // if only myself, maybe commit right now - if (ths->replicaNum == 1) { - syncMaybeAdvanceCommitIndex(ths); - } - } - - if (pRetIndex != NULL) { - if (ret == 0 && pEntry != NULL) { - *pRetIndex = pEntry->index; - } else { - *pRetIndex = SYNC_INDEX_INVALID; - } - } - - if (h) { - taosLRUCacheRelease(ths->pLogStore->pCache, h, false); - } else { - syncEntryDestory(pEntry); - } - - return ret; -} int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) { syncNodeEventLog(ths, "on client request"); @@ -3094,63 +2926,6 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd return ret; } -int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg) { - syncNodeEventLog(ths, "on client request batch"); - - int32_t code = 0; - - if (ths->state != TAOS_SYNC_STATE_LEADER) { - // call FpCommitCb, delete resp mgr - return -1; - } - - SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); - SyncTerm term = ths->pRaftStore->currentTerm; - - int32_t raftMetaArrayLen = sizeof(SRaftMeta) * pMsg->dataCount; - int32_t rpcArrayLen = sizeof(SRpcMsg) * pMsg->dataCount; - SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data); - SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen); - for (int32_t i = 0; i < pMsg->dataCount; ++i) { - SSyncRaftEntry* pEntry = syncEntryBuild(msgArr[i].contLen); - ASSERT(pEntry != NULL); - - pEntry->originalRpcType = msgArr[i].msgType; - pEntry->seqNum = raftMetaArr[i].seqNum; - pEntry->isWeak = raftMetaArr[i].isWeak; - pEntry->term = term; - pEntry->index = index; - memcpy(pEntry->data, msgArr[i].pCont, msgArr[i].contLen); - ASSERT(msgArr[i].contLen == pEntry->dataLen); - - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); - if (code != 0) { - // del resp mgr, call FpCommitCb - ASSERT(0); - return -1; - } - - // update rpc msg conn apply.index - msgArr[i].info.conn.applyIndex = pEntry->index; - } - - // fsync once - SSyncLogStoreData* pData = ths->pLogStore->data; - SWal* pWal = pData->pWal; - walFsync(pWal, false); - - if (ths->replicaNum > 1) { - // if multi replica, start replicate right now - syncNodeReplicate(ths, false); - - } else if (ths->replicaNum == 1) { - // one replica - syncMaybeAdvanceCommitIndex(ths); - } - - return 0; -} - const char* syncStr(ESyncState state) { switch (state) { case TAOS_SYNC_STATE_FOLLOWER: diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 496c8419de..3f180aef4f 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -33,21 +33,11 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEn static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry); static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index); +static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); +static SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore); -// private function static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry); -//------------------------------- -// log[0 .. n] -static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); -static SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); -static SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); -static SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); -static int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); -static int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); -static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); -static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); - //------------------------------- SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); @@ -74,14 +64,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pData->pWalHandle = walOpenReader(pData->pWal, NULL); ASSERT(pData->pWalHandle != NULL); - pLogStore->appendEntry = logStoreAppendEntry; - pLogStore->getEntry = logStoreGetEntry; - pLogStore->truncate = logStoreTruncate; - pLogStore->getLastIndex = logStoreLastIndex; - pLogStore->getLastTerm = logStoreLastTerm; - pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; - pLogStore->getCommitIndex = logStoreGetCommitIndex; - + pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex; + pLogStore->syncLogCommitIndex = raftlogCommitIndex; pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot; pLogStore->syncLogBeginIndex = raftLogBeginIndex; pLogStore->syncLogEndIndex = raftLogEndIndex; @@ -374,149 +358,7 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp return -1; } -//------------------------------- -// log[0 .. n] - -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - - SyncIndex index = 0; - SWalSyncInfo syncMeta = {0}; - syncMeta.isWeek = pEntry->isWeak; - syncMeta.seqNum = pEntry->seqNum; - syncMeta.term = pEntry->term; - - index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); - if (index < 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t sysErr = errno; - const char* sysErrStr = strerror(errno); - - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - pEntry->index, err, err, errStr, sysErr, sysErrStr); - syncNodeErrorLog(pData->pSyncNode, logBuf); - - ASSERT(0); - return -1; - } - pEntry->index = index; - - do { - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "write2 index:%" PRId64 ", type:%s, origin type:%s", pEntry->index, - TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); - syncNodeEventLog(pData->pSyncNode, eventLog); - } while (0); - - return 0; -} - -SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - - if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { - taosThreadMutexLock(&(pData->mutex)); - - // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); - SWalReader* pWalHandle = pData->pWalHandle; - ASSERT(pWalHandle != NULL); - - int32_t code = walReadVer(pWalHandle, index); - // int32_t code = walReadVerCached(pWalHandle, index); - if (code != 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t sysErr = errno; - const char* sysErrStr = strerror(errno); - - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - index, err, err, errStr, sysErr, sysErrStr); - if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { - // syncNodeEventLog(pData->pSyncNode, logBuf); - } else { - syncNodeErrorLog(pData->pSyncNode, logBuf); - } - } while (0); - - ASSERT(0); - } - - SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); - ASSERT(pEntry != NULL); - - pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST; - pEntry->originalRpcType = pWalHandle->pHead->head.msgType; - pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum; - pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; - pEntry->term = pWalHandle->pHead->head.syncMeta.term; - pEntry->index = index; - ASSERT(pEntry->dataLen == pWalHandle->pHead->head.bodyLen); - memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); - - /* - int32_t saveErr = terrno; - walCloseReadHandle(pWalHandle); - terrno = saveErr; - */ - - taosThreadMutexUnlock(&(pData->mutex)); - return pEntry; - - } else { - return NULL; - } -} - -int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - // ASSERT(walRollback(pWal, fromIndex) == 0); - int32_t code = walRollback(pWal, fromIndex); - if (code != 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t sysErr = errno; - const char* sysErrStr = strerror(errno); - sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr); - - ASSERT(0); - } - - // event log - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal truncate, from-index:%" PRId64, fromIndex); - syncNodeEventLog(pData->pSyncNode, logBuf); - } while (0); - - return 0; -} - -SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - SyncIndex lastIndex = walGetLastVer(pWal); - return lastIndex; -} - -SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { - SyncTerm lastTerm = 0; - SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); - if (pLastEntry != NULL) { - lastTerm = pLastEntry->term; - taosMemoryFree(pLastEntry); - } - return lastTerm; -} - -int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { +int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; // ASSERT(walCommit(pWal, index) == 0); @@ -534,23 +376,11 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { return 0; } -SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) { +SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; return pData->pSyncNode->commitIndex; } -SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - SyncIndex lastIndex = walGetLastVer(pWal); - - SSyncRaftEntry* pEntry = NULL; - if (lastIndex > 0) { - pEntry = logStoreGetEntry(pLogStore, lastIndex); - } - return pEntry; -} - cJSON* logStore2Json(SSyncLogStore* pLogStore) { char u64buf[128] = {0}; SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data; @@ -589,7 +419,9 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { if (!raftLogIsEmpty(pLogStore)) { for (SyncIndex i = beginIndex; i <= endIndex; ++i) { - SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); + SSyncRaftEntry* pEntry = NULL; + raftLogGetEntry(pLogStore, i, &pEntry); + cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 2100c795e6..69c708c745 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -48,435 +48,6 @@ // mdest |-> j]) // /\ UNCHANGED <> // -int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - - syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pNextIndex", pSyncNode->pNextIndex); - syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pMatchIndex", pSyncNode->pMatchIndex); - logStoreSimpleLog2("==syncNodeAppendEntriesPeers==", pSyncNode->pLogStore); - - int32_t ret = 0; - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SRaftId* pDestId = &(pSyncNode->peersId[i]); - - // set prevLogIndex - SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); - - SyncIndex preLogIndex = nextIndex - 1; - - // set preLogTerm - SyncTerm preLogTerm = 0; - if (preLogIndex >= SYNC_INDEX_BEGIN) { - SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex); - ASSERT(pPreEntry != NULL); - - preLogTerm = pPreEntry->term; - syncEntryDestory(pPreEntry); - } - - // batch optimized - // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); - - SyncAppendEntries* pMsg = NULL; - SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex); - if (pEntry != NULL) { - pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); - ASSERT(pMsg != NULL); - - // add pEntry into msg - uint32_t len; - char* serialized = syncEntrySerialize(pEntry, &len); - ASSERT(len == pEntry->bytes); - memcpy(pMsg->data, serialized, len); - - taosMemoryFree(serialized); - syncEntryDestory(pEntry); - - } else { - // maybe overflow, send empty record - pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); - ASSERT(pMsg != NULL); - } - - ASSERT(pMsg != NULL); - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = *pDestId; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - pMsg->prevLogIndex = preLogIndex; - pMsg->prevLogTerm = preLogTerm; - pMsg->commitIndex = pSyncNode->commitIndex; - - syncAppendEntriesLog2("==syncNodeAppendEntriesPeers==", pMsg); - - // send AppendEntries - syncNodeAppendEntries(pSyncNode, pDestId, pMsg); - syncAppendEntriesDestroy(pMsg); - } - - return ret; -} - -int32_t syncNodeAppendEntriesOnePeer(SSyncNode* pSyncNode, SRaftId* pDestId, SyncIndex nextIndex) { - int32_t ret = 0; - - // pre index, pre term - SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); - SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); - if (preLogTerm == SYNC_TERM_INVALID) { - SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; - // SyncIndex newNextIndex = nextIndex + 1; - - syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); - syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); - sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 - ", match-index:%d, raftid:%" PRId64, - pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); - return -1; - } - - // entry pointer array - SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE]; - memset(entryPArr, 0, sizeof(entryPArr)); - - // get entry batch - int32_t getCount = 0; - SyncIndex getEntryIndex = nextIndex; - for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) { - SSyncRaftEntry* pEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry); - if (code == 0) { - ASSERT(pEntry != NULL); - entryPArr[i] = pEntry; - getCount++; - getEntryIndex++; - - } else { - break; - } - } - - // event log - do { - char logBuf[128]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); - snprintf(logBuf, sizeof(logBuf), "build batch:%d for %s:%d", getCount, host, port); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); - - // build msg - SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId); - ASSERT(pMsg != NULL); - - // free entries - for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) { - SSyncRaftEntry* pEntry = entryPArr[i]; - if (pEntry != NULL) { - syncEntryDestory(pEntry); - entryPArr[i] = NULL; - } - } - - // prepare msg - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = *pDestId; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - pMsg->prevLogIndex = preLogIndex; - pMsg->prevLogTerm = preLogTerm; - pMsg->commitIndex = pSyncNode->commitIndex; - pMsg->privateTerm = 0; - pMsg->dataCount = getCount; - - // send msg - syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg); - - // speed up - if (pMsg->dataCount > 0 && pSyncNode->commitIndex - pMsg->prevLogIndex > SYNC_SLOW_DOWN_RANGE) { - ret = 1; - -#if 0 - do { - char logBuf[128]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); - snprintf(logBuf, sizeof(logBuf), "maybe speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); -#endif - } - - syncAppendEntriesBatchDestroy(pMsg); - - return ret; -} - -int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { - if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { - return -1; - } - - int32_t ret = 0; - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SRaftId* pDestId = &(pSyncNode->peersId[i]); - - // next index - SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); - ret = syncNodeAppendEntriesOnePeer(pSyncNode, pDestId, nextIndex); - } - - return ret; -} - -#if 0 -int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { - if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { - return -1; - } - - int32_t ret = 0; - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SRaftId* pDestId = &(pSyncNode->peersId[i]); - - // next index - SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); - - // pre index, pre term - SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); - SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); - if (preLogTerm == SYNC_TERM_INVALID) { - SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; - // SyncIndex newNextIndex = nextIndex + 1; - - syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); - syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); - sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 - ", match-index:%d, raftid:%" PRId64, - pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); - return -1; - } - - // entry pointer array - SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE]; - memset(entryPArr, 0, sizeof(entryPArr)); - - // get entry batch - int32_t getCount = 0; - SyncIndex getEntryIndex = nextIndex; - for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) { - SSyncRaftEntry* pEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry); - if (code == 0) { - ASSERT(pEntry != NULL); - entryPArr[i] = pEntry; - getCount++; - getEntryIndex++; - - } else { - break; - } - } - - // event log - do { - char logBuf[128]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); - snprintf(logBuf, sizeof(logBuf), "build batch:%d for %s:%d", getCount, host, port); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); - - // build msg - SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId); - ASSERT(pMsg != NULL); - - // free entries - for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) { - SSyncRaftEntry* pEntry = entryPArr[i]; - if (pEntry != NULL) { - syncEntryDestory(pEntry); - entryPArr[i] = NULL; - } - } - - // prepare msg - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = *pDestId; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - pMsg->prevLogIndex = preLogIndex; - pMsg->prevLogTerm = preLogTerm; - pMsg->commitIndex = pSyncNode->commitIndex; - pMsg->privateTerm = 0; - pMsg->dataCount = getCount; - - // send msg - syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg); - - // speed up - if (pMsg->dataCount > 0 && pSyncNode->commitIndex - pMsg->prevLogIndex > SYNC_SLOW_DOWN_RANGE) { - ret = 1; - -#if 0 - do { - char logBuf[128]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); - snprintf(logBuf, sizeof(logBuf), "maybe speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); -#endif - } - - syncAppendEntriesBatchDestroy(pMsg); - } - - return ret; -} -#endif - -int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - - syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex); - syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex); - logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore); - - int32_t ret = 0; - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SRaftId* pDestId = &(pSyncNode->peersId[i]); - - // next index - SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); - - // pre index, pre term - SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); - SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); - if (preLogTerm == SYNC_TERM_INVALID) { - SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; - // SyncIndex newNextIndex = nextIndex + 1; - - syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); - syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); - sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 - ", match-index:%d, raftid:%" PRId64, - pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); - - return -1; - } - - // prepare entry - SyncAppendEntries* pMsg = NULL; - - SSyncRaftEntry* pEntry; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); - - if (code == 0) { - ASSERT(pEntry != NULL); - - pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); - ASSERT(pMsg != NULL); - - // add pEntry into msg - uint32_t len; - char* serialized = syncEntrySerialize(pEntry, &len); - ASSERT(len == pEntry->bytes); - memcpy(pMsg->data, serialized, len); - - taosMemoryFree(serialized); - syncEntryDestory(pEntry); - - } else { - if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { - // no entry in log - pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); - ASSERT(pMsg != NULL); - - } else { - syncNodeLog3("", pSyncNode); - ASSERT(0); - } - } - - // prepare msg - ASSERT(pMsg != NULL); - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = *pDestId; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - pMsg->prevLogIndex = preLogIndex; - pMsg->prevLogTerm = preLogTerm; - pMsg->commitIndex = pSyncNode->commitIndex; - pMsg->privateTerm = 0; - // pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId); - - // send msg - syncNodeAppendEntries(pSyncNode, pDestId, pMsg); - syncAppendEntriesDestroy(pMsg); - } - - return ret; -} - -int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer) { - // start replicate - int32_t ret = 0; - - switch (pSyncNode->pRaftCfg->snapshotStrategy) { - case SYNC_STRATEGY_NO_SNAPSHOT: - ret = syncNodeAppendEntriesPeers(pSyncNode); - break; - - case SYNC_STRATEGY_STANDARD_SNAPSHOT: - ret = syncNodeAppendEntriesPeersSnapshot(pSyncNode); - break; - - case SYNC_STRATEGY_WAL_FIRST: - ret = syncNodeAppendEntriesPeersSnapshot2(pSyncNode); - break; - - default: - ret = syncNodeAppendEntriesPeers(pSyncNode); - break; - } - - // start delay - int64_t timeNow = taosGetTimestampMs(); - int64_t startDelay = timeNow - pSyncNode->startTime; - - // replicate delay - int64_t replicateDelay = timeNow - pSyncNode->lastReplicateTime; - pSyncNode->lastReplicateTime = timeNow; - - if (ret > 0 && isTimer && startDelay > SYNC_SPEED_UP_AFTER_MS) { - // speed up replicate - int32_t ms = - pSyncNode->heartbeatTimerMS < SYNC_SPEED_UP_HB_TIMER ? pSyncNode->heartbeatTimerMS : SYNC_SPEED_UP_HB_TIMER; - syncNodeRestartNowHeartbeatTimerMS(pSyncNode, ms); - -#if 0 - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "replicate speed up"); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); -#endif - - } else { - syncNodeRestartHeartbeatTimer(pSyncNode); - -#if 0 - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "replicate slow down"); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); -#endif - } - - return ret; -} int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) { // next index @@ -604,16 +175,6 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c return ret; } -int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, - const SyncAppendEntriesBatch* pMsg) { - syncLogSendAppendEntriesBatch(pSyncNode, pMsg, ""); - - SRpcMsg rpcMsg; - syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg); - syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); - return 0; -} - int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) { int32_t ret = 0; syncLogSendHeartbeat(pSyncNode, pMsg, ""); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 7df025b5f8..3122747c9b 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -42,65 +42,6 @@ // m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = 0; - - // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped"); - return -1; - } - - bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || - ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && - (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore))); - - // maybe update term - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); -#if 0 - if (logOK) { - syncNodeUpdateTerm(ths, pMsg->term); - } else { - syncNodeUpdateTermWithoutStepDown(ths, pMsg->term); - } -#endif - } - ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - - bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && - ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); - if (grant) { - // maybe has already voted for pMsg->srcId - // vote again, no harm - raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); - - // forbid elect for this round - syncNodeResetElectTimer(ths); - } - - // send msg - SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->voteGranted = grant; - - // trace log - do { - char logBuf[32]; - snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted); - syncLogRecvRequestVote(ths, pMsg, logBuf); - syncLogSendRequestVoteReply(ths, pReply, ""); - } while (0); - - SRpcMsg rpcMsg; - syncRequestVoteReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncRequestVoteReplyDestroy(pReply); - - return ret; -} static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) { SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode); @@ -157,64 +98,6 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM return false; } -int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = 0; - - // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped"); - return -1; - } - - bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); - - // maybe update term - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); -#if 0 - if (logOK) { - syncNodeUpdateTerm(ths, pMsg->term); - } else { - syncNodeUpdateTermWithoutStepDown(ths, pMsg->term); - } -#endif - } - ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - - bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && - ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); - if (grant) { - // maybe has already voted for pMsg->srcId - // vote again, no harm - raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); - - // forbid elect for this round - syncNodeResetElectTimer(ths); - } - - // send msg - SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->voteGranted = grant; - - // trace log - do { - char logBuf[32]; - snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted); - syncLogRecvRequestVote(ths, pMsg, logBuf); - syncLogSendRequestVoteReply(ths, pReply, ""); - } while (0); - - SRpcMsg rpcMsg; - syncRequestVoteReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncRequestVoteReplyDestroy(pReply); - - return 0; -} - int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { int32_t ret = 0; diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 74db7a3e51..3166e17cde 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -37,126 +37,6 @@ // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = 0; - - // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped"); - return -1; - } - - // drop stale response - if (pMsg->term < ths->pRaftStore->currentTerm) { - syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); - return -1; - } - - // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); - // no need this code, because if I receive reply.term, then I must have sent for that term. - // if (pMsg->term > ths->pRaftStore->currentTerm) { - // syncNodeUpdateTerm(ths, pMsg->term); - // } - - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncLogRecvRequestVoteReply(ths, pMsg, "error term"); - return -1; - } - - syncLogRecvRequestVoteReply(ths, pMsg, ""); - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - - // This tallies votes even when the current state is not Candidate, - // but they won't be looked at, so it doesn't matter. - if (ths->state == TAOS_SYNC_STATE_CANDIDATE) { - votesRespondAdd(ths->pVotesRespond, pMsg); - if (pMsg->voteGranted) { - // add vote - voteGrantedVote(ths->pVotesGranted, pMsg); - - // maybe to leader - if (voteGrantedMajority(ths->pVotesGranted)) { - if (!ths->pVotesGranted->toLeader) { - syncNodeCandidate2Leader(ths); - - // prevent to leader again! - ths->pVotesGranted->toLeader = true; - } - } - } else { - ; - // do nothing - // UNCHANGED <> - } - } - - return 0; -} - -int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = 0; - - // if already drop replica, do not process - if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped"); - return -1; - } - - // drop stale response - if (pMsg->term < ths->pRaftStore->currentTerm) { - syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); - return -1; - } - - // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); - // no need this code, because if I receive reply.term, then I must have sent for that term. - // if (pMsg->term > ths->pRaftStore->currentTerm) { - // syncNodeUpdateTerm(ths, pMsg->term); - // } - - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncLogRecvRequestVoteReply(ths, pMsg, "error term"); - return -1; - } - - syncLogRecvRequestVoteReply(ths, pMsg, ""); - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - - // This tallies votes even when the current state is not Candidate, - // but they won't be looked at, so it doesn't matter. - if (ths->state == TAOS_SYNC_STATE_CANDIDATE) { - if (ths->pVotesRespond->term != pMsg->term) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "vote respond error vote-respond-mgr term:%lu, msg term:lu", - ths->pVotesRespond->term, pMsg->term); - syncNodeErrorLog(ths, logBuf); - return -1; - } - - votesRespondAdd(ths->pVotesRespond, pMsg); - if (pMsg->voteGranted) { - // add vote - voteGrantedVote(ths->pVotesGranted, pMsg); - - // maybe to leader - if (voteGrantedMajority(ths->pVotesGranted)) { - if (!ths->pVotesGranted->toLeader) { - syncNodeCandidate2Leader(ths); - - // prevent to leader again! - ths->pVotesGranted->toLeader = true; - } - } - } else { - ; - // do nothing - // UNCHANGED <> - } - } - - return 0; -} - int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t ret = 0; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index c83e0082c3..6c4a03e10e 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -379,14 +379,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -674,7 +674,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON * pTmp = pFromId; + cJSON *pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -707,14 +707,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128]; @@ -740,7 +740,7 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) // condition 3, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close // condition 4, got data, update ack // -int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { +int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // get receiver SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; bool needRsp = false; @@ -853,7 +853,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // condition 2 sender receives ack, set seq = ack + 1, send msg from seq // condition 3 sender receives error msg, just print error log // -int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { +int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) { sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId); @@ -920,13 +920,3 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { return 0; } - -int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg) { - int32_t code = syncNodeOnSnapshotSendCb(ths, pMsg); - return code; -} - -int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg) { - int32_t code = syncNodeOnSnapshotRspCb(ths, pMsg); - return code; -} \ No newline at end of file diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index c523fbc1c3..95677e592b 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -237,8 +237,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncSnapshotSend = pSyncNode->FpOnSnapshotSend; - gSyncIO->FpOnSyncSnapshotRsp = pSyncNode->FpOnSnapshotRsp; + gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot; + gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply; gSyncIO->pSyncNode = pSyncNode; syncNodeRelease(pSyncNode); diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 454c823c6a..14bda4d5a6 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -181,8 +181,11 @@ int main(int argc, char **argv) { SSyncNode *pSyncNode = syncNodeInit(); assert(pSyncNode != NULL); SSyncRaftEntry *pEntry = pMsg4; - pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry); - SSyncRaftEntry *pEntry2 = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, pEntry->index); + pSyncNode->pLogStore->syncLogAppendEntry(pSyncNode->pLogStore, pEntry); + + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, pEntry->index, &pEntry); + ASSERT(code == 0); + syncEntryLog2((char *)"==pEntry2==", pEntry2); // step5 diff --git a/source/libs/sync/test/syncLogStoreTest.cpp b/source/libs/sync/test/syncLogStoreTest.cpp index 9cb8194aa7..9ff0ed2089 100644 --- a/source/libs/sync/test/syncLogStoreTest.cpp +++ b/source/libs/sync/test/syncLogStoreTest.cpp @@ -52,7 +52,7 @@ void cleanup() { void logStoreTest() { pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); - assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_INVALID); + assert(pLogStore->syncLogLastIndex(pLogStore) == SYNC_INDEX_INVALID); logStoreLog2((char*)"logStoreTest", pLogStore); @@ -65,22 +65,20 @@ void logStoreTest() { pEntry->seqNum = 3; pEntry->isWeak = true; pEntry->term = 100 + i; - pEntry->index = pLogStore->getLastIndex(pLogStore) + 1; + pEntry->index = pLogStore->syncLogLastIndex(pLogStore) + 1; snprintf(pEntry->data, dataLen, "value%d", i); syncEntryLog2((char*)"==write entry== :", pEntry); - pLogStore->appendEntry(pLogStore, pEntry); + pLogStore->syncLogAppendEntry(pLogStore, pEntry); syncEntryDestory(pEntry); if (i == 0) { - assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_BEGIN); + assert(pLogStore->syncLogLastIndex(pLogStore) == SYNC_INDEX_BEGIN); } } logStoreLog2((char*)"after appendEntry", pLogStore); - - pLogStore->truncate(pLogStore, 3); + pLogStore->syncLogTruncate(pLogStore, 3); logStoreLog2((char*)"after truncate 3", pLogStore); - logStoreDestory(pLogStore); } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index e718d37376..1cdecfe5b3 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -266,14 +266,12 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - - gSyncIO->FpOnSyncSnapshotSend = pSyncNode->FpOnSnapshotSend; - gSyncIO->FpOnSyncSnapshotRsp = pSyncNode->FpOnSnapshotRsp; + gSyncIO->FpOnSyncSnapshot = pSyncNode->FpOnSnapshot; + gSyncIO->FpOnSyncSnapshotReply = pSyncNode->FpOnSnapshotReply; gSyncIO->pSyncNode = pSyncNode; syncNodeRelease(pSyncNode);