diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 11d8d1487a..ef5c2572d9 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -99,6 +99,11 @@ int32_t getEndCondIndex(bool* pEnd, int32_t start, int32_t rows) { return -1; } +int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { + pAPI->streamStateReleaseBuf(pState, pPos, true); + return TSDB_CODE_SUCCESS; +} + void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) { int32_t code = TSDB_CODE_SUCCESS; int32_t size = pAggSup->resultRowSize; @@ -143,6 +148,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI pCurWin->winInfo.isOutput = false; _end: + reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore); pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur); pNextWinKey->groupId = groupId; code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pNextWinKey, NULL, 0); @@ -341,6 +347,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (isWindowIncomplete(&curWin)) { + releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore); continue; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7a1bb2729c..9085694aed 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1845,11 +1845,6 @@ int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { return TSDB_CODE_SUCCESS; } -int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { - pAPI->streamStateReleaseBuf(pState, pPos, true); - return TSDB_CODE_SUCCESS; -} - void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) { SSessionKey key = {0}; getSessionHashKey(pKey, &key); @@ -2495,7 +2490,7 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { return; } SResultWindowInfo* pWinInfo = taosArrayGet(pAllWins, size - 1); - SSessionKey* pSeKey = pWinInfo->pStatePos->pKey; + SSessionKey* pSeKey = &pWinInfo->sessionWin; taosArrayPush(pMaxWins, pSeKey); if (pSeKey->groupId == 0) { return; @@ -2503,7 +2498,7 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { uint64_t preGpId = pSeKey->groupId; for (int32_t i = size - 2; i >= 0; i--) { pWinInfo = taosArrayGet(pAllWins, i); - pSeKey = pWinInfo->pStatePos->pKey; + pSeKey = &pWinInfo->sessionWin; if (preGpId != pSeKey->groupId) { taosArrayPush(pMaxWins, pSeKey); preGpId = pSeKey->groupId; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 723f04c499..06f7b6a268 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -310,7 +310,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream int32_t size = taosArrayGetSize(pWinStates); if (pCur->buffIndex >= 0) { if (pCur->buffIndex >= size) { - pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, size); + pNewPos = addNewSessionWindow(pFileState, pWinStates, pWinKey); goto _end; } pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex); @@ -332,7 +332,6 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream } _end: - memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); (*ppVal) = pNewPos; return TSDB_CODE_SUCCESS; }