From c7a50acd632af1789f2a195ce4652c268d359a32 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 23 Feb 2024 09:05:30 +0800 Subject: [PATCH] rebuild sliding window --- source/libs/executor/src/streamcountwindowoperator.c | 3 +++ source/libs/stream/src/streamSessionState.c | 1 + 2 files changed, 4 insertions(+) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 689ba54ca6..294c2730df 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -99,6 +99,9 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, (void**)&pCurWin->winInfo.pStatePos, &size); } } + if (ts < pCurWin->winInfo.sessionWin.win.ekey) { + pBuffInfo->rebuildWindow = true; + } } else { code = pAggSup->stateStore.streamStateCountWinAddIfNotExist( pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index e2aae130e5..3d0241df75 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -488,6 +488,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS void* pFileStore = getStateFileStore(pFileState); SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey); if (pCur) { + pCur->pStreamFileState = pFileState; SSessionKey key = {0}; void* pVal = NULL; int len = 0;