Merge pull request #26119 from taosdata/feat/TS-4724-3.0
feat: data migration speed limit
This commit is contained in:
commit
470b0e6730
|
@ -432,7 +432,7 @@ The charset that takes effect is UTF-8.
|
||||||
| Applicable | Server Only |
|
| Applicable | Server Only |
|
||||||
| Meaning | Maximum number of threads to commit |
|
| Meaning | Maximum number of threads to commit |
|
||||||
| Value Range | 0-1024 |
|
| Value Range | 0-1024 |
|
||||||
| Default Value | |
|
| Default Value | 4 |
|
||||||
|
|
||||||
## Log Parameters
|
## Log Parameters
|
||||||
|
|
||||||
|
|
|
@ -430,7 +430,7 @@ charset 的有效值是 UTF-8。
|
||||||
| 适用范围 | 仅服务端适用 |
|
| 适用范围 | 仅服务端适用 |
|
||||||
| 含义 | 设置写入线程的最大数量 |
|
| 含义 | 设置写入线程的最大数量 |
|
||||||
| 取值范围 | 0-1024 |
|
| 取值范围 | 0-1024 |
|
||||||
| 缺省值 | |
|
| 缺省值 | 4 |
|
||||||
|
|
||||||
## 日志相关
|
## 日志相关
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,7 @@ extern int32_t tsNumOfQnodeFetchThreads;
|
||||||
extern int32_t tsNumOfSnodeStreamThreads;
|
extern int32_t tsNumOfSnodeStreamThreads;
|
||||||
extern int32_t tsNumOfSnodeWriteThreads;
|
extern int32_t tsNumOfSnodeWriteThreads;
|
||||||
extern int64_t tsRpcQueueMemoryAllowed;
|
extern int64_t tsRpcQueueMemoryAllowed;
|
||||||
|
extern int32_t tsRetentionSpeedLimitMB;
|
||||||
|
|
||||||
// sync raft
|
// sync raft
|
||||||
extern int32_t tsElectInterval;
|
extern int32_t tsElectInterval;
|
||||||
|
|
|
@ -74,6 +74,7 @@ int32_t tsNumOfSnodeStreamThreads = 4;
|
||||||
int32_t tsNumOfSnodeWriteThreads = 1;
|
int32_t tsNumOfSnodeWriteThreads = 1;
|
||||||
int32_t tsMaxStreamBackendCache = 128; // M
|
int32_t tsMaxStreamBackendCache = 128; // M
|
||||||
int32_t tsPQSortMemThreshold = 16; // M
|
int32_t tsPQSortMemThreshold = 16; // M
|
||||||
|
int32_t tsRetentionSpeedLimitMB = 0; // unlimited
|
||||||
|
|
||||||
// sync raft
|
// sync raft
|
||||||
int32_t tsElectInterval = 25 * 1000;
|
int32_t tsElectInterval = 25 * 1000;
|
||||||
|
@ -667,6 +668,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
|
@ -1117,6 +1119,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
|
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
|
||||||
|
|
||||||
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
||||||
|
tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32;
|
||||||
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
||||||
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
||||||
tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval;
|
tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval;
|
||||||
|
|
|
@ -38,6 +38,34 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
|
||||||
return TARRAY2_APPEND(&rtner->fopArr, op);
|
return TARRAY2_APPEND(&rtner->fopArr, op);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
|
||||||
|
int64_t total = 0;
|
||||||
|
int64_t interval = 1000; // 1s
|
||||||
|
int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
|
||||||
|
int64_t offset = 0;
|
||||||
|
int64_t remain = size;
|
||||||
|
|
||||||
|
while (remain > 0) {
|
||||||
|
int64_t n;
|
||||||
|
int64_t last = taosGetTimestampMs();
|
||||||
|
if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
total += n;
|
||||||
|
remain -= n;
|
||||||
|
|
||||||
|
if (remain > 0) {
|
||||||
|
int64_t elapsed = taosGetTimestampMs() - last;
|
||||||
|
if (elapsed < interval) {
|
||||||
|
taosMsleep(interval - elapsed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
|
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -98,7 +126,8 @@ static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile
|
||||||
if (fdTo == NULL) code = terrno;
|
if (fdTo == NULL) code = terrno;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtner->szPage));
|
int64_t n = tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
|
||||||
|
tsRetentionSpeedLimitMB);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
Loading…
Reference in New Issue