From ecaa66e53f8b31ae2feb21a651a2f277f6293395 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 20 Sep 2024 15:30:44 +0800 Subject: [PATCH] set watermark and interval --- source/libs/executor/src/executor.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c8180261c0..bfa4a56a77 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1093,8 +1093,20 @@ _end: return code; } +static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterMark, SInterval* pInterval) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + return getOpratorIntervalInfo(pOperator->pDownstream[0], pWaterMark, pInterval); + } + SStreamScanInfo* pScanOp = (SStreamScanInfo*) pOperator->info; + *pWaterMark = pScanOp->twAggSup.waterMark; + *pInterval = pScanOp->interval; + return TSDB_CODE_SUCCESS; +} + int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval) { - return 0; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + SOperatorInfo* pOperator = pTaskInfo->pRoot; + return getOpratorIntervalInfo(pOperator, pWaterMark, pInterval); } int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {