refactor(sync): if eqmsg error, return
This commit is contained in:
parent
6c7072286d
commit
c9f6c20410
|
@ -945,9 +945,13 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) {
|
||||||
// timer control --------------
|
// timer control --------------
|
||||||
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
|
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
if (syncEnvIsStart()) {
|
||||||
&pSyncNode->pPingTimer);
|
taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||||
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
|
&pSyncNode->pPingTimer);
|
||||||
|
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
|
||||||
|
} else {
|
||||||
|
sError("sync env is stop, syncNodeStartPingTimer");
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -961,10 +965,14 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
pSyncNode->electTimerMS = ms;
|
if (syncEnvIsStart()) {
|
||||||
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
pSyncNode->electTimerMS = ms;
|
||||||
&pSyncNode->pElectTimer);
|
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||||
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
|
&pSyncNode->pElectTimer);
|
||||||
|
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
|
||||||
|
} else {
|
||||||
|
sError("sync env is stop, syncNodeStartElectTimer");
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -998,9 +1006,13 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
|
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
if (syncEnvIsStart()) {
|
||||||
&pSyncNode->pHeartbeatTimer);
|
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||||
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
|
&pSyncNode->pHeartbeatTimer);
|
||||||
|
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
|
||||||
|
} else {
|
||||||
|
sError("sync env is stop, syncNodeStartHeartbeatTimer");
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1720,14 +1732,25 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
||||||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||||
syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
|
syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
|
||||||
if (pSyncNode->FpEqMsg != NULL) {
|
if (pSyncNode->FpEqMsg != NULL) {
|
||||||
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
sError("vgId:%d sync enqueue ping msg error, code:%d", pSyncNode->vgId, code);
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
|
sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
|
||||||
}
|
}
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
|
||||||
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
if (syncEnvIsStart()) {
|
||||||
&pSyncNode->pPingTimer);
|
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||||
|
&pSyncNode->pPingTimer);
|
||||||
|
} else {
|
||||||
|
sError("sync env is stop, syncNodeEqPingTimer");
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "",
|
sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "",
|
||||||
pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
|
pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
|
||||||
|
@ -1756,12 +1779,12 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
|
||||||
// reset timer ms
|
// reset timer ms
|
||||||
if (gSyncEnv != NULL) {
|
if (syncEnvIsStart()) {
|
||||||
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
||||||
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||||
&pSyncNode->pElectTimer);
|
&pSyncNode->pElectTimer);
|
||||||
} else {
|
} else {
|
||||||
sError("sync env elect is already stop");
|
sError("sync env is stop, syncNodeEqElectTimer");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "",
|
sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "",
|
||||||
|
@ -1792,11 +1815,11 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
|
||||||
if (gSyncEnv != NULL) {
|
if (syncEnvIsStart()) {
|
||||||
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||||
&pSyncNode->pHeartbeatTimer);
|
&pSyncNode->pHeartbeatTimer);
|
||||||
} else {
|
} else {
|
||||||
sError("sync env heartbeat is already stop");
|
sError("sync env is stop, syncNodeEqHeartbeatTimer");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
|
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
|
||||||
|
|
Loading…
Reference in New Issue