From b27faba7efb06f9789c240be1285e15d4453af8a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 23 Nov 2022 17:07:45 +0800 Subject: [PATCH] fix:initialize maxts --- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 15 +++-- tests/script/tsim/stream/state0.sim | 60 +++++++++++++++++++ 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index de2a7b9dac..8db450ad50 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1849,12 +1849,12 @@ FETCH_NEXT_BLOCK: prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; printDataBlock(pDelBlock, "stream scan delete data"); if (pInfo->tqReader) { blockDataDestroy(pDelBlock); } if (pInfo->pDeleteDataRes->info.rows > 0) { + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; return pInfo->pDeleteDataRes; } else { goto FETCH_NEXT_BLOCK; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index dd02ce9cd4..7fd2fcb5b8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3364,22 +3364,23 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num } } -void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t waterMark, uint16_t type, - int32_t tsColIndex) { +void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, + STimeWindowAggSupp* pTwSup) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { SStreamPartitionOperatorInfo* pScanInfo = downstream->info; pScanInfo->tsColIndex = tsColIndex; } if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - initDownStream(downstream->pDownstream[0], pAggSup, waterMark, type, tsColIndex); + initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup); return; } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); + pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); } + pScanInfo->twAggSup = *pTwSup; } int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, @@ -4102,8 +4103,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, NULL); if (downstream) { - initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, - pInfo->primaryTsIndex); + initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); } return pOperator; @@ -4606,8 +4606,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL); - initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, - pInfo->primaryTsIndex); + initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim index 87d7d4b7fc..7c922658c9 100644 --- a/tests/script/tsim/stream/state0.sim +++ b/tests/script/tsim/stream/state0.sim @@ -731,5 +731,65 @@ if $data32 != 1 then goto loop9 endi +sql drop stream if exists streams5; +sql drop database if exists test5; +sql create database test5; +sql use test5; +sql create table tb (ts timestamp, a int); +sql insert into tb values (now + 1m , 1 ); +sql create table b (c timestamp, d int, e int , f int, g double); +sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a); +sql insert into b values(1648791213000,NULL,NULL,NULL,NULL); +sql select * from streamt order by c1, c2, c3; + +print data00:$data00 +print data01:$data01 + +sql insert into b values(1648791213000,NULL,NULL,NULL,NULL); +sql select * from streamt order by c1, c2, c3; + +print data00:$data00 +print data01:$data01 + +sql insert into b values(1648791213001,1,2,2,2.0); +sql insert into b values(1648791213002,1,3,3,3.0); +sql insert into tb values(1648791213003,1); + +sql select * from streamt; +print data00:$data00 +print data01:$data01 + +sql delete from b where c >= 1648791213001 and c <= 1648791213002; +sql insert into b values(1648791223003,2,2,3,1.0); insert into b values(1648791223002,2,3,3,3.0); +sql insert into tb values (now + 1m , 1 ); + +sql select * from streamt; +print data00:$data00 +print data01:$data01 + +sql insert into b(c,d) values (now + 6m , 6 ); +sql delete from b where c >= 1648791213001 and c <= 1648791233005;; + +$loop_count = 0 +loop10: + +sleep 200 + +sql select c2 from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop10 +endi + +if $data00 != 2 then + print =====data00=$data00 + goto loop10 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT