[TD-10645][raft]<feature>refactor node and progress map
This commit is contained in:
parent
1d874657f7
commit
df2530f969
|
@ -16,15 +16,18 @@
|
||||||
#ifndef _TD_LIBS_SYNC_RAFT_NODE_MAP_H
|
#ifndef _TD_LIBS_SYNC_RAFT_NODE_MAP_H
|
||||||
#define _TD_LIBS_SYNC_RAFT_NODE_MAP_H
|
#define _TD_LIBS_SYNC_RAFT_NODE_MAP_H
|
||||||
|
|
||||||
|
#include "thash.h"
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
#include "sync_type.h"
|
#include "sync_type.h"
|
||||||
|
|
||||||
// TODO: is TSDB_MAX_REPLICA enough?
|
|
||||||
struct SSyncRaftNodeMap {
|
struct SSyncRaftNodeMap {
|
||||||
int32_t replica;
|
SHashObj* nodeIdMap;
|
||||||
SyncNodeId nodeId[TSDB_MAX_REPLICA];
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap);
|
||||||
|
|
||||||
|
void syncRaftClearNodeMap(SSyncRaftNodeMap* nodeMap);
|
||||||
|
|
||||||
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
|
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
|
||||||
|
|
||||||
void syncRaftCopyNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to);
|
void syncRaftCopyNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to);
|
||||||
|
@ -33,4 +36,11 @@ void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to)
|
||||||
|
|
||||||
void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
|
void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
|
||||||
|
|
||||||
|
void syncRaftRemoveFromNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
|
||||||
|
|
||||||
|
int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap);
|
||||||
|
|
||||||
|
// return true if reach the end
|
||||||
|
bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId);
|
||||||
|
|
||||||
#endif /* _TD_LIBS_SYNC_RAFT_NODE_MAP_H */
|
#endif /* _TD_LIBS_SYNC_RAFT_NODE_MAP_H */
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
#include "sync_type.h"
|
#include "sync_type.h"
|
||||||
#include "sync_raft_inflights.h"
|
#include "sync_raft_inflights.h"
|
||||||
|
#include "thash.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* State defines how the leader should interact with the follower.
|
* State defines how the leader should interact with the follower.
|
||||||
|
@ -69,8 +70,7 @@ static const char* kProgressStateString[] = {
|
||||||
* progresses of all followers, and sends entries to the follower based on its progress.
|
* progresses of all followers, and sends entries to the follower based on its progress.
|
||||||
**/
|
**/
|
||||||
struct SSyncRaftProgress {
|
struct SSyncRaftProgress {
|
||||||
// index in raft cluster config
|
SyncGroupId groupId;
|
||||||
int selfIndex;
|
|
||||||
|
|
||||||
SyncNodeId id;
|
SyncNodeId id;
|
||||||
|
|
||||||
|
@ -139,10 +139,10 @@ struct SSyncRaftProgress {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SSyncRaftProgressMap {
|
struct SSyncRaftProgressMap {
|
||||||
SSyncRaftProgress progress[TSDB_MAX_REPLICA];
|
// map nodeId -> SSyncRaftProgress*
|
||||||
|
SHashObj* progressMap;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgress* progress) {
|
static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgress* progress) {
|
||||||
return kProgressStateString[progress->state];
|
return kProgressStateString[progress->state];
|
||||||
}
|
}
|
||||||
|
@ -221,9 +221,9 @@ static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progres
|
||||||
return progress->recentActive;
|
return progress->recentActive;
|
||||||
}
|
}
|
||||||
|
|
||||||
int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
||||||
|
|
||||||
int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgress* progress);
|
||||||
|
|
||||||
void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
||||||
|
|
||||||
|
@ -236,7 +236,8 @@ void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snaps
|
||||||
|
|
||||||
void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to);
|
void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to);
|
||||||
|
|
||||||
void syncRaftProgressMapCopy(const SSyncRaftProgressMap* from, SSyncRaftProgressMap* to);
|
// return true if reach the end
|
||||||
|
bool syncRaftIterateProgressMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftProgress *pProgress);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
|
||||||
|
|
|
@ -175,7 +175,7 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi
|
||||||
SSyncRaftProgress* progress = NULL;
|
SSyncRaftProgress* progress = NULL;
|
||||||
|
|
||||||
syncRaftConfigState(pRaft->tracker, cs);
|
syncRaftConfigState(pRaft->tracker, cs);
|
||||||
i = syncRaftFindProgressIndexByNodeId(&pRaft->tracker->progressMap, selfId);
|
i = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, selfId);
|
||||||
exist = (i != -1);
|
exist = (i != -1);
|
||||||
|
|
||||||
// Update whether the node itself is a learner, resetting to false when the
|
// Update whether the node itself is a learner, resetting to false when the
|
||||||
|
@ -202,7 +202,7 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi
|
||||||
|
|
||||||
// The remaining steps only make sense if this node is the leader and there
|
// The remaining steps only make sense if this node is the leader and there
|
||||||
// are other nodes.
|
// are other nodes.
|
||||||
if (pRaft->state != TAOS_SYNC_STATE_LEADER || cs->voters.replica == 0) {
|
if (pRaft->state != TAOS_SYNC_STATE_LEADER || syncRaftNodeMapSize(&cs->voters) == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -101,7 +101,7 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(config->voters.incoming.replica == 0) {
|
if(syncRaftNodeMapSize(&config->voters.incoming) == 0) {
|
||||||
// We allow adding nodes to an empty config for convenience (testing and
|
// We allow adding nodes to an empty config for convenience (testing and
|
||||||
// bootstrap), but you can't enter a joint state.
|
// bootstrap), but you can't enter a joint state.
|
||||||
syncError("can't make a zero-voter config joint");
|
syncError("can't make a zero-voter config joint");
|
||||||
|
@ -112,7 +112,7 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S
|
||||||
syncRaftJointConfigClearOutgoing(&config->voters);
|
syncRaftJointConfigClearOutgoing(&config->voters);
|
||||||
|
|
||||||
// Copy incoming to outgoing.
|
// Copy incoming to outgoing.
|
||||||
memcpy(&config->voters.outgoing, &config->voters.incoming, sizeof(SSyncCluster));
|
syncRaftCopyNodeMap(&config->voters.incoming, &config->voters.outgoing);
|
||||||
|
|
||||||
ret = applyConfig(changer, config, progressMap, css);
|
ret = applyConfig(changer, config, progressMap, css);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
|
@ -129,13 +129,12 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S
|
||||||
static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
|
static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
|
||||||
syncRaftCloneTrackerConfig(&changer->tracker->config, config);
|
syncRaftCloneTrackerConfig(&changer->tracker->config, config);
|
||||||
int i;
|
int i;
|
||||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
|
||||||
SSyncRaftProgress* progress = &(changer->tracker->progressMap.progress[i]);
|
SSyncRaftProgress* pProgress = NULL;
|
||||||
if (progress->id == SYNC_NON_NODE_ID) {
|
while (!syncRaftIterateProgressMap(&changer->tracker->progressMap, pProgress)) {
|
||||||
continue;
|
syncRaftAddToProgressMap(progressMap, pProgress);
|
||||||
}
|
|
||||||
syncRaftCopyProgress(progress, &(progressMap->progress[i]));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return checkAndReturn(config, progressMap);
|
return checkAndReturn(config, progressMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,33 +157,44 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int i;
|
|
||||||
// Any staged learner was staged because it could not be directly added due
|
// Any staged learner was staged because it could not be directly added due
|
||||||
// to a conflicting voter in the outgoing config.
|
// to a conflicting voter in the outgoing config.
|
||||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
SyncNodeId* pNodeId = NULL;
|
||||||
if (!syncRaftJointConfigInOutgoing(&config->voters, config->learnersNext.nodeId[i])) {
|
while (!syncRaftIterateNodeMap(&config->learnersNext, pNodeId)) {
|
||||||
|
SyncNodeId nodeId = *pNodeId;
|
||||||
|
if (!syncRaftJointConfigInOutgoing(&config->voters, nodeId)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (progressMap->progress[i].id != SYNC_NON_NODE_ID && progressMap->progress[i].isLearner) {
|
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, nodeId);
|
||||||
syncError("%d is in LearnersNext, but is already marked as learner", progressMap->progress[i].id);
|
assert(progress);
|
||||||
|
assert(progress->id == nodeId);
|
||||||
|
if (progress->isLearner) {
|
||||||
|
syncError("[%d:%d] is in LearnersNext, but is already marked as learner", progress->groupId, nodeId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Conversely Learners and Voters doesn't intersect at all.
|
// Conversely Learners and Voters doesn't intersect at all.
|
||||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
SyncNodeId* pNodeId = NULL;
|
||||||
if (syncRaftJointConfigInIncoming(&config->voters, config->learners.nodeId[i])) {
|
while (!syncRaftIterateNodeMap(&config->learners, pNodeId)) {
|
||||||
syncError("%d is in Learners and voter.incoming", progressMap->progress[i].id);
|
SyncNodeId nodeId = *pNodeId;
|
||||||
|
if (syncRaftJointConfigInIncoming(&config->voters, nodeId)) {
|
||||||
|
syncError("%d is in Learners and voter.incoming", nodeId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (progressMap->progress[i].id != SYNC_NON_NODE_ID && !progressMap->progress[i].isLearner) {
|
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, nodeId);
|
||||||
syncError("%d is in Learners, but is not marked as learner", progressMap->progress[i].id);
|
assert(progress);
|
||||||
|
assert(progress->id == nodeId);
|
||||||
|
|
||||||
|
if (!progress->isLearner) {
|
||||||
|
syncError("[%d:%d] is in Learners, but is not marked as learner", progress->groupId, nodeId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hasJointConfig(config)) {
|
if (!hasJointConfig(config)) {
|
||||||
// We enforce that empty maps are nil instead of zero.
|
// We enforce that empty maps are nil instead of zero.
|
||||||
if (config->learnersNext.replica > 0) {
|
if (syncRaftNodeMapSize(&config->learnersNext)) {
|
||||||
syncError("cfg.LearnersNext must be nil when not joint");
|
syncError("cfg.LearnersNext must be nil when not joint");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -198,7 +208,7 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) {
|
static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) {
|
||||||
return config->voters.outgoing.replica > 0;
|
return syncRaftNodeMapSize(&config->voters.outgoing) > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
|
@ -227,7 +237,7 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config->voters.incoming.replica == 0) {
|
if (syncRaftNodeMapSize(&config->voters.incoming) == 0) {
|
||||||
syncError("removed all voters");
|
syncError("removed all voters");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -251,15 +261,10 @@ static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r) {
|
||||||
|
|
||||||
const SSyncRaftNodeMap* p0 = pp[0];
|
const SSyncRaftNodeMap* p0 = pp[0];
|
||||||
const SSyncRaftNodeMap* p1 = pp[1];
|
const SSyncRaftNodeMap* p1 = pp[1];
|
||||||
for (j0 = 0; j0 < TSDB_MAX_REPLICA; ++j0) {
|
SyncNodeId* pNodeId;
|
||||||
SyncNodeId id = p0->nodeId[j0];
|
while (!syncRaftIterateNodeMap(p0, pNodeId)) {
|
||||||
if (id == SYNC_NON_NODE_ID) {
|
if (!syncRaftIsInNodeMap(p1, *pNodeId)) {
|
||||||
continue;
|
n+=1;
|
||||||
}
|
|
||||||
for (j1 = 0; j1 < p1->replica; ++j1) {
|
|
||||||
if (p1->nodeId[j1] != SYNC_NON_NODE_ID && p1->nodeId[j1] != id) {
|
|
||||||
n+=1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -274,47 +279,23 @@ static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConf
|
||||||
|
|
||||||
// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after.
|
// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after.
|
||||||
static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id) {
|
static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id) {
|
||||||
int i;
|
syncRaftRemoveFromNodeMap(nodeMap, id);
|
||||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
|
||||||
if (nodeMap->nodeId[i] == id) {
|
|
||||||
nodeMap->replica -= 1;
|
|
||||||
nodeMap->nodeId[i] = SYNC_NON_NODE_ID;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(nodeMap->replica >= 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// nilAwareAdd populates a map entry, creating the map if necessary.
|
// nilAwareAdd populates a map entry, creating the map if necessary.
|
||||||
static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id) {
|
static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id) {
|
||||||
int i, j;
|
syncRaftAddToNodeMap(nodeMap, id);
|
||||||
for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) {
|
|
||||||
if (nodeMap->nodeId[i] == id) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (j == -1 && nodeMap->nodeId[i] == SYNC_NON_NODE_ID) {
|
|
||||||
j = i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(j != -1);
|
|
||||||
nodeMap->nodeId[j] = id;
|
|
||||||
nodeMap->replica += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeVoter adds or promotes the given ID to be a voter in the incoming
|
// makeVoter adds or promotes the given ID to be a voter in the incoming
|
||||||
// majority config.
|
// majority config.
|
||||||
static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
int i = syncRaftFindProgressIndexByNodeId(progressMap, id);
|
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id);
|
||||||
if (i == -1) {
|
if (progress == -1) {
|
||||||
initProgress(changer, config, progressMap, id, false);
|
initProgress(changer, config, progressMap, id, false);
|
||||||
i = syncRaftFindProgressIndexByNodeId(progressMap, id);
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(i != -1);
|
|
||||||
SSyncRaftProgress* progress = &(progressMap->progress[i]);
|
|
||||||
|
|
||||||
progress->isLearner = false;
|
progress->isLearner = false;
|
||||||
nilAwareDelete(&config->learners, id);
|
nilAwareDelete(&config->learners, id);
|
||||||
|
@ -337,14 +318,12 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig*
|
||||||
// LeaveJoint().
|
// LeaveJoint().
|
||||||
static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
int i = syncRaftFindProgressIndexByNodeId(progressMap, id);
|
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id);
|
||||||
if (i == -1) {
|
if (progress == NULL) {
|
||||||
initProgress(changer, config, progressMap, id, false);
|
initProgress(changer, config, progressMap, id, false);
|
||||||
i = syncRaftFindProgressIndexByNodeId(progressMap, id);
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(i != -1);
|
|
||||||
SSyncRaftProgress* progress = &(progressMap->progress[i]);
|
|
||||||
if (progress->isLearner) {
|
if (progress->isLearner) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -352,7 +331,7 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi
|
||||||
removeNodeId(changer, config, progressMap, id);
|
removeNodeId(changer, config, progressMap, id);
|
||||||
|
|
||||||
// ... but save the Progress.
|
// ... but save the Progress.
|
||||||
syncRaftAddToProgressMap(progressMap, id);
|
syncRaftAddToProgressMap(progressMap, progress);
|
||||||
|
|
||||||
// Use LearnersNext if we can't add the learner to Learners directly, i.e.
|
// Use LearnersNext if we can't add the learner to Learners directly, i.e.
|
||||||
// if the peer is still tracked as a voter in the outgoing config. It will
|
// if the peer is still tracked as a voter in the outgoing config. It will
|
||||||
|
@ -371,8 +350,8 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi
|
||||||
// removeNodeId this peer as a voter or learner from the incoming config.
|
// removeNodeId this peer as a voter or learner from the incoming config.
|
||||||
static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
int i = syncRaftFindProgressIndexByNodeId(progressMap, id);
|
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id);
|
||||||
if (i == -1) {
|
if (progress == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -85,8 +85,14 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
|
||||||
SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log);
|
SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log);
|
||||||
SSyncRaftNodeMap nodeMap;
|
SSyncRaftNodeMap nodeMap;
|
||||||
syncRaftJointConfigIDS(&pRaft->tracker->config.voters, &nodeMap);
|
syncRaftJointConfigIDS(&pRaft->tracker->config.voters, &nodeMap);
|
||||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
SyncNodeId *pNodeId = NULL;
|
||||||
SyncNodeId nodeId = nodeMap.nodeId[i];
|
while (true) {
|
||||||
|
syncRaftIterateNodeMap(&nodeMap, &pNodeId);
|
||||||
|
if (pNodeId == NULL || *pNodeId == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncNodeId nodeId = *pNodeId;
|
||||||
if (nodeId == SYNC_NON_NODE_ID) {
|
if (nodeId == SYNC_NON_NODE_ID) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,51 +16,52 @@
|
||||||
#include "sync_raft_node_map.h"
|
#include "sync_raft_node_map.h"
|
||||||
#include "sync_type.h"
|
#include "sync_type.h"
|
||||||
|
|
||||||
|
void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap) {
|
||||||
|
nodeMap->nodeIdMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRaftClearNodeMap(SSyncRaftNodeMap* nodeMap) {
|
||||||
|
taosHashClear(nodeMap->nodeIdMap);
|
||||||
|
}
|
||||||
|
|
||||||
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
|
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
|
||||||
int i;
|
SyncNodeId** ppId = (SyncNodeId**)taosHashGet(nodeMap->nodeIdMap, &nodeId, sizeof(SyncNodeId*));
|
||||||
|
if (ppId == NULL) {
|
||||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
return false;
|
||||||
if (nodeId == nodeMap->nodeId[i]) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftCopyNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) {
|
void syncRaftCopyNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) {
|
||||||
memcpy(to, nodeMap, sizeof(SSyncRaftNodeMap));
|
SyncNodeId** ppId = (SyncNodeId**)taosHashIterate(nodeMap->nodeIdMap, NULL);
|
||||||
|
while (ppId) {
|
||||||
|
taosHashPut(to->nodeIdMap, ppId, sizeof(SyncNodeId*), ppId, sizeof(SyncNodeId*));
|
||||||
|
ppId = taosHashIterate(nodeMap->nodeIdMap, ppId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId) {
|
||||||
|
SyncNodeId **ppId = taosHashIterate(nodeMap->nodeIdMap, pId);
|
||||||
|
if (ppId == NULL) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pId = *(*ppId);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) {
|
void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) {
|
||||||
int i, j, m;
|
syncRaftCopyNodeMap(nodeMap, to);
|
||||||
|
|
||||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
|
||||||
SyncNodeId id = nodeMap->nodeId[i];
|
|
||||||
if (id == SYNC_NON_NODE_ID) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
syncRaftAddToNodeMap(to, id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
|
void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
|
||||||
assert(nodeMap->replica < TSDB_MAX_REPLICA);
|
taosHashPut(nodeMap->nodeIdMap, &nodeId, sizeof(SyncNodeId*), &nodeId, sizeof(SyncNodeId*));
|
||||||
|
}
|
||||||
|
|
||||||
int i, j;
|
void syncRaftRemoveFromNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
|
||||||
for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) {
|
taosHashRemove(nodeMap->nodeIdMap, &nodeId, sizeof(SyncNodeId*));
|
||||||
SyncNodeId id = nodeMap->nodeId[i];
|
}
|
||||||
if (id == SYNC_NON_NODE_ID) {
|
|
||||||
if (j == -1) j = i;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (id == nodeId) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(j != -1);
|
int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap) {
|
||||||
nodeMap->nodeId[j] = nodeId;
|
return taosHashGetSize(nodeMap);
|
||||||
nodeMap->replica += 1;
|
|
||||||
}
|
}
|
|
@ -112,42 +112,21 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
int i;
|
SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap, &id, sizeof(SyncNodeId*));
|
||||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
if (ppProgress == NULL) {
|
||||||
if (progressMap->progress[i].id == id) {
|
return NULL;
|
||||||
return i;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return -1;
|
|
||||||
|
return *ppProgress;
|
||||||
}
|
}
|
||||||
|
|
||||||
int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgress* progress) {
|
||||||
int i, j;
|
taosHashPut(progressMap->progressMap, &progress->id, sizeof(SyncNodeId*), &progress, sizeof(SSyncRaftProgress*));
|
||||||
|
|
||||||
for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) {
|
|
||||||
if (progressMap->progress[i].id == id) {
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
if (j == -1 && progressMap->progress[i].id == SYNC_NON_NODE_ID) {
|
|
||||||
j = i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(j != -1);
|
|
||||||
|
|
||||||
progressMap->progress[i].id = id;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
int i;
|
taosHashRemove(progressMap->progressMap, &id, sizeof(SyncNodeId*));
|
||||||
|
|
||||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
|
||||||
if (progressMap->progress[i].id == id) {
|
|
||||||
progressMap->progress[i].id = SYNC_NON_NODE_ID;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
|
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
|
||||||
|
@ -188,7 +167,17 @@ void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snaps
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftCopyProgress(const SSyncRaftProgress* progress, SSyncRaftProgress* out) {
|
void syncRaftCopyProgress(const SSyncRaftProgress* progress, SSyncRaftProgress* out) {
|
||||||
|
memcpy(out, progress, sizeof(SSyncRaftProgress));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool syncRaftIterateProgressMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftProgress *pProgress) {
|
||||||
|
SSyncRaftProgress **ppProgress = taosHashIterate(nodeMap->nodeIdMap, pProgress);
|
||||||
|
if (ppProgress == NULL) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pProgress = *(*ppProgress);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -22,6 +22,9 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
syncRaftInitNodeMap(&tracker->config.learners);
|
||||||
|
syncRaftInitNodeMap(&tracker->config.learnersNext);
|
||||||
|
|
||||||
return tracker;
|
return tracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,8 +91,8 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) {
|
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) {
|
||||||
memcpy(&cs->voters, &tracker->config.voters.incoming, sizeof(SSyncRaftNodeMap));
|
syncRaftCopyNodeMap(&cs->voters, &tracker->config.voters.incoming);
|
||||||
memcpy(&cs->votersOutgoing, &tracker->config.voters.outgoing, sizeof(SSyncRaftNodeMap));
|
syncRaftCopyNodeMap(&cs->votersOutgoing, &tracker->config.voters.outgoing);
|
||||||
memcpy(&cs->learners, &tracker->config.learners, sizeof(SSyncRaftNodeMap));
|
syncRaftCopyNodeMap(&cs->learners, &tracker->config.learners);
|
||||||
memcpy(&cs->learnersNext, &tracker->config.learnersNext, sizeof(SSyncRaftNodeMap));
|
syncRaftCopyNodeMap(&cs->learnersNext, &tracker->config.learnersNext);
|
||||||
}
|
}
|
|
@ -42,39 +42,14 @@ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashOb
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
|
void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
|
||||||
int i, min;
|
syncRaftAddToNodeMap(&config->incoming, id);
|
||||||
|
|
||||||
for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) {
|
|
||||||
if (config->incoming.nodeId[i] == id) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (min == -1 && config->incoming.nodeId[i] == SYNC_NON_NODE_ID) {
|
|
||||||
min = i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(min != -1);
|
|
||||||
config->incoming.nodeId[min] = id;
|
|
||||||
config->incoming.replica += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
|
void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
|
||||||
int i;
|
syncRaftRemoveFromNodeMap(&config->incoming, id);
|
||||||
|
|
||||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
|
||||||
if (config->incoming.nodeId[i] == id) {
|
|
||||||
config->incoming.replica -= 1;
|
|
||||||
config->incoming.nodeId[i] = SYNC_NON_NODE_ID;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(config->incoming.replica >= 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftJointConfigIDS(const SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap) {
|
void syncRaftJointConfigIDS(const SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap) {
|
||||||
int i, j, m;
|
|
||||||
|
|
||||||
syncRaftCopyNodeMap(&config->incoming, nodeMap);
|
syncRaftCopyNodeMap(&config->incoming, nodeMap);
|
||||||
|
|
||||||
syncRaftUnionNodeMap(&config->outgoing, nodeMap);
|
syncRaftUnionNodeMap(&config->outgoing, nodeMap);
|
||||||
|
|
Loading…
Reference in New Issue