refactor(sync): adjust timer
This commit is contained in:
parent
eb7b9d3819
commit
64f539cacb
|
@ -155,7 +155,6 @@ typedef struct SSyncNode {
|
||||||
tmr_h pElectTimer;
|
tmr_h pElectTimer;
|
||||||
int32_t electTimerMS;
|
int32_t electTimerMS;
|
||||||
uint64_t electTimerLogicClock;
|
uint64_t electTimerLogicClock;
|
||||||
uint64_t electTimerLogicClockUser;
|
|
||||||
TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp
|
TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp
|
||||||
uint64_t electTimerCounter;
|
uint64_t electTimerCounter;
|
||||||
|
|
||||||
|
|
|
@ -1310,7 +1310,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||||
pSyncNode->pElectTimer = NULL;
|
pSyncNode->pElectTimer = NULL;
|
||||||
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
||||||
atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
|
atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
|
||||||
atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
|
|
||||||
pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
|
pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
|
||||||
pSyncNode->electTimerCounter = 0;
|
pSyncNode->electTimerCounter = 0;
|
||||||
|
|
||||||
|
@ -1567,15 +1566,6 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
||||||
pSyncNode->electTimerMS = ms;
|
pSyncNode->electTimerMS = ms;
|
||||||
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||||
&pSyncNode->pElectTimer);
|
&pSyncNode->pElectTimer);
|
||||||
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
|
|
||||||
|
|
||||||
/*
|
|
||||||
do {
|
|
||||||
char logBuf[128];
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms);
|
|
||||||
syncNodeEventLog(pSyncNode, logBuf);
|
|
||||||
} while (0);
|
|
||||||
*/
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
|
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
|
||||||
|
@ -1585,11 +1575,10 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
||||||
|
|
||||||
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
|
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
|
atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
|
||||||
taosTmrStop(pSyncNode->pElectTimer);
|
taosTmrStop(pSyncNode->pElectTimer);
|
||||||
pSyncNode->pElectTimer = NULL;
|
pSyncNode->pElectTimer = NULL;
|
||||||
|
|
||||||
// sTrace("vgId:%d, sync %s stop elect timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1815,8 +1804,6 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
|
||||||
cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
|
cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClock);
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClock);
|
||||||
cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
|
cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClockUser);
|
|
||||||
cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
|
||||||
cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
|
cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerCounter);
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerCounter);
|
||||||
|
@ -1922,7 +1909,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
||||||
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||||
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
||||||
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
|
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
|
||||||
} else {
|
} else {
|
||||||
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
||||||
}
|
}
|
||||||
|
@ -1946,7 +1933,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
||||||
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||||
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
||||||
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
|
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
|
||||||
} else {
|
} else {
|
||||||
snprintf(s, len, "%s", str);
|
snprintf(s, len, "%s", str);
|
||||||
}
|
}
|
||||||
|
@ -2000,7 +1987,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
||||||
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||||
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
||||||
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
|
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
|
||||||
} else {
|
} else {
|
||||||
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
snprintf(logBuf, sizeof(logBuf), "%s", str);
|
||||||
}
|
}
|
||||||
|
@ -2022,7 +2009,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
|
||||||
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
|
||||||
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
|
||||||
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
|
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
|
||||||
} else {
|
} else {
|
||||||
snprintf(s, len, "%s", str);
|
snprintf(s, len, "%s", str);
|
||||||
}
|
}
|
||||||
|
@ -2790,36 +2777,31 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
||||||
|
|
||||||
static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)param;
|
SSyncNode* pSyncNode = (SSyncNode*)param;
|
||||||
if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
|
|
||||||
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
|
|
||||||
pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode);
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
|
||||||
syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
|
|
||||||
if (pSyncNode->FpEqMsg != NULL) {
|
|
||||||
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
|
||||||
if (code != 0) {
|
|
||||||
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
|
|
||||||
}
|
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
|
||||||
|
|
||||||
// reset timer ms
|
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
|
||||||
if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) {
|
pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode);
|
||||||
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
SRpcMsg rpcMsg;
|
||||||
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||||
&pSyncNode->pElectTimer);
|
if (pSyncNode->FpEqMsg != NULL) {
|
||||||
} else {
|
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||||
sError("sync env is stop, syncNodeEqElectTimer");
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64,
|
sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
|
||||||
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
|
}
|
||||||
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
// reset timer ms
|
||||||
|
if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) {
|
||||||
|
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
||||||
|
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||||
|
&pSyncNode->pElectTimer);
|
||||||
|
} else {
|
||||||
|
sError("sync env is stop, syncNodeEqElectTimer");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3000,16 +2982,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||||
// on message ----
|
// on message ----
|
||||||
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
|
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
|
||||||
// log state
|
// log state
|
||||||
char logBuf[1024] = {0};
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
|
||||||
"==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%" PRIu64 " electTimerLogicClock:%" PRIu64
|
|
||||||
", "
|
|
||||||
"electTimerLogicClockUser:%" PRIu64 ", electTimerMS:%d",
|
|
||||||
ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm,
|
|
||||||
ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS);
|
|
||||||
|
|
||||||
int32_t ret = 0;
|
|
||||||
syncPingLog2(logBuf, pMsg);
|
|
||||||
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
|
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
|
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
|
||||||
|
@ -3024,7 +2996,7 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
|
||||||
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
||||||
syncPingReplyDestroy(pMsgReply);
|
syncPingReplyDestroy(pMsgReply);
|
||||||
|
|
||||||
return ret;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
|
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
|
||||||
|
|
|
@ -113,10 +113,8 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
|
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
|
||||||
if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) {
|
if (atomic_load_64(&ths->electTimerLogicClock) <= pMsg->logicClock) {
|
||||||
++(ths->electTimerCounter);
|
++(ths->electTimerCounter);
|
||||||
sTrace("vgId:%d, sync timer, type:election count:%" PRIu64 ", lc-user:%" PRIu64, ths->vgId,
|
|
||||||
ths->electTimerCounter, ths->electTimerLogicClockUser);
|
|
||||||
|
|
||||||
syncNodeElect(ths);
|
syncNodeElect(ths);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue