From 114c9ae44b7775eb3f33fbd7fa8aeb7738907f28 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 29 Nov 2024 09:56:21 +0800 Subject: [PATCH] add ci --- .../8-stream/force_window_close_interp.py | 12 ++++ tests/system-test/8-stream/stream_basic.py | 67 +++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/tests/system-test/8-stream/force_window_close_interp.py b/tests/system-test/8-stream/force_window_close_interp.py index f39ad82ed7..f78330411b 100644 --- a/tests/system-test/8-stream/force_window_close_interp.py +++ b/tests/system-test/8-stream/force_window_close_interp.py @@ -93,6 +93,18 @@ class TDTestCase: tdSql.error( f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c11,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;" ) + tdSql.error( + f"create stream itp_1d_next_error_1 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" + ) + tdSql.error( + f"create stream itp_1d_next_error_1 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" + ) + tdSql.error( + f"create stream itp_1d_next_error_1 trigger window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" + ) + tdSql.error( + f"create stream itp_1d_next_error_1 trigger max_delay 5s FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" + ) # function name : interp trigger_mode = "force_window_close" diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index 95f4f1addc..750943ba81 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -24,6 +24,7 @@ import time import traceback import os from os import path +import psutil class TDTestCase: @@ -117,6 +118,69 @@ class TDTestCase: if not tdSql.getData(2, 0).startswith('new-t3_stb_'): tdLog.exit("error6") + def caseDropStream(self): + tdLog.info(f"start caseDropStream") + sql = "drop database if exists d1;" + tdSql.query(sql) + sql = "drop database if exists db;" + tdSql.query(sql) + + sql ="show streams;" + tdSql.query(sql) + tdSql.check_rows_loop(0, sql, loopCount=100, waitTime=0.5) + + sql ="select * from information_schema.ins_stream_tasks;" + tdSql.query(sql) + tdSql.check_rows_loop(0, sql, loopCount=100, waitTime=0.5) + + self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y") + # create stream + tdSql.execute("use db;") + tdSql.execute("create stream stream4 fill_history 1 into sta4 as select _wstart, sum(current),avg(current),last(current),min(voltage),first(voltage),last(phase),max(phase),count(phase), _wend, _wduration from meters partition by tbname, ts interval(10a);", show=True) + + time.sleep(10) + + sql ="select * from information_schema.ins_stream_tasks where status == 'ready';" + tdSql.query(sql, show=True) + tdSql.check_rows_loop(4, sql, loopCount=100, waitTime=0.5) + + pl = psutil.pids() + for pid in pl: + try: + if psutil.Process(pid).name() == 'taosd': + taosdPid = pid + break + except psutil.NoSuchProcess: + pass + tdLog.info("taosd pid:{}".format(taosdPid)) + p = psutil.Process(taosdPid) + + cpuInfo = p.cpu_percent(interval=5) + tdLog.info("taosd cpu:{}".format(cpuInfo)) + + tdSql.execute("drop stream stream4;", show=True) + + sql ="show streams;" + tdSql.query(sql, show=True) + tdSql.check_rows_loop(0, sql, loopCount=100, waitTime=0.5) + + sql ="select * from information_schema.ins_stream_tasks;" + tdSql.query(sql, show=True) + tdSql.check_rows_loop(0, sql, loopCount=100, waitTime=0.5) + + for i in range(10): + cpuInfo = p.cpu_percent(interval=5) + tdLog.info("taosd cpu:{}".format(cpuInfo)) + if cpuInfo < 10: + return + else: + time.sleep(1) + continue + cpuInfo = p.cpu_percent(interval=5) + tdLog.info("taosd cpu:{}".format(cpuInfo)) + if cpuInfo > 10: + tdLog.exit("drop stream failed, stream tasks are still running") + # run def run(self): self.case1() @@ -145,6 +209,9 @@ class TDTestCase: tdSql.query(sql) tdSql.checkRows(0) + self.caseDropStream() + + # stop def stop(self): tdSql.close()