From e8aa1555ca9ef4a303ef371dd1e73a960300f304 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 1 Nov 2023 09:48:53 +0800 Subject: [PATCH 1/3] session window max delay --- source/libs/executor/inc/executorInt.h | 2 ++ .../executor/src/streamtimewindowoperator.c | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c5f077d57f..78e835df59 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -580,6 +580,7 @@ typedef struct SStreamSessionAggOperatorInfo { bool reCkBlock; SSDataBlock* pCheckpointRes; bool clearState; + bool recvGetAll; } SStreamSessionAggOperatorInfo; typedef struct SStreamStateAggOperatorInfo { @@ -603,6 +604,7 @@ typedef struct SStreamStateAggOperatorInfo { SArray* historyWins; bool reCkBlock; SSDataBlock* pCheckpointRes; + bool recvGetAll; } SStreamStateAggOperatorInfo; typedef struct SStreamPartitionOperatorInfo { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2eb6fb2d64..cef5a728a8 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2532,6 +2532,15 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { taosMemoryFree(buf); } +static void resetUnCloseSessionWinInfo(SSHashObj* winMap) { + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) { + SResultWindowInfo* pResInfo = pIte; + pResInfo->pStatePos->beUsed = true; + } +} + static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; @@ -2546,6 +2555,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (opRes) { return opRes; } + + if (pInfo->recvGetAll) { + pInfo->recvGetAll = false; + resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows); + } + setOperatorCompleted(pOperator); return NULL; } @@ -2583,6 +2598,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { taosArrayDestroy(pWins); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { + pInfo->recvGetAll = true; getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { @@ -2838,6 +2854,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->clearState = false; + pInfo->recvGetAll = false; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; // for stream void* buff = NULL; @@ -3454,6 +3472,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { return resBlock; } + if (pInfo->recvGetAll) { + pInfo->recvGetAll = false; + resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows); + } + setOperatorCompleted(pOperator); return NULL; } @@ -3482,6 +3505,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { taosArrayDestroy(pWins); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { + pInfo->recvGetAll = true; getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { @@ -3713,6 +3737,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys } pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + pInfo->recvGetAll = false; // for stream void* buff = NULL; From 5ed8da42464ed943799ae5dd3dd3cb197dd498e6 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 1 Nov 2023 14:00:49 +0800 Subject: [PATCH 2/3] opt test case --- tests/script/tsim/stream/sliding.sim | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 18893245fa..97b2464bc8 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -582,6 +582,7 @@ sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s); +sleep 1000 sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791223001,2,2,2,1.1); From 1400b0c5723617318bf678b6411d8fd030bb7923 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 1 Nov 2023 14:10:38 +0800 Subject: [PATCH 3/3] opt test case --- tests/script/tsim/stream/sliding.sim | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 97b2464bc8..26aedc9f4d 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -22,6 +22,8 @@ sql create stream streams2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s); sql create stream stream_t2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s); +sleep 1000 + sql insert into t1 values(1648791210000,1,2,3,1.0); sql insert into t1 values(1648791216000,2,2,3,1.1); sql insert into t1 values(1648791220000,3,2,3,2.1); @@ -312,6 +314,8 @@ sql create table t2 using st tags(2,2,2); sql create stream streams11 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s); sql create stream streams12 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); +sleep 1000 + sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791223001,2,2,3,1.1); sql insert into t1 values(1648791233002,3,2,3,2.1); @@ -445,6 +449,8 @@ sql create table t2 using st tags(2,2,2); sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt21 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s); sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt22 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); +sleep 1000 + sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791223001,2,2,2,1.1); sql insert into t1 values(1648791233002,3,3,3,2.1); @@ -582,6 +588,7 @@ sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s); + sleep 1000 sql insert into t1 values(1648791213000,1,1,1,1.0); @@ -708,6 +715,8 @@ sql create table t2 using st tags(2,2,2); sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart as ts, count(*),min(a) c1 from st interval(10s) sliding(5s); +sleep 1000 + sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791243000,2,1,1,1.0);