diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 1f98eb1210..aa9ba4e4b0 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2332,8 +2332,8 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { if (size == 0) { return; } - - SSessionKey* pSeKey = taosArrayGet(pAllWins, size - 1); + SRowBuffPos* pPos = taosArrayGetP(pAllWins, size - 1); + SSessionKey* pSeKey = pPos->pKey; taosArrayPush(pMaxWins, pSeKey); if (pSeKey->groupId == 0) { return; @@ -2599,6 +2599,7 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) { pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize); + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream); @@ -3404,6 +3405,7 @@ void streamStateReleaseState(SOperatorInfo* pOperator) { pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize); + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream);