diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 7e60822a28..44d0efb033 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -29,6 +29,7 @@ extern "C" { #include "ttimer.h" #define TIMER_MAX_MS 0x7FFFFFFF +#define PING_TIMER_MS 1000 typedef struct SSyncEnv { tmr_h pEnvTickTimer; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 15ed5503eb..0ee33f0912 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -131,6 +131,7 @@ typedef struct SSyncNode { // raft algorithm SSyncFSM* pFsm; int32_t quorum; + SRaftId leaderCache; // life cycle int32_t refCount; @@ -153,9 +154,11 @@ typedef struct SSyncNode { SyncIndex commitIndex; // timer - tmr_h pPingTimer; - int32_t pingTimerMS; - uint8_t pingTimerEnable; + tmr_h pPingTimer; + int32_t pingTimerMS; + // uint8_t pingTimerEnable; + uint64_t pingTimerLogicClock; + uint64_t pingTimerLogicClockUser; TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp uint64_t pingTimerCounter; @@ -193,18 +196,16 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); -int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); -int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms); -void syncNodeBecomeFollower(SSyncNode* pSyncNode); -void syncNodeBecomeLeader(SSyncNode* pSyncNode); -void syncNodeFollower2Candidate(SSyncNode* pSyncNode); -void syncNodeCandidate2Leader(SSyncNode* pSyncNode); -void syncNodeLeader2Follower(SSyncNode* pSyncNode); -void syncNodeCandidate2Follower(SSyncNode* pSyncNode); +int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); +int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index b022044528..95135d161b 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -30,7 +30,7 @@ extern "C" { // encode as uint32 typedef enum ESyncMessageType { - SYNC_UNKNOWN = 77, + SYNC_UNKNOWN = 9999, SYNC_TIMEOUT = 99, SYNC_PING = 101, SYNC_PING_REPLY = 103, @@ -52,13 +52,13 @@ typedef enum ESyncTimeoutType { SYNC_TIMEOUT_PING = 100, SYNC_TIMEOUT_ELECTION, SYNC_TIMEOUT_HEARTBEAT, - } ESyncTimeoutType; typedef struct SyncTimeout { uint32_t bytes; uint32_t msgType; ESyncTimeoutType timeoutType; + uint64_t logicClock; void* data; } SyncTimeout; @@ -69,7 +69,7 @@ void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); cJSON* syncTimeout2Json(const SyncTimeout* pMsg); -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data); +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data); // --------------------------------------------- typedef struct SyncPing { diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index c2eca55151..4fb2193010 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -28,10 +28,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg); - -void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 38068dd0e2..21fb61f85f 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -28,8 +28,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 2b9c59ec92..f3045c3180 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,7 +15,7 @@ #include "syncAppendEntries.h" -void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { +int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { // TLA+ Spec // AppendEntries(i, j) == // /\ i /= j @@ -42,7 +42,7 @@ void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { // /\ UNCHANGED <> } -void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == // LET logOk == \/ m.mprevLogIndex = 0 diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 05734237b9..81c9ea233b 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -15,7 +15,7 @@ #include "syncAppendEntriesReply.h" -void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == // /\ m.mterm = currentTerm[i] diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 25775bec76..3250d4c303 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { pMsg->msgType, pMsg->contLen); { cJSON *pJson = syncRpcMsg2Json(pMsg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); sTrace("process syncMessage send: pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); @@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) { SSyncIO *io = param; STaosQall *qall; - SRpcMsg *pRpcMsg, rpcMsg; + SRpcMsg * pRpcMsg, rpcMsg; int type; qall = taosAllocateQall(); @@ -245,6 +245,42 @@ static void *syncIOConsumerFunc(void *param) { syncPingReplyDestroy(pSyncMsg); } + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { + if (io->FpOnSyncRequestVote) { + SyncRequestVote *pSyncMsg; + pSyncMsg = syncRequestVoteBuild(pRpcMsg->contLen); + syncRequestVoteFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { + if (io->FpOnSyncRequestVoteReply) { + SyncRequestVoteReply *pSyncMsg; + pSyncMsg = SyncRequestVoteReplyBuild(); + syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { + if (io->FpOnSyncAppendEntries) { + SyncAppendEntries *pSyncMsg; + pSyncMsg = syncAppendEntriesBuild(pRpcMsg->contLen); + syncAppendEntriesFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { + if (io->FpOnSyncAppendEntriesReply) { + SyncAppendEntriesReply *pSyncMsg; + pSyncMsg = syncAppendEntriesReplyBuild(); + syncAppendEntriesReplyFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + } + } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { if (io->FpOnSyncTimeout != NULL) { SyncTimeout *pSyncMsg; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6a0663dd57..ed7ab23b0c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -31,16 +31,19 @@ static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); -static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); -static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); - static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); -static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); -static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); -static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); + +static void syncNodeBecomeFollower(SSyncNode* pSyncNode); +static void syncNodeBecomeLeader(SSyncNode* pSyncNode); +static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); +static void syncNodeCandidate2Leader(SSyncNode* pSyncNode); +static void syncNodeLeader2Follower(SSyncNode* pSyncNode); +static void syncNodeCandidate2Follower(SSyncNode* pSyncNode); + +void syncNodeRequestVotePeers(SSyncNode* pSyncNode); +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); // --------------------------------- int32_t syncInit() { @@ -96,8 +99,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); pSyncNode->pPingTimer = NULL; - pSyncNode->pingTimerMS = 1000; - atomic_store_8(&pSyncNode->pingTimerEnable, 0); + pSyncNode->pingTimerMS = PING_TIMER_MS; + atomic_store_64(&pSyncNode->pingTimerLogicClock, 0); + atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0); pSyncNode->FpPingTimer = syncNodeEqPingTimer; pSyncNode->pingTimerCounter = 0; @@ -151,6 +155,9 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { } int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { + atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); + pSyncNode->pingTimerMS = PING_TIMER_MS; + if (pSyncNode->pPingTimer == NULL) { pSyncNode->pPingTimer = taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager); @@ -159,12 +166,11 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { &pSyncNode->pPingTimer); } - atomic_store_8(&pSyncNode->pingTimerEnable, 1); return 0; } int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { - atomic_store_8(&pSyncNode->pingTimerEnable, 0); + atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1); pSyncNode->pingTimerMS = TIMER_MAX_MS; return 0; } @@ -188,8 +194,6 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } - int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { if (pSyncNode->pHeartbeatTimer == NULL) { pSyncNode->pHeartbeatTimer = @@ -209,20 +213,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } - -void syncNodeBecomeFollower(SSyncNode* pSyncNode) {} - -void syncNodeBecomeLeader(SSyncNode* pSyncNode) {} - -void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {} - -void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} - -void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} - -void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} - // ------ local funciton --------- static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { sTrace("syncNodePing pSyncNode:%p ", pSyncNode); @@ -252,16 +242,6 @@ static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, Syn return ret; } -static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { - int32_t ret = 0; - return ret; -} - static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); @@ -311,26 +291,6 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { return ret; } -static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = 0; - return ret; -} - static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0; sTrace("<-- syncNodeOnTimeoutCb -->"); @@ -344,7 +304,7 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { } if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { - if (atomic_load_8(&ths->pingTimerEnable)) { + if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->pingTimerCounter); syncNodePingAll(ths); } @@ -359,11 +319,13 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_8(&pSyncNode->pingTimerEnable)) { + if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { // pSyncNode->pingTimerMS += 100; - SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode); - SRpcMsg rpcMsg; + SyncTimeout* pSyncMsg = + syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), pSyncNode); + + SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); syncTimeoutDestroy(pSyncMsg); @@ -371,10 +333,49 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { - sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); + sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock, + pSyncNode->pingTimerLogicClockUser); } } static void syncNodeEqElectTimer(void* param, void* tmrId) {} -static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} \ No newline at end of file +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} + +static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + pSyncNode->leaderCache.addr = 0; + pSyncNode->leaderCache.vgId = 0; + } + + syncNodeStopHeartbeatTimer(pSyncNode); + syncNodeStartElectTimer(pSyncNode); +} + +static void syncNodeBecomeLeader(SSyncNode* pSyncNode) { + pSyncNode->state = TAOS_SYNC_STATE_LEADER; + pSyncNode->leaderCache = pSyncNode->raftId; + + // next Index +=1 + // match Index = 0; + + syncNodeStopElectTimer(pSyncNode); + syncNodeStartHeartbeatTimer(pSyncNode); + + // appendEntries; +} + +static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); + pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; +} + +static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} + +static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} + +static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} + +void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} + +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index f1434947a1..d1b4a6a2c6 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -123,6 +123,8 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock); + cJSON_AddStringToObject(pRoot, "logicClock", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); cJSON_AddStringToObject(pRoot, "data", u64buf); @@ -131,9 +133,10 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { return pJson; } -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) { +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data) { SyncTimeout* pMsg = syncTimeoutBuild(); pMsg->timeoutType = timeoutType; + pMsg->logicClock = logicClock; pMsg->data = data; return pMsg; } diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 7aee47b8e4..38eaea26ac 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -15,7 +15,7 @@ #include "syncRequestVote.h" -void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { +int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { // TLA+ Spec // RequestVote(i, j) == // /\ state[i] = Candidate @@ -29,7 +29,7 @@ void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { // /\ UNCHANGED <> } -void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { // TLA+ Spec // HandleRequestVoteRequest(i, j, m) == // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index a9c88a7975..63bba7c480 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -15,7 +15,7 @@ #include "syncRequestVoteReply.h" -void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { // TLA+ Spec // HandleRequestVoteResponse(i, j, m) == // \* This tallies votes even when the current state is not Candidate, but diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 4c5f7ffa56..770d1d1bd8 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable(syncIOSendMsgClientTest "") add_executable(syncIOSendMsgServerTest "") add_executable(syncRaftStoreTest "") add_executable(syncEnqTest "") +add_executable(syncIndexTest "") target_sources(syncTest @@ -55,6 +56,10 @@ target_sources(syncEnqTest PRIVATE "syncEnqTest.cpp" ) +target_sources(syncIndexTest + PRIVATE + "syncIndexTest.cpp" +) target_include_directories(syncTest @@ -112,6 +117,11 @@ target_include_directories(syncEnqTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncIndexTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -158,6 +168,10 @@ target_link_libraries(syncEnqTest sync gtest_main ) +target_link_libraries(syncIndexTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncIOSendMsgClientTest.cpp b/source/libs/sync/test/syncIOSendMsgClientTest.cpp index 0d06c4f811..83ac724789 100644 --- a/source/libs/sync/test/syncIOSendMsgClientTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgClientTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/test/syncIOSendMsgServerTest.cpp b/source/libs/sync/test/syncIOSendMsgServerTest.cpp index 1582e097d3..b0f177962f 100644 --- a/source/libs/sync/test/syncIOSendMsgServerTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgServerTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index 9b2cfcf1d8..c25ad3b1dd 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/test/syncIOTickPingTest.cpp b/source/libs/sync/test/syncIOTickPingTest.cpp index 42b9a73432..777fc035e2 100644 --- a/source/libs/sync/test/syncIOTickPingTest.cpp +++ b/source/libs/sync/test/syncIOTickPingTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/test/syncIOTickQTest.cpp b/source/libs/sync/test/syncIOTickQTest.cpp index 3d31c596bf..5615058cc3 100644 --- a/source/libs/sync/test/syncIOTickQTest.cpp +++ b/source/libs/sync/test/syncIOTickQTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/test/syncIndexTest.cpp b/source/libs/sync/test/syncIndexTest.cpp new file mode 100644 index 0000000000..ece58fb9b0 --- /dev/null +++ b/source/libs/sync/test/syncIndexTest.cpp @@ -0,0 +1,63 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void print(SHashObj *pNextIndex) { + printf("----------------\n"); + uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL); + while (p) { + printf("%lu \n", *p); + p = (uint64_t *)taosHashIterate(pNextIndex, p); + } +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + sTrace("sync log test: trace"); + sDebug("sync log test: debug"); + sInfo("sync log test: info"); + sWarn("sync log test: warn"); + sError("sync log test: error"); + sFatal("sync log test: fatal"); + + SRaftId me; + SRaftId peer1; + SRaftId peer2; + + me.addr = 0; + me.vgId = 99; + peer1.addr = 1; + peer1.vgId = 99; + peer2.addr = 2; + peer2.vgId = 99; + + uint64_t index; + SHashObj *pNextIndex = + taosHashInit(sizeof(SRaftId), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + + index = 1000; + taosHashPut(pNextIndex, &me, sizeof(me), &index, sizeof(index)); + index = 1001; + taosHashPut(pNextIndex, &peer1, sizeof(peer1), &index, sizeof(index)); + index = 1002; + taosHashPut(pNextIndex, &peer2, sizeof(peer2), &index, sizeof(index)); + + print(pNextIndex); + + SRaftId find; + find = peer1; + uint64_t *p; + p = (uint64_t *)taosHashGet(pNextIndex, &find, sizeof(find)); + (*p) += 900; + + print(pNextIndex); + + taosHashCleanup(pNextIndex); + + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 77413a713b..4e5f7b78b5 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -84,6 +84,17 @@ int main(int argc, char** argv) { assert(ret == 0); taosMsleep(10000); + + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); + + taosMsleep(10000); + + ret = syncNodeStartPingTimer(pSyncNode); + assert(ret == 0); + + taosMsleep(10000); + ret = syncNodeStopPingTimer(pSyncNode); assert(ret == 0); diff --git a/source/libs/sync/test/syncRaftStoreTest.cpp b/source/libs/sync/test/syncRaftStoreTest.cpp index 9cfbe5a4ea..447dab0cbc 100644 --- a/source/libs/sync/test/syncRaftStoreTest.cpp +++ b/source/libs/sync/test/syncRaftStoreTest.cpp @@ -1,6 +1,6 @@ -#include -#include #include "syncRaftStore.h" +//#include +#include #include "syncIO.h" #include "syncInt.h" diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index c1c5658aba..1f9f4846cc 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h"