Fix(cfg): resolve dynamic configuration update failure after rolling upgrade to 3.3.5.0
This commit is contained in:
parent
d148191bf2
commit
77359f26b3
|
@ -176,7 +176,9 @@ typedef struct SSyncFSM {
|
||||||
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
|
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
|
||||||
|
|
||||||
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
|
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
|
||||||
|
void (*FpAfterRestoredCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
|
||||||
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SReConfigCbMeta* pMeta);
|
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SReConfigCbMeta* pMeta);
|
||||||
|
|
||||||
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
|
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
|
||||||
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
|
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
|
||||||
int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm);
|
int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm);
|
||||||
|
|
|
@ -518,6 +518,8 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
SMCfgDnodeReq cfgReq = {0};
|
SMCfgDnodeReq cfgReq = {0};
|
||||||
SConfigObj *vObj = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion");
|
SConfigObj *vObj = sdbAcquire(pMnode->pSdb, SDB_CFG, "tsmmConfigVersion");
|
||||||
if (vObj == NULL) {
|
if (vObj == NULL) {
|
||||||
|
code = TSDB_CODE_SDB_OBJ_NOT_THERE;
|
||||||
|
mInfo("failed to acquire mnd config version, since %s", tstrerror(code));
|
||||||
goto _err_out;
|
goto _err_out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -309,9 +309,6 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
|
||||||
} else {
|
} else {
|
||||||
mInfo("vgId:1, sync restore finished, repeat call");
|
mInfo("vgId:1, sync restore finished, repeat call");
|
||||||
}
|
}
|
||||||
if (sdbAfterRestored(pMnode->pSdb) != 0) {
|
|
||||||
mError("failed to prepare sdb while start mnode");
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
mInfo("vgId:1, sync restore finished");
|
mInfo("vgId:1, sync restore finished");
|
||||||
}
|
}
|
||||||
|
@ -329,6 +326,17 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mndAfterRestored(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
|
||||||
|
SMnode *pMnode = pFsm->data;
|
||||||
|
|
||||||
|
if (!pMnode->deploy) {
|
||||||
|
if (sdbAfterRestored(pMnode->pSdb) != 0) {
|
||||||
|
mError("failed to prepare sdb while start mnode");
|
||||||
|
}
|
||||||
|
mInfo("vgId:1, sync restore finished and restore sdb success");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
||||||
mInfo("start to read snapshot from sdb");
|
mInfo("start to read snapshot from sdb");
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
|
@ -443,6 +451,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
pFsm->FpPreCommitCb = NULL;
|
pFsm->FpPreCommitCb = NULL;
|
||||||
pFsm->FpRollBackCb = NULL;
|
pFsm->FpRollBackCb = NULL;
|
||||||
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
||||||
|
pFsm->FpAfterRestoredCb = mndAfterRestored;
|
||||||
pFsm->FpLeaderTransferCb = NULL;
|
pFsm->FpLeaderTransferCb = NULL;
|
||||||
pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
|
pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
|
||||||
pFsm->FpApplyQueueItems = mndApplyQueueItems;
|
pFsm->FpApplyQueueItems = mndApplyQueueItems;
|
||||||
|
|
|
@ -697,6 +697,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
pFsm->FpGetSnapshot = NULL;
|
pFsm->FpGetSnapshot = NULL;
|
||||||
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
|
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
|
||||||
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
|
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
|
||||||
|
pFsm->FpAfterRestoredCb = NULL;
|
||||||
pFsm->FpLeaderTransferCb = NULL;
|
pFsm->FpLeaderTransferCb = NULL;
|
||||||
pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
|
pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
|
||||||
pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
|
pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
|
||||||
|
|
|
@ -15,10 +15,10 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
|
|
||||||
#include "syncPipeline.h"
|
|
||||||
#include "syncCommit.h"
|
#include "syncCommit.h"
|
||||||
#include "syncIndexMgr.h"
|
#include "syncIndexMgr.h"
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
|
#include "syncPipeline.h"
|
||||||
#include "syncRaftCfg.h"
|
#include "syncRaftCfg.h"
|
||||||
#include "syncRaftEntry.h"
|
#include "syncRaftEntry.h"
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
|
@ -901,12 +901,14 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
||||||
} while (true);
|
} while (true);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
bool restoreFinishAtThisCommit = false;
|
||||||
_out:
|
_out:
|
||||||
// mark as restored if needed
|
// mark as restored if needed
|
||||||
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
|
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
|
||||||
currentTerm <= pEntry->term) {
|
currentTerm <= pEntry->term) {
|
||||||
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
|
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
|
||||||
pNode->restoreFinish = true;
|
pNode->restoreFinish = true;
|
||||||
|
restoreFinishAtThisCommit = true;
|
||||||
sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
||||||
pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
}
|
}
|
||||||
|
@ -920,6 +922,12 @@ _out:
|
||||||
pNextEntry = NULL;
|
pNextEntry = NULL;
|
||||||
}
|
}
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
|
||||||
|
if (restoreFinishAtThisCommit && pNode->pFsm->FpAfterRestoredCb != NULL) {
|
||||||
|
pNode->pFsm->FpAfterRestoredCb(pNode->pFsm, pBuf->commitIndex);
|
||||||
|
sInfo("vgId:%d, after restore finished callback executed)", pNode->vgId);
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue