feat: support `alter dnode max_compact_tasks`
This commit is contained in:
parent
b86be02332
commit
9e484be276
|
@ -77,7 +77,6 @@ extern int32_t tsReadTimeout;
|
||||||
extern int32_t tsTimeToGetAvailableConn;
|
extern int32_t tsTimeToGetAvailableConn;
|
||||||
extern int32_t tsKeepAliveIdle;
|
extern int32_t tsKeepAliveIdle;
|
||||||
extern int32_t tsNumOfCommitThreads;
|
extern int32_t tsNumOfCommitThreads;
|
||||||
extern int32_t tsNumOfCompactThreads;
|
|
||||||
extern int32_t tsNumOfTaskQueueThreads;
|
extern int32_t tsNumOfTaskQueueThreads;
|
||||||
extern int32_t tsNumOfMnodeQueryThreads;
|
extern int32_t tsNumOfMnodeQueryThreads;
|
||||||
extern int32_t tsNumOfMnodeFetchThreads;
|
extern int32_t tsNumOfMnodeFetchThreads;
|
||||||
|
@ -93,6 +92,9 @@ extern int32_t tsNumOfSnodeWriteThreads;
|
||||||
extern int64_t tsQueueMemoryAllowed;
|
extern int64_t tsQueueMemoryAllowed;
|
||||||
extern int32_t tsRetentionSpeedLimitMB;
|
extern int32_t tsRetentionSpeedLimitMB;
|
||||||
|
|
||||||
|
extern const char *tsAlterCompactTaskKeywords;
|
||||||
|
extern int32_t tsNumOfCompactThreads;
|
||||||
|
|
||||||
// sync raft
|
// sync raft
|
||||||
extern int32_t tsElectInterval;
|
extern int32_t tsElectInterval;
|
||||||
extern int32_t tsHeartbeatInterval;
|
extern int32_t tsHeartbeatInterval;
|
||||||
|
|
|
@ -478,6 +478,7 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_DNODE_INVALID_EN_WHITELIST TAOS_DEF_ERROR_CODE(0, 0x0428)
|
#define TSDB_CODE_DNODE_INVALID_EN_WHITELIST TAOS_DEF_ERROR_CODE(0, 0x0428)
|
||||||
#define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429)
|
#define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429)
|
||||||
#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A)
|
#define TSDB_CODE_MNODE_STOPPED TAOS_DEF_ERROR_CODE(0, 0x042A)
|
||||||
|
#define TSDB_CODE_DNODE_INVALID_COMPACT_TASKS TAOS_DEF_ERROR_CODE(0, 0x042B)
|
||||||
|
|
||||||
// anode
|
// anode
|
||||||
#define TSDB_CODE_MND_ANODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0430)
|
#define TSDB_CODE_MND_ANODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0430)
|
||||||
|
|
|
@ -61,7 +61,6 @@ int32_t tsReadTimeout = 900;
|
||||||
int32_t tsTimeToGetAvailableConn = 500000;
|
int32_t tsTimeToGetAvailableConn = 500000;
|
||||||
|
|
||||||
int32_t tsNumOfCommitThreads = 2;
|
int32_t tsNumOfCommitThreads = 2;
|
||||||
int32_t tsNumOfCompactThreads = 2;
|
|
||||||
int32_t tsNumOfTaskQueueThreads = 16;
|
int32_t tsNumOfTaskQueueThreads = 16;
|
||||||
int32_t tsNumOfMnodeQueryThreads = 16;
|
int32_t tsNumOfMnodeQueryThreads = 16;
|
||||||
int32_t tsNumOfMnodeFetchThreads = 1;
|
int32_t tsNumOfMnodeFetchThreads = 1;
|
||||||
|
@ -78,6 +77,9 @@ int32_t tsMaxStreamBackendCache = 128; // M
|
||||||
int32_t tsPQSortMemThreshold = 16; // M
|
int32_t tsPQSortMemThreshold = 16; // M
|
||||||
int32_t tsRetentionSpeedLimitMB = 0; // unlimited
|
int32_t tsRetentionSpeedLimitMB = 0; // unlimited
|
||||||
|
|
||||||
|
const char *tsAlterCompactTaskKeywords = "max_compact_tasks";
|
||||||
|
int32_t tsNumOfCompactThreads = 2;
|
||||||
|
|
||||||
// sync raft
|
// sync raft
|
||||||
int32_t tsElectInterval = 25 * 1000;
|
int32_t tsElectInterval = 25 * 1000;
|
||||||
int32_t tsHeartbeatInterval = 1000;
|
int32_t tsHeartbeatInterval = 1000;
|
||||||
|
|
|
@ -344,6 +344,23 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t dmAlterMaxCompactTask(const char *value) {
|
||||||
|
int32_t max_compact_tasks;
|
||||||
|
char *endptr = NULL;
|
||||||
|
|
||||||
|
max_compact_tasks = taosStr2Int32(value, &endptr, 10);
|
||||||
|
if (endptr == value || endptr[0] != '\0') {
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (max_compact_tasks != tsNumOfCompactThreads) {
|
||||||
|
dInfo("alter max compact tasks from %d to %d", tsNumOfCompactThreads, max_compact_tasks);
|
||||||
|
tsNumOfCompactThreads = max_compact_tasks;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDCfgDnodeReq cfgReq = {0};
|
SDCfgDnodeReq cfgReq = {0};
|
||||||
|
@ -351,6 +368,10 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return TSDB_CODE_INVALID_MSG;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (strncmp(cfgReq.config, tsAlterCompactTaskKeywords, strlen(tsAlterCompactTaskKeywords) + 1) == 0) {
|
||||||
|
return dmAlterMaxCompactTask(cfgReq.value);
|
||||||
|
}
|
||||||
|
|
||||||
dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
|
dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
|
||||||
|
|
||||||
SConfig *pCfg = taosGetCfg();
|
SConfig *pCfg = taosGetCfg();
|
||||||
|
|
|
@ -1536,6 +1536,9 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
strcpy(dcfgReq.config, "s3blocksize");
|
strcpy(dcfgReq.config, "s3blocksize");
|
||||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
#endif
|
#endif
|
||||||
|
} else if (strncasecmp(cfgReq.config, tsAlterCompactTaskKeywords, strlen(tsAlterCompactTaskKeywords) + 1) == 0) {
|
||||||
|
tstrncpy(dcfgReq.config, cfgReq.config, TSDB_DNODE_CONFIG_LEN);
|
||||||
|
tstrncpy(dcfgReq.value, cfgReq.value, TSDB_DNODE_VALUE_LEN);
|
||||||
} else {
|
} else {
|
||||||
TAOS_CHECK_GOTO (mndMCfg2DCfg(&cfgReq, &dcfgReq), NULL, _err_out);
|
TAOS_CHECK_GOTO (mndMCfg2DCfg(&cfgReq, &dcfgReq), NULL, _err_out);
|
||||||
if (strlen(dcfgReq.config) > TSDB_DNODE_CONFIG_LEN) {
|
if (strlen(dcfgReq.config) > TSDB_DNODE_CONFIG_LEN) {
|
||||||
|
|
|
@ -9718,6 +9718,9 @@ static int32_t translateDropDnode(STranslateContext* pCxt, SDropDnodeStmt* pStmt
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define MIN_MAX_COMPACT_TASKS 1
|
||||||
|
#define MAX_MAX_COMPACT_TASKS 100
|
||||||
|
|
||||||
static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pStmt) {
|
static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pStmt) {
|
||||||
SMCfgDnodeReq cfgReq = {0};
|
SMCfgDnodeReq cfgReq = {0};
|
||||||
cfgReq.dnodeId = pStmt->dnodeId;
|
cfgReq.dnodeId = pStmt->dnodeId;
|
||||||
|
@ -9725,7 +9728,12 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
|
||||||
strcpy(cfgReq.value, pStmt->value);
|
strcpy(cfgReq.value, pStmt->value);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (0 == strncasecmp(cfgReq.config, "encrypt_key", 12)) {
|
|
||||||
|
const char* validConfigs[] = {
|
||||||
|
"encrypt_key",
|
||||||
|
tsAlterCompactTaskKeywords,
|
||||||
|
};
|
||||||
|
if (0 == strncasecmp(cfgReq.config, validConfigs[0], strlen(validConfigs[0]) + 1)) {
|
||||||
int32_t klen = strlen(cfgReq.value);
|
int32_t klen = strlen(cfgReq.value);
|
||||||
if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
|
if (klen > ENCRYPT_KEY_LEN || klen < ENCRYPT_KEY_LEN_MIN) {
|
||||||
tFreeSMCfgDnodeReq(&cfgReq);
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
|
@ -9734,6 +9742,28 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
|
||||||
ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
|
ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
|
||||||
}
|
}
|
||||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_ENCRYPT_KEY, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
|
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_ENCRYPT_KEY, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
|
||||||
|
} else if (0 == strncasecmp(cfgReq.config, validConfigs[1], strlen(validConfigs[1]) + 1)) {
|
||||||
|
char* endptr = NULL;
|
||||||
|
int32_t maxCompactTasks = taosStr2Int32(cfgReq.value, &endptr, 10);
|
||||||
|
int32_t minMaxCompactTasks = MIN_MAX_COMPACT_TASKS;
|
||||||
|
int32_t maxMaxCompactTasks = MAX_MAX_COMPACT_TASKS;
|
||||||
|
|
||||||
|
// check format
|
||||||
|
if (endptr == cfgReq.value || endptr[0] != '\0') {
|
||||||
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_DNODE_INVALID_COMPACT_TASKS,
|
||||||
|
"Invalid max compact tasks: %s", cfgReq.value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check range
|
||||||
|
if (maxCompactTasks < minMaxCompactTasks || maxCompactTasks > maxMaxCompactTasks) {
|
||||||
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_DNODE_INVALID_COMPACT_TASKS,
|
||||||
|
"Invalid max compact tasks: %d, valid range [%d,%d]", maxCompactTasks,
|
||||||
|
minMaxCompactTasks, maxMaxCompactTasks);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
|
||||||
} else {
|
} else {
|
||||||
code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
|
code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
|
||||||
}
|
}
|
||||||
|
|
|
@ -415,7 +415,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_CHARSET, "charset not match")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_LOCALE, "locale not match")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_LOCALE, "locale not match")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR, "ttlChangeOnWrite not match")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR, "ttlChangeOnWrite not match")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_EN_WHITELIST, "enableWhiteList not match")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_EN_WHITELIST, "enableWhiteList not match")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_STOPPED, "Mnode stopped")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_STOPPED, "Mnode stopped")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_COMPACT_TASKS, "Invalid max compact tasks")
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed")
|
||||||
|
|
Loading…
Reference in New Issue