fix/TD-30989
This commit is contained in:
parent
82421f5833
commit
de557c92f0
|
@ -69,13 +69,13 @@ void syncCleanUp() {
|
|||
|
||||
if (gNodeRefId != -1) {
|
||||
sDebug("sync rsetId:%d is closed", gNodeRefId);
|
||||
taosCloseRef(gNodeRefId);
|
||||
(void)taosCloseRef(gNodeRefId);
|
||||
gNodeRefId = -1;
|
||||
}
|
||||
|
||||
if (gHbDataRefId != -1) {
|
||||
sDebug("sync rsetId:%d is closed", gHbDataRefId);
|
||||
taosCloseRef(gHbDataRefId);
|
||||
(void)taosCloseRef(gHbDataRefId);
|
||||
gHbDataRefId = -1;
|
||||
}
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ int64_t syncNodeAdd(SSyncNode *pNode) {
|
|||
return pNode->rid;
|
||||
}
|
||||
|
||||
void syncNodeRemove(int64_t rid) { taosRemoveRef(gNodeRefId, rid); }
|
||||
void syncNodeRemove(int64_t rid) { (void)taosRemoveRef(gNodeRefId, rid); }
|
||||
|
||||
SSyncNode *syncNodeAcquire(int64_t rid) {
|
||||
SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid);
|
||||
|
@ -101,7 +101,7 @@ SSyncNode *syncNodeAcquire(int64_t rid) {
|
|||
}
|
||||
|
||||
void syncNodeRelease(SSyncNode *pNode) {
|
||||
if (pNode) taosReleaseRef(gNodeRefId, pNode->rid);
|
||||
if (pNode) (void)taosReleaseRef(gNodeRefId, pNode->rid);
|
||||
}
|
||||
|
||||
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
|
||||
|
@ -110,7 +110,7 @@ int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
|
|||
return pData->rid;
|
||||
}
|
||||
|
||||
void syncHbTimerDataRemove(int64_t rid) { taosRemoveRef(gHbDataRefId, rid); }
|
||||
void syncHbTimerDataRemove(int64_t rid) { (void)taosRemoveRef(gHbDataRefId, rid); }
|
||||
|
||||
SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
|
||||
SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid);
|
||||
|
@ -122,7 +122,7 @@ SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
|
|||
return pData;
|
||||
}
|
||||
|
||||
void syncHbTimerDataRelease(SSyncHbTimerData *pData) { taosReleaseRef(gHbDataRefId, pData->rid); }
|
||||
void syncHbTimerDataRelease(SSyncHbTimerData *pData) { (void)taosReleaseRef(gHbDataRefId, pData->rid); }
|
||||
|
||||
#if 0
|
||||
void syncEnvStartTimer() {
|
||||
|
|
|
@ -181,17 +181,17 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
|
||||
TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
|
||||
syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex);
|
||||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
||||
syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
|
||||
TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
|
||||
}
|
||||
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
|
||||
// syncNodeReplicate(pSyncNode);
|
||||
}
|
||||
|
||||
|
@ -394,8 +394,7 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
|
|||
syncNodeRelease(pNode);
|
||||
if (ret == 1) {
|
||||
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
|
||||
rpcSendResponse(&rpcMsg);
|
||||
return 0;
|
||||
return rpcSendResponse(&rpcMsg);
|
||||
} else {
|
||||
sError("no message handle to send timeout response, seq:%" PRId64, seq);
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
|
@ -674,9 +673,9 @@ int32_t syncGetArbToken(int64_t rid, char* outToken) {
|
|||
}
|
||||
|
||||
memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
|
||||
taosThreadMutexLock(&pSyncNode->arbTokenMutex);
|
||||
(void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
|
||||
strncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
|
||||
taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
|
||||
(void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
TAOS_RETURN(code);
|
||||
|
@ -956,8 +955,9 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
|||
|
||||
sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr);
|
||||
|
||||
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
|
||||
syncEnv()->pTimerManager, &pSyncTimer->pTimer);
|
||||
if (!taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
|
||||
syncEnv()->pTimerManager, &pSyncTimer->pTimer))
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
} else {
|
||||
ret = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
|
||||
|
@ -967,7 +967,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
|||
|
||||
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||
int32_t ret = 0;
|
||||
atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
|
||||
(void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
|
||||
if (!taosTmrStop(pSyncTimer->pTimer)) {
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
|
@ -983,7 +983,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
|
|||
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
|
||||
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
|
||||
SSnapshot snapshot = {0};
|
||||
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||
TAOS_CHECK_RETURN(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot));
|
||||
|
||||
SyncIndex commitIndex = snapshot.lastApplyIndex;
|
||||
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
|
||||
|
@ -1112,7 +1112,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
|||
}
|
||||
|
||||
pSyncNode->arbTerm = -1;
|
||||
taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
|
||||
(void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
|
||||
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
|
||||
sInfo("vgId:%d, arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
|
||||
|
||||
|
@ -1220,7 +1220,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
|||
SyncIndex commitIndex = SYNC_INDEX_INVALID;
|
||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
SSnapshot snapshot = {0};
|
||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||
TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _error);
|
||||
if (snapshot.lastApplyIndex > commitIndex) {
|
||||
commitIndex = snapshot.lastApplyIndex;
|
||||
sNTrace(pSyncNode, "reset commit index by snapshot");
|
||||
|
@ -1373,7 +1373,7 @@ _error:
|
|||
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
SSnapshot snapshot = {0};
|
||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||
if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
|
||||
pSyncNode->commitIndex = snapshot.lastApplyIndex;
|
||||
}
|
||||
|
@ -1386,11 +1386,11 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
|
|||
ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
|
||||
ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
|
||||
|
||||
taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
|
||||
(void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
|
||||
SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
|
||||
SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
|
||||
taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
|
||||
(void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
|
||||
|
||||
if (lastVer != -1 && endIndex != lastVer + 1) {
|
||||
code = TSDB_CODE_WAL_LOG_INCOMPLETE;
|
||||
|
@ -1439,7 +1439,7 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
|||
// state change
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
|
||||
|
||||
// reset elect timer, long enough
|
||||
int32_t electMS = TIMER_MAX_MS;
|
||||
|
@ -1464,13 +1464,13 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
|
|||
ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);
|
||||
|
||||
// stop elect timer
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
(void)syncNodeStopElectTimer(pSyncNode);
|
||||
|
||||
// stop heartbeat timer
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
(void)syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
// stop ping timer
|
||||
syncNodeStopPingTimer(pSyncNode);
|
||||
(void)syncNodeStopPingTimer(pSyncNode);
|
||||
|
||||
// clean rsp
|
||||
syncRespCleanRsp(pSyncNode->pSyncRespMgr);
|
||||
|
@ -1497,9 +1497,9 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
|||
|
||||
syncRespCleanRsp(pSyncNode->pSyncRespMgr);
|
||||
|
||||
syncNodeStopPingTimer(pSyncNode);
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
(void)syncNodeStopPingTimer(pSyncNode);
|
||||
(void)syncNodeStopElectTimer(pSyncNode);
|
||||
(void)syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
syncNodeLogReplDestroy(pSyncNode);
|
||||
|
||||
syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
|
||||
|
@ -1517,7 +1517,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
|||
syncLogBufferDestroy(pSyncNode->pLogBuf);
|
||||
pSyncNode->pLogBuf = NULL;
|
||||
|
||||
taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
|
||||
(void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
||||
if (pSyncNode->senders[i] != NULL) {
|
||||
|
@ -1557,8 +1557,10 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg
|
|||
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
if (syncIsInit()) {
|
||||
taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, syncEnv()->pTimerManager,
|
||||
&pSyncNode->pPingTimer);
|
||||
if (!taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, syncEnv()->pTimerManager,
|
||||
&pSyncNode->pPingTimer)) {
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
};
|
||||
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
|
||||
} else {
|
||||
sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
|
||||
|
@ -1568,8 +1570,8 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
|
|||
|
||||
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
|
||||
taosTmrStop(pSyncNode->pPingTimer);
|
||||
(void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
|
||||
TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pPingTimer));
|
||||
pSyncNode->pPingTimer = NULL;
|
||||
return ret;
|
||||
}
|
||||
|
@ -1598,8 +1600,8 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
|||
|
||||
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
|
||||
taosTmrStop(pSyncNode->pElectTimer);
|
||||
(void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
|
||||
TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pElectTimer));
|
||||
pSyncNode->pElectTimer = NULL;
|
||||
|
||||
return ret;
|
||||
|
@ -1634,8 +1636,9 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
|
|||
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
|
||||
int32_t ret = 0;
|
||||
if (syncIsInit()) {
|
||||
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
|
||||
syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer);
|
||||
if (!taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
|
||||
syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer))
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
|
||||
} else {
|
||||
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
|
||||
|
@ -1668,8 +1671,8 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
|
|||
int32_t ret = 0;
|
||||
|
||||
#if 0
|
||||
atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
|
||||
taosTmrStop(pSyncNode->pHeartbeatTimer);
|
||||
(void)atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
|
||||
TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pHeartbeatTimer));
|
||||
pSyncNode->pHeartbeatTimer = NULL;
|
||||
#endif
|
||||
|
||||
|
@ -1685,8 +1688,8 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
|
|||
|
||||
#ifdef BUILD_NO_CALL
|
||||
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
|
||||
TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
@ -1801,7 +1804,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
}
|
||||
|
||||
// add last config index
|
||||
syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
|
||||
(void)syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
|
||||
|
||||
if (IamInNew) {
|
||||
//-----------------------------------------
|
||||
|
@ -1818,7 +1821,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
|
||||
// init internal
|
||||
pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
|
||||
syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
|
||||
(void)syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
|
||||
|
||||
// init peersNum, peers, peersId
|
||||
pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
|
||||
|
@ -1831,14 +1834,14 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
}
|
||||
}
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
|
||||
(void)syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
|
||||
}
|
||||
|
||||
// init replicaNum, replicasId
|
||||
pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
|
||||
pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
|
||||
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||
syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
|
||||
(void)syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
|
||||
}
|
||||
|
||||
// update quorum first
|
||||
|
@ -1884,7 +1887,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
// create new
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
||||
if (pSyncNode->senders[i] == NULL) {
|
||||
snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]);
|
||||
(void)snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]);
|
||||
if (pSyncNode->senders[i] == NULL) {
|
||||
// will be created later while send snapshot
|
||||
sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
|
||||
|
@ -1906,10 +1909,10 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
}
|
||||
|
||||
// persist cfg
|
||||
syncWriteCfgFile(pSyncNode);
|
||||
(void)syncWriteCfgFile(pSyncNode);
|
||||
} else {
|
||||
// persist cfg
|
||||
syncWriteCfgFile(pSyncNode);
|
||||
(void)syncWriteCfgFile(pSyncNode);
|
||||
sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
|
||||
}
|
||||
|
||||
|
@ -1937,10 +1940,10 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
|
|||
} while (0);
|
||||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||
taosThreadMutexLock(&pSyncNode->arbTokenMutex);
|
||||
(void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
|
||||
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
|
||||
sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
|
||||
taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
|
||||
(void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
|
||||
}
|
||||
|
||||
if (currentTerm < newTerm) {
|
||||
|
@ -1969,7 +1972,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// state change
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
(void)syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
// trace log
|
||||
sNTrace(pSyncNode, "become follower %s", debugStr);
|
||||
|
@ -1986,7 +1989,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// reset log buffer
|
||||
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
|
||||
// reset elect timer
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
|
@ -2011,7 +2014,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// reset log buffer
|
||||
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
}
|
||||
|
||||
// TLA+ Spec
|
||||
|
@ -2061,7 +2064,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
}
|
||||
|
||||
// init peer mgr
|
||||
syncNodePeerStateInit(pSyncNode);
|
||||
(void)syncNodePeerStateInit(pSyncNode);
|
||||
|
||||
#if 0
|
||||
// update sender private term
|
||||
|
@ -2082,13 +2085,13 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
}
|
||||
|
||||
// stop elect timer
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
(void)syncNodeStopElectTimer(pSyncNode);
|
||||
|
||||
// start heartbeat timer
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
(void)syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
|
||||
// send heartbeat right now
|
||||
syncNodeHeartbeatPeers(pSyncNode);
|
||||
(void)syncNodeHeartbeatPeers(pSyncNode);
|
||||
|
||||
// call back
|
||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
|
||||
|
@ -2099,7 +2102,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// reset log buffer
|
||||
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
|
||||
// trace log
|
||||
sNInfo(pSyncNode, "become leader %s", debugStr);
|
||||
|
@ -2134,7 +2137,7 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
// init peer mgr
|
||||
syncNodePeerStateInit(pSyncNode);
|
||||
(void)syncNodePeerStateInit(pSyncNode);
|
||||
|
||||
// close receiver
|
||||
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
||||
|
@ -2142,13 +2145,13 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
// stop elect timer
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
(void)syncNodeStopElectTimer(pSyncNode);
|
||||
|
||||
// start heartbeat timer
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
(void)syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
|
||||
// send heartbeat right now
|
||||
syncNodeHeartbeatPeers(pSyncNode);
|
||||
(void)syncNodeHeartbeatPeers(pSyncNode);
|
||||
|
||||
// call back
|
||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
|
||||
|
@ -2159,7 +2162,7 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
|
|||
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// reset log buffer
|
||||
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
|
||||
// trace log
|
||||
sNInfo(pSyncNode, "become assigned leader");
|
||||
|
@ -2262,7 +2265,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
|
|||
bool ret = false;
|
||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
|
||||
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||
if ((terrno = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot)) != 0) return false;
|
||||
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
|
||||
ret = true;
|
||||
}
|
||||
|
@ -2275,7 +2278,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
|
|||
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
|
||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
|
||||
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||
if ((terrno = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot)) != 0) return -1;
|
||||
}
|
||||
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
|
||||
|
@ -2291,7 +2294,7 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
|
|||
// has snapshot
|
||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
|
||||
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||
if ((terrno = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot)) != 0) return lastTerm;
|
||||
}
|
||||
|
||||
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
|
@ -2386,7 +2389,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
|
|||
return preTerm;
|
||||
} else {
|
||||
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||
if ((terrno = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot)) != 0) return SYNC_TERM_INVALID;
|
||||
if (snapshot.lastApplyIndex == preIndex) {
|
||||
return snapshot.lastApplyTerm;
|
||||
}
|
||||
|
@ -2433,8 +2436,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
_out:
|
||||
taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
|
||||
&pNode->pPingTimer);
|
||||
(void)taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
|
||||
&pNode->pPingTimer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2510,8 +2513,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
_out:
|
||||
taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
|
||||
&pNode->pHeartbeatTimer);
|
||||
(void)taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
|
||||
&pNode->pHeartbeatTimer);
|
||||
|
||||
} else {
|
||||
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
|
||||
|
@ -2585,14 +2588,14 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
|||
// send msg
|
||||
sTrace("vgId:%d, send heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
|
||||
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
|
||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||
(void)syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||
} else {
|
||||
}
|
||||
|
||||
if (syncIsInit()) {
|
||||
// sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
|
||||
taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
|
||||
syncEnv()->pTimerManager, &pSyncTimer->pTimer);
|
||||
(void)taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
|
||||
syncEnv()->pTimerManager, &pSyncTimer->pTimer);
|
||||
} else {
|
||||
sError("sync env is stop, reset peer hb timer error");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue