Merge pull request #26837 from taosdata/fix/TD-30989-scan1-12

Fix/td 30989-scan1-12
This commit is contained in:
Hongze Cheng 2024-07-30 09:14:37 +08:00 committed by GitHub
commit baed4ce9c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 97 additions and 94 deletions

View File

@ -76,10 +76,8 @@ int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
commitIndex = TMAX(commitIndex, ths->commitIndex); commitIndex = TMAX(commitIndex, ths->commitIndex);
ths->commitIndex = TMIN(commitIndex, lastVer); ths->commitIndex = TMIN(commitIndex, lastVer);
if ((code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex)) != 0) { // TODO add return when error
// TODO add return when error (void)ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex);
sError("failed to update commit index since %s", tstrerror(code));
}
return ths->commitIndex; return ths->commitIndex;
} }
@ -87,10 +85,8 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
int32_t code = 0; int32_t code = 0;
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
SyncIndex commitIndex = indexLikely; SyncIndex commitIndex = indexLikely;
if ((code = syncNodeUpdateCommitIndex(ths, commitIndex)) != 0) { // TODO add return when error
// TODO add return when error (void)syncNodeUpdateCommitIndex(ths, commitIndex);
sError("failed to update commit index since %s", tstrerror(code));
}
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state, sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state,
raftStoreGetTerm(ths), commitIndex); raftStoreGetTerm(ths), commitIndex);
} }
@ -102,9 +98,7 @@ int64_t syncNodeUpdateAssignedCommitIndex(SSyncNode* ths, SyncIndex assignedComm
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
assignedCommitIndex = TMAX(assignedCommitIndex, ths->assignedCommitIndex); assignedCommitIndex = TMAX(assignedCommitIndex, ths->assignedCommitIndex);
ths->assignedCommitIndex = TMIN(assignedCommitIndex, lastVer); ths->assignedCommitIndex = TMIN(assignedCommitIndex, lastVer);
if ((code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->assignedCommitIndex)) != 0) { // TODO add return when error
// TODO add return when error (void)ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->assignedCommitIndex);
sError("failed to update commit index since %s", tstrerror(code));
}
return ths->commitIndex; return ths->commitIndex;
} }

View File

