From 64f539cacb1462a19441474f0e4e9e233255d1f0 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 25 Oct 2022 18:03:22 +0800 Subject: [PATCH 1/7] refactor(sync): adjust timer --- source/libs/sync/inc/syncInt.h | 1 - source/libs/sync/src/syncMain.c | 84 ++++++++++-------------------- source/libs/sync/src/syncTimeout.c | 4 +- 3 files changed, 29 insertions(+), 60 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index a158430a0f..5f4a6f21b4 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -155,7 +155,6 @@ typedef struct SSyncNode { tmr_h pElectTimer; int32_t electTimerMS; uint64_t electTimerLogicClock; - uint64_t electTimerLogicClockUser; TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp uint64_t electTimerCounter; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 09584077f1..29c82bcd8c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1310,7 +1310,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { pSyncNode->pElectTimer = NULL; pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); atomic_store_64(&pSyncNode->electTimerLogicClock, 0); - atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0); pSyncNode->FpElectTimerCB = syncNodeEqElectTimer; pSyncNode->electTimerCounter = 0; @@ -1567,15 +1566,6 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { pSyncNode->electTimerMS = ms; taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pElectTimer); - atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); - - /* - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); - */ } else { sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); @@ -1585,11 +1575,10 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1); + atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1); taosTmrStop(pSyncNode->pElectTimer); pSyncNode->pElectTimer = NULL; - // sTrace("vgId:%d, sync %s stop elect timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state)); return ret; } @@ -1815,8 +1804,6 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClock); cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClockUser); - cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB); cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerCounter); @@ -1922,7 +1909,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), - pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr); + pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr); } else { snprintf(logBuf, sizeof(logBuf), "%s", str); } @@ -1946,7 +1933,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), - pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr); + pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr); } else { snprintf(s, len, "%s", str); } @@ -2000,7 +1987,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), - pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr); + pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(logBuf, sizeof(logBuf), "%s", str); } @@ -2022,7 +2009,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), - pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr); + pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(s, len, "%s", str); } @@ -2790,36 +2777,31 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqElectTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) { - SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock), - pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode); - SRpcMsg rpcMsg; - syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); - syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg); - if (pSyncNode->FpEqMsg != NULL) { - int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); - if (code != 0) { - sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); - rpcFreeCont(rpcMsg.pCont); - syncTimeoutDestroy(pSyncMsg); - return; - } - } else { - sTrace("syncNodeEqElectTimer FpEqMsg is NULL"); - } - syncTimeoutDestroy(pSyncMsg); - // reset timer ms - if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) { - pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); - taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pElectTimer); - } else { - sError("sync env is stop, syncNodeEqElectTimer"); + SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock), + pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode); + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + if (pSyncNode->FpEqMsg != NULL) { + int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (code != 0) { + sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); + rpcFreeCont(rpcMsg.pCont); + syncTimeoutDestroy(pSyncMsg); + return; } } else { - sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64, - pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); + sTrace("syncNodeEqElectTimer FpEqMsg is NULL"); + } + syncTimeoutDestroy(pSyncMsg); + + // reset timer ms + if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) { + pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); + taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pElectTimer); + } else { + sError("sync env is stop, syncNodeEqElectTimer"); } } @@ -3000,16 +2982,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { // on message ---- int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { // log state - char logBuf[1024] = {0}; - snprintf(logBuf, sizeof(logBuf), - "==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%" PRIu64 " electTimerLogicClock:%" PRIu64 - ", " - "electTimerLogicClockUser:%" PRIu64 ", electTimerMS:%d", - ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, - ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS); - - int32_t ret = 0; - syncPingLog2(logBuf, pMsg); SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId); SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsgReply, &rpcMsg); @@ -3024,7 +2996,7 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); syncPingReplyDestroy(pMsgReply); - return ret; + return 0; } int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 17c8c14136..5ff73a6406 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -113,10 +113,8 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) { } } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { - if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) { + if (atomic_load_64(&ths->electTimerLogicClock) <= pMsg->logicClock) { ++(ths->electTimerCounter); - sTrace("vgId:%d, sync timer, type:election count:%" PRIu64 ", lc-user:%" PRIu64, ths->vgId, - ths->electTimerCounter, ths->electTimerLogicClockUser); syncNodeElect(ths); } From 01b712fbfd43c7037b105298bfadebbc0077d6e6 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 25 Oct 2022 19:11:28 +0800 Subject: [PATCH 2/7] refactor(sync): add trace log --- source/libs/sync/src/syncMain.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 29c82bcd8c..6496beea93 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2790,6 +2790,12 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { syncTimeoutDestroy(pSyncMsg); return; } + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "eq elect timer lc:%ld", pSyncMsg->logicClock); + } while (0); + } else { sTrace("syncNodeEqElectTimer FpEqMsg is NULL"); } From 96a8a80cd79605e9e634d98ef1d5d77453325359 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 25 Oct 2022 19:22:25 +0800 Subject: [PATCH 3/7] refactor(sync): add trace log --- source/libs/sync/src/syncMain.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6496beea93..b4cf27a15c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2794,6 +2794,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { do { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "eq elect timer lc:%ld", pSyncMsg->logicClock); + syncNodeEventLog(pSyncNode, logBuf); } while (0); } else { From 9caea0f9445ee2b98fe85a5f0daf5a47ef53f0ca Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 25 Oct 2022 19:43:07 +0800 Subject: [PATCH 4/7] refactor(sync): adjust elect timer --- source/libs/sync/src/syncMain.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b4cf27a15c..f2d1eaf3e7 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2802,6 +2802,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } syncTimeoutDestroy(pSyncMsg); +#if 0 // reset timer ms if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) { pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); @@ -2810,6 +2811,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } else { sError("sync env is stop, syncNodeEqElectTimer"); } +#endif } static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { From 49af601e19f4ef26f33d082b043ba1826dc76ec1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 25 Oct 2022 19:56:49 +0800 Subject: [PATCH 5/7] refactor(sync): add SElectTimer --- source/libs/sync/inc/syncInt.h | 6 ++++++ source/libs/sync/src/syncMain.c | 18 ++++++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 5f4a6f21b4..ae053328ab 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -79,6 +79,12 @@ typedef struct SSyncTimer { void* pData; } SSyncTimer; +typedef struct SElectTimer { + uint64_t logicClock; + SSyncNode* pSyncNode; + void* pData; +} SElectTimer; + int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId); int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f2d1eaf3e7..f2e25a023b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1564,7 +1564,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; if (syncEnvIsStart()) { pSyncNode->electTimerMS = ms; - taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + + SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer)); + pElectTimer->logicClock = pSyncNode->electTimerLogicClock; + pElectTimer->pSyncNode = pSyncNode; + pElectTimer->pData = NULL; + + taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, gSyncEnv->pTimerManager, &pSyncNode->pElectTimer); } else { @@ -2776,10 +2782,11 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } static void syncNodeEqElectTimer(void* param, void* tmrId) { - SSyncNode* pSyncNode = (SSyncNode*)param; + SElectTimer* pElectTimer = (SElectTimer*)param; + SSyncNode* pSyncNode = pElectTimer->pSyncNode; - SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock), - pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode); + SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pSyncNode->electTimerMS, + pSyncNode->vgId, pSyncNode); SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { @@ -2788,6 +2795,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); syncTimeoutDestroy(pSyncMsg); + taosMemoryFree(pElectTimer); return; } @@ -2800,7 +2808,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } else { sTrace("syncNodeEqElectTimer FpEqMsg is NULL"); } + syncTimeoutDestroy(pSyncMsg); + taosMemoryFree(pElectTimer); #if 0 // reset timer ms From 16e6273d9d429fad788aacd1e43aa4fa05969493 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 25 Oct 2022 20:31:20 +0800 Subject: [PATCH 6/7] refactor(sync): delete assert --- source/libs/sync/src/syncVoteMgr.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 39d62b957a..ce72935221 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -66,7 +66,12 @@ bool voteGrantedMajority(SVotesGranted *pVotesGranted) { void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { ASSERT(pMsg->voteGranted == true); - ASSERT(pMsg->term == pVotesGranted->term); + + if (pMsg->term != pVotesGranted->term) { + syncNodeEventLog(pVotesGranted->pSyncNode, "vote grant vnode error"); + return; + } + ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)); int j = -1; @@ -201,7 +206,11 @@ bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { } void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) { - ASSERT(pVotesRespond->term == pMsg->term); + if (pVotesRespond->term != pMsg->term) { + syncNodeEventLog(pVotesRespond->pSyncNode, "vote respond add error"); + return; + } + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) { // ASSERT(pVotesRespond->isRespond[i] == false); From 7723a9ac28d5c17c582aaeb921022ecf2f96bae3 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 25 Oct 2022 23:17:52 +0800 Subject: [PATCH 7/7] refactor(sync): delete %ld --- source/libs/sync/src/syncMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f2e25a023b..44c19f5431 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2801,7 +2801,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { do { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "eq elect timer lc:%ld", pSyncMsg->logicClock); + snprintf(logBuf, sizeof(logBuf), "eq elect timer lc:%" PRIu64, pSyncMsg->logicClock); syncNodeEventLog(pSyncNode, logBuf); } while (0);