Merge pull request #22238 from taosdata/fix/TD-25423
save compact window info
This commit is contained in:
commit
71e16d1eeb
|
@ -3736,7 +3736,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
|
||||||
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
|
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
|
||||||
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
|
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
|
||||||
if (winNum > 0) {
|
if (winNum > 0) {
|
||||||
saveSessionOutputBuf(pAggSup, &winInfo);
|
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||||
saveResult(winInfo, pInfo->pStUpdated);
|
saveResult(winInfo, pInfo->pStUpdated);
|
||||||
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
@ -3747,9 +3746,8 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
|
||||||
getSessionHashKey(&winInfo.sessionWin, &key);
|
getSessionHashKey(&winInfo.sessionWin, &key);
|
||||||
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)winInfo.pOutputBuf, &pAggSup->stateStore);
|
|
||||||
}
|
}
|
||||||
|
saveSessionOutputBuf(pAggSup, &winInfo);
|
||||||
}
|
}
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
|
|
||||||
|
@ -4398,7 +4396,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
||||||
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
|
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
|
||||||
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
|
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
|
||||||
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated);
|
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated);
|
||||||
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
|
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||||
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
|
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
|
||||||
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
@ -4412,7 +4409,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
|
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
|
||||||
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore);
|
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) {
|
if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) {
|
||||||
|
|
Loading…
Reference in New Issue