Merge pull request #8691 from taosdata/feature/sync-implementation
[TD-10645][raft]<feature>fix compile error
This commit is contained in:
commit
8440483cef
|
@ -112,4 +112,6 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r
|
|||
|
||||
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs);
|
||||
|
||||
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
|
||||
|
||||
#endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
|
||||
|
|
|
@ -36,34 +36,25 @@ typedef struct SSyncRaftQuorumJointConfig {
|
|||
**/
|
||||
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes);
|
||||
|
||||
static FORCE_INLINE bool syncRaftJointConfigInCluster(const SSyncCluster* cluster, SyncNodeId id) {
|
||||
int i;
|
||||
for (i = 0; i < cluster->replica; ++i) {
|
||||
if (cluster->nodeInfo[i].nodeId == id) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
|
||||
|
||||
static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
|
||||
return syncRaftJointConfigInCluster(&config->outgoing, id);
|
||||
return syncRaftIsInNodeMap(&config->outgoing, id);
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool syncRaftJointConfigInIncoming(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
|
||||
return syncRaftJointConfigInCluster(&config->incoming, id);
|
||||
return syncRaftIsInNodeMap(&config->incoming, id);
|
||||
}
|
||||
|
||||
void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id);
|
||||
|
||||
void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id);
|
||||
|
||||
static FORCE_INLINE const SSyncCluster* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) {
|
||||
static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) {
|
||||
return &config->incoming;
|
||||
}
|
||||
|
||||
static FORCE_INLINE const SSyncCluster* syncRaftJointConfigOutgoing(const SSyncRaftQuorumJointConfig* config) {
|
||||
static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigOutgoing(const SSyncRaftQuorumJointConfig* config) {
|
||||
return &config->outgoing;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,6 @@
|
|||
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
|
||||
* quorum of no has been reached).
|
||||
**/
|
||||
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteType* votes);
|
||||
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes);
|
||||
|
||||
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */
|
||||
|
|
|
@ -30,6 +30,8 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch
|
|||
static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config,
|
||||
const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs);
|
||||
|
||||
static void abortLeaderTransfer(SSyncRaft* pRaft);
|
||||
|
||||
static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
|
||||
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
|
||||
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
|
||||
|
@ -109,38 +111,6 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
|
|||
|
||||
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
|
||||
|
||||
|
||||
#if 0
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// restore fsm state from snapshot index + 1 until commitIndex
|
||||
++initIndex;
|
||||
while (initIndex <= serverState.commitIndex) {
|
||||
limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex + 1);
|
||||
|
||||
if (logStore->logRead(logStore, initIndex, limit, buffer, &nBuf) != 0) {
|
||||
return -1;
|
||||
}
|
||||
assert(limit == nBuf);
|
||||
|
||||
for (i = 0; i < limit; ++i) {
|
||||
fsm->applyLog(fsm, initIndex + i, &(buffer[i]), NULL);
|
||||
free(buffer[i].data);
|
||||
}
|
||||
initIndex += nBuf;
|
||||
}
|
||||
assert(initIndex == serverState.commitIndex);
|
||||
|
||||
//pRaft->heartbeatTimeoutTick = 1;
|
||||
|
||||
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
|
||||
|
||||
pRaft->selfIndex = pRaft->cluster.selfIndex;
|
||||
#endif
|
||||
|
||||
syncInfo("[%d:%d] restore vgid %d state: snapshot index success",
|
||||
pRaft->selfGroupId, pRaft->selfId, pInfo->vgId);
|
||||
return 0;
|
||||
|
@ -242,12 +212,16 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi
|
|||
|
||||
// If the the leadTransferee was removed or demoted, abort the leadership transfer.
|
||||
SyncNodeId leadTransferee = pRaft->leadTransferee;
|
||||
if (leadTransferee != SYNC_NON_NODE_ID) {
|
||||
|
||||
if (leadTransferee != SYNC_NON_NODE_ID && !syncRaftIsInNodeMap(&pRaft->tracker->config.voters, leadTransferee)) {
|
||||
abortLeaderTransfer(pRaft);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void abortLeaderTransfer(SSyncRaft* pRaft) {
|
||||
pRaft->leadTransferee = SYNC_NON_NODE_ID;
|
||||
}
|
||||
|
||||
/**
|
||||
* pre-handle message, return true means no need to continue
|
||||
* Handle the message term, which may result in our stepping down to a follower.
|
||||
|
|
|
@ -26,8 +26,8 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg
|
|||
static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config);
|
||||
static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||
SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css);
|
||||
static int symDiff(const SSyncCluster* l, const SSyncCluster* r);
|
||||
|
||||
static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r);
|
||||
|
||||
static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||
SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner);
|
||||
|
@ -237,27 +237,27 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig
|
|||
|
||||
// symdiff returns the count of the symmetric difference between the sets of
|
||||
// uint64s, i.e. len( (l - r) \union (r - l)).
|
||||
static int symDiff(const SSyncCluster* l, const SSyncCluster* r) {
|
||||
static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r) {
|
||||
int n;
|
||||
int i;
|
||||
int j0, j1;
|
||||
const SSyncCluster* pairs[2][2] = {
|
||||
const SSyncRaftNodeMap* pairs[2][2] = {
|
||||
{l, r}, // count elems in l but not in r
|
||||
{r, l}, // count elems in r but not in l
|
||||
};
|
||||
|
||||
for (n = 0, i = 0; i < 2; ++i) {
|
||||
const SSyncCluster** pp = pairs[i];
|
||||
const SSyncRaftNodeMap** pp = pairs[i];
|
||||
|
||||
const SSyncCluster* p0 = pp[0];
|
||||
const SSyncCluster* p1 = pp[1];
|
||||
for (j0 = 0; j0 < p0->replica; ++j0) {
|
||||
SyncNodeId id = p0->nodeInfo[j0].nodeId;
|
||||
const SSyncRaftNodeMap* p0 = pp[0];
|
||||
const SSyncRaftNodeMap* p1 = pp[1];
|
||||
for (j0 = 0; j0 < TSDB_MAX_REPLICA; ++j0) {
|
||||
SyncNodeId id = p0->nodeId[j0];
|
||||
if (id == SYNC_NON_NODE_ID) {
|
||||
continue;
|
||||
}
|
||||
for (j1 = 0; j1 < p1->replica; ++j1) {
|
||||
if (p1->nodeInfo[j1].nodeId != SYNC_NON_NODE_ID && p1->nodeInfo[j1].nodeId != id) {
|
||||
if (p1->nodeId[j1] != SYNC_NON_NODE_ID && p1->nodeId[j1] != id) {
|
||||
n+=1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -185,8 +185,17 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) {
|
|||
pRaft->voteFor = serverState->voteFor;
|
||||
}
|
||||
|
||||
void syncRaftBroadcastAppend(SSyncRaft* pRaft) {
|
||||
static void visitProgressSendAppend(int i, SSyncRaftProgress* progress, void* arg) {
|
||||
SSyncRaft* pRaft = (SSyncRaft*)arg;
|
||||
if (pRaft->selfId == progress->id) {
|
||||
return;
|
||||
}
|
||||
|
||||
syncRaftReplicate(arg, progress, true);
|
||||
}
|
||||
|
||||
void syncRaftBroadcastAppend(SSyncRaft* pRaft) {
|
||||
syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft);
|
||||
}
|
||||
|
||||
static int convertClear(SSyncRaft* pRaft) {
|
||||
|
@ -279,6 +288,7 @@ bool syncRaftMaybeCommit(SSyncRaft* pRaft) {
|
|||
* trigger I/O requests for newly appended log entries or heartbeats.
|
||||
**/
|
||||
static int triggerAll(SSyncRaft* pRaft) {
|
||||
#if 0
|
||||
assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
|
||||
int i;
|
||||
|
||||
|
@ -287,8 +297,10 @@ static int triggerAll(SSyncRaft* pRaft) {
|
|||
continue;
|
||||
}
|
||||
|
||||
syncRaftReplicate(pRaft, i);
|
||||
syncRaftReplicate(pRaft, pRaft->tracker->progressMap.progress[i], true);
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void abortLeaderTransfer(SSyncRaft* pRaft) {
|
||||
|
|
|
@ -44,16 +44,16 @@ void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNo
|
|||
int i, min;
|
||||
|
||||
for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) {
|
||||
if (config->incoming.nodeInfo[i].nodeId == id) {
|
||||
if (config->incoming.nodeId[i] == id) {
|
||||
return;
|
||||
}
|
||||
if (min == -1 && config->incoming.nodeInfo[i].nodeId == SYNC_NON_NODE_ID) {
|
||||
if (min == -1 && config->incoming.nodeId[i] == SYNC_NON_NODE_ID) {
|
||||
min = i;
|
||||
}
|
||||
}
|
||||
|
||||
assert(min != -1);
|
||||
config->incoming.nodeInfo[min].nodeId = id;
|
||||
config->incoming.nodeId[min] = id;
|
||||
config->incoming.replica += 1;
|
||||
}
|
||||
|
||||
|
@ -61,12 +61,25 @@ void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, S
|
|||
int i;
|
||||
|
||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
if (config->incoming.nodeInfo[i].nodeId == id) {
|
||||
if (config->incoming.nodeId[i] == id) {
|
||||
config->incoming.replica -= 1;
|
||||
config->incoming.nodeInfo[i].nodeId = SYNC_NON_NODE_ID;
|
||||
config->incoming.nodeId[i] = SYNC_NON_NODE_ID;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert(config->incoming.replica >= 0);
|
||||
}
|
||||
|
||||
|
||||
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
|
||||
int i;
|
||||
|
||||
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
if (nodeId == nodeMap->nodeId[i]) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
|
@ -22,14 +22,14 @@
|
|||
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
|
||||
* quorum of no has been reached).
|
||||
**/
|
||||
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteType* votes) {
|
||||
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes) {
|
||||
if (config->replica == 0) {
|
||||
return SYNC_RAFT_VOTE_WON;
|
||||
}
|
||||
|
||||
int i, g, r, missing;
|
||||
for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
if (config->nodeInfo[i].nodeId == SYNC_NON_NODE_ID) {
|
||||
if (config->nodeId[i] == SYNC_NON_NODE_ID) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue