diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1bbb969e7b..6e5eb478c3 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -241,7 +241,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 10; +int32_t tsStreamCheckpointTickInterval = 300; int32_t tsStreamNodeCheckInterval = 30; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; @@ -264,8 +264,6 @@ char tsS3BucketName[TSDB_FQDN_LEN] = ""; char tsS3AppId[TSDB_FQDN_LEN] = ""; int8_t tsS3Enabled = false; -int32_t tsCheckpointInterval = 20; - #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); @@ -642,7 +640,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e821e952e0..82ae6a846c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1603,7 +1603,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - const char* sinkStr = "SinkData:%.2fMiB"; + const char* sinkStr = "%.2fMiB"; sprintf(buf, sinkStr, pe->sinkDataSize); } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 358a75b4d9..2583e6427f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -888,7 +888,6 @@ void metaHbToMnode(void* param, void* tmrId) { .nodeId = pMeta->vgId, .stage = pMeta->stage, .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)), -// .outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)), }; entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; @@ -897,8 +896,6 @@ void metaHbToMnode(void* param, void* tmrId) { entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } -// entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; - if ((*pTask)->exec.pWalReader != NULL) { entry.offset = (*pTask)->chkInfo.nextProcessVer; walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);