@ -69,13 +69,13 @@ void syncCleanUp() {
if (gNodeRefId != -1) { if (gNodeRefId != -1) {
sDebug("sync rsetId:%d is closed", gNodeRefId); sDebug("sync rsetId:%d is closed", gNodeRefId);
taosCloseRef(gNodeRefId); (void)taosCloseRef(gNodeRefId);
gNodeRefId = -1; gNodeRefId = -1;
} }
if (gHbDataRefId != -1) { if (gHbDataRefId != -1) {
sDebug("sync rsetId:%d is closed", gHbDataRefId); sDebug("sync rsetId:%d is closed", gHbDataRefId);
taosCloseRef(gHbDataRefId); (void)taosCloseRef(gHbDataRefId);
gHbDataRefId = -1; gHbDataRefId = -1;
} }
} }
@ -88,7 +88,7 @@ int64_t syncNodeAdd(SSyncNode *pNode) {
return pNode->rid; return pNode->rid;
} }
void syncNodeRemove(int64_t rid) { taosRemoveRef(gNodeRefId, rid); } void syncNodeRemove(int64_t rid) { (void)taosRemoveRef(gNodeRefId, rid); }
SSyncNode *syncNodeAcquire(int64_t rid) { SSyncNode *syncNodeAcquire(int64_t rid) {
SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid); SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid);
@ -101,7 +101,7 @@ SSyncNode *syncNodeAcquire(int64_t rid) {
} }
void syncNodeRelease(SSyncNode *pNode) { void syncNodeRelease(SSyncNode *pNode) {
if (pNode) taosReleaseRef(gNodeRefId, pNode->rid); if (pNode) (void)taosReleaseRef(gNodeRefId, pNode->rid);
} }
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) { int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
@ -110,7 +110,7 @@ int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
return pData->rid; return pData->rid;
} }
void syncHbTimerDataRemove(int64_t rid) { taosRemoveRef(gHbDataRefId, rid); } void syncHbTimerDataRemove(int64_t rid) { (void)taosRemoveRef(gHbDataRefId, rid); }
SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) { SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid); SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid);
@ -122,7 +122,7 @@ SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
return pData; return pData;
} }
void syncHbTimerDataRelease(SSyncHbTimerData *pData) { taosReleaseRef(gHbDataRefId, pData->rid); } void syncHbTimerDataRelease(SSyncHbTimerData *pData) { (void)taosReleaseRef(gHbDataRefId, pData->rid); }
#if 0 #if 0
void syncEnvStartTimer() { void syncEnvStartTimer() {

View File

@ -185,13 +185,14 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex); syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
syncNodeStopHeartbeatTimer(pSyncNode); // TODO check return value
(void)syncNodeStopHeartbeatTimer(pSyncNode);
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]); (void)syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
} }
syncNodeStartHeartbeatTimer(pSyncNode); (void)syncNodeStartHeartbeatTimer(pSyncNode);
// syncNodeReplicate(pSyncNode); // syncNodeReplicate(pSyncNode);
} }
@ -394,7 +395,8 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
syncNodeRelease(pNode); syncNodeRelease(pNode);
if (ret == 1) { if (ret == 1) {
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle); sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg); // TODO check return value
(void)rpcSendResponse(&rpcMsg);
return 0; return 0;
} else { } else {
sError("no message handle to send timeout response, seq:%" PRId64, seq); sError("no message handle to send timeout response, seq:%" PRId64, seq);
@ -674,9 +676,9 @@ int32_t syncGetArbToken(int64_t rid, char* outToken) {
} }
memset(outToken, 0, TSDB_ARB_TOKEN_SIZE); memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
taosThreadMutexLock(&pSyncNode->arbTokenMutex); (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
strncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE); strncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
taosThreadMutexUnlock(&pSyncNode->arbTokenMutex); (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
TAOS_RETURN(code); TAOS_RETURN(code);
@ -956,8 +958,8 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr); sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr);
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid), (void)taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
syncEnv()->pTimerManager, &pSyncTimer->pTimer); syncEnv()->pTimerManager, &pSyncTimer->pTimer);
} else { } else {
ret = TSDB_CODE_SYN_INTERNAL_ERROR; ret = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
@ -967,7 +969,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t ret = 0; int32_t ret = 0;
atomic_add_fetch_64(&pSyncTimer->logicClock, 1); (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
if (!taosTmrStop(pSyncTimer->pTimer)) { if (!taosTmrStop(pSyncTimer->pTimer)) {
return TSDB_CODE_SYN_INTERNAL_ERROR; return TSDB_CODE_SYN_INTERNAL_ERROR;
} }
@ -983,7 +985,8 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); // TODO check return value
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
SyncIndex commitIndex = snapshot.lastApplyIndex; SyncIndex commitIndex = snapshot.lastApplyIndex;
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
@ -1112,7 +1115,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
} }
pSyncNode->arbTerm = -1; pSyncNode->arbTerm = -1;
taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL); (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken); syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
sInfo("vgId:%d, arb token:%s", pSyncNode->vgId, pSyncNode->arbToken); sInfo("vgId:%d, arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
@ -1220,7 +1223,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
SyncIndex commitIndex = SYNC_INDEX_INVALID; SyncIndex commitIndex = SYNC_INDEX_INVALID;
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); // TODO check return value
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > commitIndex) { if (snapshot.lastApplyIndex > commitIndex) {
commitIndex = snapshot.lastApplyIndex; commitIndex = snapshot.lastApplyIndex;
sNTrace(pSyncNode, "reset commit index by snapshot"); sNTrace(pSyncNode, "reset commit index by snapshot");
@ -1373,7 +1377,7 @@ _error:
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > pSyncNode->commitIndex) { if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
pSyncNode->commitIndex = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex;
} }
@ -1386,11 +1390,11 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
ASSERTS(pSyncNode->pLogStore != NULL, "log store not created"); ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created"); ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
taosThreadMutexLock(&pSyncNode->pLogBuf->mutex); (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore); SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
SyncIndex endIndex = pSyncNode->pLogBuf->endIndex; SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex); (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
if (lastVer != -1 && endIndex != lastVer + 1) { if (lastVer != -1 && endIndex != lastVer + 1) {
code = TSDB_CODE_WAL_LOG_INCOMPLETE; code = TSDB_CODE_WAL_LOG_INCOMPLETE;
@ -1439,7 +1443,8 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs(); pSyncNode->roleTimeMs = taosGetTimestampMs();
syncNodeStopHeartbeatTimer(pSyncNode); // TODO check return value
(void)syncNodeStopHeartbeatTimer(pSyncNode);
// reset elect timer, long enough // reset elect timer, long enough
int32_t electMS = TIMER_MAX_MS; int32_t electMS = TIMER_MAX_MS;
@ -1464,13 +1469,13 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL); ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);
// stop elect timer // stop elect timer
syncNodeStopElectTimer(pSyncNode); (void)syncNodeStopElectTimer(pSyncNode);
// stop heartbeat timer // stop heartbeat timer
syncNodeStopHeartbeatTimer(pSyncNode); (void)syncNodeStopHeartbeatTimer(pSyncNode);
// stop ping timer // stop ping timer
syncNodeStopPingTimer(pSyncNode); (void)syncNodeStopPingTimer(pSyncNode);
// clean rsp // clean rsp
syncRespCleanRsp(pSyncNode->pSyncRespMgr); syncRespCleanRsp(pSyncNode->pSyncRespMgr);
@ -1497,9 +1502,9 @@ void syncNodeClose(SSyncNode* pSyncNode) {
syncRespCleanRsp(pSyncNode->pSyncRespMgr); syncRespCleanRsp(pSyncNode->pSyncRespMgr);
syncNodeStopPingTimer(pSyncNode); (void)syncNodeStopPingTimer(pSyncNode);
syncNodeStopElectTimer(pSyncNode); (void)syncNodeStopElectTimer(pSyncNode);
syncNodeStopHeartbeatTimer(pSyncNode); (void)syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeLogReplDestroy(pSyncNode); syncNodeLogReplDestroy(pSyncNode);
syncRespMgrDestroy(pSyncNode->pSyncRespMgr); syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
@ -1517,7 +1522,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
syncLogBufferDestroy(pSyncNode->pLogBuf); syncLogBufferDestroy(pSyncNode->pLogBuf);
pSyncNode->pLogBuf = NULL; pSyncNode->pLogBuf = NULL;
taosThreadMutexDestroy(&pSyncNode->arbTokenMutex); (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
if (pSyncNode->senders[i] != NULL) { if (pSyncNode->senders[i] != NULL) {
@ -1557,8 +1562,8 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
if (syncIsInit()) { if (syncIsInit()) {
taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, syncEnv()->pTimerManager, (void)taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
&pSyncNode->pPingTimer); syncEnv()->pTimerManager, &pSyncNode->pPingTimer);
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
} else { } else {
sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
@ -1568,8 +1573,9 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1); (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
taosTmrStop(pSyncNode->pPingTimer); // TODO check return value
(void)taosTmrStop(pSyncNode->pPingTimer);
pSyncNode->pPingTimer = NULL; pSyncNode->pPingTimer = NULL;
return ret; return ret;
} }
@ -1585,11 +1591,8 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
pSyncNode->electTimerParam.pSyncNode = pSyncNode; pSyncNode->electTimerParam.pSyncNode = pSyncNode;
pSyncNode->electTimerParam.pData = NULL; pSyncNode->electTimerParam.pData = NULL;
if (!taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), (void)taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
syncEnv()->pTimerManager, &pSyncNode->pElectTimer)) { syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
ret = TSDB_CODE_SYN_INTERNAL_ERROR;
}
} else { } else {
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
} }
@ -1598,8 +1601,9 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1); (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
taosTmrStop(pSyncNode->pElectTimer); // TODO check return value
(void)taosTmrStop(pSyncNode->pElectTimer);
pSyncNode->pElectTimer = NULL; pSyncNode->pElectTimer = NULL;
return ret; return ret;
@ -1622,9 +1626,8 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
} }
if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) { // TODO check return value
sError("failed to restart elect timer since %s", tstrerror(code)); (void)syncNodeRestartElectTimer(pSyncNode, electMS);
}
sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine, sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
electMS); electMS);
@ -1634,8 +1637,8 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
if (syncIsInit()) { if (syncIsInit()) {
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid, (void)taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else { } else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
@ -1668,8 +1671,9 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
#if 0 #if 0
atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); //TODO check return value
taosTmrStop(pSyncNode->pHeartbeatTimer); (void)atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
(void)taosTmrStop(pSyncNode->pHeartbeatTimer);
pSyncNode->pHeartbeatTimer = NULL; pSyncNode->pHeartbeatTimer = NULL;
#endif #endif
@ -1685,8 +1689,9 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
syncNodeStopHeartbeatTimer(pSyncNode); // TODO check return value
syncNodeStartHeartbeatTimer(pSyncNode); (void)syncNodeStopHeartbeatTimer(pSyncNode);
(void)syncNodeStartHeartbeatTimer(pSyncNode);
return 0; return 0;
} }
#endif #endif
@ -1801,7 +1806,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
} }
// add last config index // add last config index
syncAddCfgIndex(pSyncNode, lastConfigChangeIndex); (void)syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
if (IamInNew) { if (IamInNew) {
//----------------------------------------- //-----------------------------------------
@ -1818,7 +1823,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// init internal // init internal
pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex]; pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); (void)syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
// init peersNum, peers, peersId // init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1; pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
@ -1831,14 +1836,14 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
} }
} }
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); (void)syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
} }
// init replicaNum, replicasId // init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum; pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum; pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) { for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); (void)syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
} }
// update quorum first // update quorum first
@ -1884,7 +1889,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// create new // create new
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
if (pSyncNode->senders[i] == NULL) { if (pSyncNode->senders[i] == NULL) {
snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]); (void)snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]);
if (pSyncNode->senders[i] == NULL) { if (pSyncNode->senders[i] == NULL) {
// will be created later while send snapshot // will be created later while send snapshot
sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig"); sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
@ -1906,10 +1911,10 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
} }
// persist cfg // persist cfg
syncWriteCfgFile(pSyncNode); (void)syncWriteCfgFile(pSyncNode);
} else { } else {
// persist cfg // persist cfg
syncWriteCfgFile(pSyncNode); (void)syncWriteCfgFile(pSyncNode);
sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum); sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
} }
@ -1937,10 +1942,10 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
} while (0); } while (0);
if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
taosThreadMutexLock(&pSyncNode->arbTokenMutex); (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken); syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken); sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
taosThreadMutexUnlock(&pSyncNode->arbTokenMutex); (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
} }
if (currentTerm < newTerm) { if (currentTerm < newTerm) {
@ -1969,7 +1974,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs(); pSyncNode->roleTimeMs = taosGetTimestampMs();
syncNodeStopHeartbeatTimer(pSyncNode); (void)syncNodeStopHeartbeatTimer(pSyncNode);
// trace log // trace log
sNTrace(pSyncNode, "become follower %s", debugStr); sNTrace(pSyncNode, "become follower %s", debugStr);
@ -1986,7 +1991,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// reset log buffer // reset log buffer
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); (void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
// reset elect timer // reset elect timer
syncNodeResetElectTimer(pSyncNode); syncNodeResetElectTimer(pSyncNode);
@ -2011,7 +2016,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// reset log buffer // reset log buffer
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); (void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
} }
// TLA+ Spec // TLA+ Spec
@ -2061,7 +2066,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
} }
// init peer mgr // init peer mgr
syncNodePeerStateInit(pSyncNode); (void)syncNodePeerStateInit(pSyncNode);
#if 0 #if 0
// update sender private term // update sender private term
@ -2082,13 +2087,13 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
} }
// stop elect timer // stop elect timer
syncNodeStopElectTimer(pSyncNode); (void)syncNodeStopElectTimer(pSyncNode);
// start heartbeat timer // start heartbeat timer
syncNodeStartHeartbeatTimer(pSyncNode); (void)syncNodeStartHeartbeatTimer(pSyncNode);
// send heartbeat right now // send heartbeat right now
syncNodeHeartbeatPeers(pSyncNode); (void)syncNodeHeartbeatPeers(pSyncNode);
// call back // call back
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
@ -2099,7 +2104,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// reset log buffer // reset log buffer
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); (void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
// trace log // trace log
sNInfo(pSyncNode, "become leader %s", debugStr); sNInfo(pSyncNode, "become leader %s", debugStr);
@ -2134,7 +2139,7 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
} }
// init peer mgr // init peer mgr
syncNodePeerStateInit(pSyncNode); (void)syncNodePeerStateInit(pSyncNode);
// close receiver // close receiver
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
@ -2142,13 +2147,13 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
} }
// stop elect timer // stop elect timer
syncNodeStopElectTimer(pSyncNode); (void)syncNodeStopElectTimer(pSyncNode);
// start heartbeat timer // start heartbeat timer
syncNodeStartHeartbeatTimer(pSyncNode); (void)syncNodeStartHeartbeatTimer(pSyncNode);
// send heartbeat right now // send heartbeat right now
syncNodeHeartbeatPeers(pSyncNode); (void)syncNodeHeartbeatPeers(pSyncNode);
// call back // call back
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
@ -2159,7 +2164,7 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// reset log buffer // reset log buffer
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); (void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
// trace log // trace log
sNInfo(pSyncNode, "become assigned leader"); sNInfo(pSyncNode, "become assigned leader");
@ -2262,7 +2267,8 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
bool ret = false; bool ret = false;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); // TODO check return value
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) { if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
ret = true; ret = true;
} }
@ -2275,7 +2281,8 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) { SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); // TODO check return value
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
@ -2291,7 +2298,8 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
// has snapshot // has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); // TODO check return value
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
@ -2386,7 +2394,8 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
return preTerm; return preTerm;
} else { } else {
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); // TODO check return value
(void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex == preIndex) { if (snapshot.lastApplyIndex == preIndex) {
return snapshot.lastApplyTerm; return snapshot.lastApplyTerm;
} }
@ -2433,8 +2442,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
} }
_out: _out:
taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, (void)taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
&pNode->pPingTimer); &pNode->pPingTimer);
} }
} }
@ -2510,8 +2519,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
} }
_out: _out:
taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, (void)taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
&pNode->pHeartbeatTimer); &pNode->pHeartbeatTimer);
} else { } else {
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64, sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
@ -2585,14 +2594,14 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
// send msg // send msg
sTrace("vgId:%d, send heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId))); sTrace("vgId:%d, send heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime); syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); (void)syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
} else { } else {
} }
if (syncIsInit()) { if (syncIsInit()) {
// sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId); // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid, (void)taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
syncEnv()->pTimerManager, &pSyncTimer->pTimer); syncEnv()->pTimerManager, &pSyncTimer->pTimer);
} else { } else {
sError("sync env is stop, reset peer hb timer error"); sError("sync env is stop, reset peer hb timer error");
} }