refactor: do some internal refactor W.R.T. global configurations.

This commit is contained in:
Haojun Liao 2023-10-17 08:17:49 +08:00
parent 164159f293
commit 4badb445c4
4 changed files with 22 additions and 24 deletions

View File

@ -193,7 +193,7 @@ extern int64_t tsWalFsyncDataSizeLimit;
// internal // internal
extern int32_t tsTransPullupInterval; extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval; extern int32_t tsMqRebalanceInterval;
extern int32_t tsStreamCheckpointTickInterval; extern int32_t tsStreamCheckpointInterval;
extern double tsSinkDataRate; extern double tsSinkDataRate;
extern int32_t tsStreamNodeCheckInterval; extern int32_t tsStreamNodeCheckInterval;
extern int32_t tsTtlUnit; extern int32_t tsTtlUnit;

View File

@ -244,7 +244,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal // internal
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 300; int32_t tsStreamCheckpointInterval = 300;
double tsSinkDataRate = 2.0; double tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 30; int32_t tsStreamNodeCheckInterval = 30;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
@ -270,8 +270,6 @@ int8_t tsS3Enabled = false;
int32_t tsS3BlockSize = 4096; // number of tsdb pages int32_t tsS3BlockSize = 4096; // number of tsdb pages
int32_t tsS3BlockCacheSize = 16; // number of blocks int32_t tsS3BlockCacheSize = 16; // number of blocks
int32_t tsCheckpointInterval = 300;
#ifndef _STORAGE #ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); SConfigItem *pItem = cfgGetItem(pCfg, "dataDir");
@ -651,19 +649,18 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamcheckpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddDouble(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddDouble(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1;
return -1;
if (cfgAddString(pCfg, "LossyColumns", tsLossyColumns, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "lossyColumns", tsLossyColumns, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddFloat(pCfg, "FPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddDouble(pCfg, "DPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddDouble(pCfg, "dPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "MaxRange", tsMaxRange, 0, 65536, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "maxRange", tsMaxRange, 0, 65536, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "CurRange", tsCurRange, 0, 65536, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "curRange", tsCurRange, 0, 65536, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "IfAdtFse", tsIfAdtFse, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "ifAdtFse", tsIfAdtFse, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "Compressor", tsCompressor, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "compressor", tsCompressor, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, CFG_SCOPE_SERVER) != 0) return -1;
@ -1082,17 +1079,18 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsCacheLazyLoadThreshold = cfgGetItem(pCfg, "cacheLazyLoadThreshold")->i32; tsCacheLazyLoadThreshold = cfgGetItem(pCfg, "cacheLazyLoadThreshold")->i32;
tstrncpy(tsLossyColumns, cfgGetItem(pCfg, "LossyColumns")->str, sizeof(tsLossyColumns)); tstrncpy(tsLossyColumns, cfgGetItem(pCfg, "lossyColumns")->str, sizeof(tsLossyColumns));
tsFPrecision = cfgGetItem(pCfg, "FPrecision")->fval; tsFPrecision = cfgGetItem(pCfg, "fPrecision")->fval;
tsDPrecision = cfgGetItem(pCfg, "DPrecision")->dval; tsDPrecision = cfgGetItem(pCfg, "dPrecision")->dval;
tsMaxRange = cfgGetItem(pCfg, "MaxRange")->i32; tsMaxRange = cfgGetItem(pCfg, "maxRange")->i32;
tsCurRange = cfgGetItem(pCfg, "CurRange")->i32; tsCurRange = cfgGetItem(pCfg, "curRange")->i32;
tsIfAdtFse = cfgGetItem(pCfg, "IfAdtFse")->bval; tsIfAdtFse = cfgGetItem(pCfg, "ifAdtFse")->bval;
tstrncpy(tsCompressor, cfgGetItem(pCfg, "Compressor")->str, sizeof(tsCompressor)); tstrncpy(tsCompressor, cfgGetItem(pCfg, "compressor")->str, sizeof(tsCompressor));
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
tsStreamCheckpointInterval = cfgGetItem(pCfg, "streamcheckpointInterval")->i32;
tsSinkDataRate = cfgGetItem(pCfg, "streamSinkDataRate")->dval;
tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval; tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval;
tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32; tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32;

View File

@ -281,7 +281,7 @@ static void *mndThreadFp(void *param) {
mndCalMqRebalance(pMnode); mndCalMqRebalance(pMnode);
} }
if (sec % tsStreamCheckpointTickInterval == 0) { if (sec % tsStreamCheckpointInterval == 0) {
mndStreamCheckpointTick(pMnode, sec); mndStreamCheckpointTick(pMnode, sec);
} }

View File

@ -937,7 +937,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
} }
// static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) { // static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) {
// int64_t timestampMs = taosGetTimestampMs(); // int64_t timestampMs = taosGetTimestampMs();
// if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) { // if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000) {
// return -1; // return -1;
// } // }