feat:add configure for agg cnt

This commit is contained in:
wangmm0220 2023-12-08 19:00:10 +08:00
parent 2a506b3e5e
commit d6114f90de
3 changed files with 9 additions and 7 deletions

View File

@ -208,6 +208,7 @@ extern int32_t tsUptimeInterval;
extern bool tsDisableStream; extern bool tsDisableStream;
extern int64_t tsStreamBufferSize; extern int64_t tsStreamBufferSize;
extern int tsStreamAggCnt;
extern bool tsFilterScalarMode; extern bool tsFilterScalarMode;
extern int32_t tsMaxStreamBackendCache; extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold; extern int32_t tsPQSortMemThreshold;

View File

@ -262,6 +262,7 @@ bool tsDisableStream = false;
int64_t tsStreamBufferSize = 128 * 1024 * 1024; int64_t tsStreamBufferSize = 128 * 1024 * 1024;
bool tsFilterScalarMode = false; bool tsFilterScalarMode = false;
int tsResolveFQDNRetryTime = 100; // seconds int tsResolveFQDNRetryTime = 100; // seconds
int tsStreamAggCnt = 10;
char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>"; char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>";
char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>"; char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>";
@ -737,6 +738,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1; return -1;
if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER, if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER,
CFG_DYN_ENT_SERVER) != 0) CFG_DYN_ENT_SERVER) != 0)
@ -1192,6 +1195,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
tsStreamAggCnt = cfgGetItem(pCfg, "streamAggCnt")->i32;
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
tsStreamCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i32; tsStreamCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i32;
tsSinkDataRate = cfgGetItem(pCfg, "streamSinkDataRate")->fval; tsSinkDataRate = cfgGetItem(pCfg, "streamSinkDataRate")->fval;

View File

@ -24,9 +24,6 @@
#include "tname.h" #include "tname.h"
#include "tuuid.h" #include "tuuid.h"
#define SINK_NODE_LEVEL (0)
#define SOURCE_NODE_LEVEL (0)
#define SINK_NODE_LEVEL (0)
#define SINK_NODE_LEVEL (0) #define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode; extern bool tsDeployOnSnode;
@ -553,7 +550,6 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
} }
} }
#define AGGNUM 2
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
@ -607,7 +603,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
do{ do{
SArray** list = taosArrayGetLast(pStream->tasks); SArray** list = taosArrayGetLast(pStream->tasks);
float size = (float)taosArrayGetSize(*list); float size = (float)taosArrayGetSize(*list);
size_t cnt = (int)(size/AGGNUM + 0.5); size_t cnt = (int)(size/tsStreamAggCnt + 0.5);
if(cnt <= 1) break; if(cnt <= 1) break;
addNewTaskList(pStream->tasks); addNewTaskList(pStream->tasks);
@ -621,9 +617,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
return code; return code;
} }
bindTwoLevel(pStream->tasks, j*AGGNUM, (j+1)*AGGNUM); bindTwoLevel(pStream->tasks, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt);
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
bindTwoLevel(pStream->pHTasksList, j*AGGNUM, (j+1)*AGGNUM); bindTwoLevel(pStream->pHTasksList, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt);
} }
} }
}while(1); }while(1);