enh(stream): limit the concurrent started checkpoint tasks.
This commit is contained in:
parent
46ca55ac74
commit
becc4cb368
|
@ -235,6 +235,7 @@ extern int32_t tsMqRebalanceInterval;
|
|||
extern int32_t tsStreamCheckpointInterval;
|
||||
extern float tsSinkDataRate;
|
||||
extern int32_t tsStreamNodeCheckInterval;
|
||||
extern int32_t tsMaxConcurrentCheckpoint;
|
||||
extern int32_t tsTtlUnit;
|
||||
extern int32_t tsTtlPushIntervalSec;
|
||||
extern int32_t tsTtlBatchDropNum;
|
||||
|
|
|
@ -274,6 +274,7 @@ int32_t tsMqRebalanceInterval = 2;
|
|||
int32_t tsStreamCheckpointInterval = 60;
|
||||
float tsSinkDataRate = 2.0;
|
||||
int32_t tsStreamNodeCheckInterval = 16;
|
||||
int32_t tsMaxConcurrentCheckpoint = 1;
|
||||
int32_t tsTtlUnit = 86400;
|
||||
int32_t tsTtlPushIntervalSec = 10;
|
||||
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
|
||||
|
@ -621,114 +622,80 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
|
|||
}
|
||||
|
||||
static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||
if (cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
tsNumOfCommitThreads = tsNumOfCores / 2;
|
||||
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
|
||||
|
||||
tsNumOfSupportVnodes = tsNumOfCores * 2 + 5;
|
||||
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
|
||||
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddString(pCfg, "encryptAlgorithm", tsEncryptAlgorithm, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "encryptScope", tsEncryptScope, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
// if (cfgAddString(pCfg, "authCode", tsAuthCode, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
tsNumOfCommitThreads = tsNumOfCores / 2;
|
||||
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
|
||||
tsNumOfMnodeReadThreads = tsNumOfCores / 8;
|
||||
tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
|
||||
tsNumOfVnodeQueryThreads = tsNumOfCores * 2;
|
||||
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 16);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) !=
|
||||
0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
|
||||
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
|
||||
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) !=
|
||||
0)
|
||||
return -1;
|
||||
|
||||
tsNumOfVnodeRsmaThreads = tsNumOfCores / 4;
|
||||
tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
|
||||
tsNumOfQnodeQueryThreads = tsNumOfCores * 2;
|
||||
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 16);
|
||||
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) !=
|
||||
0)
|
||||
return -1;
|
||||
|
||||
tsNumOfSnodeStreamThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4);
|
||||
|
||||
tsNumOfSnodeWriteThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4);
|
||||
|
||||
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
|
||||
// clang-format off
|
||||
if (cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddString(pCfg, "encryptAlgorithm", tsEncryptAlgorithm, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "encryptScope", tsEncryptScope, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
|
||||
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
|
||||
// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfSnodeStreamThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
tsNumOfSnodeWriteThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) !=
|
||||
0)
|
||||
return -1;
|
||||
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX,
|
||||
CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) !=
|
||||
0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2),
|
||||
CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "arbSetAssignedTimeoutSec", tsArbSetAssignedTimeoutSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "arbSetAssignedTimeoutSec", tsArbSetAssignedTimeoutSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "skipGrant", tsMndSkipGrant, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
@ -736,9 +703,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "monitorLogProtocol", tsMonitorLogProtocol, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
@ -752,68 +717,43 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
|
||||
if (cfgAddInt32(pCfg, "rsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "snodeAddress", tsSnodeAddress, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "tmqRowSize", tmqRowSize, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "maxTsmaNum", tsMaxTsmaNum, 0, 3, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
||||
0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
||||
0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "ttlBatchDropNum", tsTtlBatchDropNum, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "ttlFlushThreshold", tsTtlFlushThreshold, -1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
||||
0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
||||
0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "s3MigrateIntervalSec", tsS3MigrateIntervalSec, 600, 100000, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "ttlBatchDropNum", tsTtlBatchDropNum, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "ttlFlushThreshold", tsTtlFlushThreshold, -1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "s3MigrateIntervalSec", tsS3MigrateIntervalSec, 600, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "s3MigrateEnabled", tsS3MigrateEnabled, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
||||
0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX,
|
||||
CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "udf", tsStartUdfd, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "concurrentCheckpoint", tsMaxConcurrentCheckpoint, 1, 10, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "dPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
@ -823,41 +763,23 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddString(pCfg, "compressor", tsCompressor, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "resolveFQDNRetryTime", tsResolveFQDNRetryTime, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "resolveFQDNRetryTime", tsResolveFQDNRetryTime, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
/*
|
||||
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -1, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (tsS3BlockSize > -1 && tsS3BlockSize < 1024) {
|
||||
uError("failed to config s3blocksize since value:%d. Valid range: -1 or [1024, 1024 * 1024]", tsS3BlockSize);
|
||||
return -1;
|
||||
}
|
||||
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
||||
0)
|
||||
return -1;
|
||||
*/
|
||||
if (cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER,
|
||||
CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
// min free disk space used to check if the disk is full [50MB, 1GB]
|
||||
if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024,
|
||||
CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
// clang-format on
|
||||
|
||||
// GRANT_CFG_ADD;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -97,6 +97,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
|||
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
|
||||
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt);
|
||||
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
|
||||
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
|
||||
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
|
||||
|
|
|
@ -90,21 +90,6 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
|
|||
return pReq;
|
||||
}
|
||||
|
||||
static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) {
|
||||
SMStreamTickReq timerReq = {
|
||||
.tick = sec,
|
||||
};
|
||||
|
||||
int32_t contLen = tSerializeSMStreamTickMsg(NULL, 0, &timerReq);
|
||||
if (contLen <= 0) return NULL;
|
||||
void *pReq = rpcMallocCont(contLen);
|
||||
if (pReq == NULL) return NULL;
|
||||
|
||||
tSerializeSMStreamTickMsg(pReq, contLen, &timerReq);
|
||||
*pContLen = contLen;
|
||||
return pReq;
|
||||
}
|
||||
|
||||
static void mndPullupTrans(SMnode *pMnode) {
|
||||
mTrace("pullup trans msg");
|
||||
int32_t contLen = 0;
|
||||
|
@ -174,21 +159,12 @@ static void mndCalMqRebalance(SMnode *pMnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildCheckpointTickMsg(&contLen, sec);
|
||||
if (pReq != NULL) {
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER, .pCont = pReq, .contLen = contLen};
|
||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void mndStreamCheckpointRemain(SMnode *pMnode) {
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildCheckpointTickMsg(&contLen, 0);
|
||||
if (pReq != NULL) {
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen};
|
||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||
static void mndStreamCheckpointTimer(SMnode *pMnode) {
|
||||
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
||||
if (pMsg != NULL) {
|
||||
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -367,12 +343,8 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
|
|||
mndCalMqRebalance(pMnode);
|
||||
}
|
||||
|
||||
if (sec % tsStreamCheckpointInterval == 0) {
|
||||
mndStreamCheckpointTick(pMnode, sec);
|
||||
}
|
||||
|
||||
if (sec % 5 == 0) {
|
||||
mndStreamCheckpointRemain(pMnode);
|
||||
if (sec % 30 == 0) { // send the checkpoint info every 10 sec
|
||||
mndStreamCheckpointTimer(pMnode);
|
||||
}
|
||||
|
||||
if (sec % tsStreamNodeCheckInterval == 0) {
|
||||
|
@ -399,6 +371,7 @@ void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) {
|
|||
mndSyncCheckTimeout(pMnode);
|
||||
}
|
||||
}
|
||||
|
||||
static void *mndThreadFp(void *param) {
|
||||
SMnode *pMnode = param;
|
||||
int64_t lastTime = 0;
|
||||
|
@ -832,10 +805,9 @@ _OVER:
|
|||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
|
||||
pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER ||
|
||||
pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER ||
|
||||
pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE ||
|
||||
pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT ||
|
||||
pMsg->msgType == TDMT_MND_S3MIGRATE_DB_TIMER ||
|
||||
pMsg->msgType == TDMT_MND_ARB_HEARTBEAT_TIMER || pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER) {
|
||||
pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT ||
|
||||
pMsg->msgType == TDMT_MND_S3MIGRATE_DB_TIMER || pMsg->msgType == TDMT_MND_ARB_HEARTBEAT_TIMER ||
|
||||
pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER) {
|
||||
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
|
||||
pMnode->stopped, state.restored, syncStr(state.state));
|
||||
return -1;
|
||||
|
|
|
@ -45,9 +45,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
|
|||
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
|
||||
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
|
||||
|
||||
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
|
||||
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
|
||||
static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq);
|
||||
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
|
@ -114,10 +112,8 @@ int32_t mndInitStream(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, mndProcessStreamCheckpointInCandid);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
|
||||
|
@ -886,26 +882,10 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
|
|||
}
|
||||
}
|
||||
|
||||
mDebug("generated checkpoint %" PRId64 "", maxChkptId + 1);
|
||||
mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1);
|
||||
return maxChkptId + 1;
|
||||
}
|
||||
|
||||
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
||||
pMsg->checkpointId = mndStreamGenChkptId(pMnode, true);
|
||||
|
||||
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
|
||||
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
|
||||
SStreamCheckpointSourceReq req = {0};
|
||||
|
@ -1136,73 +1116,101 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
|
|||
return ready ? 0 : -1;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int64_t duration;
|
||||
} SCheckpointInterval;
|
||||
|
||||
static int32_t streamWaitComparFn(const void* p1, const void* p2) {
|
||||
const SCheckpointInterval* pInt1 = p1;
|
||||
const SCheckpointInterval* pInt2 = p2;
|
||||
if (pInt1->duration == pInt2->duration) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return pInt1->duration > pInt2->duration? -1:1;
|
||||
}
|
||||
|
||||
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SStreamObj *pStream = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t numOfCheckpointTrans = 0;
|
||||
|
||||
if ((code = mndCheckNodeStatus(pMnode)) != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// make sure the time interval between two consecutive checkpoint trans is long enough
|
||||
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
|
||||
SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval));
|
||||
int64_t now = taosGetTimestampMs();
|
||||
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
|
||||
code = mndProcessStreamCheckpointTrans(pMnode, pStream, pMsg->checkpointId, 1, true);
|
||||
sdbRelease(pSdb, pStream);
|
||||
if (code == -1) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
void *pIter = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
int32_t num = taosHashGetSize(execInfo.transMgmt.pWaitingList);
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
if (num == 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if ((code = mndCheckNodeStatus(pMnode)) != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SArray *pList = taosArrayInit(4, sizeof(int64_t));
|
||||
while ((pIter = taosHashIterate(execInfo.transMgmt.pWaitingList, pIter)) != NULL) {
|
||||
SCheckpointCandEntry *pEntry = pIter;
|
||||
|
||||
SStreamObj *ps = mndAcquireStream(pMnode, pEntry->pName);
|
||||
if (ps == NULL) {
|
||||
int64_t duration = now - pStream->checkpointFreq;
|
||||
if (duration < tsStreamCheckpointInterval * 1000) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
continue;
|
||||
}
|
||||
|
||||
mDebug("start to launch checkpoint for stream:%s %" PRIx64 " in candidate list", pEntry->pName, pEntry->streamId);
|
||||
SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
|
||||
taosArrayPush(pList, &in);
|
||||
|
||||
code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId, 1, true);
|
||||
mndReleaseStream(pMnode, ps);
|
||||
int32_t currentSize = taosArrayGetSize(pList);
|
||||
mDebug("stream:%s (uid:0x%" PRIx64 ") checkpoint interval beyond threshold: %" PRId64 "s(%" PRId64
|
||||
"s) beyond threshold:%d",
|
||||
pStream->name, pStream->uid, tsStreamCheckpointInterval, duration / 1000, currentSize);
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
taosArrayPush(pList, &pEntry->streamId);
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
int32_t size = taosArrayGetSize(pList);
|
||||
if (size == 0) {
|
||||
taosArrayDestroy(pList);
|
||||
return code;
|
||||
}
|
||||
|
||||
taosArraySort(pList, streamWaitComparFn);
|
||||
mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
|
||||
int32_t numOfQual = taosArrayGetSize(pList);
|
||||
|
||||
if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) {
|
||||
mDebug(
|
||||
"%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
|
||||
"checkpoint trans are not allowed, wait for 30s",
|
||||
numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint);
|
||||
taosArrayDestroy(pList);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans;
|
||||
mDebug(
|
||||
"%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, "
|
||||
"concurrent trans threshold:%d",
|
||||
numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint);
|
||||
|
||||
int32_t started = 0;
|
||||
int64_t checkpointId = mndStreamGenChkptId(pMnode, true);
|
||||
|
||||
for (int32_t i = 0; i < numOfQual; ++i) {
|
||||
SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
|
||||
|
||||
SStreamObj *p = mndGetStreamObj(pMnode, pCheckpointInfo->streamId);
|
||||
if (p != NULL) {
|
||||
code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
|
||||
sdbRelease(pSdb, p);
|
||||
|
||||
if (code != -1) {
|
||||
started += 1;
|
||||
|
||||
if (started >= capacity) {
|
||||
mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started,
|
||||
(started + numOfCheckpointTrans));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||
int64_t *pId = taosArrayGet(pList, i);
|
||||
|
||||
taosHashRemove(execInfo.transMgmt.pWaitingList, pId, sizeof(*pId));
|
||||
}
|
||||
|
||||
int32_t remain = taosHashGetSize(execInfo.transMgmt.pWaitingList);
|
||||
mDebug("%d in candidate list generated checkpoint, remaining:%d", (int32_t)taosArrayGetSize(pList), remain);
|
||||
taosArrayDestroy(pList);
|
||||
return code;
|
||||
}
|
||||
|
@ -2349,10 +2357,10 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SMStreamNodeCheckMsg *pMsg = rpcMallocCont(sizeof(SMStreamNodeCheckMsg));
|
||||
int32_t size = sizeof(SMStreamNodeCheckMsg);
|
||||
SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)};
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -21,8 +21,6 @@ typedef struct SKeyInfo {
|
|||
int32_t keyLen;
|
||||
} SKeyInfo;
|
||||
|
||||
static int32_t clearFinishedTrans(SMnode* pMnode);
|
||||
|
||||
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamId) {
|
||||
SStreamTransInfo info = {
|
||||
.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
|
||||
|
@ -30,41 +28,54 @@ int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t s
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t clearFinishedTrans(SMnode* pMnode) {
|
||||
int32_t mndStreamClearFinishedTrans(SMnode* pMnode, int32_t* pNumOfActiveChkpt) {
|
||||
size_t keyLen = 0;
|
||||
void* pIter = NULL;
|
||||
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
|
||||
int32_t num = 0;
|
||||
|
||||
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
|
||||
SStreamTransInfo* pEntry = (SStreamTransInfo*)pIter;
|
||||
SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
|
||||
|
||||
// let's clear the finished trans
|
||||
STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId);
|
||||
STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId);
|
||||
if (pTrans == NULL) {
|
||||
void* pKey = taosHashGetKey(pEntry, &keyLen);
|
||||
void *pKey = taosHashGetKey(pEntry, &keyLen);
|
||||
// key is the name of src/dst db name
|
||||
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
|
||||
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name,
|
||||
pEntry->startTime);
|
||||
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name, pEntry->startTime);
|
||||
taosArrayPush(pList, &info);
|
||||
} else {
|
||||
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
|
||||
num++;
|
||||
}
|
||||
mndReleaseTrans(pMnode, pTrans);
|
||||
}
|
||||
}
|
||||
|
||||
size_t num = taosArrayGetSize(pList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
int32_t size = taosArrayGetSize(pList);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SKeyInfo* pKey = taosArrayGet(pList, i);
|
||||
taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
|
||||
}
|
||||
|
||||
mDebug("clear %d finished stream-trans, remained:%d", (int32_t)num, taosHashGetSize(execInfo.transMgmt.pDBTrans));
|
||||
mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size,
|
||||
taosHashGetSize(execInfo.transMgmt.pDBTrans), num);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
taosArrayDestroy(pList);
|
||||
|
||||
if (pNumOfActiveChkpt != NULL) {
|
||||
*pNumOfActiveChkpt = num;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
|
||||
// For a given stream:
|
||||
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
|
||||
// 2. create/drop/reset/update trans are conflict with any other trans.
|
||||
bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* pTransName, bool lock) {
|
||||
if (lock) {
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
|
@ -78,7 +89,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p
|
|||
return false;
|
||||
}
|
||||
|
||||
clearFinishedTrans(pMnode);
|
||||
mndStreamClearFinishedTrans(pMnode, NULL);
|
||||
|
||||
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
|
||||
if (pEntry != NULL) {
|
||||
|
@ -95,7 +106,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p
|
|||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
return true;
|
||||
} else {
|
||||
mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName);
|
||||
mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
|
||||
}
|
||||
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
|
||||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
|
||||
|
@ -106,7 +117,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p
|
|||
return true;
|
||||
}
|
||||
} else {
|
||||
mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamId);
|
||||
mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
|
||||
}
|
||||
|
||||
if (lock) {
|
||||
|
@ -124,7 +135,7 @@ int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
clearFinishedTrans(pMnode);
|
||||
mndStreamClearFinishedTrans(pMnode, NULL);
|
||||
SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid));
|
||||
if (pEntry != NULL) {
|
||||
SStreamTransInfo tInfo = *pEntry;
|
||||
|
|
|
@ -377,8 +377,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
|
|||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||
if (pReq->dropRelHTask) {
|
||||
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||
|
||||
// commit the update
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue