From e1be1c8d0f3e84897ec8239422ff174156d64b3c Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 7 Mar 2023 11:53:45 +0800 Subject: [PATCH] test: add stream and topic in compatibility case --- tests/system-test/0-others/compa4096.json | 2 +- tests/system-test/0-others/compatibility.py | 91 ++++++++++++++++----- tests/system-test/0-others/testRoll.py | 83 +++++++++++++++++++ 3 files changed, 154 insertions(+), 22 deletions(-) create mode 100644 tests/system-test/0-others/testRoll.py diff --git a/tests/system-test/0-others/compa4096.json b/tests/system-test/0-others/compa4096.json index 5cc5d2084d..49e0ec1d8a 100644 --- a/tests/system-test/0-others/compa4096.json +++ b/tests/system-test/0-others/compa4096.json @@ -41,7 +41,7 @@ "interlace_rows": 0, "line_protocol": null, "tcp_transfer": "no", - "insert_rows": 10000, + "insert_rows": 1000, "childtable_limit": 0, "childtable_offset": 0, "rows_per_tbl": 0, diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index a5cded7a6b..8764a169fa 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -3,6 +3,8 @@ import taos import sys import os import time +import inspect +from taos.tmq import Consumer from pathlib import Path from util.log import * @@ -14,7 +16,7 @@ from util.dnodes import TDDnode from util.cluster import * import subprocess -BASEVERSION = "3.0.1.8" +BASEVERSION = "3.0.2.5" class TDTestCase: def caseDescription(self): ''' @@ -99,6 +101,7 @@ class TDTestCase: def run(self): + scriptsPath = os.path.dirname(os.path.realpath(__file__)) distro_id = distro.id() if distro_id == "alpine": tdLog.info(f"alpine skip compatibility test") @@ -128,19 +131,18 @@ class TDTestCase: tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}") tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") - sleep(3) + os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ") + os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ') + os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ') + os.system(f"cd {scriptsPath} && python3 testRoll.py") + os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ') - # tdsqlF.query(f"select count(*) from {stb}") - # tdsqlF.checkData(0,0,tableNumbers*recordNumbers1) - os.system("pkill taosd") - self.checkProcessPid("taosd") - - print(f"start taosd: nohup taosd -c {cPath} & ") - os.system(f" nohup taosd -c {cPath} & " ) - sleep(10) + # print(f"start taosd: nohup taosd -c {cPath} & ") + # os.system(f" nohup taosd -c {cPath} & " ) tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ") os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y") - os.system("pkill taosd") # make sure all the data are saved in disk. + os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") + os.system("pkill taosd") # make sure all the data are saved in disk. self.checkProcessPid("taosd") @@ -161,12 +163,14 @@ class TDTestCase: tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}") tdsql.query(f"select count(*) from {stb}") - tdsql.checkData(0,0,tableNumbers*recordNumbers1) - os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ") - tdsql.query(f"select count(*) from {stb}") - tdsql.checkData(0,0,tableNumbers*recordNumbers2) + tdsql.checkData(0,0,tableNumbers*recordNumbers1) + # tdsql.query("show streams;") + # os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ") + # tdsql.query("show streams;") + # tdsql.query(f"select count(*) from {stb}") + # tdsql.checkData(0,0,tableNumbers*recordNumbers2) tdsql.query(f"select count(*) from db4096.stb0") - tdsql.checkData(0,0,50000) + tdsql.checkData(0,0,5000) tdsql=tdCom.newTdSql() tdLog.printNoPrefix(f"==========step4:verify backticks in taos Sql-TD18542") @@ -183,13 +187,58 @@ class TDTestCase: tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);") tdsql.query("select * from db.ct4") tdsql.checkData(0,1,14) + print(1) + tdsql=tdCom.newTdSql() tdsql.query("describe information_schema.ins_databases;") qRows=tdsql.queryRows - for i in range(qRows) : - if tdsql.queryResult[i][0]=="retentions" : - return True - else: - return False + comFlag=True + j=0 + while comFlag: + for i in range(qRows) : + if tdsql.queryResult[i][0] == "retentions" : + print("parameters include retentions") + comFlag=False + break + else : + comFlag=True + j=j+1 + if j == qRows: + print("parameters don't include retentions") + caller = inspect.getframeinfo(inspect.stack()[0][0]) + args = (caller.filename, caller.lineno) + tdLog.exit("%s(%d) failed" % args) + tdsql.query("show streams;") + tdsql.checkRows(2) + tdsql.execute("insert into tmq_test.tb1 values (now, 11, 3.0, 'tmq test1');") + tdsql.execute("insert into tmq_test.tb2 values (now, 22, 3.0, 'tmq test2');") + + conn = taos.connect() + + consumer = Consumer( + { + "group.id": "tg75", + "client.id": "124", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "enable.auto.commit": "true", + "experimental.snapshot.enable": "true", + } + ) + consumer.subscribe(["tmq_test_topic"]) + + while True: + res = consumer.poll(10) + if not res: + break + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) + tdsql.query("show topics;") + tdsql.checkRows(1) def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") diff --git a/tests/system-test/0-others/testRoll.py b/tests/system-test/0-others/testRoll.py new file mode 100644 index 0000000000..56e5b3630a --- /dev/null +++ b/tests/system-test/0-others/testRoll.py @@ -0,0 +1,83 @@ +from taos.tmq import Consumer +import taos +import taosrest +import socket + + +def init_tmq_env(db, topic): + conn = taos.connect() + # conn.execute("create dnode test209") + # conn.execute("create dnode test216") + # conn.execute("create mnode on dnode 2") + # conn.execute("create mnode on dnode 3") + + conn.execute("drop topic if exists {}".format(topic)) + conn.execute("drop database if exists {}".format(db)) + conn.execute("create database if not exists {} replica 1 ".format(db)) + conn.select_db(db) + conn.execute( + "create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))") + conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')") + conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')") + conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')") + conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic)) + conn.execute("insert into tb1 values (now+10s, 1, 1.0, 'tmq test')") + conn.execute("insert into tb2 values (now+100s, 2, 2.0, 'tmq test')") + conn.execute("insert into tb3 values (now+20s, 3, 3.0, 'tmq test')") + conn.execute("insert into tb3 values (now+30s, 4, 4.0, 'tmq test4')") + +def init_tmq_rest_env(db, topic): + host = socket.gethostname() + conn = taosrest.connect(url=f"http://{host}:6041") + + # conn.execute("create dnode test209") + # conn.execute("create dnode test216") + # conn.execute("create mnode on dnode 2") + # conn.execute("create mnode on dnode 3") + + conn.execute("drop topic if exists {}".format(topic)) + conn.execute("drop database if exists {}".format(db)) + conn.execute("create database if not exists {} replica 3 ".format(db)) + conn.select_db(db) + conn.execute( + "create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))") + conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')") + conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')") + conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')") + conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic)) + conn.execute("insert into tb1 values (now+10s, 1, 1.0, 'tmq test')") + conn.execute("insert into tb2 values (now+100s, 2, 2.0, 'tmq test')") + conn.execute("insert into tb3 values (now+20s, 3, 3.0, 'tmq test')") + conn.execute("insert into tb3 values (now+30s, 4, 4.0, 'tmq test4')") + + + + +if __name__ == '__main__': + conn = taos.connect() + + init_tmq_env("tmq_test", "tmq_test_topic") # init env + # init_tmq_rest_env("tmq_test", "tmq_test_topic") + consumer = Consumer( + { + "group.id": "tg75", + "client.id": "124", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "enable.auto.commit": "true", + "experimental.snapshot.enable": "true", + } + ) + consumer.subscribe(["tmq_test_topic"]) + + while True: + res = consumer.poll(10) + if not res: + break + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) \ No newline at end of file