From 77359f26b34dd108b9e2fad074167c0b3f9759f5 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Tue, 7 Jan 2025 15:26:01 +0800 Subject: [PATCH] Fix(cfg): resolve dynamic configuration update failure after rolling upgrade to 3.3.5.0 --- include/libs/sync/sync.h | 2 ++ source/dnode/mnode/impl/src/mndConfig.c | 2 ++ source/dnode/mnode/impl/src/mndSync.c | 15 ++++++++++++--- source/dnode/vnode/src/vnd/vnodeSync.c | 1 + source/libs/sync/src/syncPipeline.c | 10 +++++++++- 5 files changed, 26 insertions(+), 4 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 50c096258e..f1f907ce37 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -176,7 +176,9 @@ typedef struct SSyncFSM { void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx); + void (*FpAfterRestoredCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SReConfigCbMeta* pMeta); + void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm); diff --git a/source/dnode/mnode/impl/src/mndConfig.c b/source/dnode/mnode/impl/src/mndConfig.c index 74bb0561cd..10b9add0a3 100644 --- a/source/dnode/mnode/impl/src/mndConfig.c +++ b/source/dnode/mnode/impl/src/mndConfig.c @@ -518,6 +518,8 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { SMCfgDnodeReq cfgReq = {0}; SConfigObj *vObj = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion"); if (vObj == NULL) { + code = TSDB_CODE_SDB_OBJ_NOT_THERE; + mInfo("failed to acquire mnd config version, since %s", tstrerror(code)); goto _err_out; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 7ed970be62..76642d5e58 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -309,9 +309,6 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { } else { mInfo("vgId:1, sync restore finished, repeat call"); } - if (sdbAfterRestored(pMnode->pSdb) != 0) { - mError("failed to prepare sdb while start mnode"); - } } else { mInfo("vgId:1, sync restore finished"); } @@ -329,6 +326,17 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { } } +void mndAfterRestored(const SSyncFSM *pFsm, const SyncIndex commitIdx) { + SMnode *pMnode = pFsm->data; + + if (!pMnode->deploy) { + if (sdbAfterRestored(pMnode->pSdb) != 0) { + mError("failed to prepare sdb while start mnode"); + } + mInfo("vgId:1, sync restore finished and restore sdb success"); + } +} + int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { mInfo("start to read snapshot from sdb"); SMnode *pMnode = pFsm->data; @@ -443,6 +451,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpPreCommitCb = NULL; pFsm->FpRollBackCb = NULL; pFsm->FpRestoreFinishCb = mndRestoreFinish; + pFsm->FpAfterRestoredCb = mndAfterRestored; pFsm->FpLeaderTransferCb = NULL; pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty; pFsm->FpApplyQueueItems = mndApplyQueueItems; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index cea82c13ff..068f4dec3d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -697,6 +697,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpGetSnapshot = NULL; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo; pFsm->FpRestoreFinishCb = vnodeRestoreFinish; + pFsm->FpAfterRestoredCb = NULL; pFsm->FpLeaderTransferCb = NULL; pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty; pFsm->FpApplyQueueItems = vnodeApplyQueueItems; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index af6aab5d2b..5a5d1f02f8 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -15,10 +15,10 @@ #define _DEFAULT_SOURCE -#include "syncPipeline.h" #include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncPipeline.h" #include "syncRaftCfg.h" #include "syncRaftEntry.h" #include "syncRaftStore.h" @@ -901,12 +901,14 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } while (true); code = 0; + bool restoreFinishAtThisCommit = false; _out: // mark as restored if needed if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && currentTerm <= pEntry->term) { pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex); pNode->restoreFinish = true; + restoreFinishAtThisCommit = true; sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); } @@ -920,6 +922,12 @@ _out: pNextEntry = NULL; } (void)taosThreadMutexUnlock(&pBuf->mutex); + + if (restoreFinishAtThisCommit && pNode->pFsm->FpAfterRestoredCb != NULL) { + pNode->pFsm->FpAfterRestoredCb(pNode->pFsm, pBuf->commitIndex); + sInfo("vgId:%d, after restore finished callback executed)", pNode->vgId); + } + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); TAOS_RETURN(code); }