diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 689ba54ca6..294c2730df 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -99,6 +99,9 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, (void**)&pCurWin->winInfo.pStatePos, &size); } } + if (ts < pCurWin->winInfo.sessionWin.win.ekey) { + pBuffInfo->rebuildWindow = true; + } } else { code = pAggSup->stateStore.streamStateCountWinAddIfNotExist( pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index e2aae130e5..3d0241df75 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -488,6 +488,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS void* pFileStore = getStateFileStore(pFileState); SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey); if (pCur) { + pCur->pStreamFileState = pFileState; SSessionKey key = {0}; void* pVal = NULL; int len = 0;