From 7979d4532bfd9ce3e0c6291e7bd80ad3eec20ed9 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 9 Jan 2024 16:49:27 +0800 Subject: [PATCH] enh: let max num of concurrent msgs of snap replication be configurable --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 5 +++++ source/libs/sync/src/syncSnapshot.c | 3 ++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index f006779a48..8f8ddc632f 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -74,6 +74,7 @@ extern int64_t tsRpcQueueMemoryAllowed; extern int32_t tsElectInterval; extern int32_t tsHeartbeatInterval; extern int32_t tsHeartbeatTimeout; +extern int32_t tsSnapReplicationMsgNLimit; // vnode extern int64_t tsVndCommitMaxIntervalMs; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 79d21955d4..5228226235 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -72,6 +72,7 @@ int32_t tsPQSortMemThreshold = 16; // M int32_t tsElectInterval = 25 * 1000; int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatTimeout = 20 * 1000; +int32_t tsSnapReplicationMsgNLimit = 128; // mnode int64_t tsMndSdbWriteDelta = 200; @@ -673,6 +674,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "syncSnapReplicationMsgNLimit", tsSnapReplicationMsgNLimit, 16, + (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + return -1; if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1183,6 +1187,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsElectInterval = cfgGetItem(pCfg, "syncElectInterval")->i32; tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32; tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32; + tsSnapReplicationMsgNLimit = cfgGetItem(pCfg, "syncSnapReplicationMsgNLimit")->i32; tsMndSdbWriteDelta = cfgGetItem(pCfg, "mndSdbWriteDelta")->i64; tsMndLogRetention = cfgGetItem(pCfg, "mndLogRetention")->i64; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index f0e457ef8d..564d28b0f4 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -22,6 +22,7 @@ #include "syncRaftStore.h" #include "syncReplication.h" #include "syncUtil.h" +#include "tglobal.h" static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths); @@ -1186,7 +1187,7 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp pSndBuf->start = ack + 1; } - while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) { + while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplicationMsgNLimit) { if (snapshotSend(pSender) != 0) { code = terrno; goto _out;