Merge pull request #15944 from taosdata/feature/3.0_mhli
refactor(sync): add leader, follower call back
This commit is contained in:
commit
d59a0b390b
|
@ -129,6 +129,9 @@ typedef struct SSyncFSM {
|
|||
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
|
||||
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
|
||||
void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm);
|
||||
void (*FpBecomeFollowerCb)(struct SSyncFSM* pFsm);
|
||||
|
||||
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
|
||||
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||
|
||||
|
|
|
@ -166,6 +166,18 @@ void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
|
|||
mDebug("vgId:1, mnode leader transfer finish");
|
||||
}
|
||||
|
||||
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
mDebug("vgId:1, become follower");
|
||||
|
||||
// clear old leader resource
|
||||
}
|
||||
|
||||
static void mndBecomeLeader(struct SSyncFSM *pFsm) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
mDebug("vgId:1, become leader");
|
||||
}
|
||||
|
||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||
pFsm->data = pMnode;
|
||||
|
@ -175,6 +187,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
|||
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
||||
pFsm->FpLeaderTransferCb = mndLeaderTransfer;
|
||||
pFsm->FpReConfigCb = mndReConfig;
|
||||
pFsm->FpBecomeLeaderCb = mndBecomeLeader;
|
||||
pFsm->FpBecomeFollowerCb = mndBecomeFollower;
|
||||
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
|
||||
pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
|
||||
pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
|
||||
|
|
|
@ -672,6 +672,18 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
|
|||
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
|
||||
}
|
||||
|
||||
static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
|
||||
SVnode *pVnode = pFsm->data;
|
||||
vDebug("vgId:%d, become follower", pVnode->config.vgId);
|
||||
|
||||
// clear old leader resource
|
||||
}
|
||||
|
||||
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
|
||||
SVnode *pVnode = pFsm->data;
|
||||
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
||||
}
|
||||
|
||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||
pFsm->data = pVnode;
|
||||
|
@ -681,6 +693,8 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
|||
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
|
||||
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
|
||||
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
|
||||
pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
|
||||
pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
|
||||
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
||||
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
||||
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
|
||||
|
|
|
@ -2028,6 +2028,11 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// reset elect timer
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
|
||||
// call back
|
||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
|
||||
pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
|
||||
}
|
||||
|
||||
// trace log
|
||||
do {
|
||||
int32_t debugStrLen = strlen(debugStr);
|
||||
|
@ -2109,6 +2114,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// start heartbeat timer
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
|
||||
// call back
|
||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
|
||||
pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
|
||||
}
|
||||
|
||||
// trace log
|
||||
do {
|
||||
int32_t debugStrLen = strlen(debugStr);
|
||||
|
@ -3100,7 +3110,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
|
|||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
}
|
||||
|
||||
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
||||
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
|
|
Loading…
Reference in New Issue