diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 93bfe306b6..8bdc9a9346 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -261,6 +261,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_SDB, "config-sdb", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 50c096258e..f1f907ce37 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -176,7 +176,9 @@ typedef struct SSyncFSM { void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx); + void (*FpAfterRestoredCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SReConfigCbMeta* pMeta); + void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 165437ed28..80ef0d31de 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -114,6 +114,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_SDB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -130,6 +132,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_SDB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndConfig.c b/source/dnode/mnode/impl/src/mndConfig.c index 74bb0561cd..0729b2a26e 100644 --- a/source/dnode/mnode/impl/src/mndConfig.c +++ b/source/dnode/mnode/impl/src/mndConfig.c @@ -17,6 +17,7 @@ #include "audit.h" #include "mndConfig.h" #include "mndDnode.h" +#include "mndMnode.h" #include "mndPrivilege.h" #include "mndSync.h" #include "mndTrans.h" @@ -33,8 +34,10 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq); static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessConfigReq(SRpcMsg *pReq); static int32_t mndInitWriteCfg(SMnode *pMnode); -static int32_t mndTryRebuildCfg(SMnode *pMnode); +static int32_t mndSendRebuildReq(SMnode *pMnode); +static int32_t mndTryRebuildConfigSdbRsp(SRpcMsg *pRsp); static int32_t initConfigArrayFromSdb(SMnode *pMnode, SArray *array); +static int32_t mndTryRebuildConfigSdb(SRpcMsg *pReq); static void cfgArrayCleanUp(SArray *array); static void cfgObjArrayCleanUp(SArray *array); @@ -59,6 +62,8 @@ int32_t mndInitConfig(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq); mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp); mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq); + mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_SDB, mndTryRebuildConfigSdb); + mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_SDB_RSP, mndTryRebuildConfigSdbRsp); return sdbSetTable(pMnode->pSdb, table); } @@ -214,7 +219,7 @@ static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigObj *pOld, SConfigObj *pNew static int32_t mndCfgActionDeploy(SMnode *pMnode) { return mndInitWriteCfg(pMnode); } -static int32_t mndCfgActionAfterRestored(SMnode *pMnode) { return mndTryRebuildCfg(pMnode); } +static int32_t mndCfgActionAfterRestored(SMnode *pMnode) { return mndSendRebuildReq(pMnode); } static int32_t mndProcessConfigReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; @@ -340,7 +345,30 @@ _OVER: return code; } -int32_t mndTryRebuildCfg(SMnode *pMnode) { +int32_t mndSendRebuildReq(SMnode *pMnode) { + int32_t code = 0; + + SRpcMsg rpcMsg = {.pCont = NULL, + .contLen = 0, + .msgType = TDMT_MND_CONFIG_SDB, + .info.ahandle = 0, + .info.notFreeAhandle = 1, + .info.refId = 0, + .info.noResp = 0, + .info.handle = 0}; + SEpSet epSet = {0}; + + mndGetMnodeEpSet(pMnode, &epSet); + + code = tmsgSendReq(&epSet, &rpcMsg); + if (code != 0) { + mError("failed to send rebuild config req, since %s", tstrerror(code)); + } + return code; +} + +static int32_t mndTryRebuildConfigSdb(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; if (!mndIsLeader(pMnode)) { return TSDB_CODE_SUCCESS; } @@ -518,6 +546,8 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { SMCfgDnodeReq cfgReq = {0}; SConfigObj *vObj = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion"); if (vObj == NULL) { + code = TSDB_CODE_SDB_OBJ_NOT_THERE; + mInfo("failed to acquire mnd config version, since %s", tstrerror(code)); goto _err_out; } @@ -609,6 +639,11 @@ static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) { return 0; } +static int32_t mndTryRebuildConfigSdbRsp(SRpcMsg *pRsp) { + mInfo("rebuild config sdb rsp"); + return 0; +} + // get int32_t value from 'SMCfgDnodeReq' static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32_t *pOutValue) { int32_t code = 0; @@ -770,7 +805,6 @@ static void cfgObjArrayCleanUp(SArray *array) { for (int32_t i = 0; i < sz; ++i) { SConfigObj *obj = taosArrayGet(array, i); tFreeSConfigObj(obj); - taosMemoryFree(obj); } taosArrayDestroy(array); } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 7ed970be62..76642d5e58 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -309,9 +309,6 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { } else { mInfo("vgId:1, sync restore finished, repeat call"); } - if (sdbAfterRestored(pMnode->pSdb) != 0) { - mError("failed to prepare sdb while start mnode"); - } } else { mInfo("vgId:1, sync restore finished"); } @@ -329,6 +326,17 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { } } +void mndAfterRestored(const SSyncFSM *pFsm, const SyncIndex commitIdx) { + SMnode *pMnode = pFsm->data; + + if (!pMnode->deploy) { + if (sdbAfterRestored(pMnode->pSdb) != 0) { + mError("failed to prepare sdb while start mnode"); + } + mInfo("vgId:1, sync restore finished and restore sdb success"); + } +} + int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { mInfo("start to read snapshot from sdb"); SMnode *pMnode = pFsm->data; @@ -443,6 +451,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpPreCommitCb = NULL; pFsm->FpRollBackCb = NULL; pFsm->FpRestoreFinishCb = mndRestoreFinish; + pFsm->FpAfterRestoredCb = mndAfterRestored; pFsm->FpLeaderTransferCb = NULL; pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty; pFsm->FpApplyQueueItems = mndApplyQueueItems; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index cea82c13ff..068f4dec3d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -697,6 +697,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpGetSnapshot = NULL; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo; pFsm->FpRestoreFinishCb = vnodeRestoreFinish; + pFsm->FpAfterRestoredCb = NULL; pFsm->FpLeaderTransferCb = NULL; pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty; pFsm->FpApplyQueueItems = vnodeApplyQueueItems; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index af6aab5d2b..3022a1f8ac 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -15,10 +15,10 @@ #define _DEFAULT_SOURCE -#include "syncPipeline.h" #include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncPipeline.h" #include "syncRaftCfg.h" #include "syncRaftEntry.h" #include "syncRaftStore.h" @@ -787,6 +787,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm bool inBuf = false; SSyncRaftEntry* pNextEntry = NULL; bool nextInBuf = false; + bool restoreFinishAtThisCommit = false; if (commitIndex <= pBuf->commitIndex) { sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex, @@ -907,6 +908,7 @@ _out: currentTerm <= pEntry->term) { pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex); pNode->restoreFinish = true; + restoreFinishAtThisCommit = true; sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); } @@ -920,6 +922,12 @@ _out: pNextEntry = NULL; } (void)taosThreadMutexUnlock(&pBuf->mutex); + + if (restoreFinishAtThisCommit && pNode->pFsm->FpAfterRestoredCb != NULL) { + pNode->pFsm->FpAfterRestoredCb(pNode->pFsm, pBuf->commitIndex); + sInfo("vgId:%d, after restore finished callback executed)", pNode->vgId); + } + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); TAOS_RETURN(code); }