diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e06ea70f70..7ff6116137 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1343,7 +1343,7 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t ret = 0; if (syncIsInit()) { - taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, + taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, syncEnv()->pTimerManager, &pSyncNode->pPingTimer); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } else { @@ -1415,8 +1415,8 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; if (syncIsInit()) { - taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, - &pSyncNode->pHeartbeatTimer); + taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid, + syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } else { sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); @@ -2153,7 +2153,11 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex static void syncNodeEqPingTimer(void* param, void* tmrId) { if (!syncIsInit()) return; - SSyncNode* pNode = param; + int64_t rid = (int64_t)param; + SSyncNode* pNode = syncNodeAcquire(rid); + + if (pNode == NULL) return; + if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) { SRpcMsg rpcMsg = {0}; int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), @@ -2173,7 +2177,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } _out: - taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer); + taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, + &pNode->pPingTimer); } } @@ -2224,7 +2229,11 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { if (!syncIsInit()) return; - SSyncNode* pNode = param; + int64_t rid = (int64_t)param; + SSyncNode* pNode = syncNodeAcquire(rid); + + if (pNode == NULL) return; + if (pNode->totalReplicaNum > 1) { if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) { SRpcMsg rpcMsg = {0}; @@ -2245,7 +2254,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } _out: - taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager, + taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, &pNode->pHeartbeatTimer); } else { @@ -3385,4 +3394,4 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { return true; } -#endif \ No newline at end of file +#endif