From d79db33bab126a7eec0111aa000d0c8060757fa9 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 20 Sep 2024 04:13:08 +0000 Subject: [PATCH] fix/TD-31891-remove-void-sync1 --- source/libs/sync/src/syncCommit.c | 4 +- source/libs/sync/src/syncMain.c | 281 +++++++++++++++++-------- source/libs/sync/src/syncRaftCfg.c | 2 +- source/libs/sync/src/syncRaftStore.c | 22 +- source/libs/sync/src/syncRequestVote.c | 2 +- 5 files changed, 214 insertions(+), 97 deletions(-) diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 5054339e8e..409277e2da 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -86,7 +86,9 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { SyncIndex commitIndex = indexLikely; // TODO add return when error - (void)syncNodeUpdateCommitIndex(ths, commitIndex); + if ((code = syncNodeUpdateCommitIndex(ths, commitIndex)) != 0) { + sError("vgId:%d, failed to update commit index:%" PRId64 ", since %s", ths->vgId, commitIndex, tstrerror(code)); + } sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state, raftStoreGetTerm(ths), commitIndex); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4dce54fc1a..71234e589f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -201,13 +201,13 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { // TODO check return value - (void)syncNodeStopHeartbeatTimer(pSyncNode); + TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode)); for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { - (void)syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]); + TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i])); } - (void)syncNodeStartHeartbeatTimer(pSyncNode); + TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode)); // syncNodeReplicate(pSyncNode); } @@ -410,9 +410,8 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { syncNodeRelease(pNode); if (ret == 1) { sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle); - // TODO check return value - (void)rpcSendResponse(&rpcMsg); - return 0; + code = rpcSendResponse(&rpcMsg); + return code; } else { sError("no message handle to send timeout response, seq:%" PRId64, seq); return TSDB_CODE_SYN_INTERNAL_ERROR; @@ -933,7 +932,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ int32_t code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId); if (code != 0) { sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr()); - (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); + code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); TAOS_RETURN(code); } @@ -941,7 +940,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr()); - (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); + TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum)); } if (seq != NULL) *seq = seqNum; @@ -961,7 +960,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa } static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { - int32_t ret = 0; + int32_t code = 0; int64_t tsNow = taosGetTimestampMs(); if (syncIsInit()) { SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid); @@ -980,13 +979,13 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr); - (void)taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid), - syncEnv()->pTimerManager, &pSyncTimer->pTimer); + TAOS_CHECK_RETURN(taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid), + syncEnv()->pTimerManager, &pSyncTimer->pTimer)); } else { - ret = TSDB_CODE_SYN_INTERNAL_ERROR; + code = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); } - return ret; + return code; } static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { @@ -1308,7 +1307,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { } // tools - (void)syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr); // TODO: check return value + if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) { + sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId); + goto _error; + } if (pSyncNode->pSyncRespMgr == NULL) { sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId); goto _error; @@ -1471,29 +1473,31 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) { #ifdef BUILD_NO_CALL int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) { // state change + int32_t code = 0; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->roleTimeMs = taosGetTimestampMs(); // TODO check return value - (void)syncNodeStopHeartbeatTimer(pSyncNode); + TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode)); // reset elect timer, long enough int32_t electMS = TIMER_MAX_MS; - int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); - if (ret < 0) { + code = syncNodeRestartElectTimer(pSyncNode, electMS); + if (code < 0) { sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr()); return -1; } - ret = syncNodeStartPingTimer(pSyncNode); - if (ret < 0) { + code = syncNodeStartPingTimer(pSyncNode); + if (code < 0) { sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr()); return -1; } - return ret; + return code; } #endif void syncNodePreClose(SSyncNode* pSyncNode) { + int32_t code = 0; if (pSyncNode == NULL) { sError("failed to pre close sync node since sync node is null"); return; @@ -1508,13 +1512,22 @@ void syncNodePreClose(SSyncNode* pSyncNode) { } // stop elect timer - (void)syncNodeStopElectTimer(pSyncNode); + if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // stop heartbeat timer - (void)syncNodeStopHeartbeatTimer(pSyncNode); + if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // stop ping timer - (void)syncNodeStopPingTimer(pSyncNode); + if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // clean rsp syncRespCleanRsp(pSyncNode->pSyncRespMgr); @@ -1536,14 +1549,24 @@ void syncNodePostClose(SSyncNode* pSyncNode) { void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); } void syncNodeClose(SSyncNode* pSyncNode) { + int32_t code = 0; if (pSyncNode == NULL) return; sNInfo(pSyncNode, "sync close, node:%p", pSyncNode); syncRespCleanRsp(pSyncNode->pSyncRespMgr); - (void)syncNodeStopPingTimer(pSyncNode); - (void)syncNodeStopElectTimer(pSyncNode); - (void)syncNodeStopHeartbeatTimer(pSyncNode); + if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } + if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } + if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } syncNodeLogReplDestroy(pSyncNode); syncRespMgrDestroy(pSyncNode->pSyncRespMgr); @@ -1599,28 +1622,28 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { - int32_t ret = 0; + int32_t code = 0; if (syncIsInit()) { - (void)taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, - syncEnv()->pTimerManager, &pSyncNode->pPingTimer); + TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, + syncEnv()->pTimerManager, &pSyncNode->pPingTimer)); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } else { sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId); } - return ret; + return code; } int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { - int32_t ret = 0; + int32_t code = 0; (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1); // TODO check return value - (void)taosTmrStop(pSyncNode->pPingTimer); + TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pPingTimer)); pSyncNode->pPingTimer = NULL; - return ret; + return code; } int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { - int32_t ret = 0; + int32_t code = 0; if (syncIsInit()) { pSyncNode->electTimerMS = ms; @@ -1630,22 +1653,22 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { pSyncNode->electTimerParam.pSyncNode = pSyncNode; pSyncNode->electTimerParam.pData = NULL; - (void)taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), - syncEnv()->pTimerManager, &pSyncNode->pElectTimer); + TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), + syncEnv()->pTimerManager, &pSyncNode->pElectTimer)); } else { sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); } - return ret; + return code; } int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { - int32_t ret = 0; + int32_t code = 0; (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1); // TODO check return value - (void)taosTmrStop(pSyncNode->pElectTimer); + TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pElectTimer)); pSyncNode->pElectTimer = NULL; - return ret; + return code; } int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { @@ -1666,7 +1689,10 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) { } // TODO check return value - (void)syncNodeRestartElectTimer(pSyncNode, electMS); + if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) { + sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr()); + return; + }; sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine, electMS); @@ -1674,17 +1700,17 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) { #ifdef BUILD_NO_CALL static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { - int32_t ret = 0; + int32_t code = 0; if (syncIsInit()) { - (void)taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid, - syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); + TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid, + syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer)); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } else { sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); } sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS); - return ret; + return code; } #endif @@ -1707,12 +1733,12 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { } int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { - int32_t ret = 0; + int32_t code = 0; #if 0 //TODO check return value - (void)atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); - (void)taosTmrStop(pSyncNode->pHeartbeatTimer); + TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1)); + TAOS_CHECK_RETURN(taosTmrStop(pSyncNode->pHeartbeatTimer)); pSyncNode->pHeartbeatTimer = NULL; #endif @@ -1723,14 +1749,15 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { } } - return ret; + return code; } #ifdef BUILD_NO_CALL int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { // TODO check return value - (void)syncNodeStopHeartbeatTimer(pSyncNode); - (void)syncNodeStartHeartbeatTimer(pSyncNode); + int32_t code = 0; + TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode)); + TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode)); return 0; } #endif @@ -1806,6 +1833,7 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg } int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) { + int32_t code = 0; SSyncCfg oldConfig = pSyncNode->raftCfg.cfg; if (!syncIsConfigChanged(&oldConfig, pNewConfig)) { sInfo("vgId:1, sync not reconfig since not changed"); @@ -1873,7 +1901,7 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI // init internal pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex]; - (void)syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); + TAOS_CHECK_RETURN(syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)); // init peersNum, peers, peersId pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1; @@ -1886,14 +1914,15 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI } } for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { - (void)syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); + TAOS_CHECK_RETURN(syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])); } // init replicaNum, replicasId pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum; pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum; for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) { - (void)syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); + TAOS_CHECK_RETURN( + syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])); } // update quorum first @@ -1939,7 +1968,7 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI // create new for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { if (pSyncNode->senders[i] == NULL) { - (void)snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]); + TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i])); if (pSyncNode->senders[i] == NULL) { // will be created later while send snapshot sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig"); @@ -1961,10 +1990,10 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI } // persist cfg - (void)syncWriteCfgFile(pSyncNode); + TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode)); } else { // persist cfg - (void)syncWriteCfgFile(pSyncNode); + TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode)); sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum); } @@ -2015,7 +2044,7 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); } void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { - // maybe clear leader cache + int32_t code = 0; // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { pSyncNode->leaderCache = EMPTY_RAFT_ID; } @@ -2025,7 +2054,10 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // state change pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->roleTimeMs = taosGetTimestampMs(); - (void)syncNodeStopHeartbeatTimer(pSyncNode); + if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // trace log sNTrace(pSyncNode, "become follower %s", debugStr); @@ -2042,7 +2074,10 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; // reset log buffer - (void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); + if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) { + sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // reset elect timer syncNodeResetElectTimer(pSyncNode); @@ -2069,7 +2104,11 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; // reset log buffer - (void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); + int32_t code = 0; + if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) { + sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code)); + return; + }; } // TLA+ Spec @@ -2091,6 +2130,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) { // /\ UNCHANGED <> // void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { + int32_t code = 0; pSyncNode->becomeLeaderNum++; pSyncNode->hbrSlowNum = 0; @@ -2122,7 +2162,10 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { } // init peer mgr - (void)syncNodePeerStateInit(pSyncNode); + if ((code = syncNodePeerStateInit(pSyncNode)) != 0) { + sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code)); + return; + } #if 0 // update sender private term @@ -2143,13 +2186,22 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { } // stop elect timer - (void)syncNodeStopElectTimer(pSyncNode); + if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // start heartbeat timer - (void)syncNodeStartHeartbeatTimer(pSyncNode); + if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // send heartbeat right now - (void)syncNodeHeartbeatPeers(pSyncNode); + if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) { + sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // call back if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) { @@ -2160,13 +2212,17 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; // reset log buffer - (void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); + if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) { + sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // trace log sNInfo(pSyncNode, "become leader %s", debugStr); } void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) { + int32_t code = 0; pSyncNode->becomeAssignedLeaderNum++; pSyncNode->hbrSlowNum = 0; @@ -2198,7 +2254,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) { } // init peer mgr - (void)syncNodePeerStateInit(pSyncNode); + if ((code = syncNodePeerStateInit(pSyncNode)) != 0) { + sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // close receiver if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { @@ -2206,13 +2265,22 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) { } // stop elect timer - (void)syncNodeStopElectTimer(pSyncNode); + if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // start heartbeat timer - (void)syncNodeStartHeartbeatTimer(pSyncNode); + if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) { + sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // send heartbeat right now - (void)syncNodeHeartbeatPeers(pSyncNode); + if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) { + sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // call back if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) { @@ -2223,7 +2291,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) { pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; // reset log buffer - (void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); + if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) { + sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code)); + return; + } // trace log sNInfo(pSyncNode, "become assigned leader"); @@ -2513,8 +2584,10 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } _out: - (void)taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, - &pNode->pPingTimer); + if ((code = taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, + &pNode->pPingTimer)) != 0) { + sError("failed to reset ping timer since %s", tstrerror(code)); + }; } syncNodeRelease(pNode); } @@ -2591,8 +2664,9 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } _out: - (void)taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, - &pNode->pHeartbeatTimer); + if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, + &pNode->pHeartbeatTimer) != 0) + return; } else { sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64, @@ -2603,6 +2677,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { #endif static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { + int32_t code = 0; int64_t hbDataRid = (int64_t)param; int64_t tsNow = taosGetTimestampMs(); @@ -2646,7 +2721,12 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { pData->execTime += pSyncTimer->timerMS; SRpcMsg rpcMsg = {0}; - (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId); + if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) { + sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code)); + syncNodeRelease(pSyncNode); + syncHbTimerDataRelease(pData); + return; + } pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode); @@ -2668,14 +2748,25 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { STraceId* trace = &(rpcMsg.info.traceId); sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId))); syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime); - (void)syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); + if ((code = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg)) != 0) { + sError("vgId:%d, failed to send heartbeat to dnode:%d since %s", pSyncNode->vgId, DID(&(pSyncMsg->destId)), + tstrerror(code)); + syncNodeRelease(pSyncNode); + syncHbTimerDataRelease(pData); + return; + } } else { } if (syncIsInit()) { // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId); - (void)taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid, - syncEnv()->pTimerManager, &pSyncTimer->pTimer); + if ((code = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, + (void*)hbDataRid, syncEnv()->pTimerManager, &pSyncTimer->pTimer)) != 0) { + sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code)); + syncNodeRelease(pSyncNode); + syncHbTimerDataRelease(pData); + return; + } } else { sError("sync env is stop, reset peer hb timer error"); } @@ -2715,6 +2806,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TODO SAlterVnodeReplicaReq name is proper? cfg->replicaNum = 0; cfg->totalReplicaNum = 0; + int32_t code = 0; for (int i = 0; i < pReq->replica; ++i) { SNodeInfo* pNode = &cfg->nodeInfo[i]; @@ -2722,7 +2814,9 @@ void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TO pNode->nodePort = pReq->replicas[i].port; tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); pNode->nodeRole = TAOS_SYNC_ROLE_VOTER; - (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + if ((code = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) != 0) { + sError("vgId:%d, failed to update dnode info since %s", pReq->vgId, tstrerror(code)); + } sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole); cfg->replicaNum++; @@ -2736,7 +2830,9 @@ void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TO pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port; pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER; tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn)); - (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + if ((code = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) != 0) { + sError("vgId:%d, failed to update dnode info, %s", pReq->vgId, tstrerror(code)); + } sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole); cfg->totalReplicaNum++; @@ -3287,7 +3383,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { // append to log buffer if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) { sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); - (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false); + int32_t ret = 0; + if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) { + sError("vgId:%d, failed to execute fsm, since %s", ths->vgId, tstrerror(ret)); + } syncEntryDestroy(pEntry); pEntry = NULL; goto _out; @@ -3305,7 +3404,7 @@ _out:; ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { - (void)syncNodeUpdateAssignedCommitIndex(ths, matchIndex); + TAOS_CHECK_RETURN(syncNodeUpdateAssignedCommitIndex(ths, matchIndex)); if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) { @@ -3320,7 +3419,7 @@ _out:; } // single replica - (void)syncNodeUpdateCommitIndex(ths, matchIndex); + TAOS_CHECK_RETURN(syncNodeUpdateCommitIndex(ths, matchIndex)); if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) { @@ -3442,7 +3541,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } SRpcMsg rpcMsg = {0}; - (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); + TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId)); SyncTerm currentTerm = raftStoreGetTerm(ths); SyncHeartbeatReply* pMsgReply = rpcMsg.pCont; @@ -3470,7 +3569,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) { SRpcMsg rpcMsgLocalCmd = {0}; - (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); + TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId)); SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; pSyncMsg->cmd = @@ -3494,7 +3593,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (pMsg->term >= currentTerm && (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) { SRpcMsg rpcMsgLocalCmd = {0}; - (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); + TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId)); SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; @@ -3577,7 +3676,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { return TSDB_CODE_SYN_INTERNAL_ERROR; } if (pMsg->currentTerm == matchTerm) { - (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); + TAOS_CHECK_RETURN(syncNodeUpdateCommitIndex(ths, pMsg->commitIndex)); } if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(), @@ -3644,7 +3743,11 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn if (code > 0) { SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; - (void)syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info); + if ((code = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info)) != 0) { + syncEntryDestroy(pEntry); + pEntry = NULL; + TAOS_RETURN(code); + } if (rsp.info.handle != NULL) { tmsgSendRsp(&rsp); } diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index e8ed73b089..338a56b17f 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -163,7 +163,7 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) { TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } - (void)taosCloseFile(&pFile); + TAOS_CHECK_EXIT(taosCloseFile(&pFile)); TAOS_CHECK_EXIT(taosRenameFile(file, realfile)); sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d", diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 3bb40286bc..717c26dbed 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -134,7 +134,7 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) { if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(terrno, &lino, _OVER); - (void)taosCloseFile(&pFile); + TAOS_CHECK_GOTO(taosCloseFile(&pFile), &lino, _OVER); if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(terrno, &lino, _OVER); code = 0; @@ -168,21 +168,30 @@ bool raftStoreHasVoted(SSyncNode *pNode) { void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) { (void)taosThreadMutexLock(&pNode->raftStore.mutex); pNode->raftStore.voteFor = *pRaftId; - (void)raftStoreWriteFile(pNode); + int32_t code = 0; + if ((code = raftStoreWriteFile(pNode)) != 0) { + sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code)); + } (void)taosThreadMutexUnlock(&pNode->raftStore.mutex); } void raftStoreClearVote(SSyncNode *pNode) { (void)taosThreadMutexLock(&pNode->raftStore.mutex); pNode->raftStore.voteFor = EMPTY_RAFT_ID; - (void)raftStoreWriteFile(pNode); + int32_t code = 0; + if ((code = raftStoreWriteFile(pNode)) != 0) { + sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code)); + } (void)taosThreadMutexUnlock(&pNode->raftStore.mutex); } void raftStoreNextTerm(SSyncNode *pNode) { (void)taosThreadMutexLock(&pNode->raftStore.mutex); pNode->raftStore.currentTerm++; - (void)raftStoreWriteFile(pNode); + int32_t code = 0; + if ((code = raftStoreWriteFile(pNode)) != 0) { + sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code)); + } (void)taosThreadMutexUnlock(&pNode->raftStore.mutex); } @@ -190,7 +199,10 @@ void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) { (void)taosThreadMutexLock(&pNode->raftStore.mutex); if (pNode->raftStore.currentTerm < term) { pNode->raftStore.currentTerm = term; - (void)raftStoreWriteFile(pNode); + int32_t code = 0; + if ((code = raftStoreWriteFile(pNode)) != 0) { + sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code)); + } } (void)taosThreadMutexUnlock(&pNode->raftStore.mutex); } diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index c8e81b13df..fe5b3eb7ad 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -137,7 +137,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // trace log syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "", "proceed"); syncLogSendRequestVoteReply(ths, pReply, ""); - (void)syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg)); if (resetElect) syncNodeResetElectTimer(ths);