From dc82c1ba226f74805edbd519b959b4254b805572 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 16:00:03 +0800 Subject: [PATCH] enh(sync) sync/mnode integration, syncStart async -> sync --- source/dnode/mnode/impl/src/mndSync.c | 39 ++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 5eec46056b..16c6b7f7fa 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -128,6 +128,18 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); + //sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term); + + if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + tsem_post(&pMgmt->syncSem); + } + +#if 0 SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pFsm->FpGetSnapshot != NULL) { SSnapshot snapshot = {0}; @@ -166,8 +178,23 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM } } +#endif + } +int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { + SMnode *pMnode = pFsm->data; + pSnapshot->lastApplyIndex = pMnode->pSdb->lastCommitVer; + //pSnapshot->lastApplyTerm = ...; + return 0; +} + +void mndRestoreFinish(struct SSyncFSM* pFsm) { + SMnode *pMnode = pFsm->data; + mndTransPullup(pMnode); +} + +#if 0 void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } @@ -175,19 +202,17 @@ void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } - -int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { - // snapshot - return 0; -} +#endif SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; pFsm->FpCommitCb = mndSyncCommitMsg; - pFsm->FpPreCommitCb = mndSyncPreCommitMsg; - pFsm->FpRollBackCb = mndSyncRollBackMsg; + pFsm->FpPreCommitCb = NULL; + pFsm->FpRollBackCb = NULL; pFsm->FpGetSnapshot = mndSyncGetSnapshot; + pFsm->FpRestoreFinish = mndRestoreFinish; + pFsm->FpRestoreSnapshot = NULL; return pFsm; }