diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8c591702c1..919b6545dc 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -28,6 +28,7 @@ #include "ttime.h" #define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) +#define IS_MID_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) #define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) #define DEAULT_DELETE_MARK INT64_MAX #define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" @@ -237,7 +238,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa dumyInfo.cur.pageId = -1; STimeWindow win = {0}; - if (IS_FINAL_INTERVAL_OP(pOperator)) { + if (IS_FINAL_INTERVAL_OP(pOperator) || IS_MID_INTERVAL_OP(pOperator)) { win.skey = startTsCols[i]; win.ekey = endTsCols[i]; } else { @@ -4113,8 +4114,7 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS STimeWindow nextWin = getFinalTimeWindow(ts, &pInfo->interval); while (1) { - bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); - if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { + if (!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCol, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { break;