enh(sync) sync/mnode integration, syncStart async -> sync

This commit is contained in:
Minghao Li 2022-05-23 16:00:03 +08:00
parent 4535722957
commit dc82c1ba22
1 changed files with 32 additions and 7 deletions

View File

@ -128,6 +128,18 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg;
pApplyMsg->info.node = pFsm->data;
mndProcessApplyMsg(pApplyMsg);
//sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term);
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_post(&pMgmt->syncSem);
}
#if 0
SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) {
SSnapshot snapshot = {0};
@ -166,8 +178,23 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
}
}
#endif
}
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SMnode *pMnode = pFsm->data;
pSnapshot->lastApplyIndex = pMnode->pSdb->lastCommitVer;
//pSnapshot->lastApplyTerm = ...;
return 0;
}
void mndRestoreFinish(struct SSyncFSM* pFsm) {
SMnode *pMnode = pFsm->data;
mndTransPullup(pMnode);
}
#if 0
void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
// strict consistent, do nothing
}
@ -175,19 +202,17 @@ void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
// strict consistent, do nothing
}
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
// snapshot
return 0;
}
#endif
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode;
pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpPreCommitCb = mndSyncPreCommitMsg;
pFsm->FpRollBackCb = mndSyncRollBackMsg;
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpRestoreFinish = mndRestoreFinish;
pFsm->FpRestoreSnapshot = NULL;
return pFsm;
}