From 7979d4532bfd9ce3e0c6291e7bd80ad3eec20ed9 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 9 Jan 2024 16:49:27 +0800 Subject: [PATCH 1/2] enh: let max num of concurrent msgs of snap replication be configurable --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 5 +++++ source/libs/sync/src/syncSnapshot.c | 3 ++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index f006779a48..8f8ddc632f 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -74,6 +74,7 @@ extern int64_t tsRpcQueueMemoryAllowed; extern int32_t tsElectInterval; extern int32_t tsHeartbeatInterval; extern int32_t tsHeartbeatTimeout; +extern int32_t tsSnapReplicationMsgNLimit; // vnode extern int64_t tsVndCommitMaxIntervalMs; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 79d21955d4..5228226235 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -72,6 +72,7 @@ int32_t tsPQSortMemThreshold = 16; // M int32_t tsElectInterval = 25 * 1000; int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatTimeout = 20 * 1000; +int32_t tsSnapReplicationMsgNLimit = 128; // mnode int64_t tsMndSdbWriteDelta = 200; @@ -673,6 +674,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "syncSnapReplicationMsgNLimit", tsSnapReplicationMsgNLimit, 16, + (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + return -1; if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1183,6 +1187,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsElectInterval = cfgGetItem(pCfg, "syncElectInterval")->i32; tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32; tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32; + tsSnapReplicationMsgNLimit = cfgGetItem(pCfg, "syncSnapReplicationMsgNLimit")->i32; tsMndSdbWriteDelta = cfgGetItem(pCfg, "mndSdbWriteDelta")->i64; tsMndLogRetention = cfgGetItem(pCfg, "mndLogRetention")->i64; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index f0e457ef8d..564d28b0f4 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -22,6 +22,7 @@ #include "syncRaftStore.h" #include "syncReplication.h" #include "syncUtil.h" +#include "tglobal.h" static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths); @@ -1186,7 +1187,7 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp pSndBuf->start = ack + 1; } - while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) { + while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplicationMsgNLimit) { if (snapshotSend(pSender) != 0) { code = terrno; goto _out; From 485bad1ebd1fc90651b48b31687fb2f5533f8de1 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 10 Jan 2024 11:30:42 +0800 Subject: [PATCH 2/2] refact: use config syncSnapReplMaxWaitN instead --- include/common/tglobal.h | 2 +- source/common/src/tglobal.c | 6 +++--- source/libs/sync/src/syncSnapshot.c | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 8f8ddc632f..f23bb4d51b 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -74,7 +74,7 @@ extern int64_t tsRpcQueueMemoryAllowed; extern int32_t tsElectInterval; extern int32_t tsHeartbeatInterval; extern int32_t tsHeartbeatTimeout; -extern int32_t tsSnapReplicationMsgNLimit; +extern int32_t tsSnapReplMaxWaitN; // vnode extern int64_t tsVndCommitMaxIntervalMs; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 5228226235..dd51c5267a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -72,7 +72,7 @@ int32_t tsPQSortMemThreshold = 16; // M int32_t tsElectInterval = 25 * 1000; int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatTimeout = 20 * 1000; -int32_t tsSnapReplicationMsgNLimit = 128; +int32_t tsSnapReplMaxWaitN = 128; // mnode int64_t tsMndSdbWriteDelta = 200; @@ -674,7 +674,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "syncSnapReplicationMsgNLimit", tsSnapReplicationMsgNLimit, 16, + if (cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -1187,7 +1187,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsElectInterval = cfgGetItem(pCfg, "syncElectInterval")->i32; tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32; tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32; - tsSnapReplicationMsgNLimit = cfgGetItem(pCfg, "syncSnapReplicationMsgNLimit")->i32; + tsSnapReplMaxWaitN = cfgGetItem(pCfg, "syncSnapReplMaxWaitN")->i32; tsMndSdbWriteDelta = cfgGetItem(pCfg, "mndSdbWriteDelta")->i64; tsMndLogRetention = cfgGetItem(pCfg, "mndLogRetention")->i64; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 564d28b0f4..578e6798e0 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -1187,7 +1187,7 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp pSndBuf->start = ack + 1; } - while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplicationMsgNLimit) { + while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplMaxWaitN) { if (snapshotSend(pSender) != 0) { code = terrno; goto _out;