[TD-10645][raft]<feature>add node map
This commit is contained in:
parent
0c65b84886
commit
1d874657f7
|
@ -21,6 +21,7 @@
|
|||
#include "sync_raft_quorum_joint.h"
|
||||
#include "sync_raft_progress.h"
|
||||
#include "sync_raft_proto.h"
|
||||
#include "thash.h"
|
||||
|
||||
struct SSyncRaftProgressTrackerConfig {
|
||||
SSyncRaftQuorumJointConfig voters;
|
||||
|
@ -83,7 +84,9 @@ struct SSyncRaftProgressTracker {
|
|||
|
||||
SSyncRaftProgressMap progressMap;
|
||||
|
||||
ESyncRaftVoteType votes[TSDB_MAX_REPLICA];
|
||||
// nodeid -> ESyncRaftVoteType map
|
||||
SHashObj* votesMap;
|
||||
|
||||
int maxInflightMsgs;
|
||||
};
|
||||
|
||||
|
@ -98,7 +101,7 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, voi
|
|||
* syncRaftRecordVote records that the node with the given id voted for this Raft
|
||||
* instance if v == true (and declined it otherwise).
|
||||
**/
|
||||
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant);
|
||||
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant);
|
||||
|
||||
void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressTrackerConfig* result);
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "sync.h"
|
||||
#include "sync_type.h"
|
||||
#include "sync_raft_node_map.h"
|
||||
#include "thash.h"
|
||||
|
||||
/**
|
||||
* SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping)
|
||||
|
@ -35,7 +36,7 @@ typedef struct SSyncRaftQuorumJointConfig {
|
|||
* a result indicating whether the vote is pending, lost, or won. A joint quorum
|
||||
* requires both majority quorums to vote in favor.
|
||||
**/
|
||||
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes);
|
||||
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap);
|
||||
|
||||
static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
|
||||
return syncRaftIsInNodeMap(&config->outgoing, id);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "sync.h"
|
||||
#include "sync_type.h"
|
||||
#include "sync_raft_quorum.h"
|
||||
#include "thash.h"
|
||||
|
||||
/**
|
||||
* syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
|
||||
|
@ -26,6 +27,6 @@
|
|||
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
|
||||
* quorum of no has been reached).
|
||||
**/
|
||||
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes);
|
||||
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap);
|
||||
|
||||
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */
|
||||
|
|
|
@ -77,9 +77,6 @@ typedef enum {
|
|||
} ESyncRaftElectionType;
|
||||
|
||||
typedef enum {
|
||||
// the init vote resp status
|
||||
SYNC_RAFT_VOTE_RESP_UNKNOWN = 0,
|
||||
|
||||
// grant the vote request
|
||||
SYNC_RAFT_VOTE_RESP_GRANT = 1,
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ void syncCleanUp() {
|
|||
SSyncNode* syncStart(const SSyncInfo* pInfo) {
|
||||
pthread_mutex_lock(&gSyncManager->mutex);
|
||||
|
||||
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId));
|
||||
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId*));
|
||||
if (ppNode != NULL) {
|
||||
syncInfo("vgroup %d already exist", pInfo->vgId);
|
||||
pthread_mutex_unlock(&gSyncManager->mutex);
|
||||
|
@ -140,7 +140,7 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) {
|
|||
void syncStop(const SSyncNode* pNode) {
|
||||
pthread_mutex_lock(&gSyncManager->mutex);
|
||||
|
||||
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId));
|
||||
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId*));
|
||||
if (ppNode == NULL) {
|
||||
syncInfo("vgroup %d not exist", pNode->vgId);
|
||||
pthread_mutex_unlock(&gSyncManager->mutex);
|
||||
|
@ -288,7 +288,7 @@ static void *syncWorkerMain(void *argv) {
|
|||
|
||||
static void syncNodeTick(void *param, void *tmrId) {
|
||||
SyncGroupId vgId = (SyncGroupId)param;
|
||||
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId));
|
||||
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId*));
|
||||
if (ppNode == NULL) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -95,6 +95,11 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
|
|||
continue;
|
||||
}
|
||||
|
||||
SNodeInfo* pNode = syncRaftGetNodeById(pRaft, nodeId);
|
||||
if (pNode == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId,
|
||||
term, cType, lastIndex, lastTerm);
|
||||
if (pMsg == NULL) {
|
||||
|
@ -105,6 +110,6 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
|
|||
pRaft->selfGroupId, pRaft->selfId, lastTerm,
|
||||
lastIndex, nodeId, pRaft->term);
|
||||
|
||||
//pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i]));
|
||||
pRaft->io.send(pMsg, pNode);
|
||||
}
|
||||
}
|
|
@ -200,7 +200,7 @@ void syncRaftBroadcastAppend(SSyncRaft* pRaft) {
|
|||
}
|
||||
|
||||
SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id) {
|
||||
SNodeInfo **ppNode = taosHashGet(pRaft->nodeInfoMap, &id, sizeof(SNodeInfo));
|
||||
SNodeInfo **ppNode = taosHashGet(pRaft->nodeInfoMap, &id, sizeof(SyncNodeId*));
|
||||
if (ppNode != NULL) {
|
||||
return *ppNode;
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() {
|
|||
}
|
||||
|
||||
void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) {
|
||||
memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(ESyncRaftVoteType) * TSDB_MAX_REPLICA);
|
||||
taosHashClear(tracker->votesMap);
|
||||
}
|
||||
|
||||
void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) {
|
||||
|
@ -37,12 +37,14 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp vi
|
|||
}
|
||||
}
|
||||
|
||||
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) {
|
||||
if (tracker->votes[i] != SYNC_RAFT_VOTE_RESP_UNKNOWN) {
|
||||
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant) {
|
||||
ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &id, sizeof(SyncNodeId*));
|
||||
if (pType != NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
tracker->votes[i] = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT;
|
||||
ESyncRaftVoteType type = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT;
|
||||
taosHashPut(tracker->votesMap, &id, sizeof(SyncNodeId), &type, sizeof(ESyncRaftVoteType*));
|
||||
}
|
||||
|
||||
void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) {
|
||||
|
@ -68,11 +70,12 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r
|
|||
continue;
|
||||
}
|
||||
|
||||
if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) {
|
||||
ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &progress->id, sizeof(SyncNodeId*));
|
||||
if (pType == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) {
|
||||
if (*pType == SYNC_RAFT_VOTE_RESP_GRANT) {
|
||||
g++;
|
||||
} else {
|
||||
r++;
|
||||
|
@ -81,7 +84,7 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r
|
|||
|
||||
if (rejected) *rejected = r;
|
||||
if (granted) *granted = g;
|
||||
return syncRaftVoteResult(&(tracker->config.voters), tracker->votes);
|
||||
return syncRaftVoteResult(&(tracker->config.voters), tracker->votesMap);
|
||||
}
|
||||
|
||||
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) {
|
||||
|
|
|
@ -23,9 +23,9 @@
|
|||
* a result indicating whether the vote is pending, lost, or won. A joint quorum
|
||||
* requires both majority quorums to vote in favor.
|
||||
**/
|
||||
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes) {
|
||||
ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->incoming), votes);
|
||||
ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->outgoing), votes);
|
||||
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap) {
|
||||
ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->incoming), votesMap);
|
||||
ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->outgoing), votesMap);
|
||||
|
||||
if (r1 == r2) {
|
||||
// If they agree, return the agreed state.
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
|
||||
* quorum of no has been reached).
|
||||
**/
|
||||
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes) {
|
||||
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap) {
|
||||
if (config->replica == 0) {
|
||||
return SYNC_RAFT_VOTE_WON;
|
||||
}
|
||||
|
@ -34,9 +34,13 @@ ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const E
|
|||
continue;
|
||||
}
|
||||
|
||||
if (votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) {
|
||||
const ESyncRaftVoteType* pType = taosHashGet(votesMap, &config->nodeId[i], sizeof(SyncNodeId*));
|
||||
if (pType == NULL) {
|
||||
missing += 1;
|
||||
} else if (votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (*pType == SYNC_RAFT_VOTE_RESP_GRANT) {
|
||||
g +=1;
|
||||
} else {
|
||||
r += 1;
|
||||
|
|
Loading…
Reference in New Issue