diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index fb348e4416..7b68b65c17 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -436,6 +436,18 @@ static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { return true; } +void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo* pInfo) { + SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; + + if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + reloadFromDownStream(downstream->pDownstream[0], pInfo); + return; + } + + SStreamScanInfo* pScanInfo = downstream->info; + pInfo->pUpdateInfo = pScanInfo->pUpdateInfo; +} + void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) { SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; @@ -1401,8 +1413,8 @@ static void streamIntervalReleaseState(SOperatorInfo* pOperator) { } void streamIntervalReloadState(SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { - SStreamIntervalOperatorInfo* pInfo = pOperator->info; int32_t size = 0; void* pBuf = NULL; int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, @@ -1418,6 +1430,7 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); } + reloadFromDownStream(downstream, pInfo); } SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,