Merge pull request #28977 from taosdata/fix/TD-33024

ci(stream): add ci for stream
This commit is contained in:
Shengliang Guan 2024-11-29 17:39:40 +08:00 committed by GitHub
commit 90c3b1928a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 79 additions and 0 deletions

View File

@ -93,6 +93,18 @@ class TDTestCase:
tdSql.error(
f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c11,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;"
)
tdSql.error(
f"create stream itp_1d_next_error_1 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
)
tdSql.error(
f"create stream itp_1d_next_error_1 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
)
tdSql.error(
f"create stream itp_1d_next_error_1 trigger window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
)
tdSql.error(
f"create stream itp_1d_next_error_1 trigger max_delay 5s FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
)
# function name : interp
trigger_mode = "force_window_close"

View File

@ -24,6 +24,7 @@ import time
import traceback
import os
from os import path
import psutil
class TDTestCase:
@ -117,6 +118,69 @@ class TDTestCase:
if not tdSql.getData(2, 0).startswith('new-t3_stb_'):
tdLog.exit("error6")
def caseDropStream(self):
tdLog.info(f"start caseDropStream")
sql = "drop database if exists d1;"
tdSql.query(sql)
sql = "drop database if exists db;"
tdSql.query(sql)
sql ="show streams;"
tdSql.query(sql)
tdSql.check_rows_loop(0, sql, loopCount=100, waitTime=0.5)
sql ="select * from information_schema.ins_stream_tasks;"
tdSql.query(sql)
tdSql.check_rows_loop(0, sql, loopCount=100, waitTime=0.5)
self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")
# create stream
tdSql.execute("use db;")
tdSql.execute("create stream stream4 fill_history 1 into sta4 as select _wstart, sum(current),avg(current),last(current),min(voltage),first(voltage),last(phase),max(phase),count(phase), _wend, _wduration from meters partition by tbname, ts interval(10a);", show=True)
time.sleep(10)
sql ="select * from information_schema.ins_stream_tasks where status == 'ready';"
tdSql.query(sql, show=True)
tdSql.check_rows_loop(4, sql, loopCount=100, waitTime=0.5)
pl = psutil.pids()
for pid in pl:
try:
if psutil.Process(pid).name() == 'taosd':
taosdPid = pid
break
except psutil.NoSuchProcess:
pass
tdLog.info("taosd pid:{}".format(taosdPid))
p = psutil.Process(taosdPid)
cpuInfo = p.cpu_percent(interval=5)
tdLog.info("taosd cpu:{}".format(cpuInfo))
tdSql.execute("drop stream stream4;", show=True)
sql ="show streams;"
tdSql.query(sql, show=True)
tdSql.check_rows_loop(0, sql, loopCount=100, waitTime=0.5)
sql ="select * from information_schema.ins_stream_tasks;"
tdSql.query(sql, show=True)
tdSql.check_rows_loop(0, sql, loopCount=100, waitTime=0.5)
for i in range(10):
cpuInfo = p.cpu_percent(interval=5)
tdLog.info("taosd cpu:{}".format(cpuInfo))
if cpuInfo < 10:
return
else:
time.sleep(1)
continue
cpuInfo = p.cpu_percent(interval=5)
tdLog.info("taosd cpu:{}".format(cpuInfo))
if cpuInfo > 10:
tdLog.exit("drop stream failed, stream tasks are still running")
# run
def run(self):
self.case1()
@ -145,6 +209,9 @@ class TDTestCase:
tdSql.query(sql)
tdSql.checkRows(0)
self.caseDropStream()
# stop
def stop(self):
tdSql.close()