From b59028365a19f28ff14f4eec9dbc7d033acd9312 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 7 Mar 2022 14:18:46 +0800 Subject: [PATCH 1/4] sync refactor --- source/libs/sync/inc/syncEnv.h | 7 +- source/libs/sync/inc/syncInt.h | 14 ++-- source/libs/sync/inc/syncMessage.h | 3 +- source/libs/sync/inc/syncUtil.h | 23 +++--- source/libs/sync/src/syncEnv.c | 1 + source/libs/sync/src/syncIO.c | 1 + source/libs/sync/src/syncMain.c | 78 ++++++++++++++++----- source/libs/sync/src/syncMessage.c | 4 +- source/libs/sync/src/syncUtil.c | 7 ++ source/libs/sync/test/syncIndexTest.cpp | 16 +++-- source/libs/sync/test/syncRaftStoreTest.cpp | 22 +++--- source/libs/sync/test/syncTest.cpp | 51 +++----------- 12 files changed, 122 insertions(+), 105 deletions(-) diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 44d0efb033..2356651a4c 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -30,12 +30,17 @@ extern "C" { #define TIMER_MAX_MS 0x7FFFFFFF #define PING_TIMER_MS 1000 +#define ELECT_TIMER_MS_MIN 150 +#define ELECT_TIMER_MS_MAX 300 +#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) +#define HEARTBEAT_TIMER_MS 30 + +#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) typedef struct SSyncEnv { tmr_h pEnvTickTimer; tmr_h pTimerManager; char name[128]; - } SSyncEnv; extern SSyncEnv* gSyncEnv; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0ee33f0912..d4703b3e74 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -154,9 +154,8 @@ typedef struct SSyncNode { SyncIndex commitIndex; // timer - tmr_h pPingTimer; - int32_t pingTimerMS; - // uint8_t pingTimerEnable; + tmr_h pPingTimer; + int32_t pingTimerMS; uint64_t pingTimerLogicClock; uint64_t pingTimerLogicClockUser; TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp @@ -164,13 +163,15 @@ typedef struct SSyncNode { tmr_h pElectTimer; int32_t electTimerMS; - uint8_t electTimerEnable; + uint64_t electTimerLogicClock; + uint64_t electTimerLogicClockUser; TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp uint64_t electTimerCounter; tmr_h pHeartbeatTimer; int32_t heartbeatTimerMS; - uint8_t heartbeatTimerEnable; + uint64_t heartbeatTimerLogicClock; + uint64_t heartbeatTimerLogicClockUser; TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp uint64_t heartbeatTimerCounter; @@ -194,8 +195,9 @@ void syncNodePingSelf(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); -int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); +int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 95135d161b..3405f0f6cc 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -59,6 +59,7 @@ typedef struct SyncTimeout { uint32_t msgType; ESyncTimeoutType timeoutType; uint64_t logicClock; + int32_t timerMS; void* data; } SyncTimeout; @@ -69,7 +70,7 @@ void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); cJSON* syncTimeout2Json(const SyncTimeout* pMsg); -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data); +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data); // --------------------------------------------- typedef struct SyncPing { diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index e1078d5738..9b481e82d9 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -28,28 +28,23 @@ extern "C" { #include "taosdef.h" // ---- encode / decode - uint64_t syncUtilAddr2U64(const char* host, uint16_t port); - -void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port); - -void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); - -void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); - -void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); - -bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); +void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port); +void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); +void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); +void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); +bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); // ---- SSyncBuffer ---- void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); - void syncUtilbufDestroy(SSyncBuffer* syncBuf); - void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest); - void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); +// ---- misc ---- +int32_t syncUtilRand(int32_t max); +int32_t syncUtilElectRandomMS(); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index a9cf035650..6917df1597 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -28,6 +28,7 @@ static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer); int32_t syncEnvStart() { int32_t ret; + srand(time(NULL)); gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); assert(gSyncEnv != NULL); ret = doSyncEnvStart(gSyncEnv); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 3250d4c303..d37c821a24 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -44,6 +44,7 @@ int32_t syncIOStart(char *host, uint16_t port) { gSyncIO = syncIOCreate(host, port); assert(gSyncIO != NULL); + srand(time(NULL)); int32_t ret = syncIOStartInternal(gSyncIO); assert(ret == 0); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ed7ab23b0c..ad3639b32d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -98,6 +98,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); + // init ping timer pSyncNode->pPingTimer = NULL; pSyncNode->pingTimerMS = PING_TIMER_MS; atomic_store_64(&pSyncNode->pingTimerLogicClock, 0); @@ -105,6 +106,22 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->FpPingTimer = syncNodeEqPingTimer; pSyncNode->pingTimerCounter = 0; + // init elect timer + pSyncNode->pElectTimer = NULL; + pSyncNode->electTimerMS = syncUtilElectRandomMS(); + atomic_store_64(&pSyncNode->electTimerLogicClock, 0); + atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0); + pSyncNode->FpElectTimer = syncNodeEqElectTimer; + pSyncNode->electTimerCounter = 0; + + // init heartbeat timer + pSyncNode->pHeartbeatTimer = NULL; + pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS; + atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0); + atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0); + pSyncNode->FpHeartbeatTimer = syncNodeEqHeartbeatTimer; + pSyncNode->heartbeatTimerCounter = 0; + pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; @@ -157,7 +174,6 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); pSyncNode->pingTimerMS = PING_TIMER_MS; - if (pSyncNode->pPingTimer == NULL) { pSyncNode->pPingTimer = taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager); @@ -165,7 +181,6 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } - return 0; } @@ -175,7 +190,9 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) { +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { + pSyncNode->electTimerMS = ms; + atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); if (pSyncNode->pElectTimer == NULL) { pSyncNode->pElectTimer = taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager); @@ -183,18 +200,23 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) { taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pElectTimer); } - - atomic_store_8(&pSyncNode->electTimerEnable, 1); return 0; } int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { - atomic_store_8(&pSyncNode->electTimerEnable, 0); + atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1); pSyncNode->electTimerMS = TIMER_MAX_MS; return 0; } +int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { + syncNodeStopElectTimer(pSyncNode); + syncNodeStartElectTimer(pSyncNode, ms); + return 0; +} + int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { + atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); if (pSyncNode->pHeartbeatTimer == NULL) { pSyncNode->pHeartbeatTimer = taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager); @@ -202,13 +224,11 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); } - - atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1); return 0; } int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { - atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0); + atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); pSyncNode->heartbeatTimerMS = TIMER_MAX_MS; return 0; } @@ -320,16 +340,16 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { - // pSyncNode->pingTimerMS += 100; - - SyncTimeout* pSyncMsg = - syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), pSyncNode); - - SRpcMsg rpcMsg; + SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), + pSyncNode->pingTimerMS, pSyncNode); + SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); syncTimeoutDestroy(pSyncMsg); + // reset timer ms + // pSyncNode->pingTimerMS += 100; + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { @@ -338,18 +358,38 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } } -static void syncNodeEqElectTimer(void* param, void* tmrId) {} +static void syncNodeEqElectTimer(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) { + SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock), + pSyncNode->electTimerMS, pSyncNode); + + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncTimeoutDestroy(pSyncMsg); + + // reset timer ms + pSyncNode->electTimerMS = syncUtilElectRandomMS(); + + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } else { + sTrace("syncNodeEqElectTimer: electTimerLogicClock:%lu, electTimerLogicClockUser:%lu", + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); + } +} static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - pSyncNode->leaderCache.addr = 0; - pSyncNode->leaderCache.vgId = 0; + pSyncNode->leaderCache = EMPTY_RAFT_ID; } syncNodeStopHeartbeatTimer(pSyncNode); - syncNodeStartElectTimer(pSyncNode); + int32_t electMS = syncUtilElectRandomMS(); + syncNodeStartElectTimer(pSyncNode, electMS); } static void syncNodeBecomeLeader(SSyncNode* pSyncNode) { diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index d1b4a6a2c6..33e311b0fa 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -125,6 +125,7 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock); cJSON_AddStringToObject(pRoot, "logicClock", u64buf); + cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS); snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); cJSON_AddStringToObject(pRoot, "data", u64buf); @@ -133,10 +134,11 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { return pJson; } -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data) { +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data) { SyncTimeout* pMsg = syncTimeoutBuild(); pMsg->timeoutType = timeoutType; pMsg->logicClock = logicClock; + pMsg->timerMS = timerMS; pMsg->data = data; return pMsg; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 7b4d6ee366..c70e490025 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -17,6 +17,7 @@ #include #include #include +#include "syncEnv.h" // ---- encode / decode uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { @@ -91,3 +92,9 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { dest->data = malloc(dest->len); memcpy(dest->data, src->data, dest->len); } + +// ---- misc ---- + +int32_t syncUtilRand(int32_t max) { return rand() % max; } + +int32_t syncUtilElectRandomMS() { ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); } \ No newline at end of file diff --git a/source/libs/sync/test/syncIndexTest.cpp b/source/libs/sync/test/syncIndexTest.cpp index ece58fb9b0..1cf2847b5c 100644 --- a/source/libs/sync/test/syncIndexTest.cpp +++ b/source/libs/sync/test/syncIndexTest.cpp @@ -13,17 +13,21 @@ void print(SHashObj *pNextIndex) { } } +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + int main() { // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; - sTrace("sync log test: trace"); - sDebug("sync log test: debug"); - sInfo("sync log test: info"); - sWarn("sync log test: warn"); - sError("sync log test: error"); - sFatal("sync log test: fatal"); + logTest(); SRaftId me; SRaftId peer1; diff --git a/source/libs/sync/test/syncRaftStoreTest.cpp b/source/libs/sync/test/syncRaftStoreTest.cpp index 447dab0cbc..71c0138c8d 100644 --- a/source/libs/sync/test/syncRaftStoreTest.cpp +++ b/source/libs/sync/test/syncRaftStoreTest.cpp @@ -4,14 +4,13 @@ #include "syncIO.h" #include "syncInt.h" -void *pingFunc(void *param) { - SSyncIO *io = (SSyncIO *)param; - while (1) { - sDebug("io->ping"); - // io->ping(io); - sleep(1); - } - return NULL; +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); } int main() { @@ -19,12 +18,7 @@ int main() { tsAsyncLog = 0; sDebugFlag = 143 + 64; - sTrace("sync log test: trace"); - sDebug("sync log test: debug"); - sInfo("sync log test: info"); - sWarn("sync log test: warn"); - sError("sync log test: error"); - sFatal("sync log test: fatal"); + logTest(); SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); assert(pRaftStore != NULL); diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 1f9f4846cc..ec402ff57a 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -4,55 +4,20 @@ #include "syncInt.h" #include "syncRaftStore.h" -void *pingFunc(void *param) { - SSyncIO *io = (SSyncIO *)param; - while (1) { - sDebug("io->ping"); - // io->ping(io); - sleep(1); - } - return NULL; +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); } int main() { // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; + logTest(); - sTrace("sync log test: trace"); - sDebug("sync log test: debug"); - sInfo("sync log test: info"); - sWarn("sync log test: warn"); - sError("sync log test: error"); - sFatal("sync log test: fatal"); - - SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); - // assert(pRaftStore != NULL); - - // raftStorePrint(pRaftStore); - - // pRaftStore->currentTerm = 100; - // pRaftStore->voteFor.addr = 200; - // pRaftStore->voteFor.vgId = 300; - - // raftStorePrint(pRaftStore); - - // raftStorePersist(pRaftStore); - - // sDebug("sync test"); - - // SSyncIO *syncIO = syncIOCreate(); - // assert(syncIO != NULL); - - // syncIO->start(syncIO); - - // sleep(2); - - // pthread_t tid; - // pthread_create(&tid, NULL, pingFunc, syncIO); - - // while (1) { - // sleep(1); - // } return 0; } From a0319ae49bdf86275adf1e150c71ea5dc1ba8958 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 7 Mar 2022 14:42:04 +0800 Subject: [PATCH 2/4] sync refactor --- source/libs/sync/inc/syncAppendEntries.h | 4 -- source/libs/sync/inc/syncAppendEntriesReply.h | 2 - source/libs/sync/inc/syncElection.h | 1 - source/libs/sync/inc/syncInt.h | 2 + source/libs/sync/inc/syncOnMessage.h | 2 - source/libs/sync/inc/syncTimeout.h | 2 +- source/libs/sync/src/syncElection.c | 7 ++-- source/libs/sync/src/syncMain.c | 39 +++---------------- source/libs/sync/src/syncMessage.c | 2 - source/libs/sync/src/syncTimeout.c | 37 +++++++++++++++++- 10 files changed, 48 insertions(+), 50 deletions(-) diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index b7c1c051cc..29358fcf90 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -28,10 +28,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg); - -void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 22f8eb464f..af6453d839 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -28,8 +28,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index ed5b86fa98..7299a3fe2e 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -27,7 +27,6 @@ extern "C" { #include "taosdef.h" void syncNodeElect(SSyncNode* pSyncNode); -void syncNodeRequestVotePeers(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d4703b3e74..e015bee530 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -191,6 +191,8 @@ void syncNodeClose(SSyncNode* pSyncNode); void syncNodePingAll(SSyncNode* pSyncNode); void syncNodePingPeers(SSyncNode* pSyncNode); void syncNodePingSelf(SSyncNode* pSyncNode); +void syncNodeRequestVotePeers(SSyncNode* pSyncNode); +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); diff --git a/source/libs/sync/inc/syncOnMessage.h b/source/libs/sync/inc/syncOnMessage.h index 07c44b6199..8eae4fed4d 100644 --- a/source/libs/sync/inc/syncOnMessage.h +++ b/source/libs/sync/inc/syncOnMessage.h @@ -26,8 +26,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void onMessage(SRaft *pRaft, void *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index d9d6a17939..3dda1f212c 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -28,7 +28,7 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void onTimeout(SRaft *pRaft, void *pMsg); +int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 433201b849..6d12af02c0 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -15,6 +15,7 @@ #include "syncElection.h" -void syncNodeElect(SSyncNode* pSyncNode) {} - -void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} \ No newline at end of file +void syncNodeElect(SSyncNode* pSyncNode) { + // start election + syncNodeRequestVotePeers(pSyncNode); +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ad3639b32d..c2ceb515b0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -18,6 +18,7 @@ #include "syncEnv.h" #include "syncInt.h" #include "syncRaft.h" +#include "syncTimeout.h" #include "syncUtil.h" static int32_t tsNodeRefId = -1; @@ -33,7 +34,6 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); static void syncNodeBecomeFollower(SSyncNode* pSyncNode); static void syncNodeBecomeLeader(SSyncNode* pSyncNode); @@ -41,9 +41,6 @@ static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); static void syncNodeCandidate2Leader(SSyncNode* pSyncNode); static void syncNodeLeader2Follower(SSyncNode* pSyncNode); static void syncNodeCandidate2Follower(SSyncNode* pSyncNode); - -void syncNodeRequestVotePeers(SSyncNode* pSyncNode); -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); // --------------------------------- int32_t syncInit() { @@ -171,6 +168,10 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { syncPingDestroy(pMsg); } +void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} + +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} + int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); pSyncNode->pingTimerMS = PING_TIMER_MS; @@ -311,32 +312,6 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { return ret; } -static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { - int32_t ret = 0; - sTrace("<-- syncNodeOnTimeoutCb -->"); - - { - cJSON* pJson = syncTimeout2Json(pMsg); - char* serialized = cJSON_Print(pJson); - sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { - if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { - ++(ths->pingTimerCounter); - syncNodePingAll(ths); - } - - } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { - } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { - } else { - } - - return ret; -} - static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { @@ -415,7 +390,3 @@ static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} - -void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} - -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 33e311b0fa..14f139a803 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -18,8 +18,6 @@ #include "syncUtil.h" #include "tcoding.h" -void onMessage(SRaft* pRaft, void* pMsg) {} - // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* pRoot; diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index e27df55d07..589921dddc 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -14,5 +14,40 @@ */ #include "syncTimeout.h" +#include "syncElection.h" -void onTimeout(SRaft *pRaft, void *pMsg) {} \ No newline at end of file +int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { + int32_t ret = 0; + sTrace("<-- syncNodeOnTimeoutCb -->"); + + { + cJSON* pJson = syncTimeout2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { + if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { + ++(ths->pingTimerCounter); + syncNodePingAll(ths); + } + + } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { + if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) { + ++(ths->electTimerCounter); + syncNodeElect(ths); + } + + } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { + if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) { + ++(ths->heartbeatTimerCounter); + syncNodeAppendEntriesPeers(ths); + } + } else { + sTrace("unknown timeoutType:%d", pMsg->timeoutType); + } + + return ret; +} \ No newline at end of file From 25af19bd2428c4c55022b98668d4ef5a092c3463 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 7 Mar 2022 16:06:07 +0800 Subject: [PATCH 3/4] sync refactor --- source/libs/sync/inc/syncAppendEntries.h | 2 + source/libs/sync/inc/syncAppendEntriesReply.h | 2 + source/libs/sync/inc/syncElection.h | 6 +- source/libs/sync/inc/syncInt.h | 21 ++-- source/libs/sync/inc/syncReplication.h | 4 +- source/libs/sync/inc/syncRequestVote.h | 2 + source/libs/sync/inc/syncRequestVoteReply.h | 2 + source/libs/sync/src/syncAppendEntries.c | 27 ------ source/libs/sync/src/syncElection.c | 26 ++++- source/libs/sync/src/syncMain.c | 96 +++++++++---------- source/libs/sync/src/syncReplication.c | 37 ++++++- source/libs/sync/src/syncRequestVote.c | 14 --- source/libs/sync/src/syncTimeout.c | 1 + 13 files changed, 131 insertions(+), 109 deletions(-) diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 29358fcf90..0156e695a3 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -28,6 +28,8 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index af6453d839..7b80172e8d 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -28,6 +28,8 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 7299a3fe2e..abacfb8093 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -26,7 +26,11 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" -void syncNodeElect(SSyncNode* pSyncNode); +int32_t syncNodeElect(SSyncNode* pSyncNode); + +int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); + +int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e015bee530..bce03059a0 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -188,29 +188,22 @@ typedef struct SSyncNode { SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeClose(SSyncNode* pSyncNode); -void syncNodePingAll(SSyncNode* pSyncNode); -void syncNodePingPeers(SSyncNode* pSyncNode); -void syncNodePingSelf(SSyncNode* pSyncNode); -void syncNodeRequestVotePeers(SSyncNode* pSyncNode); -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); + +int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); +int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); +int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); +void syncNodePingAll(SSyncNode* pSyncNode); +void syncNodePingPeers(SSyncNode* pSyncNode); +void syncNodePingSelf(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); - int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); - int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); -int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); -int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); -int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index a9875d5cae..72ce986a7e 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -26,7 +26,9 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); +int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); + +int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index 4fb2193010..da821c3ebd 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -28,6 +28,8 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 21fb61f85f..82f132f80b 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -28,6 +28,8 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index f3045c3180..243a566ff0 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,33 +15,6 @@ #include "syncAppendEntries.h" -int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { - // TLA+ Spec - // AppendEntries(i, j) == - // /\ i /= j - // /\ state[i] = Leader - // /\ LET prevLogIndex == nextIndex[i][j] - 1 - // prevLogTerm == IF prevLogIndex > 0 THEN - // log[i][prevLogIndex].term - // ELSE - // 0 - // \* Send up to 1 entry, constrained by the end of the log. - // lastEntry == Min({Len(log[i]), nextIndex[i][j]}) - // entries == SubSeq(log[i], nextIndex[i][j], lastEntry) - // IN Send([mtype |-> AppendEntriesRequest, - // mterm |-> currentTerm[i], - // mprevLogIndex |-> prevLogIndex, - // mprevLogTerm |-> prevLogTerm, - // mentries |-> entries, - // \* mlog is used as a history variable for the proof. - // \* It would not exist in a real implementation. - // mlog |-> log[i], - // mcommitIndex |-> Min({commitIndex[i], lastEntry}), - // msource |-> i, - // mdest |-> j]) - // /\ UNCHANGED <> -} - int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 6d12af02c0..fe86d220cc 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -14,8 +14,32 @@ */ #include "syncElection.h" +#include "syncMessage.h" -void syncNodeElect(SSyncNode* pSyncNode) { +int32_t syncNodeElect(SSyncNode* pSyncNode) { // start election syncNodeRequestVotePeers(pSyncNode); } + +// TLA+ Spec +// RequestVote(i, j) == +// /\ state[i] = Candidate +// /\ j \notin votesResponded[i] +// /\ Send([mtype |-> RequestVoteRequest, +// mterm |-> currentTerm[i], +// mlastLogTerm |-> LastTerm(log[i]), +// mlastLogIndex |-> Len(log[i]), +// msource |-> i, +// mdest |-> j]) +// /\ UNCHANGED <> +int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} + +int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { + sTrace("syncNodeRequestVote pSyncNode:%p ", pSyncNode); + int32_t ret = 0; + + SRpcMsg rpcMsg; + syncRequestVote2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + return ret; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c2ceb515b0..cac2b6953d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -15,23 +15,23 @@ #include #include "sync.h" +#include "syncAppendEntries.h" +#include "syncAppendEntriesReply.h" #include "syncEnv.h" #include "syncInt.h" #include "syncRaft.h" +#include "syncRequestVote.h" +#include "syncRequestVoteReply.h" #include "syncTimeout.h" #include "syncUtil.h" static int32_t tsNodeRefId = -1; // ------ local funciton --------- -static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); -static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); - static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); -static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); @@ -135,6 +135,48 @@ void syncNodeClose(SSyncNode* pSyncNode) { free(pSyncNode); } +int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + syncUtilraftId2EpSet(destRaftId, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + syncUtilnodeInfo2EpSet(nodeInfo, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { + sTrace("syncNodePing pSyncNode:%p ", pSyncNode); + int32_t ret = 0; + + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodePing pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + { + SyncPing* pMsg2 = rpcMsg.pCont; + cJSON* pJson = syncPing2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodePing rpcMsg.pCont:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + return ret; +} + void syncNodePingAll(SSyncNode* pSyncNode) { sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode); int32_t ret = 0; @@ -168,10 +210,6 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { syncPingDestroy(pMsg); } -void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} - -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} - int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); pSyncNode->pingTimerMS = PING_TIMER_MS; @@ -235,48 +273,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { } // ------ local funciton --------- -static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { - sTrace("syncNodePing pSyncNode:%p ", pSyncNode); - int32_t ret = 0; - - SRpcMsg rpcMsg; - syncPing2RpcMsg(pMsg, &rpcMsg); - syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); - - { - cJSON* pJson = syncPing2Json(pMsg); - char* serialized = cJSON_Print(pJson); - sTrace("syncNodePing pMsg:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - { - SyncPing* pMsg2 = rpcMsg.pCont; - cJSON* pJson = syncPing2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - sTrace("syncNodePing rpcMsg.pCont:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - return ret; -} - -static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { - SEpSet epSet; - syncUtilraftId2EpSet(destRaftId, &epSet); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); - return 0; -} - -static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { - SEpSet epSet; - syncUtilnodeInfo2EpSet(nodeInfo, &epSet); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); - return 0; -} - static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { int32_t ret = 0; sTrace("<-- syncNodeOnPingCb -->"); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index e3e551fd2b..878a870677 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -14,5 +14,40 @@ */ #include "syncReplication.h" +#include "syncMessage.h" -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} \ No newline at end of file +// TLA+ Spec +// AppendEntries(i, j) == +// /\ i /= j +// /\ state[i] = Leader +// /\ LET prevLogIndex == nextIndex[i][j] - 1 +// prevLogTerm == IF prevLogIndex > 0 THEN +// log[i][prevLogIndex].term +// ELSE +// 0 +// \* Send up to 1 entry, constrained by the end of the log. +// lastEntry == Min({Len(log[i]), nextIndex[i][j]}) +// entries == SubSeq(log[i], nextIndex[i][j], lastEntry) +// IN Send([mtype |-> AppendEntriesRequest, +// mterm |-> currentTerm[i], +// mprevLogIndex |-> prevLogIndex, +// mprevLogTerm |-> prevLogTerm, +// mentries |-> entries, +// \* mlog is used as a history variable for the proof. +// \* It would not exist in a real implementation. +// mlog |-> log[i], +// mcommitIndex |-> Min({commitIndex[i], lastEntry}), +// msource |-> i, +// mdest |-> j]) +// /\ UNCHANGED <> +int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} + +int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { + sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode); + int32_t ret = 0; + + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + return ret; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 38eaea26ac..0edd6d2ce4 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -15,20 +15,6 @@ #include "syncRequestVote.h" -int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { - // TLA+ Spec - // RequestVote(i, j) == - // /\ state[i] = Candidate - // /\ j \notin votesResponded[i] - // /\ Send([mtype |-> RequestVoteRequest, - // mterm |-> currentTerm[i], - // mlastLogTerm |-> LastTerm(log[i]), - // mlastLogIndex |-> Len(log[i]), - // msource |-> i, - // mdest |-> j]) - // /\ UNCHANGED <> -} - int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { // TLA+ Spec // HandleRequestVoteRequest(i, j, m) == diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 589921dddc..df9b9d27b4 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -15,6 +15,7 @@ #include "syncTimeout.h" #include "syncElection.h" +#include "syncReplication.h" int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0; From fa8284af733301e3edd8a2856c464feabd5e46b7 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 7 Mar 2022 16:17:41 +0800 Subject: [PATCH 4/4] sync refactor --- source/libs/sync/inc/syncEnv.h | 7 +- source/libs/sync/inc/syncRaftStore.h | 24 +--- source/libs/sync/src/syncAppendEntries.c | 133 +++++++++--------- source/libs/sync/src/syncAppendEntriesReply.c | 27 ++-- source/libs/sync/src/syncRaftStore.c | 123 +--------------- source/libs/sync/src/syncRequestVote.c | 45 +++--- source/libs/sync/src/syncRequestVoteReply.c | 35 +++-- 7 files changed, 131 insertions(+), 263 deletions(-) diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 2356651a4c..9fbea03265 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -46,12 +46,9 @@ typedef struct SSyncEnv { extern SSyncEnv* gSyncEnv; int32_t syncEnvStart(); - int32_t syncEnvStop(); - -tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param); - -void syncEnvStopTimer(tmr_h* pTimer); +tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param); +void syncEnvStopTimer(tmr_h* pTimer); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index c480486ff0..591a5b9963 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -32,28 +32,18 @@ extern "C" { #define RAFT_STORE_PATH_LEN 128 typedef struct SRaftStore { - SyncTerm currentTerm; - SRaftId voteFor; - // FileFd fd; + SyncTerm currentTerm; + SRaftId voteFor; TdFilePtr pFile; char path[RAFT_STORE_PATH_LEN]; } SRaftStore; SRaftStore *raftStoreOpen(const char *path); - -static int32_t raftStoreInit(SRaftStore *pRaftStore); - -int32_t raftStoreClose(SRaftStore *pRaftStore); - -int32_t raftStorePersist(SRaftStore *pRaftStore); - -static bool raftStoreFileExist(char *path); - -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); - -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); - -void raftStorePrint(SRaftStore *pRaftStore); +int32_t raftStoreClose(SRaftStore *pRaftStore); +int32_t raftStorePersist(SRaftStore *pRaftStore); +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); +void raftStorePrint(SRaftStore *pRaftStore); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 243a566ff0..ba10234a1d 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,70 +15,69 @@ #include "syncAppendEntries.h" -int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { - // TLA+ Spec - // HandleAppendEntriesRequest(i, j, m) == - // LET logOk == \/ m.mprevLogIndex = 0 - // \/ /\ m.mprevLogIndex > 0 - // /\ m.mprevLogIndex <= Len(log[i]) - // /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term - // IN /\ m.mterm <= currentTerm[i] - // /\ \/ /\ \* reject request - // \/ m.mterm < currentTerm[i] - // \/ /\ m.mterm = currentTerm[i] - // /\ state[i] = Follower - // /\ \lnot logOk - // /\ Reply([mtype |-> AppendEntriesResponse, - // mterm |-> currentTerm[i], - // msuccess |-> FALSE, - // mmatchIndex |-> 0, - // msource |-> i, - // mdest |-> j], - // m) - // /\ UNCHANGED <> - // \/ \* return to follower state - // /\ m.mterm = currentTerm[i] - // /\ state[i] = Candidate - // /\ state' = [state EXCEPT ![i] = Follower] - // /\ UNCHANGED <> - // \/ \* accept request - // /\ m.mterm = currentTerm[i] - // /\ state[i] = Follower - // /\ logOk - // /\ LET index == m.mprevLogIndex + 1 - // IN \/ \* already done with request - // /\ \/ m.mentries = << >> - // \/ /\ m.mentries /= << >> - // /\ Len(log[i]) >= index - // /\ log[i][index].term = m.mentries[1].term - // \* This could make our commitIndex decrease (for - // \* example if we process an old, duplicated request), - // \* but that doesn't really affect anything. - // /\ commitIndex' = [commitIndex EXCEPT ![i] = - // m.mcommitIndex] - // /\ Reply([mtype |-> AppendEntriesResponse, - // mterm |-> currentTerm[i], - // msuccess |-> TRUE, - // mmatchIndex |-> m.mprevLogIndex + - // Len(m.mentries), - // msource |-> i, - // mdest |-> j], - // m) - // /\ UNCHANGED <> - // \/ \* conflict: remove 1 entry - // /\ m.mentries /= << >> - // /\ Len(log[i]) >= index - // /\ log[i][index].term /= m.mentries[1].term - // /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> - // log[i][index2]] - // IN log' = [log EXCEPT ![i] = new] - // /\ UNCHANGED <> - // \/ \* no conflict: append entry - // /\ m.mentries /= << >> - // /\ Len(log[i]) = m.mprevLogIndex - // /\ log' = [log EXCEPT ![i] = - // Append(log[i], m.mentries[1])] - // /\ UNCHANGED <> - // /\ UNCHANGED <> - // -} +// TLA+ Spec +// HandleAppendEntriesRequest(i, j, m) == +// LET logOk == \/ m.mprevLogIndex = 0 +// \/ /\ m.mprevLogIndex > 0 +// /\ m.mprevLogIndex <= Len(log[i]) +// /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term +// IN /\ m.mterm <= currentTerm[i] +// /\ \/ /\ \* reject request +// \/ m.mterm < currentTerm[i] +// \/ /\ m.mterm = currentTerm[i] +// /\ state[i] = Follower +// /\ \lnot logOk +// /\ Reply([mtype |-> AppendEntriesResponse, +// mterm |-> currentTerm[i], +// msuccess |-> FALSE, +// mmatchIndex |-> 0, +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +// \/ \* return to follower state +// /\ m.mterm = currentTerm[i] +// /\ state[i] = Candidate +// /\ state' = [state EXCEPT ![i] = Follower] +// /\ UNCHANGED <> +// \/ \* accept request +// /\ m.mterm = currentTerm[i] +// /\ state[i] = Follower +// /\ logOk +// /\ LET index == m.mprevLogIndex + 1 +// IN \/ \* already done with request +// /\ \/ m.mentries = << >> +// \/ /\ m.mentries /= << >> +// /\ Len(log[i]) >= index +// /\ log[i][index].term = m.mentries[1].term +// \* This could make our commitIndex decrease (for +// \* example if we process an old, duplicated request), +// \* but that doesn't really affect anything. +// /\ commitIndex' = [commitIndex EXCEPT ![i] = +// m.mcommitIndex] +// /\ Reply([mtype |-> AppendEntriesResponse, +// mterm |-> currentTerm[i], +// msuccess |-> TRUE, +// mmatchIndex |-> m.mprevLogIndex + +// Len(m.mentries), +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +// \/ \* conflict: remove 1 entry +// /\ m.mentries /= << >> +// /\ Len(log[i]) >= index +// /\ log[i][index].term /= m.mentries[1].term +// /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> +// log[i][index2]] +// IN log' = [log EXCEPT ![i] = new] +// /\ UNCHANGED <> +// \/ \* no conflict: append entry +// /\ m.mentries /= << >> +// /\ Len(log[i]) = m.mprevLogIndex +// /\ log' = [log EXCEPT ![i] = +// Append(log[i], m.mentries[1])] +// /\ UNCHANGED <> +// /\ UNCHANGED <> +// +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {} diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 81c9ea233b..23df8a539c 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -15,17 +15,16 @@ #include "syncAppendEntriesReply.h" -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - // TLA+ Spec - // HandleAppendEntriesResponse(i, j, m) == - // /\ m.mterm = currentTerm[i] - // /\ \/ /\ m.msuccess \* successful - // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] - // /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] - // \/ /\ \lnot m.msuccess \* not successful - // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = - // Max({nextIndex[i][j] - 1, 1})] - // /\ UNCHANGED <> - // /\ Discard(m) - // /\ UNCHANGED <> -} +// TLA+ Spec +// HandleAppendEntriesResponse(i, j, m) == +// /\ m.mterm = currentTerm[i] +// /\ \/ /\ m.msuccess \* successful +// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] +// /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] +// \/ /\ \lnot m.msuccess \* not successful +// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = +// Max({nextIndex[i][j] - 1, 1})] +// /\ UNCHANGED <> +// /\ Discard(m) +// /\ UNCHANGED <> +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {} diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 59c85c38de..7154a21bd1 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -16,8 +16,11 @@ #include "syncRaftStore.h" #include "cJSON.h" -// to complie success: FileIO interface is modified +// private function +static int32_t raftStoreInit(SRaftStore *pRaftStore); +static bool raftStoreFileExist(char *path); +// public function SRaftStore *raftStoreOpen(const char *path) { int32_t ret; @@ -137,121 +140,3 @@ void raftStorePrint(SRaftStore *pRaftStore) { raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); printf("%s\n", storeBuf); } - -#if 0 - -SRaftStore *raftStoreOpen(const char *path) { - int32_t ret; - - SRaftStore *pRaftStore = malloc(sizeof(SRaftStore)); - if (pRaftStore == NULL) { - sError("raftStoreOpen malloc error"); - return NULL; - } - memset(pRaftStore, 0, sizeof(*pRaftStore)); - snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path); - - char storeBuf[RAFT_STORE_BLOCK_SIZE]; - memset(storeBuf, 0, sizeof(storeBuf)); - - if (!raftStoreFileExist(pRaftStore->path)) { - ret = raftStoreInit(pRaftStore); - assert(ret == 0); - } - - pRaftStore->fd = taosOpenFileReadWrite(pRaftStore->path); - if (pRaftStore->fd < 0) { - return NULL; - } - - int len = taosReadFile(pRaftStore->fd, storeBuf, sizeof(storeBuf)); - assert(len == RAFT_STORE_BLOCK_SIZE); - - ret = raftStoreDeserialize(pRaftStore, storeBuf, len); - assert(ret == 0); - - return pRaftStore; -} - -static int32_t raftStoreInit(SRaftStore *pRaftStore) { - pRaftStore->fd = taosOpenFileCreateWrite(pRaftStore->path); - if (pRaftStore->fd < 0) { - return -1; - } - - pRaftStore->currentTerm = 0; - pRaftStore->voteFor.addr = 0; - pRaftStore->voteFor.vgId = 0; - - int32_t ret = raftStorePersist(pRaftStore); - assert(ret == 0); - - taosCloseFile(pRaftStore->fd); - return 0; -} - -int32_t raftStoreClose(SRaftStore *pRaftStore) { - taosCloseFile(pRaftStore->fd); - free(pRaftStore); - return 0; -} - -int32_t raftStorePersist(SRaftStore *pRaftStore) { - int32_t ret; - char storeBuf[RAFT_STORE_BLOCK_SIZE]; - - ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); - assert(ret == 0); - - taosLSeekFile(pRaftStore->fd, 0, SEEK_SET); - - ret = taosWriteFile(pRaftStore->fd, storeBuf, sizeof(storeBuf)); - assert(ret == RAFT_STORE_BLOCK_SIZE); - - fsync(pRaftStore->fd); - return 0; -} - -static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; } - -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { - cJSON *pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm); - cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr); - cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); - - char *serialized = cJSON_Print(pRoot); - int len2 = strlen(serialized); - assert(len2 < len); - memset(buf, 0, len); - snprintf(buf, len, "%s", serialized); - free(serialized); - - cJSON_Delete(pRoot); - return 0; -} - -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { - assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); - cJSON *pRoot = cJSON_Parse(buf); - - cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); - pRaftStore->currentTerm = pCurrentTerm->valueint; - - cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); - pRaftStore->voteFor.addr = pVoteForAddr->valueint; - - cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); - pRaftStore->voteFor.vgId = pVoteForVgid->valueint; - - cJSON_Delete(pRoot); - return 0; -} - -void raftStorePrint(SRaftStore *pRaftStore) { - char storeBuf[RAFT_STORE_BLOCK_SIZE]; - raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); - printf("%s\n", storeBuf); -} - -#endif diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 0edd6d2ce4..533043c512 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -15,26 +15,25 @@ #include "syncRequestVote.h" -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { - // TLA+ Spec - // HandleRequestVoteRequest(i, j, m) == - // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) - // \/ /\ m.mlastLogTerm = LastTerm(log[i]) - // /\ m.mlastLogIndex >= Len(log[i]) - // grant == /\ m.mterm = currentTerm[i] - // /\ logOk - // /\ votedFor[i] \in {Nil, j} - // IN /\ m.mterm <= currentTerm[i] - // /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] - // \/ ~grant /\ UNCHANGED votedFor - // /\ Reply([mtype |-> RequestVoteResponse, - // mterm |-> currentTerm[i], - // mvoteGranted |-> grant, - // \* mlog is used just for the `elections' history variable for - // \* the proof. It would not exist in a real implementation. - // mlog |-> log[i], - // msource |-> i, - // mdest |-> j], - // m) - // /\ UNCHANGED <> -} +// TLA+ Spec +// HandleRequestVoteRequest(i, j, m) == +// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) +// \/ /\ m.mlastLogTerm = LastTerm(log[i]) +// /\ m.mlastLogIndex >= Len(log[i]) +// grant == /\ m.mterm = currentTerm[i] +// /\ logOk +// /\ votedFor[i] \in {Nil, j} +// IN /\ m.mterm <= currentTerm[i] +// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] +// \/ ~grant /\ UNCHANGED votedFor +// /\ Reply([mtype |-> RequestVoteResponse, +// mterm |-> currentTerm[i], +// mvoteGranted |-> grant, +// \* mlog is used just for the `elections' history variable for +// \* the proof. It would not exist in a real implementation. +// mlog |-> log[i], +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {} diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 63bba7c480..a5b434dbc5 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -15,21 +15,20 @@ #include "syncRequestVoteReply.h" -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { - // TLA+ Spec - // HandleRequestVoteResponse(i, j, m) == - // \* This tallies votes even when the current state is not Candidate, but - // \* they won't be looked at, so it doesn't matter. - // /\ m.mterm = currentTerm[i] - // /\ votesResponded' = [votesResponded EXCEPT ![i] = - // votesResponded[i] \cup {j}] - // /\ \/ /\ m.mvoteGranted - // /\ votesGranted' = [votesGranted EXCEPT ![i] = - // votesGranted[i] \cup {j}] - // /\ voterLog' = [voterLog EXCEPT ![i] = - // voterLog[i] @@ (j :> m.mlog)] - // \/ /\ ~m.mvoteGranted - // /\ UNCHANGED <> - // /\ Discard(m) - // /\ UNCHANGED <> -} +// TLA+ Spec +// HandleRequestVoteResponse(i, j, m) == +// \* This tallies votes even when the current state is not Candidate, but +// \* they won't be looked at, so it doesn't matter. +// /\ m.mterm = currentTerm[i] +// /\ votesResponded' = [votesResponded EXCEPT ![i] = +// votesResponded[i] \cup {j}] +// /\ \/ /\ m.mvoteGranted +// /\ votesGranted' = [votesGranted EXCEPT ![i] = +// votesGranted[i] \cup {j}] +// /\ voterLog' = [voterLog EXCEPT ![i] = +// voterLog[i] @@ (j :> m.mlog)] +// \/ /\ ~m.mvoteGranted +// /\ UNCHANGED <> +// /\ Discard(m) +// /\ UNCHANGED <> +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {}