From afc2a346e5719a9529a038b7d6a36d31461a623d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 2 Jan 2024 14:16:34 +0800 Subject: [PATCH] reset update info --- source/libs/executor/inc/executorInt.h | 1 + .../libs/executor/src/streameventwindowoperator.c | 1 + .../libs/executor/src/streamtimewindowoperator.c | 15 +++++++++++++++ 3 files changed, 17 insertions(+) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e3e504cdbc..a2ced573f7 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -845,6 +845,7 @@ void resetWinRange(STimeWindow* winRange); bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts); int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); void resetUnCloseSessionWinInfo(SSHashObj* winMap); +void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup); int32_t encodeSSessionKey(void** buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index bd247eba07..85f15cc368 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -654,6 +654,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) { if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); } + reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); } SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index c9490e2c55..946b02f2d4 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2759,6 +2759,18 @@ void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, } } +void reloadAggSupFromDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup) { + SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; + + if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + reloadAggSupFromDownStream(downstream->pDownstream[0], pAggSup); + return; + } + + SStreamScanInfo* pScanInfo = downstream->info; + pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo; +} + void streamSessionSemiReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; @@ -2790,6 +2802,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); } + reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); } void streamSessionReloadState(SOperatorInfo* pOperator) { @@ -2842,6 +2855,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); } + reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); } SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, @@ -3724,6 +3738,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); } + reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); } SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,