fix/TD-31891-remove-void-sync1
This commit is contained in:
parent
706479bdb2
commit
d79db33bab
|
@ -86,7 +86,9 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
|
|||
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
|
||||
SyncIndex commitIndex = indexLikely;
|
||||
// TODO add return when error
|
||||
(void)syncNodeUpdateCommitIndex(ths, commitIndex);
|
||||
if ((code = syncNodeUpdateCommitIndex(ths, commitIndex)) != 0) {
|
||||
sError("vgId:%d, failed to update commit index:%" PRId64 ", since %s", ths->vgId, commitIndex, tstrerror(code));
|
||||
}
|
||||
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state,
|
||||
raftStoreGetTerm(ths), commitIndex);
|
||||
}
|
||||
|
|
|
@ -201,13 +201,13 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
|
|||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||
// TODO check return value
|
||||
(void)syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
||||
(void)syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
|
||||
TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
|
||||
}
|
||||
|
||||
(void)syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
|
||||
// syncNodeReplicate(pSyncNode);
|
||||
}
|
||||
|
||||
|
@ -410,9 +410,8 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
|
|||
syncNodeRelease(pNode);
|
||||
if (ret == 1) {
|
||||
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
|
||||
// TODO check return value
|
||||
(void)rpcSendResponse(&rpcMsg);
|
||||
return 0;
|
||||
code = rpcSendResponse(&rpcMsg);
|
||||
return code;
|
||||
} else {
|
||||
sError("no message handle to send timeout response, seq:%" PRId64, seq);
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
|
@ -933,7 +932,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
|
|||
int32_t code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
|
||||
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
||||
code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
|
@ -941,7 +940,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
|
|||
code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
|
||||
if (code != 0) {
|
||||
sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
|
||||
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
||||
TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
|
||||
}
|
||||
|
||||
if (seq != NULL) *seq = seqNum;
|
||||
|
@ -961,7 +960,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa
|
|||
}
|
||||
|
||||
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||
int32_t ret = 0;
|
||||
int32_t code = 0;
|
||||
int64_t tsNow = taosGetTimestampMs();
|
||||
if (syncIsInit()) {
|
||||
SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
|
||||
|
@ -980,13 +979,13 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
|||
|
||||
sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr);
|
||||
|
||||
(void)taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
|
||||
syncEnv()->pTimerManager, &pSyncTimer->pTimer);
|
||||
TAOS_CHECK_RETURN(taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
|
||||
syncEnv()->pTimerManager, &pSyncTimer->pTimer));
|
||||
} else {
|
||||
ret = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
|
||||
}
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||
|
@ -1308,7 +1307,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
|||
}
|
||||
|
||||
// tools
|
||||
(void)syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr); // TODO: check return value
|
||||
if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
|
||||
sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
|
||||
goto _error;
|
||||
}
|
||||
if (pSyncNode->pSyncRespMgr == NULL) {
|
||||
sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
|
||||
goto _error;
|
||||
|
@ -1471,29 +1473,31 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
|
|||
#ifdef BUILD_NO_CALL
|
||||
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||
// state change
|
||||
int32_t code = 0;
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
// TODO check return value
|
||||
(void)syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
|
||||
|
||||
// reset elect timer, long enough
|
||||
int32_t electMS = TIMER_MAX_MS;
|
||||
int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
|
||||
if (ret < 0) {
|
||||
code = syncNodeRestartElectTimer(pSyncNode, electMS);
|
||||
if (code < 0) {
|
||||
sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = syncNodeStartPingTimer(pSyncNode);
|
||||
if (ret < 0) {
|
||||
code = syncNodeStartPingTimer(pSyncNode);
|
||||
if (code < 0) {
|
||||
sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
||||
void syncNodePreClose(SSyncNode* pSyncNode) {
|
||||
int32_t code = 0;
|
||||
if (pSyncNode == NULL) {
|
||||
sError("failed to pre close sync node since sync node is null");
|
||||
return;
|
||||
|
@ -1508,13 +1512,22 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
// stop elect timer
|
||||
(void)syncNodeStopElectTimer(pSyncNode);
|
||||
if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// stop heartbeat timer
|
||||
(void)syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// stop ping timer
|
||||
(void)syncNodeStopPingTimer(pSyncNode);
|
||||
if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// clean rsp
|
||||
syncRespCleanRsp(pSyncNode->pSyncRespMgr);
|
||||
|
@ -1536,14 +1549,24 @@ void syncNodePostClose(SSyncNode* pSyncNode) {
|
|||
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
|
||||
|
||||
void syncNodeClose(SSyncNode* pSyncNode) {
|
||||
int32_t code = 0;
|
||||
if (pSyncNode == NULL) return;
|
||||
sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
|
||||
|
||||
syncRespCleanRsp(pSyncNode->pSyncRespMgr);
|
||||
|
||||
(void)syncNodeStopPingTimer(pSyncNode);
|
||||
(void)syncNodeStopElectTimer(pSyncNode);
|
||||
(void)syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
syncNodeLogReplDestroy(pSyncNode);
|
||||
|
||||
syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
|
||||
|
@ -1599,28 +1622,28 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg
|
|||
|
||||
// timer control --------------
|
||||
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
int32_t code = 0;
|
||||
if (syncIsInit()) {
|
||||
(void)taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
|
||||
syncEnv()->pTimerManager, &pSyncNode->pPingTimer);
|
||||
TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
|
||||
syncEnv()->pTimerManager, &pSyncNode->pPingTimer));
|
||||
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
|
||||
} else {
|
||||
sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
|
||||
}
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
int32_t code = 0;
|
||||
(void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
|
||||
// TODO check return value
|
||||
(void)taosTmrStop(pSyncNode->pPingTimer);
|
||||
TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pPingTimer));
|
||||
pSyncNode->pPingTimer = NULL;
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
||||
int32_t ret = 0;
|
||||
int32_t code = 0;
|
||||
if (syncIsInit()) {
|
||||
pSyncNode->electTimerMS = ms;
|
||||
|
||||
|
@ -1630,22 +1653,22 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
|||
pSyncNode->electTimerParam.pSyncNode = pSyncNode;
|
||||
pSyncNode->electTimerParam.pData = NULL;
|
||||
|
||||
(void)taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
|
||||
syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
|
||||
TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
|
||||
syncEnv()->pTimerManager, &pSyncNode->pElectTimer));
|
||||
} else {
|
||||
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
|
||||
}
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
int32_t code = 0;
|
||||
(void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
|
||||
// TODO check return value
|
||||
(void)taosTmrStop(pSyncNode->pElectTimer);
|
||||
TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pElectTimer));
|
||||
pSyncNode->pElectTimer = NULL;
|
||||
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
||||
|
@ -1666,7 +1689,10 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
// TODO check return value
|
||||
(void)syncNodeRestartElectTimer(pSyncNode, electMS);
|
||||
if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
|
||||
sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
|
||||
return;
|
||||
};
|
||||
|
||||
sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
|
||||
electMS);
|
||||
|
@ -1674,17 +1700,17 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
|
|||
|
||||
#ifdef BUILD_NO_CALL
|
||||
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
int32_t code = 0;
|
||||
if (syncIsInit()) {
|
||||
(void)taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
|
||||
syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer);
|
||||
TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
|
||||
syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
|
||||
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
|
||||
} else {
|
||||
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
|
||||
}
|
||||
|
||||
sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -1707,12 +1733,12 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
#if 0
|
||||
//TODO check return value
|
||||
(void)atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
|
||||
(void)taosTmrStop(pSyncNode->pHeartbeatTimer);
|
||||
TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
|
||||
TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pHeartbeatTimer));
|
||||
pSyncNode->pHeartbeatTimer = NULL;
|
||||
#endif
|
||||
|
||||
|
@ -1723,14 +1749,15 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
|
|||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
|
||||
// TODO check return value
|
||||
(void)syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
(void)syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
int32_t code = 0;
|
||||
TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
|
||||
TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
@ -1806,6 +1833,7 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg
|
|||
}
|
||||
|
||||
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
|
||||
int32_t code = 0;
|
||||
SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
|
||||
if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
|
||||
sInfo("vgId:1, sync not reconfig since not changed");
|
||||
|
@ -1873,7 +1901,7 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI
|
|||
|
||||
// init internal
|
||||
pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
|
||||
(void)syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
|
||||
TAOS_CHECK_RETURN(syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId));
|
||||
|
||||
// init peersNum, peers, peersId
|
||||
pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
|
||||
|
@ -1886,14 +1914,15 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI
|
|||
}
|
||||
}
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
(void)syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
|
||||
TAOS_CHECK_RETURN(syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]));
|
||||
}
|
||||
|
||||
// init replicaNum, replicasId
|
||||
pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
|
||||
pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
|
||||
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||
(void)syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
|
||||
TAOS_CHECK_RETURN(
|
||||
syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]));
|
||||
}
|
||||
|
||||
// update quorum first
|
||||
|
@ -1939,7 +1968,7 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI
|
|||
// create new
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
||||
if (pSyncNode->senders[i] == NULL) {
|
||||
(void)snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]);
|
||||
TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
|
||||
if (pSyncNode->senders[i] == NULL) {
|
||||
// will be created later while send snapshot
|
||||
sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
|
||||
|
@ -1961,10 +1990,10 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI
|
|||
}
|
||||
|
||||
// persist cfg
|
||||
(void)syncWriteCfgFile(pSyncNode);
|
||||
TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
|
||||
} else {
|
||||
// persist cfg
|
||||
(void)syncWriteCfgFile(pSyncNode);
|
||||
TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
|
||||
sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
|
||||
}
|
||||
|
||||
|
@ -2015,7 +2044,7 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
|
|||
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
|
||||
|
||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||
// maybe clear leader cache
|
||||
int32_t code = 0; // maybe clear leader cache
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
pSyncNode->leaderCache = EMPTY_RAFT_ID;
|
||||
}
|
||||
|
@ -2025,7 +2054,10 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// state change
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
(void)syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// trace log
|
||||
sNTrace(pSyncNode, "become follower %s", debugStr);
|
||||
|
@ -2042,7 +2074,10 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// reset log buffer
|
||||
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// reset elect timer
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
|
@ -2069,7 +2104,11 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// reset log buffer
|
||||
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
int32_t code = 0;
|
||||
if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
};
|
||||
}
|
||||
|
||||
// TLA+ Spec
|
||||
|
@ -2091,6 +2130,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
|
||||
//
|
||||
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
||||
int32_t code = 0;
|
||||
pSyncNode->becomeLeaderNum++;
|
||||
pSyncNode->hbrSlowNum = 0;
|
||||
|
||||
|
@ -2122,7 +2162,10 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
}
|
||||
|
||||
// init peer mgr
|
||||
(void)syncNodePeerStateInit(pSyncNode);
|
||||
if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
#if 0
|
||||
// update sender private term
|
||||
|
@ -2143,13 +2186,22 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
}
|
||||
|
||||
// stop elect timer
|
||||
(void)syncNodeStopElectTimer(pSyncNode);
|
||||
if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// start heartbeat timer
|
||||
(void)syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// send heartbeat right now
|
||||
(void)syncNodeHeartbeatPeers(pSyncNode);
|
||||
if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// call back
|
||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
|
||||
|
@ -2160,13 +2212,17 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// reset log buffer
|
||||
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// trace log
|
||||
sNInfo(pSyncNode, "become leader %s", debugStr);
|
||||
}
|
||||
|
||||
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
||||
int32_t code = 0;
|
||||
pSyncNode->becomeAssignedLeaderNum++;
|
||||
pSyncNode->hbrSlowNum = 0;
|
||||
|
||||
|
@ -2198,7 +2254,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
// init peer mgr
|
||||
(void)syncNodePeerStateInit(pSyncNode);
|
||||
if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// close receiver
|
||||
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
||||
|
@ -2206,13 +2265,22 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
// stop elect timer
|
||||
(void)syncNodeStopElectTimer(pSyncNode);
|
||||
if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// start heartbeat timer
|
||||
(void)syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// send heartbeat right now
|
||||
(void)syncNodeHeartbeatPeers(pSyncNode);
|
||||
if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// call back
|
||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
|
||||
|
@ -2223,7 +2291,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
|||
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// reset log buffer
|
||||
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
|
||||
sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// trace log
|
||||
sNInfo(pSyncNode, "become assigned leader");
|
||||
|
@ -2513,8 +2584,10 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
_out:
|
||||
(void)taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
|
||||
&pNode->pPingTimer);
|
||||
if ((code = taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
|
||||
&pNode->pPingTimer)) != 0) {
|
||||
sError("failed to reset ping timer since %s", tstrerror(code));
|
||||
};
|
||||
}
|
||||
syncNodeRelease(pNode);
|
||||
}
|
||||
|
@ -2591,8 +2664,9 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
_out:
|
||||
(void)taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
|
||||
&pNode->pHeartbeatTimer);
|
||||
if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
|
||||
&pNode->pHeartbeatTimer) != 0)
|
||||
return;
|
||||
|
||||
} else {
|
||||
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
|
||||
|
@ -2603,6 +2677,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
|||
#endif
|
||||
|
||||
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||
int32_t code = 0;
|
||||
int64_t hbDataRid = (int64_t)param;
|
||||
int64_t tsNow = taosGetTimestampMs();
|
||||
|
||||
|
@ -2646,7 +2721,12 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
|||
pData->execTime += pSyncTimer->timerMS;
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
||||
if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
|
||||
sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
|
||||
syncNodeRelease(pSyncNode);
|
||||
syncHbTimerDataRelease(pData);
|
||||
return;
|
||||
}
|
||||
|
||||
pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||
|
||||
|
@ -2668,14 +2748,25 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
|||
STraceId* trace = &(rpcMsg.info.traceId);
|
||||
sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
|
||||
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
|
||||
(void)syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||
if ((code = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg)) != 0) {
|
||||
sError("vgId:%d, failed to send heartbeat to dnode:%d since %s", pSyncNode->vgId, DID(&(pSyncMsg->destId)),
|
||||
tstrerror(code));
|
||||
syncNodeRelease(pSyncNode);
|
||||
syncHbTimerDataRelease(pData);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
||||
if (syncIsInit()) {
|
||||
// sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
|
||||
(void)taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
|
||||
syncEnv()->pTimerManager, &pSyncTimer->pTimer);
|
||||
if ((code = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM,
|
||||
(void*)hbDataRid, syncEnv()->pTimerManager, &pSyncTimer->pTimer)) != 0) {
|
||||
sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
|
||||
syncNodeRelease(pSyncNode);
|
||||
syncHbTimerDataRelease(pData);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
sError("sync env is stop, reset peer hb timer error");
|
||||
}
|
||||
|
@ -2715,6 +2806,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
|
|||
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TODO SAlterVnodeReplicaReq name is proper?
|
||||
cfg->replicaNum = 0;
|
||||
cfg->totalReplicaNum = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
for (int i = 0; i < pReq->replica; ++i) {
|
||||
SNodeInfo* pNode = &cfg->nodeInfo[i];
|
||||
|
@ -2722,7 +2814,9 @@ void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TO
|
|||
pNode->nodePort = pReq->replicas[i].port;
|
||||
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
|
||||
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||
if ((code = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) != 0) {
|
||||
sError("vgId:%d, failed to update dnode info since %s", pReq->vgId, tstrerror(code));
|
||||
}
|
||||
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort,
|
||||
pNode->nodeId, pNode->nodeRole);
|
||||
cfg->replicaNum++;
|
||||
|
@ -2736,7 +2830,9 @@ void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TO
|
|||
pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
|
||||
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
||||
tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
|
||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||
if ((code = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) != 0) {
|
||||
sError("vgId:%d, failed to update dnode info, %s", pReq->vgId, tstrerror(code));
|
||||
}
|
||||
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort,
|
||||
pNode->nodeId, pNode->nodeRole);
|
||||
cfg->totalReplicaNum++;
|
||||
|
@ -3287,7 +3383,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
|||
// append to log buffer
|
||||
if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
|
||||
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
|
||||
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false);
|
||||
int32_t ret = 0;
|
||||
if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
|
||||
sError("vgId:%d, failed to execute fsm, since %s", ths->vgId, tstrerror(ret));
|
||||
}
|
||||
syncEntryDestroy(pEntry);
|
||||
pEntry = NULL;
|
||||
goto _out;
|
||||
|
@ -3305,7 +3404,7 @@ _out:;
|
|||
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
|
||||
|
||||
if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||
(void)syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
|
||||
TAOS_CHECK_RETURN(syncNodeUpdateAssignedCommitIndex(ths, matchIndex));
|
||||
|
||||
if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
|
||||
syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) {
|
||||
|
@ -3320,7 +3419,7 @@ _out:;
|
|||
}
|
||||
|
||||
// single replica
|
||||
(void)syncNodeUpdateCommitIndex(ths, matchIndex);
|
||||
TAOS_CHECK_RETURN(syncNodeUpdateCommitIndex(ths, matchIndex));
|
||||
|
||||
if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
|
||||
(code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) {
|
||||
|
@ -3442,7 +3541,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
}
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
|
||||
TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
|
||||
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
||||
|
||||
SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
|
||||
|
@ -3470,7 +3569,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
|
||||
if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
|
||||
SRpcMsg rpcMsgLocalCmd = {0};
|
||||
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
|
||||
TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
|
||||
|
||||
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
|
||||
pSyncMsg->cmd =
|
||||
|
@ -3494,7 +3593,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
if (pMsg->term >= currentTerm &&
|
||||
(ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
|
||||
SRpcMsg rpcMsgLocalCmd = {0};
|
||||
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
|
||||
TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
|
||||
|
||||
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
|
||||
pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
|
||||
|
@ -3577,7 +3676,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
if (pMsg->currentTerm == matchTerm) {
|
||||
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
|
||||
TAOS_CHECK_RETURN(syncNodeUpdateCommitIndex(ths, pMsg->commitIndex));
|
||||
}
|
||||
if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
|
||||
sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
|
||||
|
@ -3644,7 +3743,11 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
|
|||
|
||||
if (code > 0) {
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
(void)syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
|
||||
if ((code = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info)) != 0) {
|
||||
syncEntryDestroy(pEntry);
|
||||
pEntry = NULL;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (rsp.info.handle != NULL) {
|
||||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
|
|
|
@ -163,7 +163,7 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) {
|
|||
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
|
||||
(void)taosCloseFile(&pFile);
|
||||
TAOS_CHECK_EXIT(taosCloseFile(&pFile));
|
||||
TAOS_CHECK_EXIT(taosRenameFile(file, realfile));
|
||||
|
||||
sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
|
||||
|
|
|
@ -134,7 +134,7 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {
|
|||
|
||||
if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(terrno, &lino, _OVER);
|
||||
|
||||
(void)taosCloseFile(&pFile);
|
||||
TAOS_CHECK_GOTO(taosCloseFile(&pFile), &lino, _OVER);
|
||||
if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(terrno, &lino, _OVER);
|
||||
|
||||
code = 0;
|
||||
|
@ -168,21 +168,30 @@ bool raftStoreHasVoted(SSyncNode *pNode) {
|
|||
void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) {
|
||||
(void)taosThreadMutexLock(&pNode->raftStore.mutex);
|
||||
pNode->raftStore.voteFor = *pRaftId;
|
||||
(void)raftStoreWriteFile(pNode);
|
||||
int32_t code = 0;
|
||||
if ((code = raftStoreWriteFile(pNode)) != 0) {
|
||||
sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code));
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
|
||||
}
|
||||
|
||||
void raftStoreClearVote(SSyncNode *pNode) {
|
||||
(void)taosThreadMutexLock(&pNode->raftStore.mutex);
|
||||
pNode->raftStore.voteFor = EMPTY_RAFT_ID;
|
||||
(void)raftStoreWriteFile(pNode);
|
||||
int32_t code = 0;
|
||||
if ((code = raftStoreWriteFile(pNode)) != 0) {
|
||||
sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code));
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
|
||||
}
|
||||
|
||||
void raftStoreNextTerm(SSyncNode *pNode) {
|
||||
(void)taosThreadMutexLock(&pNode->raftStore.mutex);
|
||||
pNode->raftStore.currentTerm++;
|
||||
(void)raftStoreWriteFile(pNode);
|
||||
int32_t code = 0;
|
||||
if ((code = raftStoreWriteFile(pNode)) != 0) {
|
||||
sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code));
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
|
||||
}
|
||||
|
||||
|
@ -190,7 +199,10 @@ void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) {
|
|||
(void)taosThreadMutexLock(&pNode->raftStore.mutex);
|
||||
if (pNode->raftStore.currentTerm < term) {
|
||||
pNode->raftStore.currentTerm = term;
|
||||
(void)raftStoreWriteFile(pNode);
|
||||
int32_t code = 0;
|
||||
if ((code = raftStoreWriteFile(pNode)) != 0) {
|
||||
sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
|
||||
}
|
||||
|
|
|
@ -137,7 +137,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
// trace log
|
||||
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "", "proceed");
|
||||
syncLogSendRequestVoteReply(ths, pReply, "");
|
||||
(void)syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||
TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg));
|
||||
|
||||
if (resetElect) syncNodeResetElectTimer(ths);
|
||||
|
||||
|
|
Loading…
Reference in New Issue