From 1e02f823a42e023c7d439fafc2e1db72030ec83f Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 29 Nov 2023 14:59:22 +0800 Subject: [PATCH] compact history window --- .../libs/executor/src/streameventwindowoperator.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 8029b9b156..9f1610e08d 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -603,13 +603,20 @@ void streamEventReloadState(SOperatorInfo* pOperator) { pSeKeyBuf[i].groupId, i); getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo); setEventWindowFlag(pAggSup, &curInfo); - if (!curInfo.pWinFlag->startFlag || !curInfo.pWinFlag->endFlag) { + if (!curInfo.pWinFlag->startFlag || curInfo.pWinFlag->endFlag) { continue; } compactEventWindow(pOperator, &curInfo, pInfo->pSeUpdated, pInfo->pSeDeleted, false); qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, curInfo.winInfo.sessionWin.groupId); + if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { + saveSessionOutputBuf(pAggSup, &curInfo.winInfo); + } + + if (!curInfo.pWinFlag->endFlag) { + continue; + } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResult(curInfo.winInfo, pInfo->pSeUpdated); @@ -621,10 +628,6 @@ void streamEventReloadState(SOperatorInfo* pOperator) { getSessionHashKey(&curInfo.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo)); } - - if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { - saveSessionOutputBuf(pAggSup, &curInfo.winInfo); - } } taosMemoryFree(pBuf);