fix(sync): syncSetStandby
This commit is contained in:
parent
4e1aa6b5c5
commit
f078f5b300
|
@ -166,6 +166,7 @@ void syncCleanUp();
|
||||||
int64_t syncOpen(const SSyncInfo* pSyncInfo);
|
int64_t syncOpen(const SSyncInfo* pSyncInfo);
|
||||||
void syncStart(int64_t rid);
|
void syncStart(int64_t rid);
|
||||||
void syncStop(int64_t rid);
|
void syncStop(int64_t rid);
|
||||||
|
int32_t syncSetStandby(int64_t rid);
|
||||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
|
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
|
||||||
ESyncState syncGetMyRole(int64_t rid);
|
ESyncState syncGetMyRole(int64_t rid);
|
||||||
const char* syncGetMyRoleStr(int64_t rid);
|
const char* syncGetMyRoleStr(int64_t rid);
|
||||||
|
|
|
@ -141,9 +141,38 @@ void syncStop(int64_t rid) {
|
||||||
taosRemoveRef(tsNodeRefId, rid);
|
taosRemoveRef(tsNodeRefId, rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncSetStandby(int64_t rid) {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// state change
|
||||||
|
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||||
|
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||||
|
|
||||||
|
// reset elect timer, long enough
|
||||||
|
int32_t electMS = TIMER_MAX_MS;
|
||||||
|
int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
pSyncNode->pRaftCfg->isStandBy = 1;
|
||||||
|
raftCfgPersist(pSyncNode->pRaftCfg);
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
|
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
|
char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
|
||||||
|
sInfo("==syncReconfig== newconfig:%s", configChange);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
|
rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
|
||||||
rpcMsg.info.noResp = 1;
|
rpcMsg.info.noResp = 1;
|
||||||
|
|
|
@ -77,13 +77,23 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
|
||||||
|
|
||||||
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
|
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
|
||||||
|
|
||||||
|
void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
|
||||||
|
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
|
||||||
|
}
|
||||||
|
|
||||||
SSyncFSM* createFsm() {
|
SSyncFSM* createFsm() {
|
||||||
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
|
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
|
||||||
pFsm->FpCommitCb = CommitCb;
|
pFsm->FpCommitCb = CommitCb;
|
||||||
pFsm->FpPreCommitCb = PreCommitCb;
|
pFsm->FpPreCommitCb = PreCommitCb;
|
||||||
pFsm->FpRollBackCb = RollBackCb;
|
pFsm->FpRollBackCb = RollBackCb;
|
||||||
|
|
||||||
pFsm->FpGetSnapshot = GetSnapshotCb;
|
pFsm->FpGetSnapshot = GetSnapshotCb;
|
||||||
pFsm->FpRestoreFinishCb = RestoreFinishCb;
|
pFsm->FpRestoreFinishCb = RestoreFinishCb;
|
||||||
|
pFsm->FpSnapshotApply = NULL;
|
||||||
|
pFsm->FpSnapshotRead = NULL;
|
||||||
|
|
||||||
|
pFsm->FpReConfigCb = ReConfigCb;
|
||||||
|
|
||||||
return pFsm;
|
return pFsm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,7 +193,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
tsAsyncLog = 0;
|
tsAsyncLog = 0;
|
||||||
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_INFO;
|
||||||
if (argc != 7) {
|
if (argc != 7) {
|
||||||
usage(argv[0]);
|
usage(argv[0]);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
|
@ -229,7 +239,7 @@ int main(int argc, char** argv) {
|
||||||
assert(pSyncNode != NULL);
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
if (isConfigChange) {
|
if (isConfigChange) {
|
||||||
configChange(rid, 3, myIndex);
|
configChange(rid, 2, myIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
//---------------------------
|
//---------------------------
|
||||||
|
|
Loading…
Reference in New Issue