From 1392711e66ccbd144c1ae16cf0f1ffc818a50872 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 9 Mar 2023 18:12:40 +0800 Subject: [PATCH] enh: add fsm FpAppliedIndexCb to get the current applied index --- include/libs/sync/sync.h | 1 + source/dnode/mnode/impl/inc/mndInt.h | 1 + source/dnode/mnode/impl/src/mndSync.c | 14 ++++++++++++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSync.c | 6 ++++++ 5 files changed, 24 insertions(+), 2 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 189484d1a6..5ea90906a8 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -143,6 +143,7 @@ typedef struct SSyncFSM { void* data; int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm); int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 006c74ef3d..ffb2443808 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -113,6 +113,7 @@ typedef struct SMnode { bool deploy; char *path; int64_t checkTime; + SyncIndex applied; SSdb *pSdb; SArray *pSteps; SQHandle *pQuery; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index f702d8f148..ce3caaad6c 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -129,9 +129,13 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { int32_t code = 0; + SMnode *pMnode = pFsm->data; + atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); + if (!syncUtilUserCommit(pMsg->msgType)) { goto _out; } + code = mndProcessWriteMsg(pFsm, pMsg, pMeta); _out: @@ -140,6 +144,11 @@ _out: return code; } +SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) { + SMnode *pMnode = pFSM->data; + return atomic_load_64(&pMnode->applied); +} + int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { mInfo("start to read snapshot from sdb in atomic way"); SMnode *pMnode = pFsm->data; @@ -253,6 +262,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; pFsm->FpCommitCb = mndSyncCommitMsg; + pFsm->FpAppliedIndexCb = mndSyncAppliedIndex; pFsm->FpPreCommitCb = NULL; pFsm->FpRollBackCb = NULL; pFsm->FpRestoreFinishCb = mndRestoreFinish; @@ -321,6 +331,10 @@ int32_t mndInitSync(SMnode *pMnode) { } pMnode->pSdb->sync = pMgmt->sync; + SSnapshot snap = {0}; + sdbGetCommitInfo(pMnode->pSdb, &snap.lastApplyIndex, &snap.lastApplyTerm, &snap.lastConfigIndex); + atomic_store_64(&pMnode->applied, snap.lastApplyIndex); + mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 222e795424..bfcf401ca8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -320,8 +320,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); ASSERT(pVnode->state.applied + 1 == version); - pVnode->state.applied = version; - pVnode->state.applyTerm = pMsg->info.conn.applyTerm; + atomic_store_64(&pVnode->state.applied, version); + atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm); if (!syncUtilUserCommit(pMsg->msgType)) goto _exit; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 989fdaf515..9613afd837 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -454,6 +454,11 @@ static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const return 0; } +static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) { + SVnode *pVnode = pFSM->data; + return atomic_load_64(&pVnode->state.applied); +} + static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", @@ -580,6 +585,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pVnode; pFsm->FpCommitCb = vnodeSyncCommitMsg; + pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;