diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index d258eb08ff..eacb2fcfc8 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -251,6 +251,11 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW pWin->ekey = pTsData[i]; pWinInfo->pWinFlag->endFlag = ends[i]; } else if (pWin->ekey == pTsData[i]) { + if (pWinInfo->pWinFlag->endFlag == true && ends[i] == false) { + (*pWinRow) = i + 1 - start; + *pRebuild = true; + goto _end; + } pWinInfo->pWinFlag->endFlag |= ends[i]; } else if (ends[i] && !pWinInfo->pWinFlag->endFlag) { *pRebuild = true; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 747133379f..33efd0cfb1 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -474,14 +474,12 @@ void destroyFlusedppPos(void* ppRes) { } void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { - if (pGroupResInfo->freeItem) { - int32_t size = taosArrayGetSize(pGroupResInfo->pRows); - for (int32_t i = pGroupResInfo->index; i < size; i++) { - void* pPos = taosArrayGetP(pGroupResInfo->pRows, i); - destroyFlusedPos(pPos); - } - pGroupResInfo->freeItem = false; + int32_t size = taosArrayGetSize(pGroupResInfo->pRows); + for (int32_t i = pGroupResInfo->index; i < size; i++) { + void* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + destroyFlusedPos(pPos); } + pGroupResInfo->freeItem = false; taosArrayDestroy(pGroupResInfo->pRows); pGroupResInfo->pRows = NULL; pGroupResInfo->index = 0;