From e89362fb36c802ad290c9859f8dae306ed49ade8 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 7 Feb 2023 15:29:05 +0800 Subject: [PATCH] feat:stream ignore check update data --- include/libs/nodes/plannodes.h | 1 + source/libs/executor/inc/executorimpl.h | 2 + source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/scanoperator.c | 4 ++ source/libs/executor/src/timewindowoperator.c | 4 +- .../script/tsim/stream/ignoreCheckUpdate.sim | 62 +++++++++++++++++++ 6 files changed, 72 insertions(+), 3 deletions(-) create mode 100644 tests/script/tsim/stream/ignoreCheckUpdate.sim diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index d885665117..799f4e368b 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -357,6 +357,7 @@ typedef struct STableScanPhysiNode { int64_t watermark; int8_t igExpired; bool assignBlockUid; + int8_t igCheckUpdate; } STableScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4ae178d508..999a7965fb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -474,6 +474,8 @@ typedef struct SStreamScanInfo { int32_t blockRecoverContiCnt; int32_t blockRecoverTotCnt; + int8_t igCheckUpdate; + int8_t igExpired; } SStreamScanInfo; typedef struct { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5febfbb2f3..9d88126220 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1109,7 +1109,7 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->partitionSup = *pParSup; pScanInfo->pPartScalarSup = pExpr; - if (!pScanInfo->pUpdateInfo) { + if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6d3eb67f16..ab1810398b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2368,6 +2368,10 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->partitionSup.needCalc = false; + //todo(liuyao) for test + pTableScanNode->igCheckUpdate = true; + pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; + pInfo->igExpired = pTableScanNode->igExpired; setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d320ef6e9e..0dbed103c8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1726,7 +1726,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.pIntervalAggSup = pSup; - if (!pScanInfo->pUpdateInfo) { + if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark); } pScanInfo->interval = *pInterval; @@ -2893,7 +2893,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; - if (!pScanInfo->pUpdateInfo) { + if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); } pScanInfo->twAggSup = *pTwSup; diff --git a/tests/script/tsim/stream/ignoreCheckUpdate.sim b/tests/script/tsim/stream/ignoreCheckUpdate.sim new file mode 100644 index 0000000000..d380a97d16 --- /dev/null +++ b/tests/script/tsim/stream/ignoreCheckUpdate.sim @@ -0,0 +1,62 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql drop stream if exists streams0; +sql drop database if exists test; +sql create database test vgroups 1; +sql use test; +sql create table t1(ts timestamp, a int, b int , c int); +sql create stream streams0 trigger at_once ignore update 1 into streamt as select _wstart c1, count(*) c2, max(b) c3 from t1 interval(10s); + +sql insert into t1 values(1648791213000,1,1,1); +sql insert into t1 values(1648791213000,2,2,2); + +$loop_count = 0 + +loop0: +sleep 300 +sql select * from streamt order by 1,2,3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop0 +endi + + +sql insert into t1 values(1648791213000,3,3,3); + +$loop_count = 0 + +loop1: +sleep 300 +sql select * from streamt order by 1,2,3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 3 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop1 +endi + +system sh/stop_dnodes.sh