diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7e5028b969..c9da3c99e7 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1944,6 +1944,8 @@ static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo } int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { + qDebug("===stream===try save session result skey:%" PRId64 ", ekey:%" PRId64 ".pos%d", + pWinInfo->sessionWin.win.skey, pWinInfo->sessionWin.win.ekey, pWinInfo->pStatePos->needFree); return pAggSup->stateStore.streamStateSessionPut(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pStatePos, pAggSup->resultRowSize); } @@ -2154,12 +2156,15 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS } } num++; + parentWin.sessionWin.win.skey = TMIN(parentWin.sessionWin.win.skey, childWin.sessionWin.win.skey); + parentWin.sessionWin.win.ekey = TMAX(parentWin.sessionWin.win.ekey, childWin.sessionWin.win.ekey); + memcpy(parentWin.pStatePos->pKey, &parentWin.sessionWin, sizeof(SSessionKey)); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap); initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput, pChild->exprSupp.rowEntryInfoOffset); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true); - saveResult(parentWin, pStUpdated); releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore); } else { releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore); @@ -2169,6 +2174,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS pAPI->stateStore.streamStateFreeCur(pCur); } if (num > 0) { + saveResult(parentWin, pStUpdated); saveSessionOutputBuf(pAggSup, &parentWin); } }