diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fe29b8eda9..582181b891 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1610,10 +1610,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); - destroyStreamFinalIntervalOperatorInfo(pChildOp->info); - taosMemoryFree(pChildOp->pDownstream); - cleanupExprSupp(&pChildOp->exprSupp); - taosMemoryFreeClear(pChildOp); + destroyOperatorInfo(pChildOp); } taosArrayDestroy(pInfo->pChildren); } @@ -3418,11 +3415,10 @@ void destroyStreamSessionAggOperatorInfo(void* param) { if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { - SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); - SStreamSessionAggOperatorInfo* pChInfo = pChild->info; - destroyStreamSessionAggOperatorInfo(pChInfo); - taosMemoryFreeClear(pChild); + SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); + destroyOperatorInfo(pChild); } + taosArrayDestroy(pInfo->pChildren); } colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); @@ -3470,7 +3466,9 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; - pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); + if (!pScanInfo->pUpdateInfo) { + pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); + } } int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, @@ -4374,9 +4372,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pInfo->pGroupIdTbNameMap = - taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - pOperator->operatorType = pPhyNode->type; if (numOfChild > 0) { pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); @@ -4415,11 +4410,10 @@ void destroyStreamStateOperatorInfo(void* param) { if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { - SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); - SStreamSessionAggOperatorInfo* pChInfo = pChild->info; - destroyStreamSessionAggOperatorInfo(pChInfo); - taosMemoryFreeClear(pChild); + SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); + destroyOperatorInfo(pChild); } + taosArrayDestroy(pInfo->pChildren); } colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index bc076a194b..bb2ea42383 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -710,6 +710,11 @@ sleep 200 sql select * from streamt4; +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + # row 0 if $rows != 0 then print =====rows=$rows diff --git a/tests/script/tsim/stream/basic3.sim b/tests/script/tsim/stream/basic3.sim new file mode 100644 index 0000000000..48fb860a72 --- /dev/null +++ b/tests/script/tsim/stream/basic3.sim @@ -0,0 +1,55 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c debugflag -v 131 +system sh/exec.sh -n dnode1 -s start -v + +sleep 5000 + +sql connect + +print ========== interval\session\state window + +sql CREATE DATABASE test1 BUFFER 96 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 STRICT 'off' WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0; +sql use test1; +sql CREATE STABLE st (time TIMESTAMP, ca DOUBLE, cb DOUBLE, cc int) TAGS (ta VARCHAR(10) ); + +print ========== create table before stream + +sql CREATE TABLE t1 using st TAGS ('aaa'); +sql CREATE TABLE t2 using st TAGS ('bbb'); +sql CREATE TABLE t3 using st TAGS ('ccc'); +sql CREATE TABLE t4 using st TAGS ('ddd'); + +sql create stream streamd1 into streamt1 as select ca, _wstart,_wend, count(*) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(linear); +sql create stream streamd2 into streamt2 as select tbname, _wstart,_wend, count(*) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by tbname interval(60m) fill(linear); + +sql create stream streamd3 into streamt3 as select ca, _wstart,_wend, count(*), max(ca), min(cb), APERCENTILE(cc, 20) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca session(time, 60m); +sql create stream streamd4 into streamt4 as select tbname, _wstart,_wend, count(*), max(ca), min(cb), APERCENTILE(cc, 20) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by tbname session(time, 60m); + +sql create stream streamd5 into streamt5 as select tbname, _wstart,_wend, count(*), max(ca), min(cb) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by tbname state_window(cc); +sql create stream streamd6 into streamt6 as select ca, _wstart,_wend, count(*), max(ca), min(cb) from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca state_window(cc); + +sleep 3000 + +sql drop stream if exists streamd1; +sql drop stream if exists streamd2; +sql drop stream if exists streamd3; +sql drop stream if exists streamd4; +sql drop stream if exists streamd5; +sql drop stream if exists streamd6; + + +_OVER: +system sh/exec.sh -n dnode1 -s stop -x SIGINT +print =============== check +$null= + +system_content sh/checkValgrind.sh -n dnode1 +print cmd return result ----> [ $system_content ] +if $system_content > 0 then + return -1 +endi + +if $system_content == $null then + return -1 +endi