From 3a0af1ea49c4898913f9ab5007a4366df9d64b5e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 19 Dec 2023 10:53:46 +0800 Subject: [PATCH] fix:aggCnt must bigger than 1 & move msg code to the end to be compatible with old version --- include/common/tmsg.h | 4 ++-- source/common/src/tglobal.c | 2 +- source/dnode/mnode/impl/src/mndScheduler.c | 7 ++----- source/libs/stream/src/streamTask.c | 2 +- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e6ed0f250c..bd759df621 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -395,7 +395,6 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_FILL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, @@ -419,7 +418,8 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, - QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL + QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, + QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL } ENodeType; typedef struct { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 4f2ae1b11f..865528086a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -738,7 +738,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER, diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index e91253e689..47461d7c1b 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -534,11 +534,8 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { SArray* pUpTaskList = taosArrayGetP(tasks, size - 2); SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList); - for(int i = begin; i < end; i++){ + for(int i = begin; i < end && i < taosArrayGetSize(pUpTaskList); i++){ SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i); - if(pUpTask == NULL) { // out of range - break; - } streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask); streamTaskSetUpstreamInfo(*pDownTask, pUpTask); } @@ -600,7 +597,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* do{ SArray** list = taosArrayGetLast(pStream->tasks); float size = (float)taosArrayGetSize(*list); - size_t cnt = (int)(size/tsStreamAggCnt + 0.5); + size_t cnt = (size_t)ceil(size/tsStreamAggCnt); if(cnt <= 1) break; mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index db0217f000..1c5d39f92c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -54,7 +54,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory } char buf[128] = {0}; - sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId); + sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId); pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;