diff --git a/docs/en/14-reference/12-config/index.md b/docs/en/14-reference/12-config/index.md index 5e4eadcceb..f50551b5de 100755 --- a/docs/en/14-reference/12-config/index.md +++ b/docs/en/14-reference/12-config/index.md @@ -432,7 +432,7 @@ The charset that takes effect is UTF-8. | Applicable | Server Only | | Meaning | Maximum number of threads to commit | | Value Range | 0-1024 | -| Default Value | | +| Default Value | 4 | ## Log Parameters diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index 6fce985927..effa72099a 100755 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -430,7 +430,7 @@ charset 的有效值是 UTF-8。 | 适用范围 | 仅服务端适用 | | 含义 | 设置写入线程的最大数量 | | 取值范围 | 0-1024 | -| 缺省值 | | +| 缺省值 | 4 | ## 日志相关 diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 90ee6f7cc0..e7035fe297 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -86,6 +86,7 @@ extern int32_t tsNumOfQnodeFetchThreads; extern int32_t tsNumOfSnodeStreamThreads; extern int32_t tsNumOfSnodeWriteThreads; extern int64_t tsRpcQueueMemoryAllowed; +extern int32_t tsRetentionSpeedLimitMB; // sync raft extern int32_t tsElectInterval; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c68dc85c29..da692e78fd 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -74,6 +74,7 @@ int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; int32_t tsMaxStreamBackendCache = 128; // M int32_t tsPQSortMemThreshold = 16; // M +int32_t tsRetentionSpeedLimitMB = 0; // unlimited // sync raft int32_t tsElectInterval = 25 * 1000; @@ -667,6 +668,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -1117,6 +1119,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; + tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 3d53d1ada3..0d3994d78e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -38,6 +38,34 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) { return TARRAY2_APPEND(&rtner->fopArr, op); } +static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) { + int64_t total = 0; + int64_t interval = 1000; // 1s + int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX; + int64_t offset = 0; + int64_t remain = size; + + while (remain > 0) { + int64_t n; + int64_t last = taosGetTimestampMs(); + if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) { + return -1; + } + + total += n; + remain -= n; + + if (remain > 0) { + int64_t elapsed = taosGetTimestampMs() - last; + if (elapsed < interval) { + taosMsleep(interval - elapsed); + } + } + } + + return total; +} + static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) { int32_t code = 0; int32_t lino = 0; @@ -98,7 +126,8 @@ static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile if (fdTo == NULL) code = terrno; TSDB_CHECK_CODE(code, lino, _exit); - int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtner->szPage)); + int64_t n = tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage), + tsRetentionSpeedLimitMB); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); TSDB_CHECK_CODE(code, lino, _exit);