refactor(sync): add leader, follower call back
This commit is contained in:
parent
6cabe5847e
commit
b4c8685766
|
@ -129,6 +129,9 @@ typedef struct SSyncFSM {
|
||||||
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
|
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
|
||||||
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||||
|
|
||||||
|
void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm);
|
||||||
|
void (*FpBecomeFollowerCb)(struct SSyncFSM* pFsm);
|
||||||
|
|
||||||
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
|
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
|
||||||
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||||
|
|
||||||
|
|
|
@ -166,6 +166,18 @@ void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
|
||||||
mDebug("vgId:1, mnode leader transfer finish");
|
mDebug("vgId:1, mnode leader transfer finish");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
|
||||||
|
SMnode *pMnode = pFsm->data;
|
||||||
|
mDebug("vgId:1, become follower");
|
||||||
|
|
||||||
|
// clear old leader resource
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mndBecomeLeader(struct SSyncFSM *pFsm) {
|
||||||
|
SMnode *pMnode = pFsm->data;
|
||||||
|
mDebug("vgId:1, become leader");
|
||||||
|
}
|
||||||
|
|
||||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||||
pFsm->data = pMnode;
|
pFsm->data = pMnode;
|
||||||
|
@ -175,6 +187,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
||||||
pFsm->FpLeaderTransferCb = mndLeaderTransfer;
|
pFsm->FpLeaderTransferCb = mndLeaderTransfer;
|
||||||
pFsm->FpReConfigCb = mndReConfig;
|
pFsm->FpReConfigCb = mndReConfig;
|
||||||
|
pFsm->FpBecomeLeaderCb = mndBecomeLeader;
|
||||||
|
pFsm->FpBecomeFollowerCb = mndBecomeFollower;
|
||||||
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
|
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
|
||||||
pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
|
pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
|
||||||
pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
|
pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
|
||||||
|
|
|
@ -672,6 +672,18 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
|
||||||
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
|
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
|
||||||
|
SVnode *pVnode = pFsm->data;
|
||||||
|
vDebug("vgId:%d, become follower", pVnode->config.vgId);
|
||||||
|
|
||||||
|
// clear old leader resource
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
|
||||||
|
SVnode *pVnode = pFsm->data;
|
||||||
|
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
||||||
|
}
|
||||||
|
|
||||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||||
pFsm->data = pVnode;
|
pFsm->data = pVnode;
|
||||||
|
@ -681,6 +693,8 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
|
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
|
||||||
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
|
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
|
||||||
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
|
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
|
||||||
|
pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
|
||||||
|
pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
|
||||||
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
||||||
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
||||||
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
|
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
|
||||||
|
|
|
@ -2028,6 +2028,11 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
// reset elect timer
|
// reset elect timer
|
||||||
syncNodeResetElectTimer(pSyncNode);
|
syncNodeResetElectTimer(pSyncNode);
|
||||||
|
|
||||||
|
// call back
|
||||||
|
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
|
||||||
|
pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
|
||||||
|
}
|
||||||
|
|
||||||
// trace log
|
// trace log
|
||||||
do {
|
do {
|
||||||
int32_t debugStrLen = strlen(debugStr);
|
int32_t debugStrLen = strlen(debugStr);
|
||||||
|
@ -2109,6 +2114,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
// start heartbeat timer
|
// start heartbeat timer
|
||||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||||
|
|
||||||
|
// call back
|
||||||
|
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
|
||||||
|
pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
|
||||||
|
}
|
||||||
|
|
||||||
// trace log
|
// trace log
|
||||||
do {
|
do {
|
||||||
int32_t debugStrLen = strlen(debugStr);
|
int32_t debugStrLen = strlen(debugStr);
|
||||||
|
@ -3100,7 +3110,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
|
||||||
syncNodeEventLog(pSyncNode, logBuf);
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
||||||
char host[64];
|
char host[64];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
|
Loading…
Reference in New Issue