From d6114f90de9679ad6b609e24ab193c91f58fb4c9 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 8 Dec 2023 19:00:10 +0800 Subject: [PATCH] feat:add configure for agg cnt --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 5 +++++ source/dnode/mnode/impl/src/mndScheduler.c | 10 +++------- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 33cfada338..31b98a0121 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -208,6 +208,7 @@ extern int32_t tsUptimeInterval; extern bool tsDisableStream; extern int64_t tsStreamBufferSize; +extern int tsStreamAggCnt; extern bool tsFilterScalarMode; extern int32_t tsMaxStreamBackendCache; extern int32_t tsPQSortMemThreshold; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 98d79c5295..4f2ae1b11f 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -262,6 +262,7 @@ bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds +int tsStreamAggCnt = 10; char tsS3Endpoint[TSDB_FQDN_LEN] = ""; char tsS3AccessKey[TSDB_FQDN_LEN] = ""; @@ -737,6 +738,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + return -1; if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) @@ -1192,6 +1195,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; + tsStreamAggCnt = cfgGetItem(pCfg, "streamAggCnt")->i32; + tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; tsStreamCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i32; tsSinkDataRate = cfgGetItem(pCfg, "streamSinkDataRate")->fval; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 8213a21d7f..3de7a55820 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -24,9 +24,6 @@ #include "tname.h" #include "tuuid.h" -#define SINK_NODE_LEVEL (0) -#define SOURCE_NODE_LEVEL (0) -#define SINK_NODE_LEVEL (0) #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; @@ -553,7 +550,6 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { } } -#define AGGNUM 2 static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); @@ -607,7 +603,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* do{ SArray** list = taosArrayGetLast(pStream->tasks); float size = (float)taosArrayGetSize(*list); - size_t cnt = (int)(size/AGGNUM + 0.5); + size_t cnt = (int)(size/tsStreamAggCnt + 0.5); if(cnt <= 1) break; addNewTaskList(pStream->tasks); @@ -621,9 +617,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* return code; } - bindTwoLevel(pStream->tasks, j*AGGNUM, (j+1)*AGGNUM); + bindTwoLevel(pStream->tasks, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt); if (pStream->conf.fillHistory) { - bindTwoLevel(pStream->pHTasksList, j*AGGNUM, (j+1)*AGGNUM); + bindTwoLevel(pStream->pHTasksList, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt); } } }while(1);