Merge pull request #23852 from taosdata/fix/TD-27562

compact history window
This commit is contained in:
Haojun Liao 2023-11-29 16:31:10 +08:00 committed by GitHub
commit c5ff908e4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 8 additions and 5 deletions

View File

@ -603,13 +603,20 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
pSeKeyBuf[i].groupId, i); pSeKeyBuf[i].groupId, i);
getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo); getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo);
setEventWindowFlag(pAggSup, &curInfo); setEventWindowFlag(pAggSup, &curInfo);
if (!curInfo.pWinFlag->startFlag || !curInfo.pWinFlag->endFlag) { if (!curInfo.pWinFlag->startFlag || curInfo.pWinFlag->endFlag) {
continue; continue;
} }
compactEventWindow(pOperator, &curInfo, pInfo->pSeUpdated, pInfo->pSeDeleted, false); compactEventWindow(pOperator, &curInfo, pInfo->pSeUpdated, pInfo->pSeDeleted, false);
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey,
curInfo.winInfo.sessionWin.groupId); curInfo.winInfo.sessionWin.groupId);
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
}
if (!curInfo.pWinFlag->endFlag) {
continue;
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(curInfo.winInfo, pInfo->pSeUpdated); saveResult(curInfo.winInfo, pInfo->pSeUpdated);
@ -621,10 +628,6 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
getSessionHashKey(&curInfo.winInfo.sessionWin, &key); getSessionHashKey(&curInfo.winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo)); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo));
} }
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
}
} }
taosMemoryFree(pBuf); taosMemoryFree(pBuf);