[TD-10645][raft]<feature>refactor node and progress map

This commit is contained in:
lichuang 2021-11-18 17:19:33 +08:00
parent 98a6b1918c
commit 7e2590f108
14 changed files with 159 additions and 30 deletions

View File

@ -33,6 +33,11 @@ struct SSyncRaftChanger {
typedef int (*configChangeFp)(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
// Simple carries out a series of configuration changes that (in aggregate)
// mutates the incoming majority config Voters[0] by at most one. This method
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
// outgoing configuration).
int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);

View File

@ -25,6 +25,7 @@ struct SSyncRaftNodeMap {
};
void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap);
void syncRaftFreeNodeMap(SSyncRaftNodeMap* nodeMap);
void syncRaftClearNodeMap(SSyncRaftNodeMap* nodeMap);
@ -43,6 +44,6 @@ int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap);
// return true if reach the end
bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId);
bool syncRaftIsAllInProgressMap(const SSyncRaftNodeMap* nodeMap, const SSyncRaftProgressMap* progressMap);
bool syncRaftIsAllNodeInProgressMap(SSyncRaftNodeMap* nodeMap, SSyncRaftProgressMap* progressMap);
#endif /* _TD_LIBS_SYNC_RAFT_NODE_MAP_H */

View File

@ -74,6 +74,8 @@ struct SSyncRaftProgress {
SyncNodeId id;
int16_t refCount;
SyncIndex nextIndex;
SyncIndex matchIndex;
@ -221,6 +223,12 @@ static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progres
return progress->recentActive;
}
void syncRaftInitProgressMap(SSyncRaftProgressMap* progressMap);
void syncRaftFreeProgressMap(SSyncRaftProgressMap* progressMap);
void syncRaftClearProgressMap(SSyncRaftProgressMap* progressMap);
void syncRaftCopyProgressMap(SSyncRaftProgressMap* from, SSyncRaftProgressMap* to);
SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id);
int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgress* progress);

View File

@ -94,6 +94,11 @@ struct SSyncRaftProgressTracker {
SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft);
void syncRaftInitTrackConfig(SSyncRaftProgressTrackerConfig* config);
void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config);
void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config);
void syncRaftResetVotes(SSyncRaftProgressTracker*);
void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg);
@ -104,9 +109,9 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, voi
**/
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant);
void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressTrackerConfig* result);
void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to);
int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
/**
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the

View File

@ -59,4 +59,19 @@ typedef struct SSyncConfigState {
bool autoLeave;
} SSyncConfigState;
static FORCE_INLINE bool syncRaftConfArrayIsEmpty(const SSyncConfChangeSingleArray* ary) {
return ary->n == 0;
}
static FORCE_INLINE void syncRaftInitConfArray(SSyncConfChangeSingleArray* ary) {
*ary = (SSyncConfChangeSingleArray) {
.changes = NULL,
.n = 0,
};
}
static FORCE_INLINE void syncRaftFreeConfArray(SSyncConfChangeSingleArray* ary) {
if (ary->changes != NULL) free(ary->changes);
}
#endif /* TD_SYNC_RAFT_PROTO_H */

View File

@ -38,6 +38,8 @@ typedef struct SSyncRaftQuorumJointConfig {
**/
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap);
void syncRaftInitQuorumJointConfig(SSyncRaftQuorumJointConfig* config);
static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
return syncRaftIsInNodeMap(&config->outgoing, id);
}

View File

@ -27,6 +27,7 @@
// the Changer only needs a ProgressMap (not a whole Tracker) at which point
// this can just take LastIndex and MaxInflight directly instead and cook up
// the results from that alone.
int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs);
int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
#endif /* TD_SYNC_RAFT_RESTORE_H */

View File

