other: merge stream fix.

This commit is contained in:
Haojun Liao 2023-10-07 09:56:03 +08:00
parent 11d8c8da39
commit 9d5a3b8d78
3 changed files with 3 additions and 8 deletions

View File

@ -241,7 +241,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 10;
int32_t tsStreamCheckpointTickInterval = 300;
int32_t tsStreamNodeCheckInterval = 30;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10;
@ -264,8 +264,6 @@ char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
char tsS3AppId[TSDB_FQDN_LEN] = "<appid>";
int8_t tsS3Enabled = false;
int32_t tsCheckpointInterval = 20;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "dataDir");
@ -642,7 +640,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0)
return -1;

View File

@ -1603,7 +1603,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
const char* sinkStr = "SinkData:%.2fMiB";
const char* sinkStr = "%.2fMiB";
sprintf(buf, sinkStr, pe->sinkDataSize);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// offset info

View File

@ -888,7 +888,6 @@ void metaHbToMnode(void* param, void* tmrId) {
.nodeId = pMeta->vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)),
// .outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)),
};
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
@ -897,8 +896,6 @@ void metaHbToMnode(void* param, void* tmrId) {
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
// entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
if ((*pTask)->exec.pWalReader != NULL) {
entry.offset = (*pTask)->chkInfo.nextProcessVer;
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);