diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index 8adb0b4736..3a448290c8 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -21,6 +21,7 @@ #include "sync_raft_quorum_joint.h" #include "sync_raft_progress.h" #include "sync_raft_proto.h" +#include "thash.h" struct SSyncRaftProgressTrackerConfig { SSyncRaftQuorumJointConfig voters; @@ -83,7 +84,9 @@ struct SSyncRaftProgressTracker { SSyncRaftProgressMap progressMap; - ESyncRaftVoteType votes[TSDB_MAX_REPLICA]; + // nodeid -> ESyncRaftVoteType map + SHashObj* votesMap; + int maxInflightMsgs; }; @@ -98,7 +101,7 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, voi * syncRaftRecordVote records that the node with the given id voted for this Raft * instance if v == true (and declined it otherwise). **/ -void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant); +void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant); void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressTrackerConfig* result); diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index 0637a9be9a..4a5b749a0e 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -20,6 +20,7 @@ #include "sync.h" #include "sync_type.h" #include "sync_raft_node_map.h" +#include "thash.h" /** * SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping) @@ -35,7 +36,7 @@ typedef struct SSyncRaftQuorumJointConfig { * a result indicating whether the vote is pending, lost, or won. A joint quorum * requires both majority quorums to vote in favor. **/ -ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes); +ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap); static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { return syncRaftIsInNodeMap(&config->outgoing, id); diff --git a/source/libs/sync/inc/sync_raft_quorum_majority.h b/source/libs/sync/inc/sync_raft_quorum_majority.h index 0512a4dc87..38df40147a 100644 --- a/source/libs/sync/inc/sync_raft_quorum_majority.h +++ b/source/libs/sync/inc/sync_raft_quorum_majority.h @@ -19,6 +19,7 @@ #include "sync.h" #include "sync_type.h" #include "sync_raft_quorum.h" +#include "thash.h" /** * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns @@ -26,6 +27,6 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes); +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap); #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index 6d29e019cc..fcb0940609 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -77,9 +77,6 @@ typedef enum { } ESyncRaftElectionType; typedef enum { - // the init vote resp status - SYNC_RAFT_VOTE_RESP_UNKNOWN = 0, - // grant the vote request SYNC_RAFT_VOTE_RESP_GRANT = 1, diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index 2f25828d5d..06af8ff6c2 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -99,7 +99,7 @@ void syncCleanUp() { SSyncNode* syncStart(const SSyncInfo* pInfo) { pthread_mutex_lock(&gSyncManager->mutex); - SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId)); + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId*)); if (ppNode != NULL) { syncInfo("vgroup %d already exist", pInfo->vgId); pthread_mutex_unlock(&gSyncManager->mutex); @@ -140,7 +140,7 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) { void syncStop(const SSyncNode* pNode) { pthread_mutex_lock(&gSyncManager->mutex); - SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId)); + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId*)); if (ppNode == NULL) { syncInfo("vgroup %d not exist", pNode->vgId); pthread_mutex_unlock(&gSyncManager->mutex); @@ -288,7 +288,7 @@ static void *syncWorkerMain(void *argv) { static void syncNodeTick(void *param, void *tmrId) { SyncGroupId vgId = (SyncGroupId)param; - SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId)); + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId*)); if (ppNode == NULL) { return; } diff --git a/source/libs/sync/src/sync_raft_election.c b/source/libs/sync/src/sync_raft_election.c index b5649d5c5e..4423e1ac3c 100644 --- a/source/libs/sync/src/sync_raft_election.c +++ b/source/libs/sync/src/sync_raft_election.c @@ -95,6 +95,11 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { continue; } + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, nodeId); + if (pNode == NULL) { + continue; + } + SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId, term, cType, lastIndex, lastTerm); if (pMsg == NULL) { @@ -105,6 +110,6 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { pRaft->selfGroupId, pRaft->selfId, lastTerm, lastIndex, nodeId, pRaft->term); - //pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); + pRaft->io.send(pMsg, pNode); } } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index 6ec0c6c089..73a02c4b80 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -200,7 +200,7 @@ void syncRaftBroadcastAppend(SSyncRaft* pRaft) { } SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id) { - SNodeInfo **ppNode = taosHashGet(pRaft->nodeInfoMap, &id, sizeof(SNodeInfo)); + SNodeInfo **ppNode = taosHashGet(pRaft->nodeInfoMap, &id, sizeof(SyncNodeId*)); if (ppNode != NULL) { return *ppNode; } diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index f43414127d..1407df059a 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -26,7 +26,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { } void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { - memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(ESyncRaftVoteType) * TSDB_MAX_REPLICA); + taosHashClear(tracker->votesMap); } void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) { @@ -37,12 +37,14 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp vi } } -void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) { - if (tracker->votes[i] != SYNC_RAFT_VOTE_RESP_UNKNOWN) { +void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant) { + ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &id, sizeof(SyncNodeId*)); + if (pType != NULL) { return; } - tracker->votes[i] = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; + ESyncRaftVoteType type = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; + taosHashPut(tracker->votesMap, &id, sizeof(SyncNodeId), &type, sizeof(ESyncRaftVoteType*)); } void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) { @@ -68,11 +70,12 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r continue; } - if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) { + ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &progress->id, sizeof(SyncNodeId*)); + if (pType == NULL) { continue; } - if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) { + if (*pType == SYNC_RAFT_VOTE_RESP_GRANT) { g++; } else { r++; @@ -81,7 +84,7 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r if (rejected) *rejected = r; if (granted) *granted = g; - return syncRaftVoteResult(&(tracker->config.voters), tracker->votes); + return syncRaftVoteResult(&(tracker->config.voters), tracker->votesMap); } void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index 8a99574d68..2383d7ee63 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -23,9 +23,9 @@ * a result indicating whether the vote is pending, lost, or won. A joint quorum * requires both majority quorums to vote in favor. **/ -ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes) { - ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->incoming), votes); - ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->outgoing), votes); +ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap) { + ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->incoming), votesMap); + ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->outgoing), votesMap); if (r1 == r2) { // If they agree, return the agreed state. diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c index 8ff5752b97..014a8c7303 100644 --- a/source/libs/sync/src/sync_raft_quorum_majority.c +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -23,7 +23,7 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes) { +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap) { if (config->replica == 0) { return SYNC_RAFT_VOTE_WON; } @@ -34,9 +34,13 @@ ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const E continue; } - if (votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) { + const ESyncRaftVoteType* pType = taosHashGet(votesMap, &config->nodeId[i], sizeof(SyncNodeId*)); + if (pType == NULL) { missing += 1; - } else if (votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) { + continue; + } + + if (*pType == SYNC_RAFT_VOTE_RESP_GRANT) { g +=1; } else { r += 1;