fix:aggCnt must bigger than 1 & move msg code to the end to be compatible with old version

This commit is contained in:
wangmm0220 2023-12-19 10:53:46 +08:00
parent 64f98744e2
commit 3a0af1ea49
4 changed files with 6 additions and 9 deletions

View File

@ -395,7 +395,6 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION,
@ -419,7 +418,8 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT,
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN,
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE,
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL
} ENodeType;
typedef struct {

View File

@ -738,7 +738,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER,

View File

@ -534,11 +534,8 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
for(int i = begin; i < end; i++){
for(int i = begin; i < end && i < taosArrayGetSize(pUpTaskList); i++){
SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
if(pUpTask == NULL) { // out of range
break;
}
streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
streamTaskSetUpstreamInfo(*pDownTask, pUpTask);
}
@ -600,7 +597,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
do{
SArray** list = taosArrayGetLast(pStream->tasks);
float size = (float)taosArrayGetSize(*list);
size_t cnt = (int)(size/tsStreamAggCnt + 0.5);
size_t cnt = (size_t)ceil(size/tsStreamAggCnt);
if(cnt <= 1) break;
mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);

View File

@ -54,7 +54,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
}
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
pTask->id.idStr = taosStrdup(buf);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;