From d99c9159082d2047cf10d008420c50749735aff5 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 29 Oct 2024 14:38:33 +0800 Subject: [PATCH] test:use the black tool to format the code style --- tests/army/tmq/drop_lost_comsumers.py | 211 ++++++++++++++++---------- tests/army/tmq/per_consumer.py | 123 +++++++++------ 2 files changed, 211 insertions(+), 123 deletions(-) diff --git a/tests/army/tmq/drop_lost_comsumers.py b/tests/army/tmq/drop_lost_comsumers.py index d2ec796516..f455931089 100644 --- a/tests/army/tmq/drop_lost_comsumers.py +++ b/tests/army/tmq/drop_lost_comsumers.py @@ -1,4 +1,3 @@ - import taos import sys import time @@ -19,8 +18,9 @@ from datetime import datetime from taos.tmq import Consumer from frame.common import * + class TaosConsumer: - #TODo: add to tq.py and remove from here + # TODo: add to tq.py and remove from here def __init__(self): self.sub_once = True self.once_consumer_rows = 0 @@ -31,43 +31,57 @@ class TaosConsumer: if self.sub_log: tdLog.info(message) - def sub_consumer(self ,consumer ,group_id ,topic_name ): + def sub_consumer(self, consumer, group_id, topic_name): group_id = int(group_id) - if group_id < 100 : + if group_id < 100: try: consumer.subscribe([topic_name]) except TmqError: - tdLog.exit(f"subscribe error") + tdLog.exit(f"subscribe error") nrows = 0 while True: start = datetime.now() tdLog.info(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=10.0) - + if message: id = message.offset() topic = message.topic() database = message.database() - + for block in message: addrows = block.nrows() nrows += block.nrows() ncols = block.ncols() values = block.fetchall end = datetime.now() - elapsed_time = end -start - tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + elapsed_time = end - start + tdLog.info( + f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}" + ) consumer.commit() tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") - # consumer.unsubscribe() - # consumer.close() + # consumer.unsubscribe() + # consumer.close() - def set_conf(self,td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=180000,experimental_snapshot_enable="false"): + def set_conf( + self, + td_connect_ip="localhost", + group_id=1, + client_id="test_consumer_py", + enable_auto_commit="false", + auto_commit_interval_ms="1000", + auto_offset_reset="earliest", + msg_with_table_name="true", + session_timeout_ms=10000, + max_poll_interval_ms=180000, + experimental_snapshot_enable="false", + ): conf = { # auth options # consume options "td.connect.ip": f"{td_connect_ip}", - "group.id": f"{group_id}", + "group.id": f"{group_id}", "client.id": f"{client_id}", "enable.auto.commit": f"{enable_auto_commit}", "auto.commit.interval.ms": f"{auto_commit_interval_ms}", @@ -75,25 +89,28 @@ class TaosConsumer: "msg.with.table.name": f"{msg_with_table_name}", "session.timeout.ms": f"{session_timeout_ms}", "max.poll.interval.ms": f"{max_poll_interval_ms}", - "experimental.snapshot.enable" :f"{experimental_snapshot_enable}", + "experimental.snapshot.enable": f"{experimental_snapshot_enable}", } return conf - + def sub_consumer_once(self, consumer, group_id, topic_name, stop_event): group_id = int(group_id) - if group_id < 100 : + if group_id < 100: consumer.subscribe([topic_name]) nrows = 0 consumer_nrows = 0 - + while not stop_event.is_set(): start = datetime.now() - self.log_info(f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}") + self.log_info( + f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}" + ) if consumer_nrows < self.once_consumer_rows: - message = consumer.poll(timeout=1.0) + message = consumer.poll(timeout=1.0) elif consumer_nrows >= self.once_consumer_rows: + # when break the loop, the consumer will be closed, so we need to continue to keep consumer alive util the stop_event is set pass - # tdLog.info("stop consumer when consumer all rows") + # tdLog.info("stop consumer when consumer all rows") if message: id = message.offset() @@ -106,37 +123,42 @@ class TaosConsumer: ncols = block.ncols() values = block.fetchall end = datetime.now() - elapsed_time = end -start + elapsed_time = end - start - self.log_info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") - self.log_info(f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}") + self.log_info( + f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}" + ) + self.log_info( + f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}" + ) # consumer.commit() consumer_nrows = nrows # consumer.unsubscribe() # consumer.close() - # break + # break tdLog.info("Consumer subscription thread is stopping.") def taosc_consumer(self, conf: list, topic_name: str, stop_event): try: tdLog.info(conf) from taos.tmq import Consumer + tdLog.info("start to config consumer") consumer = Consumer(conf) tdLog.info("start to subscribe") group_id = int(conf["group.id"]) tdLog.info(f"{consumer},{group_id}") if self.sub_once: - self.sub_consumer_once(consumer, group_id, topic_name, stop_event) + self.sub_consumer_once(consumer, group_id, topic_name, stop_event) else: self.sub_consumer(consumer, group_id, topic_name) # only consumer once except Exception as e: tdLog.exit(f"{e}") - - #consumer.close() - + # consumer.close() + + class ThreadSafeCounter: def __init__(self): self.counter = 0 @@ -154,45 +176,51 @@ class ThreadSafeCounter: class TDTestCase: # updatecfgDict = {'debugFlag': 135, 'asynclog': 0} - def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) self.consumer_instance = TaosConsumer() - #db parameter + # db parameter self.table_number = 1000 self.rows_per_table = 1000 - #consumer parameter + # consumer parameter self.consumer_groups_num = 2 - self.session_timeout_ms= 180000 - self.max_poll_interval_ms= 180000 - #case consumer parameter + self.session_timeout_ms = 180000 + self.max_poll_interval_ms = 180000 + # case consumer parameter self.consumer_rows_per_thread = self.table_number * self.rows_per_table - self.consumer_all_rows = self.consumer_rows_per_thread * self.consumer_groups_num + self.consumer_all_rows = ( + self.consumer_rows_per_thread * self.consumer_groups_num + ) + + # tdSql.init(conn.cursor(), logSql) # output sql.txt file - #tdSql.init(conn.cursor(), logSql) # output sql.txt file def caseDescription(self): - ''' - drop_lost_consmuers: + """ + drop_lost_consmuers: 1. verifying that the boundary and valid values of session_timeout_ms are in effect 2. verifying that the boundary and valid values of max_poll_interval_ms are in effect 3. verifying that consumer will be closed when the session_timeout_ms and max_poll_interval_ms is expired - ''' + """ return - + def check_consumer(self, count, rows, stop_event=None): time.sleep(count) try: - tdLog.info(f"wait timeout count:{count} and check consumer status whether is closed") + tdLog.info( + f"wait timeout count:{count} and check consumer status whether is closed" + ) for ct in range(5): - tdSql.query(f'show consumers') - anser_rows=tdSql.getRows() + tdSql.query(f"show consumers") + anser_rows = tdSql.getRows() if anser_rows == rows: break else: time.sleep(1) - tdLog.info(f"wait for {count} seconds to check that consumers number is {anser_rows}") + tdLog.info( + f"wait for {count} seconds to check that consumers number is {anser_rows}" + ) if anser_rows != rows: if stop_event: stop_event.set() @@ -200,27 +228,37 @@ class TDTestCase: except Exception as e: tdLog.exit(f"{e},check consumer error") - def drop_session_timeout_consmuers(self, consumer_groups_num, session_timeout_ms, max_poll_interval_ms, topic_name, timeout): - tdSql.execute(f'drop topic if exists {topic_name};') - tdSql.execute(f'use db_sub') - tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;') + def drop_session_timeout_consmuers( + self, + consumer_groups_num, + session_timeout_ms, + max_poll_interval_ms, + topic_name, + timeout, + ): + tdSql.execute(f"drop topic if exists {topic_name};") + tdSql.execute(f"use db_sub") + tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;") # start consumer and config some parameters - os.system(f"nohup python3 ./tmq/per_consumer.py -c {consumer_groups_num} -s {session_timeout_ms} -p {max_poll_interval_ms} > consumer.log &") + os.system( + f"nohup python3 ./tmq/per_consumer.py -c {consumer_groups_num} -s {session_timeout_ms} -p {max_poll_interval_ms} > consumer.log &" + ) # wait 4s for consuming data time.sleep(4) # kill consumer to simulate session_timeout_ms tdLog.info("kill per_consumer.py") - tdCom.kill_signal_process(signal=9,processor_name="python3\s*./tmq/per_consumer.py") - self.check_consumer(timeout,0) - tdSql.execute(f'drop topic if exists {topic_name};') + tdCom.kill_signal_process( + signal=9, processor_name="python3\s*./tmq/per_consumer.py" + ) + self.check_consumer(timeout, 0) + tdSql.execute(f"drop topic if exists {topic_name};") os.system("rm -rf consumer.log") - - + def drop_max_poll_timeout_consmuers(self, topic_name, timeout): - tdSql.execute(f'drop topic if exists {topic_name};') - tdSql.execute(f'use db_sub') - tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;') + tdSql.execute(f"drop topic if exists {topic_name};") + tdSql.execute(f"use db_sub") + tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;") threads = [] self.safe_counter = ThreadSafeCounter() @@ -230,48 +268,68 @@ class TDTestCase: tdLog.info(f"consumer_rows:{self.consumer_instance.once_consumer_rows}") self.consumer_instance.sub_once = True for id in range(self.consumer_groups_num): - conf = self.consumer_instance.set_conf(group_id=id, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms) - threads.append(threading.Thread(target=self.consumer_instance.taosc_consumer, args=(conf, topic_name, stop_event))) + conf = self.consumer_instance.set_conf( + group_id=id, + session_timeout_ms=self.session_timeout_ms, + max_poll_interval_ms=self.max_poll_interval_ms, + ) + threads.append( + threading.Thread( + target=self.consumer_instance.taosc_consumer, + args=(conf, topic_name, stop_event), + ) + ) for tr in threads: tr.start() - + while True: if self.safe_counter.get() < self.consumer_all_rows: time.sleep(5) - tdLog.info(f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}") + tdLog.info( + f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}" + ) elif self.safe_counter.get() >= self.consumer_all_rows: - # adding 5s is for heartbeat check - self.check_consumer(timeout+5, 0, stop_event) + # adding 5s is for heartbeat check + self.check_consumer(timeout + 5, 0, stop_event) stop_event.set() tr.join() break time.sleep(1) - tdSql.execute(f'drop topic if exists {topic_name};') - + tdSql.execute(f"drop topic if exists {topic_name};") + def case_session_timeout(self): tdLog.info("start to test session_timeout_ms=12s") - #test session_timeout_ms=12s - self.session_timeout_ms=12000 - self.max_poll_interval_ms=180000 + # test session_timeout_ms=12s + self.session_timeout_ms = 12000 + self.max_poll_interval_ms = 180000 topic_name = "select_d1" - self.drop_session_timeout_consmuers(consumer_groups_num=1, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms, topic_name=topic_name , timeout=int(self.session_timeout_ms/1000)) + self.drop_session_timeout_consmuers( + consumer_groups_num=1, + session_timeout_ms=self.session_timeout_ms, + max_poll_interval_ms=self.max_poll_interval_ms, + topic_name=topic_name, + timeout=int(self.session_timeout_ms / 1000), + ) tdLog.info("stop to test session_timeout_ms=12s and done ") def case_max_poll_timeout(self): tdLog.info("start to test max_poll_interval_ms=20s") - #test max_poll_interval_ms=20s - self.session_timeout_ms=180000 - self.max_poll_interval_ms=20000 + # test max_poll_interval_ms=20s + self.session_timeout_ms = 180000 + self.max_poll_interval_ms = 20000 topic_name = "select_d1" - self.drop_max_poll_timeout_consmuers(topic_name=topic_name, timeout=int(self.max_poll_interval_ms/1000)) + self.drop_max_poll_timeout_consmuers( + topic_name=topic_name, timeout=int(self.max_poll_interval_ms / 1000) + ) tdLog.info("stop to test max_poll_interval_ms=20s and done ") - def run(self): vgroups = 4 - etool.benchMark(command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y") + etool.benchMark( + command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y" + ) # test case start here - + self.case_session_timeout() self.case_max_poll_timeout() @@ -279,5 +337,6 @@ class TDTestCase: tdSql.close() tdLog.success(f"{__file__} successfully executed") + tdCases.addLinux(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/tmq/per_consumer.py b/tests/army/tmq/per_consumer.py index 810d7d44f8..418c88dead 100644 --- a/tests/army/tmq/per_consumer.py +++ b/tests/army/tmq/per_consumer.py @@ -9,21 +9,46 @@ import threading from taos.tmq import Consumer import click +# TDDO +# 1. using tmq common class to replace the function, file drop_lost_consumers.py has the same function + try: conn = taos.connect() except Exception as e: tdLog.info(str(e)) - -@click.command() -@click.option('-c', '--consumer-groups-num', "consumer_group_num", default=1, help='Number of consumer group.') -@click.option('-s', '--session-timeout-ms', "session_timeout_ms", default=60000, help='session timeout:ms') -@click.option('-p', '--max-poll-interval-ms',"max_poll_interval_ms", default=180000, help='max poll interval timeout:ms') -def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms): + +@click.command() +@click.option( + "-c", + "--consumer-groups-num", + "consumer_group_num", + default=1, + help="Number of consumer group.", +) +@click.option( + "-s", + "--session-timeout-ms", + "session_timeout_ms", + default=60000, + help="session timeout:ms", +) +@click.option( + "-p", + "--max-poll-interval-ms", + "max_poll_interval_ms", + default=180000, + help="max poll interval timeout:ms", +) +def test_timeout_sub(consumer_group_num, session_timeout_ms, max_poll_interval_ms): threads = [] - tdLog.info(consumer_group_num,session_timeout_ms,max_poll_interval_ms) + tdLog.info(consumer_group_num, session_timeout_ms, max_poll_interval_ms) for id in range(consumer_group_num): - conf = set_conf(group_id=id,session_timeout_ms=session_timeout_ms,max_poll_interval_ms=max_poll_interval_ms) + conf = set_conf( + group_id=id, + session_timeout_ms=session_timeout_ms, + max_poll_interval_ms=max_poll_interval_ms, + ) tdLog.info(conf) threads.append(threading.Thread(target=taosc_consumer, args=(conf,))) for tr in threads: @@ -31,52 +56,54 @@ def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms) for tr in threads: tr.join() -def sub_consumer(consumer,group_id): + +def sub_consumer(consumer, group_id): group_id = int(group_id) - if group_id < 100 : + if group_id < 100: try: consumer.subscribe(["select_d1"]) except Exception as e: - tdLog.info(f"subscribe error") - exit(1) + tdLog.info(f"subscribe error") + exit(1) nrows = 0 while True: start = datetime.now() tdLog.info(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=10.0) - + if message: id = message.offset() topic = message.topic() database = message.database() - + for block in message: addrows = block.nrows() nrows += block.nrows() ncols = block.ncols() values = block.fetchall end = datetime.now() - elapsed_time = end -start - tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + elapsed_time = end - start + tdLog.info( + f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}" + ) consumer.commit() tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") - # consumer.unsubscribe() - # consumer.close() - # break - # if nrows >= 1000000: - # break -def sub_consumer_once(consumer,group_id): + # consumer.unsubscribe() + # consumer.close() + + +def sub_consumer_once(consumer, group_id): group_id = int(group_id) - if group_id < 100 : + if group_id < 100: consumer.subscribe(["select_d1"]) nrows = 0 consumer_nrows = 0 while True: start = datetime.now() tdLog.info(f"time:{start},consumer:{group_id}, start to consume") - #start = datetime.now() - #tdLog.info(f"time:{start},consumer:{group_id}, start to consume") + # start = datetime.now() + # tdLog.info(f"time:{start},consumer:{group_id}, start to consume") tdLog.info(f"consumer_nrows:{consumer_nrows}") if consumer_nrows < 1000000: message = consumer.poll(timeout=10.0) @@ -87,28 +114,40 @@ def sub_consumer_once(consumer,group_id): id = message.offset() topic = message.topic() database = message.database() - + for block in message: addrows = block.nrows() nrows += block.nrows() ncols = block.ncols() values = block.fetchall end = datetime.now() - elapsed_time = end -start + elapsed_time = end - start # tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") consumer.commit() # tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") consumer_nrows = nrows - # consumer.unsubscribe() - # consumer.close() - # break + # consumer.unsubscribe() + # consumer.close() + # break -def set_conf(td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=20000,experimental_snapshot_enable="false"): + +def set_conf( + td_connect_ip="localhost", + group_id=1, + client_id="test_consumer_py", + enable_auto_commit="false", + auto_commit_interval_ms="1000", + auto_offset_reset="earliest", + msg_with_table_name="true", + session_timeout_ms=10000, + max_poll_interval_ms=20000, + experimental_snapshot_enable="false", +): conf = { # auth options # consume options "td.connect.ip": f"{td_connect_ip}", - "group.id": f"{group_id}", + "group.id": f"{group_id}", "client.id": f"{client_id}", "enable.auto.commit": f"{enable_auto_commit}", "auto.commit.interval.ms": f"{auto_commit_interval_ms}", @@ -116,30 +155,20 @@ def set_conf(td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",e "msg.with.table.name": f"{msg_with_table_name}", "session.timeout.ms": f"{session_timeout_ms}", "max.poll.interval.ms": f"{max_poll_interval_ms}", - "experimental.snapshot.enable" :f"{experimental_snapshot_enable}", + "experimental.snapshot.enable": f"{experimental_snapshot_enable}", } return conf + def taosc_consumer(conf): consumer = Consumer(conf) group_id = int(conf["group.id"]) tdLog.info(f"{consumer},{group_id}") - #counsmer sub: - # while True: - # try: - # self.sub_consumer_once(consumer,group_id) - # except Exception as e: - # tdLog.info(str(e)) - # time.sleep(1) - # break - # only consumer once try: - sub_consumer_once(consumer,group_id) + sub_consumer_once(consumer, group_id) except Exception as e: tdLog.info(str(e)) - - #consumer.close() -if __name__ == '__main__': - test_timeout_sub() \ No newline at end of file +if __name__ == "__main__": + test_timeout_sub()