enh: use reference id of sync node in timer callbacks of sync

This commit is contained in:
Benguang Zhao 2024-02-28 19:01:22 +08:00
parent 09a04ed0a4
commit dd7f375d8e
1 changed files with 17 additions and 8 deletions

View File

@ -1343,7 +1343,7 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
if (syncIsInit()) { if (syncIsInit()) {
taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, syncEnv()->pTimerManager,
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
} else { } else {
@ -1415,8 +1415,8 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
if (syncIsInit()) { if (syncIsInit()) {
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
&pSyncNode->pHeartbeatTimer); syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else { } else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
@ -2153,7 +2153,11 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex
static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqPingTimer(void* param, void* tmrId) {
if (!syncIsInit()) return; if (!syncIsInit()) return;
SSyncNode* pNode = param; int64_t rid = (int64_t)param;
SSyncNode* pNode = syncNodeAcquire(rid);
if (pNode == NULL) return;
if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) { if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
@ -2173,7 +2177,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
} }
_out: _out:
taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer); taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
&pNode->pPingTimer);
} }
} }
@ -2224,7 +2229,11 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
if (!syncIsInit()) return; if (!syncIsInit()) return;
SSyncNode* pNode = param; int64_t rid = (int64_t)param;
SSyncNode* pNode = syncNodeAcquire(rid);
if (pNode == NULL) return;
if (pNode->totalReplicaNum > 1) { if (pNode->totalReplicaNum > 1) {
if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) { if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
@ -2245,7 +2254,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
} }
_out: _out:
taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager, taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
&pNode->pHeartbeatTimer); &pNode->pHeartbeatTimer);
} else { } else {
@ -3385,4 +3394,4 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
return true; return true;
} }
#endif #endif