From 91ebee80b01e056b79122d88d51643f4e87de465 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Thu, 25 Apr 2024 19:59:58 +0800 Subject: [PATCH] test: add tmq and stream in compatibility --- tests/system-test/0-others/compatibility.py | 130 +++++++++++++------- 1 file changed, 85 insertions(+), 45 deletions(-) diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 4c2fe652b8..71adf6eaac 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -185,8 +185,8 @@ class TDTestCase: # baseVersion = "3.0.1.8" tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}") - tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") - os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") + tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ") + os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database test '") os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/com_alltypedata.json -y") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database curdb '") @@ -196,49 +196,81 @@ class TDTestCase: os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select min(ui) from curdb.meters '") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select max(bi) from curdb.meters '") - # os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ") - # os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ') - # os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ') + os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ") + os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream trigger at_once into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ') + os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ') + self.alter_string_in_file("0-others/tmqBasic.json", "/etc/taos/", cPath) # os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/tmqBasic.json -y ") - os.system('LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ') + # create db/stb/select topic + + db_topic = "db_test_topic" + os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {db_topic} with meta as database test" ') + + stable_topic = "stable_test_meters_topic" + os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {stable_topic} as stable test.meters where tbname like \\"d3\\";" ') + + select_topic = "select_test_meters_topic" + os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {select_topic} as select current,voltage,phase from test.meters where voltage >= 170;" ') + os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ') os.system(f" /usr/bin/taosadapter --version " ) consumer_dict = { "group.id": "g1", + "td.connect.websocket.scheme": "ws", "td.connect.user": "root", "td.connect.pass": "taosdata", "auto.offset.reset": "earliest", + "enable.auto.commit": "false", } - consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) + consumer = taosws.Consumer(consumer_dict) try: - consumer.subscribe(["tmq_test_topic"]) + consumer.subscribe([select_topic]) except TmqError: tdLog.exit(f"subscribe error") - + first_consumer_rows = 0 while True: message = consumer.poll(timeout=1.0) if message: - print("message") - id = message.vgroup() - topic = message.topic() - database = message.database() - for block in message: - nrows = block.nrows() - ncols = block.ncols() - for row in block: - print(row) - values = block.fetchall() - print(nrows, ncols) - - consumer.commit(message) + first_consumer_rows += block.nrows() else: - print("break") + tdLog.notice("message is null and break") break + consumer.commit(message) + tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version") + break consumer.close() + # consumer_dict2 = { + # "group.id": "g2", + # "td.connect.websocket.scheme": "ws", + # "td.connect.user": "root", + # "td.connect.pass": "taosdata", + # "auto.offset.reset": "earliest", + # "enable.auto.commit": "false", + # } + # consumer = taosws.Consumer(consumer_dict2) + # try: + # consumer.subscribe([db_topic,stable_topic]) + # except TmqError: + # tdLog.exit(f"subscribe error") + # first_consumer_rows = 0 + # while True: + # message = consumer.poll(timeout=1.0) + # if message: + # for block in message: + # first_consumer_rows += block.nrows() + # else: + # tdLog.notice("message is null and break") + # break + # consumer.commit(message) + # tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version") + # break + + + tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ") os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") @@ -279,11 +311,10 @@ class TDTestCase: tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}") tdsql.query(f"select count(*) from {stb}") tdsql.checkData(0,0,tableNumbers*recordNumbers1) - # tdsql.query("show streams;") - # os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ") - # tdsql.query("show streams;") - # tdsql.query(f"select count(*) from {stb}") - # tdsql.checkData(0,0,tableNumbers*recordNumbers2) + tdsql.query("show streams;") + tdsql.checkRows(2) + + # checkout db4096 tdsql.query("select count(*) from db4096.stb0") @@ -334,7 +365,7 @@ class TDTestCase: # check stream tdsql.query("show streams;") - tdsql.checkRows(0) + tdsql.checkRows(2) #check TS-3131 tdsql.query("select *,tbname from d0.almlog where mcid='m0103';") @@ -348,39 +379,48 @@ class TDTestCase: print("The unordered list is the same as the ordered list.") else: tdLog.exit("The unordered list is not the same as the ordered list.") - tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);") - tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);") # check tmq + tdsql.execute("insert into test.d80 values (now+1s, 11, 190, 0.21);") + tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);") conn = taos.connect() consumer = Consumer( { - "group.id": "tg75", - "client.id": "124", + "group.id": "g1", "td.connect.user": "root", "td.connect.pass": "taosdata", "enable.auto.commit": "true", "experimental.snapshot.enable": "true", } ) - consumer.subscribe(["tmq_test_topic"]) - + consumer.subscribe([select_topic]) + consumer_rows = 0 while True: - res = consumer.poll(10) - if not res: + message = consumer.poll(timeout=1.0) + tdLog.info(f" null {message}") + if message: + for block in message: + consumer_rows += block.nrows() + tdLog.info(f"consumer rows is {consumer_rows}") + else: + print("consumer has completed and break") break - err = res.error() - if err is not None: - raise err - val = res.value() - - for block in val: - print(block.fetchall()) + consumer.close() + tdsql.query("select current,voltage,phase from test.meters where voltage >= 170;") + all_rows = tdsql.queryRows + if consumer_rows < all_rows - first_consumer_rows : + tdLog.exit(f"consumer rows is {consumer_rows}, less than {all_rows - first_consumer_rows}") tdsql.query("show topics;") - tdsql.checkRows(1) + tdsql.checkRows(3) + tdsql.execute(f"drop topic {select_topic};",queryTimes=10) + tdsql.execute(f"drop topic {db_topic};",queryTimes=10) + tdsql.execute(f"drop topic {stable_topic};",queryTimes=10) + os.system(f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ") + tdsql.query(f"select count(*) from {stb}") + tdsql.checkData(0,0,tableNumbers*recordNumbers2) def stop(self): tdSql.close()