Merge pull request #8621 from taosdata/feature/sync-implementation
[TD-10645][raft]<feature>refactor sync interface
This commit is contained in:
commit
d5e6553ca9
|
@ -29,10 +29,10 @@ typedef int64_t SyncIndex;
|
|||
typedef uint64_t SyncTerm;
|
||||
|
||||
typedef enum {
|
||||
TAOS_SYNC_ROLE_FOLLOWER = 0,
|
||||
TAOS_SYNC_ROLE_CANDIDATE = 1,
|
||||
TAOS_SYNC_ROLE_LEADER = 2,
|
||||
} ESyncRole;
|
||||
TAOS_SYNC_STATE_FOLLOWER = 0,
|
||||
TAOS_SYNC_STATE_CANDIDATE = 1,
|
||||
TAOS_SYNC_STATE_LEADER = 2,
|
||||
} ESyncState;
|
||||
|
||||
typedef struct {
|
||||
void* data;
|
||||
|
@ -55,7 +55,7 @@ typedef struct {
|
|||
int32_t selfIndex;
|
||||
int32_t replica;
|
||||
SNodeInfo node[TSDB_MAX_REPLICA];
|
||||
ESyncRole role[TSDB_MAX_REPLICA];
|
||||
ESyncState role[TSDB_MAX_REPLICA];
|
||||
} SNodesRole;
|
||||
|
||||
typedef struct SSyncFSM {
|
||||
|
@ -159,9 +159,9 @@ void syncStop(const SSyncNode*);
|
|||
|
||||
int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak);
|
||||
|
||||
// int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
||||
int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
||||
|
||||
// int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
||||
int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
||||
|
||||
extern int32_t sDebugFlag;
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ struct SSyncRaft {
|
|||
int maxMsgSize;
|
||||
SSyncRaftProgressTracker *tracker;
|
||||
|
||||
ESyncRole state;
|
||||
ESyncState state;
|
||||
|
||||
// isLearner is true if the local raft node is a learner.
|
||||
bool isLearner;
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "raft_message.h"
|
||||
|
||||
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
||||
if (pRaft->state == TAOS_SYNC_ROLE_LEADER) {
|
||||
if (pRaft->state == TAOS_SYNC_STATE_LEADER) {
|
||||
syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
|||
int quorum;
|
||||
int voterIndex;
|
||||
|
||||
assert(pRaft->state == TAOS_SYNC_ROLE_CANDIDATE);
|
||||
assert(pRaft->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||
|
||||
voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
|
||||
if (voterIndex == -1) {
|
||||
|
@ -31,7 +31,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (pRaft->state != TAOS_SYNC_ROLE_CANDIDATE) {
|
||||
if (pRaft->state != TAOS_SYNC_STATE_CANDIDATE) {
|
||||
syncError("[%d:%d] is not candidate, ignore vote resp", pRaft->selfGroupId, pRaft->selfId);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm
|
|||
|
||||
int syncRaftReplicate(SSyncRaft* pRaft, int i) {
|
||||
#if 0
|
||||
assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
|
||||
assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
|
||||
assert(i >= 0 && i < pRaft->leaderState.nProgress);
|
||||
|
||||
SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId;
|
||||
|
|
|
@ -167,6 +167,14 @@ int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, b
|
|||
|
||||
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
|
||||
|
||||
int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// process rpc rsp message from other sync server
|
||||
static void syncProcessRsp(SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId
|
|||
resetRaft(pRaft, term);
|
||||
pRaft->tickFp = tickElection;
|
||||
pRaft->leaderId = leaderId;
|
||||
pRaft->state = TAOS_SYNC_ROLE_FOLLOWER;
|
||||
pRaft->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
|
|||
**/
|
||||
pRaft->stepFp = stepCandidate;
|
||||
pRaft->tickFp = tickElection;
|
||||
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
|
||||
pRaft->state = TAOS_SYNC_STATE_CANDIDATE;
|
||||
pRaft->candidateState.inPreVote = true;
|
||||
syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
|
||||
}
|
||||
|
@ -72,17 +72,17 @@ void syncRaftBecomeCandidate(SSyncRaft* pRaft) {
|
|||
resetRaft(pRaft, pRaft->term + 1);
|
||||
pRaft->tickFp = tickElection;
|
||||
pRaft->voteFor = pRaft->selfId;
|
||||
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
|
||||
pRaft->state = TAOS_SYNC_STATE_CANDIDATE;
|
||||
syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
|
||||
}
|
||||
|
||||
void syncRaftBecomeLeader(SSyncRaft* pRaft) {
|
||||
assert(pRaft->state != TAOS_SYNC_ROLE_FOLLOWER);
|
||||
assert(pRaft->state != TAOS_SYNC_STATE_FOLLOWER);
|
||||
|
||||
pRaft->stepFp = stepLeader;
|
||||
resetRaft(pRaft, pRaft->term);
|
||||
pRaft->leaderId = pRaft->leaderId;
|
||||
pRaft->state = TAOS_SYNC_ROLE_LEADER;
|
||||
pRaft->state = TAOS_SYNC_STATE_LEADER;
|
||||
// TODO: check if there is pending config log
|
||||
int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log);
|
||||
if (nPendingConf > 1) {
|
||||
|
@ -263,7 +263,7 @@ static bool maybeCommit(SSyncRaft* pRaft) {
|
|||
* trigger I/O requests for newly appended log entries or heartbeats.
|
||||
**/
|
||||
static int triggerAll(SSyncRaft* pRaft) {
|
||||
assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
|
||||
assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
|
||||
int i;
|
||||
|
||||
for (i = 0; i < pRaft->cluster.replica; ++i) {
|
||||
|
|
Loading…
Reference in New Issue