From 891249191e1cd20aa63ba0a32b4dde54da97c57e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Feb 2024 16:01:46 +0800 Subject: [PATCH] stream count window --- source/libs/executor/src/countwindowoperator.c | 2 +- source/libs/executor/src/operator.c | 2 ++ source/libs/executor/src/scanoperator.c | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index e91afb2e38..ffed6a7788 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -66,7 +66,7 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pBlock->info.rows; i++) { - // 1.如果group id发生变化,获取新group id上一次的window的缓存,并把旧group id的信息存入缓存。 + // todo(liuyao) 1.如果group id发生变化,获取新group id上一次的window的缓存,并把旧group id的信息存入缓存。 // 2.计算 当前需要合并的行数 // 3.做聚集计算。 // 4.达到行数,将结果存入pInfo->res中。 diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 4c4e83fced..bf5d1b2019 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -541,6 +541,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT == type) { pOptr = createStreamCountAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) { + pOptr = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else { terrno = TSDB_CODE_INVALID_PARA; pTaskInfo->code = terrno; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c056f33504..8747082c08 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1284,7 +1284,7 @@ static bool isSlidingWindow(SStreamScanInfo* pInfo) { } static bool isCountSlidingWindow(SStreamScanInfo* pInfo) { - return pInfo->windowSup.pStreamAggSup->windowCount != pInfo->windowSup.pStreamAggSup->windowSliding; + return pInfo->windowSup.pStreamAggSup && (pInfo->windowSup.pStreamAggSup->windowCount != pInfo->windowSup.pStreamAggSup->windowSliding); } static bool isCountWindow(SStreamScanInfo* pInfo) {