@ -101,11 +101,22 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
.tracker = pRaft->tracker,
.lastIndex = syncRaftLogLastIndex(pRaft->log),
};
if (syncRaftRestoreConfig(&changer, &confState) < 0) {
SSyncRaftProgressTrackerConfig config;
SSyncRaftProgressMap progressMap;
if (syncRaftRestoreConfig(&changer, &confState, &config, &progressMap) < 0) {
syncError("syncRaftRestoreConfig for vgid %d fail", pInfo->vgId);
return -1;
}
// save restored config and progress map to tracker
syncRaftCopyProgressMap(&progressMap, &pRaft->tracker->progressMap);
syncRaftCopyTrackerConfig(&config, &pRaft->tracker->config);
// free progress map and config
syncRaftFreeProgressMap(&progressMap);
syncRaftFreeTrackConfig(&config);
if (!syncRaftIsEmptyServerState(&serverState)) {
syncRaftLoadState(pRaft, &serverState);
}

View File

@ -92,7 +92,7 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S
return checkAndReturn(config, progressMap);
}
// syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate)
// Simple carries out a series of configuration changes that (in aggregate)
// mutates the incoming majority config Voters[0] by at most one. This method
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
@ -276,6 +276,7 @@ static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConf
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has had a chance to communicate with us.
.recentActive = true,
.refCount = 0,
};
syncRaftAddToProgressMap(progressMap, pProgress);
@ -285,7 +286,7 @@ static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConf
// each other. This is used to check both what the Changer is initialized with,
// as well as what it returns.
static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
int ret = syncRaftCheckProgress(config, progressMap);
int ret = syncRaftCheckTrackerConfigInProgress(config, progressMap);
if (ret != 0) {
return ret;
}
@ -296,6 +297,7 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg
while (!syncRaftIterateNodeMap(&config->learnersNext, pNodeId)) {
SyncNodeId nodeId = *pNodeId;
if (!syncRaftJointConfigInOutgoing(&config->voters, nodeId)) {
syncError("[%d] is in LearnersNext, but not outgoing", nodeId);
return -1;
}
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, nodeId);
@ -311,8 +313,8 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg
pNodeId = NULL;
while (!syncRaftIterateNodeMap(&config->learners, pNodeId)) {
SyncNodeId nodeId = *pNodeId;
if (syncRaftJointConfigInIncoming(&config->voters, nodeId)) {
syncError("%d is in Learners and voter.incoming", nodeId);
if (syncRaftJointConfigInOutgoing(&config->voters, nodeId)) {
syncError("%d is in Learners and outgoing", nodeId);
return -1;
}
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, nodeId);
@ -327,7 +329,7 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg
if (!hasJointConfig(config)) {
// We enforce that empty maps are nil instead of zero.
if (syncRaftNodeMapSize(&config->learnersNext)) {
if (syncRaftNodeMapSize(&config->learnersNext) > 0) {
syncError("cfg.LearnersNext must be nil when not joint");
return -1;
}
@ -344,8 +346,8 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg
// the purposes of the Changer) and returns those copies. It returns an error
// if checkInvariants does.
static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
syncRaftCloneTrackerConfig(&changer->tracker->config, config);
int i;
syncRaftCopyTrackerConfig(&changer->tracker->config, config);
syncRaftClearProgressMap(progressMap);
SSyncRaftProgress* pProgress = NULL;
while (!syncRaftIterateProgressMap(&changer->tracker->progressMap, pProgress)) {

View File

@ -21,6 +21,10 @@ void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap) {
nodeMap->nodeIdMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
}
void syncRaftFreeNodeMap(SSyncRaftNodeMap* nodeMap) {
taosHashCleanup(nodeMap->nodeIdMap);
}
void syncRaftClearNodeMap(SSyncRaftNodeMap* nodeMap) {
taosHashClear(nodeMap->nodeIdMap);
}
@ -51,7 +55,7 @@ bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId) {
return false;
}
bool syncRaftIsAllInProgressMap(const SSyncRaftNodeMap* nodeMap, const SSyncRaftProgressMap* progressMap) {
bool syncRaftIsAllNodeInProgressMap(SSyncRaftNodeMap* nodeMap, SSyncRaftProgressMap* progressMap) {
SyncNodeId *pId = NULL;
while (!syncRaftIterateNodeMap(nodeMap, pId)) {
if (!syncRaftIsInProgressMap(progressMap, *pId)) {

View File

@ -20,6 +20,11 @@
#include "sync.h"
#include "syncInt.h"
static void copyProgress(SSyncRaftProgress* progress, void* arg);
static void refProgress(SSyncRaftProgress* progress);
static void unrefProgress(SSyncRaftProgress* progress, void*);
static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state);
static void probeAcked(SSyncRaftProgress* progress);
@ -125,6 +130,7 @@ SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* prog
}
int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgress* progress) {
refProgress(progress);
taosHashPut(progressMap->progressMap, &progress->id, sizeof(SyncNodeId*), &progress, sizeof(SSyncRaftProgress*));
}
@ -133,7 +139,8 @@ void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId
if (ppProgress == NULL) {
return;
}
free(*ppProgress);
unrefProgress(*ppProgress, NULL);
taosHashRemove(progressMap->progressMap, &id, sizeof(SyncNodeId*));
}
@ -182,6 +189,23 @@ void syncRaftCopyProgress(const SSyncRaftProgress* progress, SSyncRaftProgress*
memcpy(out, progress, sizeof(SSyncRaftProgress));
}
void syncRaftInitProgressMap(SSyncRaftProgressMap* progressMap) {
progressMap->progressMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
}
void syncRaftFreeProgressMap(SSyncRaftProgressMap* progressMap) {
syncRaftVisitProgressMap(progressMap, unrefProgress, NULL);
taosHashCleanup(progressMap->progressMap);
}
void syncRaftClearProgressMap(SSyncRaftProgressMap* progressMap) {
taosHashClear(progressMap->progressMap);
}
void syncRaftCopyProgressMap(SSyncRaftProgressMap* from, SSyncRaftProgressMap* to) {
syncRaftVisitProgressMap(from, copyProgress, to);
}
bool syncRaftIterateProgressMap(const SSyncRaftProgressMap* progressMap, SSyncRaftProgress *pProgress) {
SSyncRaftProgress **ppProgress = taosHashIterate(progressMap->progressMap, pProgress);
if (ppProgress == NULL) {
@ -199,6 +223,25 @@ bool syncRaftVisitProgressMap(SSyncRaftProgressMap* progressMap, visitProgressFp
}
}
static void copyProgress(SSyncRaftProgress* progress, void* arg) {
assert(progress->refCount > 0);
SSyncRaftProgressMap* to = (SSyncRaftProgressMap*)arg;
syncRaftAddToProgressMap(to, progress);
}
static void refProgress(SSyncRaftProgress* progress) {
progress->refCount += 1;
}
static void unrefProgress(SSyncRaftProgress* progress, void* arg) {
(void)arg;
progress->refCount -= 1;
assert(progress->refCount >= 0);
if (progress->refCount == 0) {
free(progress);
}
}
/**
* ResetState moves the Progress into the specified State, resetting ProbeSent,
* PendingSnapshot, and Inflights.

View File

@ -22,13 +22,26 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft) {
return NULL;
}
syncRaftInitNodeMap(&tracker->config.learners);
syncRaftInitTrackConfig(&tracker->config);
syncRaftInitNodeMap(&tracker->config.learnersNext);
tracker->pRaft = pRaft;
return tracker;
}
void syncRaftInitTrackConfig(SSyncRaftProgressTrackerConfig* config) {
syncRaftInitNodeMap(&config->learners);
syncRaftInitNodeMap(&config->learnersNext);
syncRaftInitQuorumJointConfig(&config->voters);
config->autoLeave = false;
}
void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config) {
syncRaftFreeNodeMap(&config->learners);
syncRaftFreeNodeMap(&config->learnersNext);
syncRaftFreeQuorumJointConfig(&config->voters);
}
void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) {
taosHashClear(tracker->votesMap);
}
@ -47,21 +60,21 @@ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool g
taosHashPut(tracker->votesMap, &id, sizeof(SyncNodeId), &type, sizeof(ESyncRaftVoteType*));
}
void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) {
void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) {
memcpy(to, from, sizeof(SSyncRaftProgressTrackerConfig));
}
int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
// NB: intentionally allow the empty config. In production we'll never see a
// non-empty config (we prevent it from being created) but we will need to
// be able to *create* an initial config, for example during bootstrap (or
// during tests). Instead of having to hand-code this, we allow
// transitioning from an empty config into any other legal and non-empty
// config.
if (!syncRaftIsAllInProgressMap(&config->voters.incoming, progressMap)) return -1;
if (!syncRaftIsAllInProgressMap(&config->voters.outgoing, progressMap)) return -1;
if (!syncRaftIsAllInProgressMap(&config->learners, progressMap)) return -1;
if (!syncRaftIsAllInProgressMap(&config->learnersNext, progressMap)) return -1;
if (!syncRaftIsAllNodeInProgressMap(&config->voters.incoming, progressMap)) return -1;
if (!syncRaftIsAllNodeInProgressMap(&config->voters.outgoing, progressMap)) return -1;
if (!syncRaftIsAllNodeInProgressMap(&config->learners, progressMap)) return -1;
if (!syncRaftIsAllNodeInProgressMap(&config->learnersNext, progressMap)) return -1;
return 0;
}

View File

@ -41,6 +41,16 @@ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashOb
return SYNC_RAFT_VOTE_PENDING;
}
void syncRaftInitQuorumJointConfig(SSyncRaftQuorumJointConfig* config) {
syncRaftInitNodeMap(&config->incoming);
syncRaftInitNodeMap(&config->outgoing);
}
void syncRaftFreeQuorumJointConfig(SSyncRaftQuorumJointConfig* config) {
syncRaftFreeNodeMap(&config->incoming);
syncRaftFreeNodeMap(&config->outgoing);
}
void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
syncRaftAddToNodeMap(&config->incoming, id);
}

View File

@ -28,21 +28,26 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA
// the Changer only needs a ProgressMap (not a whole Tracker) at which point
// this can just take LastIndex and MaxInflight directly instead and cook up
// the results from that alone.
int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) {
int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
SSyncConfChangeSingleArray outgoing;
SSyncConfChangeSingleArray incoming;
SSyncConfChangeSingleArray css;
SSyncRaftProgressTracker* tracker = changer->tracker;
SSyncRaftProgressTrackerConfig* config = &tracker->config;
SSyncRaftProgressMap* progressMap = &tracker->progressMap;
int i, ret;
syncRaftInitConfArray(&outgoing);
syncRaftInitConfArray(&incoming);
syncRaftInitTrackConfig(config);
syncRaftInitProgressMap(progressMap);
ret = toConfChangeSingle(cs, &outgoing, &incoming);
if (ret != 0) {
goto out;
}
if (outgoing.n == 0) {
if (syncRaftConfArrayIsEmpty(&outgoing)) {
// No outgoing config, so just apply the incoming changes one by one.
for (i = 0; i < incoming.n; ++i) {
css = (SSyncConfChangeSingleArray) {
@ -53,6 +58,9 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs)
if (ret != 0) {
goto out;
}
syncRaftCopyTrackerConfig(config, &changer->tracker->config);
syncRaftCopyProgressMap(progressMap, &changer->tracker->progressMap);
}
} else {
// The ConfState describes a joint configuration.
@ -69,6 +77,8 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs)
if (ret != 0) {
goto out;
}
syncRaftCopyTrackerConfig(config, &changer->tracker->config);
syncRaftCopyProgressMap(progressMap, &changer->tracker->progressMap);
}
ret = syncRaftChangerEnterJoint(changer, cs->autoLeave, &incoming, config, progressMap);
@ -78,8 +88,9 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs)
}
out:
if (incoming.n != 0) free(incoming.changes);
if (outgoing.n != 0) free(outgoing.changes);
syncRaftFreeConfArray(&incoming);
syncRaftFreeConfArray(&outgoing);
return ret;
}
@ -102,8 +113,6 @@ static void addToConfChangeSingleArray(SSyncConfChangeSingleArray* out, int* i,
static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in) {
int i;
out->n = in->n = 0;
out->n = syncRaftNodeMapSize(&cs->votersOutgoing);
out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * out->n);
if (out->changes == NULL) {