diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 95b7591263..90ee6f7cc0 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -235,6 +235,7 @@ extern int32_t tsMqRebalanceInterval; extern int32_t tsStreamCheckpointInterval; extern float tsSinkDataRate; extern int32_t tsStreamNodeCheckInterval; +extern int32_t tsMaxConcurrentCheckpoint; extern int32_t tsTtlUnit; extern int32_t tsTtlPushIntervalSec; extern int32_t tsTtlBatchDropNum; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 691eccd174..2fefeb4cf2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -274,6 +274,7 @@ int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointInterval = 60; float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 16; +int32_t tsMaxConcurrentCheckpoint = 1; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups @@ -621,114 +622,80 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { } static int32_t taosAddServerCfg(SConfig *pCfg) { - if (cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + tsNumOfCommitThreads = tsNumOfCores / 2; + tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); tsNumOfSupportVnodes = tsNumOfCores * 2 + 5; tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2); - if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) - return -1; - - if (cfgAddString(pCfg, "encryptAlgorithm", tsEncryptAlgorithm, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddString(pCfg, "encryptScope", tsEncryptScope, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - // if (cfgAddString(pCfg, "authCode", tsAuthCode, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - - if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) - return -1; - if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) - return -1; - - if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; - if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - - tsNumOfCommitThreads = tsNumOfCores / 2; - tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); - if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; tsNumOfMnodeReadThreads = tsNumOfCores / 8; tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4); - if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; tsNumOfVnodeQueryThreads = tsNumOfCores * 2; tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 16); - if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != - 0) - return -1; - - if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER, - CFG_DYN_NONE) != 0) - return -1; tsNumOfVnodeFetchThreads = tsNumOfCores / 4; tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); - if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != - 0) - return -1; tsNumOfVnodeRsmaThreads = tsNumOfCores / 4; tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4); - if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; tsNumOfQnodeQueryThreads = tsNumOfCores * 2; tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 16); - if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != - 0) - return -1; + + tsNumOfSnodeStreamThreads = tsNumOfCores / 4; + tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4); + + tsNumOfSnodeWriteThreads = tsNumOfCores / 4; + tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4); + + tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; + tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + + // clang-format off + if (cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + + if (cfgAddString(pCfg, "encryptAlgorithm", tsEncryptAlgorithm, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddString(pCfg, "encryptScope", tsEncryptScope, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + + if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; + if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; + + if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + + if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + + if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; // tsNumOfQnodeFetchThreads = tsNumOfCores / 2; // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; - tsNumOfSnodeStreamThreads = tsNumOfCores / 4; - tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4); - if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, - CFG_DYN_NONE) != 0) - return -1; + if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - tsNumOfSnodeWriteThreads = tsNumOfCores / 4; - tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4); - if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != - 0) - return -1; + if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; - tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; - tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); - if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, - CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) - return -1; + if (cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != - 0) - return -1; - if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, - CFG_DYN_NONE) != 0) - return -1; - if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, - CFG_DYN_NONE) != 0) - return -1; - if (cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), - CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; + if (cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "arbSetAssignedTimeoutSec", tsArbSetAssignedTimeoutSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, - CFG_DYN_NONE) != 0) - return -1; - if (cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, - CFG_DYN_NONE) != 0) - return -1; - if (cfgAddInt32(pCfg, "arbSetAssignedTimeoutSec", tsArbSetAssignedTimeoutSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, - CFG_DYN_NONE) != 0) - return -1; - - if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) - return -1; - if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; + if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "skipGrant", tsMndSkipGrant, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -736,9 +703,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorLogProtocol", tsMonitorLogProtocol, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER, - CFG_DYN_NONE) != 0) - return -1; + if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -752,68 +717,43 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "rsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH, CFG_DYN_SERVER) != 0) return -1; if (cfgAddString(pCfg, "snodeAddress", tsSnodeAddress, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; - if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) - return -1; + if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) - return -1; + if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "tmqRowSize", tmqRowSize, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "maxTsmaNum", tsMaxTsmaNum, 0, 3, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != - 0) - return -1; - if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, - CFG_DYN_ENT_SERVER) != 0) - return -1; - if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != - 0) - return -1; + if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) - return -1; - if (cfgAddInt32(pCfg, "ttlBatchDropNum", tsTtlBatchDropNum, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) - return -1; - if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "ttlFlushThreshold", tsTtlFlushThreshold, -1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != - 0) - return -1; - if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != - 0) - return -1; - if (cfgAddInt32(pCfg, "s3MigrateIntervalSec", tsS3MigrateIntervalSec, 600, 100000, CFG_SCOPE_SERVER, - CFG_DYN_ENT_SERVER) != 0) - return -1; + if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "ttlBatchDropNum", tsTtlBatchDropNum, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "ttlFlushThreshold", tsTtlFlushThreshold, -1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3MigrateIntervalSec", tsS3MigrateIntervalSec, 600, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "s3MigrateEnabled", tsS3MigrateEnabled, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; - if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != - 0) - return -1; + if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, - CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; + if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; + if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER, - CFG_DYN_ENT_SERVER) != 0) - return -1; + if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddFloat(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "concurrentCheckpoint", tsMaxConcurrentCheckpoint, 1, 10, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER, - CFG_DYN_ENT_SERVER) != 0) - return -1; + if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddFloat(pCfg, "dPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -823,41 +763,23 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "compressor", tsCompressor, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, CFG_SCOPE_SERVER, - CFG_DYN_ENT_SERVER) != 0) - return -1; - if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; - if (cfgAddInt32(pCfg, "resolveFQDNRetryTime", tsResolveFQDNRetryTime, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; + if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "resolveFQDNRetryTime", tsResolveFQDNRetryTime, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - /* - if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -1, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) - return -1; - if (tsS3BlockSize > -1 && tsS3BlockSize < 1024) { - uError("failed to config s3blocksize since value:%d. Valid range: -1 or [1024, 1024 * 1024]", tsS3BlockSize); - return -1; - } - if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != - 0) - return -1; - */ - if (cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, - CFG_DYN_ENT_SERVER) != 0) - return -1; - if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER, - CFG_DYN_ENT_SERVER) != 0) - return -1; + + if (cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; // min free disk space used to check if the disk is full [50MB, 1GB] - if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, - CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) - return -1; + if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + // clang-format on + // GRANT_CFG_ADD; return 0; } diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 5a4caf3348..bb10a9b9ad 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -97,6 +97,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt); int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId); bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index a78edcb05e..a228166124 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -90,21 +90,6 @@ static void *mndBuildTimerMsg(int32_t *pContLen) { return pReq; } -static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) { - SMStreamTickReq timerReq = { - .tick = sec, - }; - - int32_t contLen = tSerializeSMStreamTickMsg(NULL, 0, &timerReq); - if (contLen <= 0) return NULL; - void *pReq = rpcMallocCont(contLen); - if (pReq == NULL) return NULL; - - tSerializeSMStreamTickMsg(pReq, contLen, &timerReq); - *pContLen = contLen; - return pReq; -} - static void mndPullupTrans(SMnode *pMnode) { mTrace("pullup trans msg"); int32_t contLen = 0; @@ -174,21 +159,12 @@ static void mndCalMqRebalance(SMnode *pMnode) { } } -static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { - int32_t contLen = 0; - void *pReq = mndBuildCheckpointTickMsg(&contLen, sec); - if (pReq != NULL) { - SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER, .pCont = pReq, .contLen = contLen}; - tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); - } -} - -static void mndStreamCheckpointRemain(SMnode *pMnode) { - int32_t contLen = 0; - void *pReq = mndBuildCheckpointTickMsg(&contLen, 0); - if (pReq != NULL) { - SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen}; - tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); +static void mndStreamCheckpointTimer(SMnode *pMnode) { + SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); + if (pMsg != NULL) { + int32_t size = sizeof(SMStreamDoCheckpointMsg); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } @@ -367,12 +343,8 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { mndCalMqRebalance(pMnode); } - if (sec % tsStreamCheckpointInterval == 0) { - mndStreamCheckpointTick(pMnode, sec); - } - - if (sec % 5 == 0) { - mndStreamCheckpointRemain(pMnode); + if (sec % 30 == 0) { // send the checkpoint info every 10 sec + mndStreamCheckpointTimer(pMnode); } if (sec % tsStreamNodeCheckInterval == 0) { @@ -399,6 +371,7 @@ void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) { mndSyncCheckTimeout(pMnode); } } + static void *mndThreadFp(void *param) { SMnode *pMnode = param; int64_t lastTime = 0; @@ -832,10 +805,9 @@ _OVER: pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER || pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER || - pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE || - pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT || - pMsg->msgType == TDMT_MND_S3MIGRATE_DB_TIMER || - pMsg->msgType == TDMT_MND_ARB_HEARTBEAT_TIMER || pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER) { + pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT || + pMsg->msgType == TDMT_MND_S3MIGRATE_DB_TIMER || pMsg->msgType == TDMT_MND_ARB_HEARTBEAT_TIMER || + pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER) { mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, pMnode->stopped, state.restored, syncStr(state.state)); return -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 989d9970cd..ef29547e20 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -45,9 +45,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq); static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq); -static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq); static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); -static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -114,10 +112,8 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, mndProcessStreamCheckpointInCandid); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); @@ -886,26 +882,10 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { } } - mDebug("generated checkpoint %" PRId64 "", maxChkptId + 1); + mDebug("generate new checkpointId:%" PRId64, maxChkptId + 1); return maxChkptId + 1; } -static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { - return 0; - } - - SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - pMsg->checkpointId = mndStreamGenChkptId(pMnode, true); - - int32_t size = sizeof(SMStreamDoCheckpointMsg); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - return 0; -} - static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) { SStreamCheckpointSourceReq req = {0}; @@ -1136,73 +1116,101 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { return ready ? 0 : -1; } +typedef struct { + int64_t streamId; + int64_t duration; +} SCheckpointInterval; + +static int32_t streamWaitComparFn(const void* p1, const void* p2) { + const SCheckpointInterval* pInt1 = p1; + const SCheckpointInterval* pInt2 = p2; + if (pInt1->duration == pInt2->duration) { + return 0; + } + + return pInt1->duration > pInt2->duration? -1:1; +} + static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SStreamObj *pStream = NULL; int32_t code = 0; + int32_t numOfCheckpointTrans = 0; if ((code = mndCheckNodeStatus(pMnode)) != 0) { return code; } - // make sure the time interval between two consecutive checkpoint trans is long enough - SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; + SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval)); + int64_t now = taosGetTimestampMs(); while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { - code = mndProcessStreamCheckpointTrans(pMnode, pStream, pMsg->checkpointId, 1, true); - sdbRelease(pSdb, pStream); - if (code == -1) { - break; - } - } - - return code; -} - -static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - void *pIter = NULL; - int32_t code = 0; - - taosThreadMutexLock(&execInfo.lock); - int32_t num = taosHashGetSize(execInfo.transMgmt.pWaitingList); - taosThreadMutexUnlock(&execInfo.lock); - if (num == 0) { - return code; - } - - if ((code = mndCheckNodeStatus(pMnode)) != 0) { - return code; - } - - SArray *pList = taosArrayInit(4, sizeof(int64_t)); - while ((pIter = taosHashIterate(execInfo.transMgmt.pWaitingList, pIter)) != NULL) { - SCheckpointCandEntry *pEntry = pIter; - - SStreamObj *ps = mndAcquireStream(pMnode, pEntry->pName); - if (ps == NULL) { + int64_t duration = now - pStream->checkpointFreq; + if (duration < tsStreamCheckpointInterval * 1000) { + sdbRelease(pSdb, pStream); continue; } - mDebug("start to launch checkpoint for stream:%s %" PRIx64 " in candidate list", pEntry->pName, pEntry->streamId); + SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration}; + taosArrayPush(pList, &in); - code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId, 1, true); - mndReleaseStream(pMnode, ps); + int32_t currentSize = taosArrayGetSize(pList); + mDebug("stream:%s (uid:0x%" PRIx64 ") checkpoint interval beyond threshold: %" PRId64 "s(%" PRId64 + "s) beyond threshold:%d", + pStream->name, pStream->uid, tsStreamCheckpointInterval, duration / 1000, currentSize); - if (code == TSDB_CODE_SUCCESS) { - taosArrayPush(pList, &pEntry->streamId); + sdbRelease(pSdb, pStream); + } + + int32_t size = taosArrayGetSize(pList); + if (size == 0) { + taosArrayDestroy(pList); + return code; + } + + taosArraySort(pList, streamWaitComparFn); + mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans); + int32_t numOfQual = taosArrayGetSize(pList); + + if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) { + mDebug( + "%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new " + "checkpoint trans are not allowed, wait for 30s", + numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, tsMaxConcurrentCheckpoint); + taosArrayDestroy(pList); + return code; + } + + int32_t capacity = tsMaxConcurrentCheckpoint - numOfCheckpointTrans; + mDebug( + "%d stream(s) checkpoint interval longer than %ds, %d ongoing checkpoint trans, %d new checkpoint trans allowed, " + "concurrent trans threshold:%d", + numOfQual, tsStreamCheckpointInterval, numOfCheckpointTrans, capacity, tsMaxConcurrentCheckpoint); + + int32_t started = 0; + int64_t checkpointId = mndStreamGenChkptId(pMnode, true); + + for (int32_t i = 0; i < numOfQual; ++i) { + SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i); + + SStreamObj *p = mndGetStreamObj(pMnode, pCheckpointInfo->streamId); + if (p != NULL) { + code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); + sdbRelease(pSdb, p); + + if (code != -1) { + started += 1; + + if (started >= capacity) { + mDebug("already start %d new checkpoint trans, current active checkpoint trans:%d", started, + (started + numOfCheckpointTrans)); + break; + } + } } } - for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { - int64_t *pId = taosArrayGet(pList, i); - - taosHashRemove(execInfo.transMgmt.pWaitingList, pId, sizeof(*pId)); - } - - int32_t remain = taosHashGetSize(execInfo.transMgmt.pWaitingList); - mDebug("%d in candidate list generated checkpoint, remaining:%d", (int32_t)taosArrayGetSize(pList), remain); taosArrayDestroy(pList); return code; } @@ -2349,10 +2357,10 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } - SMStreamNodeCheckMsg *pMsg = rpcMallocCont(sizeof(SMStreamNodeCheckMsg)); + int32_t size = sizeof(SMStreamNodeCheckMsg); + SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size); - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)}; + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 74ad09c752..ac4cb08308 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -21,8 +21,6 @@ typedef struct SKeyInfo { int32_t keyLen; } SKeyInfo; -static int32_t clearFinishedTrans(SMnode* pMnode); - int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamId) { SStreamTransInfo info = { .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId}; @@ -30,41 +28,54 @@ int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t s return 0; } -int32_t clearFinishedTrans(SMnode* pMnode) { +int32_t mndStreamClearFinishedTrans(SMnode* pMnode, int32_t* pNumOfActiveChkpt) { size_t keyLen = 0; void* pIter = NULL; SArray* pList = taosArrayInit(4, sizeof(SKeyInfo)); + int32_t num = 0; while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { - SStreamTransInfo* pEntry = (SStreamTransInfo*)pIter; + SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter; // let's clear the finished trans - STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId); + STrans *pTrans = mndAcquireTrans(pMnode, pEntry->transId); if (pTrans == NULL) { - void* pKey = taosHashGetKey(pEntry, &keyLen); + void *pKey = taosHashGetKey(pEntry, &keyLen); // key is the name of src/dst db name SKeyInfo info = {.pKey = pKey, .keyLen = keyLen}; - mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name, - pEntry->startTime); + mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name, pEntry->startTime); taosArrayPush(pList, &info); } else { + if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) { + num++; + } mndReleaseTrans(pMnode, pTrans); } } - size_t num = taosArrayGetSize(pList); - for (int32_t i = 0; i < num; ++i) { + int32_t size = taosArrayGetSize(pList); + for (int32_t i = 0; i < size; ++i) { SKeyInfo* pKey = taosArrayGet(pList, i); taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen); } - mDebug("clear %d finished stream-trans, remained:%d", (int32_t)num, taosHashGetSize(execInfo.transMgmt.pDBTrans)); + mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size, + taosHashGetSize(execInfo.transMgmt.pDBTrans), num); terrno = TSDB_CODE_SUCCESS; taosArrayDestroy(pList); + + if (pNumOfActiveChkpt != NULL) { + *pNumOfActiveChkpt = num; + } + return 0; } +// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream. +// For a given stream: +// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. +// 2. create/drop/reset/update trans are conflict with any other trans. bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* pTransName, bool lock) { if (lock) { taosThreadMutexLock(&execInfo.lock); @@ -78,7 +89,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p return false; } - clearFinishedTrans(pMnode); + mndStreamClearFinishedTrans(pMnode, NULL); SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId)); if (pEntry != NULL) { @@ -95,7 +106,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p terrno = TSDB_CODE_MND_TRANS_CONFLICT; return true; } else { - mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName); + mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName); } } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) || @@ -106,7 +117,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p return true; } } else { - mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamId); + mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId); } if (lock) { @@ -124,7 +135,7 @@ int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { return 0; } - clearFinishedTrans(pMnode); + mndStreamClearFinishedTrans(pMnode, NULL); SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid)); if (pEntry != NULL) { SStreamTransInfo tInfo = *pEntry; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 76bf139c3f..6b02ae485f 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -377,8 +377,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); - - // commit the update int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); }