enh: let max num of concurrent msgs of snap replication be configurable

This commit is contained in:
Benguang Zhao 2024-01-09 16:49:27 +08:00
parent a518cba133
commit 7979d4532b
3 changed files with 8 additions and 1 deletions

View File

@ -74,6 +74,7 @@ extern int64_t tsRpcQueueMemoryAllowed;
extern int32_t tsElectInterval; extern int32_t tsElectInterval;
extern int32_t tsHeartbeatInterval; extern int32_t tsHeartbeatInterval;
extern int32_t tsHeartbeatTimeout; extern int32_t tsHeartbeatTimeout;
extern int32_t tsSnapReplicationMsgNLimit;
// vnode // vnode
extern int64_t tsVndCommitMaxIntervalMs; extern int64_t tsVndCommitMaxIntervalMs;

View File

@ -72,6 +72,7 @@ int32_t tsPQSortMemThreshold = 16; // M
int32_t tsElectInterval = 25 * 1000; int32_t tsElectInterval = 25 * 1000;
int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatInterval = 1000;
int32_t tsHeartbeatTimeout = 20 * 1000; int32_t tsHeartbeatTimeout = 20 * 1000;
int32_t tsSnapReplicationMsgNLimit = 128;
// mnode // mnode
int64_t tsMndSdbWriteDelta = 200; int64_t tsMndSdbWriteDelta = 200;
@ -673,6 +674,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER,
CFG_DYN_NONE) != 0) CFG_DYN_NONE) != 0)
return -1; return -1;
if (cfgAddInt32(pCfg, "syncSnapReplicationMsgNLimit", tsSnapReplicationMsgNLimit, 16,
(TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
return -1; return -1;
@ -1183,6 +1187,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsElectInterval = cfgGetItem(pCfg, "syncElectInterval")->i32; tsElectInterval = cfgGetItem(pCfg, "syncElectInterval")->i32;
tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32; tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32;
tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32; tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32;
tsSnapReplicationMsgNLimit = cfgGetItem(pCfg, "syncSnapReplicationMsgNLimit")->i32;
tsMndSdbWriteDelta = cfgGetItem(pCfg, "mndSdbWriteDelta")->i64; tsMndSdbWriteDelta = cfgGetItem(pCfg, "mndSdbWriteDelta")->i64;
tsMndLogRetention = cfgGetItem(pCfg, "mndLogRetention")->i64; tsMndLogRetention = cfgGetItem(pCfg, "mndLogRetention")->i64;

View File

@ -22,6 +22,7 @@
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncReplication.h" #include "syncReplication.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "tglobal.h"
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths); static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
@ -1186,7 +1187,7 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
pSndBuf->start = ack + 1; pSndBuf->start = ack + 1;
} }
while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) { while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplicationMsgNLimit) {
if (snapshotSend(pSender) != 0) { if (snapshotSend(pSender) != 0) {
code = terrno; code = terrno;
goto _out; goto _out;