diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 56bf4f818d..9b987ff1a4 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -144,6 +144,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI _end: pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur); + pNextWinKey->groupId = groupId; code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pNextWinKey, NULL, 0); if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_KEY_INVALID(pNextWinKey); @@ -160,7 +161,7 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) { *pRebuild = false; - if (!pWinInfo->pWinFlag->startFlag) { + if (!pWinInfo->pWinFlag->startFlag && !(starts[start]) ) { return 1; } @@ -303,7 +304,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl int32_t winIndex = 0; bool allEqual = true; SEventWindowInfo curWin = {0}; - SSessionKey nextWinKey; + SSessionKey nextWinKey = {0}; setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin, &nextWinKey); setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); bool rebuild = false;