From b1d9cd1c4a3faea447f509970defa8f433312bcf Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 21 Sep 2023 10:12:43 +0800 Subject: [PATCH] release state --- source/libs/executor/src/streamtimewindowoperator.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 1f98eb1210..aa9ba4e4b0 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2332,8 +2332,8 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { if (size == 0) { return; } - - SSessionKey* pSeKey = taosArrayGet(pAllWins, size - 1); + SRowBuffPos* pPos = taosArrayGetP(pAllWins, size - 1); + SSessionKey* pSeKey = pPos->pKey; taosArrayPush(pMaxWins, pSeKey); if (pSeKey->groupId == 0) { return; @@ -2599,6 +2599,7 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) { pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize); + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream); @@ -3404,6 +3405,7 @@ void streamStateReleaseState(SOperatorInfo* pOperator) { pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize); + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream);