From 020cdf4d2753b38033e7ab64271ee6984b56cec8 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 12 Jul 2023 14:06:36 +0800 Subject: [PATCH] get checkpoint id --- source/libs/executor/inc/operator.h | 4 ++-- source/libs/executor/src/operator.c | 6 +++--- source/libs/executor/src/timewindowoperator.c | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index e6c3405d7f..b9ddecd3be 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -105,7 +105,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); @@ -133,7 +133,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle); -SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle); SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 2db5ea2f1e..47e82314ad 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -468,7 +468,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { - pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo); + pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode; pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); @@ -477,10 +477,10 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { int32_t children = 0; - pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) { int32_t children = pHandle->numOfVgroups; - pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6ec14bc218..55c1b89c30 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2959,7 +2959,7 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild) { + SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -5614,7 +5614,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo) { + SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) { SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) {