From f078f5b30016821d400105acb0520dcabb0b9c52 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 26 May 2022 15:08:20 +0800 Subject: [PATCH] fix(sync): syncSetStandby --- include/libs/sync/sync.h | 1 + source/libs/sync/src/syncMain.c | 29 +++++++++++++++++++ .../libs/sync/test/syncConfigChangeTest.cpp | 14 +++++++-- 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 08640e25c2..2e04afdbdc 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -166,6 +166,7 @@ void syncCleanUp(); int64_t syncOpen(const SSyncInfo* pSyncInfo); void syncStart(int64_t rid); void syncStop(int64_t rid); +int32_t syncSetStandby(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); ESyncState syncGetMyRole(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9e6765e1d5..99aac7991a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -141,9 +141,38 @@ void syncStop(int64_t rid) { taosRemoveRef(tsNodeRefId, rid); } +int32_t syncSetStandby(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return -1; + } + + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return -1; + } + + // state change + pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; + syncNodeStopHeartbeatTimer(pSyncNode); + + // reset elect timer, long enough + int32_t electMS = TIMER_MAX_MS; + int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); + ASSERT(ret == 0); + + pSyncNode->pRaftCfg->isStandBy = 1; + raftCfgPersist(pSyncNode->pRaftCfg); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; +} + int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t ret = 0; char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); + sInfo("==syncReconfig== newconfig:%s", configChange); + SRpcMsg rpcMsg = {0}; rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE; rpcMsg.info.noResp = 1; diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 580c4248a3..7efc3f50c0 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -77,13 +77,23 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } +void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { + sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); +} + SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; + pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpRestoreFinishCb = RestoreFinishCb; + pFsm->FpSnapshotApply = NULL; + pFsm->FpSnapshotRead = NULL; + + pFsm->FpReConfigCb = ReConfigCb; + return pFsm; } @@ -183,7 +193,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { int main(int argc, char** argv) { tsAsyncLog = 0; - sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_INFO; if (argc != 7) { usage(argv[0]); exit(-1); @@ -229,7 +239,7 @@ int main(int argc, char** argv) { assert(pSyncNode != NULL); if (isConfigChange) { - configChange(rid, 3, myIndex); + configChange(rid, 2, myIndex); } //---------------------------