optimize session operator reload state

This commit is contained in:
liuyao 2023-07-27 09:59:58 +08:00
parent 05504023f5
commit 1894608b80
1 changed files with 12 additions and 7 deletions

View File

@ -3192,11 +3192,12 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
return pCur; return pCur;
} }
static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
SSHashObj* pStDeleted, bool addGap) { SSHashObj* pStDeleted, bool addGap) {
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
int32_t winNum = 0;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
@ -3230,7 +3231,9 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
pAPI->stateStore.streamStateFreeCur(pCur); pAPI->stateStore.streamStateFreeCur(pCur);
taosMemoryFree(winInfo.pOutputBuf); taosMemoryFree(winInfo.pOutputBuf);
winNum++;
} }
return winNum;
} }
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
@ -3731,9 +3734,11 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < num; i++) { for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0}; SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
saveSessionOutputBuf(pAggSup, &winInfo); if (winNum > 0) {
saveResult(winInfo, pInfo->pStUpdated); saveSessionOutputBuf(pAggSup, &winInfo);
saveResult(winInfo, pInfo->pStUpdated);
}
} }
taosMemoryFree(pBuf); taosMemoryFree(pBuf);