Merge pull request #29502 from taosdata/fix/main/TD-33457

Fix(cfg): resolve dynamic configuration update failure after rolling …
This commit is contained in:
Shengliang Guan 2025-01-08 17:31:13 +08:00 committed by GitHub
commit 37eb686d7c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 66 additions and 8 deletions

View File

@ -261,6 +261,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_SDB, "config-sdb", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8

View File

@ -176,7 +176,9 @@ typedef struct SSyncFSM {
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
void (*FpAfterRestoredCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm);

View File

@ -114,6 +114,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_SDB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
@ -130,6 +132,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_SDB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -17,6 +17,7 @@
#include "audit.h"
#include "mndConfig.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndSync.h"
#include "mndTrans.h"
@ -33,8 +34,10 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessConfigReq(SRpcMsg *pReq);
static int32_t mndInitWriteCfg(SMnode *pMnode);
static int32_t mndTryRebuildCfg(SMnode *pMnode);
static int32_t mndSendRebuildReq(SMnode *pMnode);
static int32_t mndTryRebuildConfigSdbRsp(SRpcMsg *pRsp);
static int32_t initConfigArrayFromSdb(SMnode *pMnode, SArray *array);
static int32_t mndTryRebuildConfigSdb(SRpcMsg *pReq);
static void cfgArrayCleanUp(SArray *array);
static void cfgObjArrayCleanUp(SArray *array);
@ -59,6 +62,8 @@ int32_t mndInitConfig(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_SDB, mndTryRebuildConfigSdb);
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_SDB_RSP, mndTryRebuildConfigSdbRsp);
return sdbSetTable(pMnode->pSdb, table);
}
@ -214,7 +219,7 @@ static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigObj *pOld, SConfigObj *pNew
static int32_t mndCfgActionDeploy(SMnode *pMnode) { return mndInitWriteCfg(pMnode); }
static int32_t mndCfgActionAfterRestored(SMnode *pMnode) { return mndTryRebuildCfg(pMnode); }
static int32_t mndCfgActionAfterRestored(SMnode *pMnode) { return mndSendRebuildReq(pMnode); }
static int32_t mndProcessConfigReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
@ -340,7 +345,30 @@ _OVER:
return code;
}
int32_t mndTryRebuildCfg(SMnode *pMnode) {
int32_t mndSendRebuildReq(SMnode *pMnode) {
int32_t code = 0;
SRpcMsg rpcMsg = {.pCont = NULL,
.contLen = 0,
.msgType = TDMT_MND_CONFIG_SDB,
.info.ahandle = 0,
.info.notFreeAhandle = 1,
.info.refId = 0,
.info.noResp = 0,
.info.handle = 0};
SEpSet epSet = {0};
mndGetMnodeEpSet(pMnode, &epSet);
code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
mError("failed to send rebuild config req, since %s", tstrerror(code));
}
return code;
}
static int32_t mndTryRebuildConfigSdb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
if (!mndIsLeader(pMnode)) {
return TSDB_CODE_SUCCESS;
}
@ -518,6 +546,8 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
SMCfgDnodeReq cfgReq = {0};
SConfigObj *vObj = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion");
if (vObj == NULL) {
code = TSDB_CODE_SDB_OBJ_NOT_THERE;
mInfo("failed to acquire mnd config version, since %s", tstrerror(code));
goto _err_out;
}
@ -609,6 +639,11 @@ static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
return 0;
}
static int32_t mndTryRebuildConfigSdbRsp(SRpcMsg *pRsp) {
mInfo("rebuild config sdb rsp");
return 0;
}
// get int32_t value from 'SMCfgDnodeReq'
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32_t *pOutValue) {
int32_t code = 0;
@ -770,7 +805,6 @@ static void cfgObjArrayCleanUp(SArray *array) {
for (int32_t i = 0; i < sz; ++i) {
SConfigObj *obj = taosArrayGet(array, i);
tFreeSConfigObj(obj);
taosMemoryFree(obj);
}
taosArrayDestroy(array);
}

View File

@ -309,9 +309,6 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
} else {
mInfo("vgId:1, sync restore finished, repeat call");
}
if (sdbAfterRestored(pMnode->pSdb) != 0) {
mError("failed to prepare sdb while start mnode");
}
} else {
mInfo("vgId:1, sync restore finished");
}
@ -329,6 +326,17 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
}
}
void mndAfterRestored(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
SMnode *pMnode = pFsm->data;
if (!pMnode->deploy) {
if (sdbAfterRestored(pMnode->pSdb) != 0) {
mError("failed to prepare sdb while start mnode");
}
mInfo("vgId:1, sync restore finished and restore sdb success");
}
}
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
mInfo("start to read snapshot from sdb");
SMnode *pMnode = pFsm->data;
@ -443,6 +451,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpAfterRestoredCb = mndAfterRestored;
pFsm->FpLeaderTransferCb = NULL;
pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
pFsm->FpApplyQueueItems = mndApplyQueueItems;

View File

@ -697,6 +697,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpGetSnapshot = NULL;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
pFsm->FpAfterRestoredCb = NULL;
pFsm->FpLeaderTransferCb = NULL;
pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
pFsm->FpApplyQueueItems = vnodeApplyQueueItems;

View File

@ -15,10 +15,10 @@
#define _DEFAULT_SOURCE
#include "syncPipeline.h"
#include "syncCommit.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncPipeline.h"
#include "syncRaftCfg.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
@ -787,6 +787,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
bool inBuf = false;
SSyncRaftEntry* pNextEntry = NULL;
bool nextInBuf = false;
bool restoreFinishAtThisCommit = false;
if (commitIndex <= pBuf->commitIndex) {
sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex,
@ -907,6 +908,7 @@ _out:
currentTerm <= pEntry->term) {
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
pNode->restoreFinish = true;
restoreFinishAtThisCommit = true;
sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
}
@ -920,6 +922,12 @@ _out:
pNextEntry = NULL;
}
(void)taosThreadMutexUnlock(&pBuf->mutex);
if (restoreFinishAtThisCommit && pNode->pFsm->FpAfterRestoredCb != NULL) {
pNode->pFsm->FpAfterRestoredCb(pNode->pFsm, pBuf->commitIndex);
sInfo("vgId:%d, after restore finished callback executed)", pNode->vgId);
}
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
TAOS_RETURN(code);
}