rebuild session window

This commit is contained in:
liuyao 2023-10-08 18:27:05 +08:00
parent ed8cad22cb
commit 1d952ca8ea
1 changed files with 7 additions and 1 deletions

View File

@ -1944,6 +1944,8 @@ static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo
}
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
qDebug("===stream===try save session result skey:%" PRId64 ", ekey:%" PRId64 ".pos%d",
pWinInfo->sessionWin.win.skey, pWinInfo->sessionWin.win.ekey, pWinInfo->pStatePos->needFree);
return pAggSup->stateStore.streamStateSessionPut(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pStatePos, pAggSup->resultRowSize);
}
@ -2154,12 +2156,15 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
}
}
num++;
parentWin.sessionWin.win.skey = TMIN(parentWin.sessionWin.win.skey, childWin.sessionWin.win.skey);
parentWin.sessionWin.win.ekey = TMAX(parentWin.sessionWin.win.ekey, childWin.sessionWin.win.ekey);
memcpy(parentWin.pStatePos->pKey, &parentWin.sessionWin, sizeof(SSessionKey));
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap);
initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
pChild->exprSupp.rowEntryInfoOffset);
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
saveResult(parentWin, pStUpdated);
releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore);
} else {
releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore);
@ -2169,6 +2174,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
pAPI->stateStore.streamStateFreeCur(pCur);
}
if (num > 0) {
saveResult(parentWin, pStUpdated);
saveSessionOutputBuf(pAggSup, &parentWin);
}
}