From 9d3a00920e5a2c2c78fcf31657865119f6a0a4e5 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 27 Feb 2025 11:23:40 +0800 Subject: [PATCH] fix(stream): fix mem leak && rebuild event window when the window end flag is modified --- source/libs/executor/src/streameventwindowoperator.c | 5 +++++ source/libs/executor/src/streamtimewindowoperator.c | 12 +++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index fa6008eba7..a2304a2e6c 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -234,6 +234,11 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW pWin->ekey = pTsData[i]; pWinInfo->pWinFlag->endFlag = ends[i]; } else if (pWin->ekey == pTsData[i]) { + if (pWinInfo->pWinFlag->endFlag == true && ends[i] == false) { + (*pWinRow) = i + 1 - start; + *pRebuild = true; + goto _end; + } pWinInfo->pWinFlag->endFlag |= ends[i]; } else if (ends[i] && !pWinInfo->pWinFlag->endFlag) { *pRebuild = true; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5cab26b9d3..3b6c1963dc 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -479,14 +479,12 @@ void destroyFlusedppPos(void* ppRes) { } void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { - if (pGroupResInfo->freeItem) { - int32_t size = taosArrayGetSize(pGroupResInfo->pRows); - for (int32_t i = pGroupResInfo->index; i < size; i++) { - void* pPos = taosArrayGetP(pGroupResInfo->pRows, i); - destroyFlusedPos(pPos); - } - pGroupResInfo->freeItem = false; + int32_t size = taosArrayGetSize(pGroupResInfo->pRows); + for (int32_t i = pGroupResInfo->index; i < size; i++) { + void* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + destroyFlusedPos(pPos); } + pGroupResInfo->freeItem = false; taosArrayDestroy(pGroupResInfo->pRows); pGroupResInfo->pRows = NULL; pGroupResInfo->index = 0;