[TD-10645][raft]<feature>refactor sync interface
This commit is contained in:
parent
98a7907b76
commit
f10a91ff03
|
@ -29,10 +29,10 @@ typedef int64_t SyncIndex;
|
||||||
typedef uint64_t SyncTerm;
|
typedef uint64_t SyncTerm;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_SYNC_ROLE_FOLLOWER = 0,
|
TAOS_SYNC_STATE_FOLLOWER = 0,
|
||||||
TAOS_SYNC_ROLE_CANDIDATE = 1,
|
TAOS_SYNC_STATE_CANDIDATE = 1,
|
||||||
TAOS_SYNC_ROLE_LEADER = 2,
|
TAOS_SYNC_STATE_LEADER = 2,
|
||||||
} ESyncRole;
|
} ESyncState;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void* data;
|
void* data;
|
||||||
|
@ -55,7 +55,7 @@ typedef struct {
|
||||||
int32_t selfIndex;
|
int32_t selfIndex;
|
||||||
int32_t replica;
|
int32_t replica;
|
||||||
SNodeInfo node[TSDB_MAX_REPLICA];
|
SNodeInfo node[TSDB_MAX_REPLICA];
|
||||||
ESyncRole role[TSDB_MAX_REPLICA];
|
ESyncState role[TSDB_MAX_REPLICA];
|
||||||
} SNodesRole;
|
} SNodesRole;
|
||||||
|
|
||||||
typedef struct SSyncFSM {
|
typedef struct SSyncFSM {
|
||||||
|
@ -159,9 +159,9 @@ void syncStop(const SSyncNode*);
|
||||||
|
|
||||||
int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak);
|
int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak);
|
||||||
|
|
||||||
// int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
||||||
|
|
||||||
// int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
||||||
|
|
||||||
extern int32_t sDebugFlag;
|
extern int32_t sDebugFlag;
|
||||||
|
|
||||||
|
|
|
@ -68,7 +68,7 @@ struct SSyncRaft {
|
||||||
int maxMsgSize;
|
int maxMsgSize;
|
||||||
SSyncRaftProgressTracker *tracker;
|
SSyncRaftProgressTracker *tracker;
|
||||||
|
|
||||||
ESyncRole state;
|
ESyncState state;
|
||||||
|
|
||||||
// isLearner is true if the local raft node is a learner.
|
// isLearner is true if the local raft node is a learner.
|
||||||
bool isLearner;
|
bool isLearner;
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
#include "raft_message.h"
|
#include "raft_message.h"
|
||||||
|
|
||||||
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
||||||
if (pRaft->state == TAOS_SYNC_ROLE_LEADER) {
|
if (pRaft->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId);
|
syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
||||||
int quorum;
|
int quorum;
|
||||||
int voterIndex;
|
int voterIndex;
|
||||||
|
|
||||||
assert(pRaft->state == TAOS_SYNC_ROLE_CANDIDATE);
|
assert(pRaft->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||||
|
|
||||||
voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
|
voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
|
||||||
if (voterIndex == -1) {
|
if (voterIndex == -1) {
|
||||||
|
@ -31,7 +31,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRaft->state != TAOS_SYNC_ROLE_CANDIDATE) {
|
if (pRaft->state != TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
syncError("[%d:%d] is not candidate, ignore vote resp", pRaft->selfGroupId, pRaft->selfId);
|
syncError("[%d:%d] is not candidate, ignore vote resp", pRaft->selfGroupId, pRaft->selfId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm
|
||||||
|
|
||||||
int syncRaftReplicate(SSyncRaft* pRaft, int i) {
|
int syncRaftReplicate(SSyncRaft* pRaft, int i) {
|
||||||
#if 0
|
#if 0
|
||||||
assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
|
assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
|
||||||
assert(i >= 0 && i < pRaft->leaderState.nProgress);
|
assert(i >= 0 && i < pRaft->leaderState.nProgress);
|
||||||
|
|
||||||
SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId;
|
SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId;
|
||||||
|
|
|
@ -167,6 +167,14 @@ int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, b
|
||||||
|
|
||||||
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
|
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
|
||||||
|
|
||||||
|
int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// process rpc rsp message from other sync server
|
// process rpc rsp message from other sync server
|
||||||
static void syncProcessRsp(SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void syncProcessRsp(SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,7 @@ void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId
|
||||||
resetRaft(pRaft, term);
|
resetRaft(pRaft, term);
|
||||||
pRaft->tickFp = tickElection;
|
pRaft->tickFp = tickElection;
|
||||||
pRaft->leaderId = leaderId;
|
pRaft->leaderId = leaderId;
|
||||||
pRaft->state = TAOS_SYNC_ROLE_FOLLOWER;
|
pRaft->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||||
syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
|
syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
|
||||||
**/
|
**/
|
||||||
pRaft->stepFp = stepCandidate;
|
pRaft->stepFp = stepCandidate;
|
||||||
pRaft->tickFp = tickElection;
|
pRaft->tickFp = tickElection;
|
||||||
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
|
pRaft->state = TAOS_SYNC_STATE_CANDIDATE;
|
||||||
pRaft->candidateState.inPreVote = true;
|
pRaft->candidateState.inPreVote = true;
|
||||||
syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
|
syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
|
||||||
}
|
}
|
||||||
|
@ -72,17 +72,17 @@ void syncRaftBecomeCandidate(SSyncRaft* pRaft) {
|
||||||
resetRaft(pRaft, pRaft->term + 1);
|
resetRaft(pRaft, pRaft->term + 1);
|
||||||
pRaft->tickFp = tickElection;
|
pRaft->tickFp = tickElection;
|
||||||
pRaft->voteFor = pRaft->selfId;
|
pRaft->voteFor = pRaft->selfId;
|
||||||
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
|
pRaft->state = TAOS_SYNC_STATE_CANDIDATE;
|
||||||
syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
|
syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftBecomeLeader(SSyncRaft* pRaft) {
|
void syncRaftBecomeLeader(SSyncRaft* pRaft) {
|
||||||
assert(pRaft->state != TAOS_SYNC_ROLE_FOLLOWER);
|
assert(pRaft->state != TAOS_SYNC_STATE_FOLLOWER);
|
||||||
|
|
||||||
pRaft->stepFp = stepLeader;
|
pRaft->stepFp = stepLeader;
|
||||||
resetRaft(pRaft, pRaft->term);
|
resetRaft(pRaft, pRaft->term);
|
||||||
pRaft->leaderId = pRaft->leaderId;
|
pRaft->leaderId = pRaft->leaderId;
|
||||||
pRaft->state = TAOS_SYNC_ROLE_LEADER;
|
pRaft->state = TAOS_SYNC_STATE_LEADER;
|
||||||
// TODO: check if there is pending config log
|
// TODO: check if there is pending config log
|
||||||
int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log);
|
int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log);
|
||||||
if (nPendingConf > 1) {
|
if (nPendingConf > 1) {
|
||||||
|
@ -263,7 +263,7 @@ static bool maybeCommit(SSyncRaft* pRaft) {
|
||||||
* trigger I/O requests for newly appended log entries or heartbeats.
|
* trigger I/O requests for newly appended log entries or heartbeats.
|
||||||
**/
|
**/
|
||||||
static int triggerAll(SSyncRaft* pRaft) {
|
static int triggerAll(SSyncRaft* pRaft) {
|
||||||
assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
|
assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for (i = 0; i < pRaft->cluster.replica; ++i) {
|
for (i = 0; i < pRaft->cluster.replica; ++i) {
|
||||||
|
|
Loading…
Reference in New Issue