From 62b07dbe5d8724294bc8116a269cb47247c4938a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 8 Aug 2022 15:10:32 +0800 Subject: [PATCH] refactor(sync): speed up replicate --- source/libs/sync/inc/syncInt.h | 2 ++ source/libs/sync/src/syncMain.c | 17 ++++++++++++++--- source/libs/sync/src/syncReplication.c | 25 +++++++++++++++++++++++-- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 82399f52b9..250b294c19 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -192,9 +192,11 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartNowHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeRestartNowHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms); // utils -------------- int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index cdd729999f..5395b72e27 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1322,10 +1322,10 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { return ret; } -int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) { +int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; if (syncEnvIsStart()) { - taosTmrReset(pSyncNode->FpHeartbeatTimerCB, 1, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); + taosTmrReset(pSyncNode->FpHeartbeatTimerCB, ms, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } else { sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); @@ -1333,13 +1333,18 @@ int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) { do { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", 1); + snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", ms); syncNodeEventLog(pSyncNode, logBuf); } while (0); return ret; } +int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) { + int32_t ret = syncNodeStartHeartbeatTimerMS(pSyncNode, 1); + return ret; +} + int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); @@ -1363,6 +1368,12 @@ int32_t syncNodeRestartNowHeartbeatTimer(SSyncNode* pSyncNode) { return 0; } +int32_t syncNodeRestartNowHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) { + syncNodeStopHeartbeatTimer(pSyncNode); + syncNodeStartHeartbeatTimerMS(pSyncNode, ms); + return 0; +} + // utils -------------- int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index bc703e519c..1a147e391d 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -200,9 +200,23 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { // send msg syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg); syncAppendEntriesBatchDestroy(pMsg); + + // speed up + if (pMsg->dataCount > 0 && pMsg->prevLogIndex < pSyncNode->commitIndex) { + ret = 1; + + do { + char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); + snprintf(logBuf, sizeof(logBuf), "speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + } } - return 0; + return ret; } int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { @@ -309,7 +323,14 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { break; } - syncNodeRestartHeartbeatTimer(pSyncNode); + if (ret > 0) { + // speed up replicate + int32_t ms = pSyncNode->heartbeatTimerMS < 50 ? pSyncNode->heartbeatTimerMS : 50; + syncNodeRestartNowHeartbeatTimerMS(pSyncNode, ms); + + } else { + syncNodeRestartHeartbeatTimer(pSyncNode); + } return ret; }