This commit is contained in:
dmchen 2024-07-29 11:17:15 +00:00
parent 81a44ee6b7
commit 598ed867c3
1 changed files with 33 additions and 20 deletions

View File

@ -181,17 +181,18 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
TAOS_RETURN(code); TAOS_RETURN(code);
} }
TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg)); syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex); syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode)); // TODO check return value
(void)syncNodeStopHeartbeatTimer(pSyncNode);
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i])); (void)syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
} }
TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode)); (void)syncNodeStartHeartbeatTimer(pSyncNode);
// syncNodeReplicate(pSyncNode); // syncNodeReplicate(pSyncNode);
} }
@ -394,7 +395,9 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
syncNodeRelease(pNode); syncNodeRelease(pNode);
if (ret == 1) { if (ret == 1) {
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle); sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
return rpcSendResponse(&rpcMsg); // TODO check return value
(void)rpcSendResponse(&rpcMsg);
return 0;
} else { } else {
sError("no message handle to send timeout response, seq:%" PRId64, seq); sError("no message handle to send timeout response, seq:%" PRId64, seq);
return TSDB_CODE_SYN_INTERNAL_ERROR; return TSDB_CODE_SYN_INTERNAL_ERROR;
@ -982,7 +985,8 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
TAOS_CHECK_RETURN(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot)); // TODO check return value
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
SyncIndex commitIndex = snapshot.lastApplyIndex; SyncIndex commitIndex = snapshot.lastApplyIndex;
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
@ -1219,7 +1223,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
SyncIndex commitIndex = SYNC_INDEX_INVALID; SyncIndex commitIndex = SYNC_INDEX_INVALID;
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _error); // TODO check return value
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > commitIndex) { if (snapshot.lastApplyIndex > commitIndex) {
commitIndex = snapshot.lastApplyIndex; commitIndex = snapshot.lastApplyIndex;
sNTrace(pSyncNode, "reset commit index by snapshot"); sNTrace(pSyncNode, "reset commit index by snapshot");
@ -1438,7 +1443,8 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs(); pSyncNode->roleTimeMs = taosGetTimestampMs();
TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode)); // TODO check return value
(void)syncNodeStopHeartbeatTimer(pSyncNode);
// reset elect timer, long enough // reset elect timer, long enough
int32_t electMS = TIMER_MAX_MS; int32_t electMS = TIMER_MAX_MS;
@ -1568,7 +1574,8 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
(void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1); (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pPingTimer)); // TODO check return value
(void)taosTmrStop(pSyncNode->pPingTimer);
pSyncNode->pPingTimer = NULL; pSyncNode->pPingTimer = NULL;
return ret; return ret;
} }
@ -1595,7 +1602,8 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
(void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1); (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pElectTimer)); // TODO check return value
(void)taosTmrStop(pSyncNode->pElectTimer);
pSyncNode->pElectTimer = NULL; pSyncNode->pElectTimer = NULL;
return ret; return ret;
@ -1618,9 +1626,8 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
} }
if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) { // TODO check return value
sError("failed to restart elect timer since %s", tstrerror(code)); (void)syncNodeRestartElectTimer(pSyncNode, electMS);
}
sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine, sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
electMS); electMS);
@ -1664,8 +1671,9 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
#if 0 #if 0
//TODO check return value
(void)atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); (void)atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pHeartbeatTimer)); (void)taosTmrStop(pSyncNode->pHeartbeatTimer);
pSyncNode->pHeartbeatTimer = NULL; pSyncNode->pHeartbeatTimer = NULL;
#endif #endif
@ -1681,8 +1689,9 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode)); // TODO check return value
TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode)); (void)syncNodeStopHeartbeatTimer(pSyncNode);
(void)syncNodeStartHeartbeatTimer(pSyncNode);
return 0; return 0;
} }
#endif #endif
@ -2258,7 +2267,8 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
bool ret = false; bool ret = false;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
if ((terrno = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot)) != 0) return false; // TODO check return value
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) { if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
ret = true; ret = true;
} }
@ -2271,7 +2281,8 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) { SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
if ((terrno = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot)) != 0) return -1; // TODO check return value
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
@ -2287,7 +2298,8 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
// has snapshot // has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
if ((terrno = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot)) != 0) return lastTerm; // TODO check return value
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
@ -2382,7 +2394,8 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
return preTerm; return preTerm;
} else { } else {
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
if ((terrno = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot)) != 0) return SYNC_TERM_INVALID; // TODO check return value
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex == preIndex) { if (snapshot.lastApplyIndex == preIndex) {
return snapshot.lastApplyTerm; return snapshot.lastApplyTerm;
} }