release state

This commit is contained in:
liuyao 2023-09-21 10:12:43 +08:00
parent 3b3c5d7cb8
commit b1d9cd1c4a
1 changed files with 4 additions and 2 deletions

View File

@ -2332,8 +2332,8 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
if (size == 0) { if (size == 0) {
return; return;
} }
SRowBuffPos* pPos = taosArrayGetP(pAllWins, size - 1);
SSessionKey* pSeKey = taosArrayGet(pAllWins, size - 1); SSessionKey* pSeKey = pPos->pKey;
taosArrayPush(pMaxWins, pSeKey); taosArrayPush(pMaxWins, pSeKey);
if (pSeKey->groupId == 0) { if (pSeKey->groupId == 0) {
return; return;
@ -2599,6 +2599,7 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) {
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
resSize); resSize);
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) { if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream); downstream->fpSet.releaseStreamStateFn(downstream);
@ -3404,6 +3405,7 @@ void streamStateReleaseState(SOperatorInfo* pOperator) {
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME,
strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData,
resSize); resSize);
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) { if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream); downstream->fpSet.releaseStreamStateFn(downstream);