From 246ad22e0ba4febf1491dc5a5d86e2972dd36fba Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 21 Dec 2023 14:34:37 +0800 Subject: [PATCH 1/2] mid operator igore expired data --- source/libs/executor/src/streamtimewindowoperator.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8c591702c1..8870a5b1c4 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -4113,8 +4113,7 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS STimeWindow nextWin = getFinalTimeWindow(ts, &pInfo->interval); while (1) { - bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); - if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { + if (!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCol, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { break; From 036fa8d0d951c635960dd488d4427bfd93a735f0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 21 Dec 2023 19:51:09 +0800 Subject: [PATCH 2/2] sliding window --- source/libs/executor/src/streamtimewindowoperator.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8870a5b1c4..919b6545dc 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -28,6 +28,7 @@ #include "ttime.h" #define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) +#define IS_MID_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) #define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) #define DEAULT_DELETE_MARK INT64_MAX #define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" @@ -237,7 +238,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa dumyInfo.cur.pageId = -1; STimeWindow win = {0}; - if (IS_FINAL_INTERVAL_OP(pOperator)) { + if (IS_FINAL_INTERVAL_OP(pOperator) || IS_MID_INTERVAL_OP(pOperator)) { win.skey = startTsCols[i]; win.ekey = endTsCols[i]; } else {