From e4547fd5a7f225c03c029e3a0be0f6f8b8153e23 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Wed, 23 Oct 2024 16:56:11 +0800 Subject: [PATCH 1/6] tetst:add testecase for clear consumer with session and poll timout --- tests/army/frame/common.py | 8 +- tests/army/tmq/drop_lost_comsumers.py | 267 ++++++++++++++++++++++++++ tests/army/tmq/per_consumer.py | 144 ++++++++++++++ tests/parallel_test/cases.task | 2 +- 4 files changed, 417 insertions(+), 4 deletions(-) create mode 100644 tests/army/tmq/drop_lost_comsumers.py create mode 100644 tests/army/tmq/per_consumer.py diff --git a/tests/army/frame/common.py b/tests/army/frame/common.py index b816095817..bad86c828f 100644 --- a/tests/army/frame/common.py +++ b/tests/army/frame/common.py @@ -803,11 +803,13 @@ class TDCom: else: tdLog.exit(f"getOneRow out of range: row_index={location} row_count={self.query_row}") - def killProcessor(self, processorName): + def kill_signal_process(self, signal=15, processor_name: str = "taosd"): if (platform.system().lower() == 'windows'): - os.system("TASKKILL /F /IM %s.exe"%processorName) + os.system(f"TASKKILL /F /IM {processor_name}.exe") else: - os.system("unset LD_PRELOAD; pkill %s " % processorName) + tdLog.debug(f"unset LD_PRELOAD; sudo pkill -f -{signal} '{processor_name}' ") + os.system(f"unset LD_PRELOAD; sudo pkill -f -{signal} '{processor_name}' ") + def gen_tag_col_str(self, gen_type, data_type, count): """ diff --git a/tests/army/tmq/drop_lost_comsumers.py b/tests/army/tmq/drop_lost_comsumers.py new file mode 100644 index 0000000000..b88aae8c03 --- /dev/null +++ b/tests/army/tmq/drop_lost_comsumers.py @@ -0,0 +1,267 @@ + +import taos +import sys +import time +import socket +import os +import threading +import multiprocessing +from multiprocessing import Process, Queue + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * +from taos.tmq import * +from frame import etool +from datetime import datetime +from taos.tmq import Consumer +from frame.common import * + +class TaosConsumer: + def __init__(self): + pass + + def sub_consumer(self ,consumer ,group_id ,topic_name ): + group_id = int(group_id) + if group_id < 100 : + try: + consumer.subscribe([topic_name]) + except TmqError: + tdLog.exit(f"subscribe error") + nrows = 0 + while True: + start = datetime.now() + print(f"time:{start},consumer:{group_id}, start to consume") + message = consumer.poll(timeout=10.0) + + if message: + id = message.offset() + topic = message.topic() + database = message.database() + + for block in message: + addrows = block.nrows() + nrows += block.nrows() + ncols = block.ncols() + values = block.fetchall + end = datetime.now() + elapsed_time = end -start + print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + consumer.commit() + print(f"consumer:{group_id},consumer_nrows:{nrows}") + # consumer.unsubscribe() + # consumer.close() + # break + # if nrows >= 1000000: + # break + + def set_conf(self,td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=180000,experimental_snapshot_enable="false"): + conf = { + # auth options + # consume options + "td.connect.ip": f"{td_connect_ip}", + "group.id": f"{group_id}", + "client.id": f"{client_id}", + "enable.auto.commit": f"{enable_auto_commit}", + "auto.commit.interval.ms": f"{auto_commit_interval_ms}", + "auto.offset.reset": f"{auto_offset_reset}", + "msg.with.table.name": f"{msg_with_table_name}", + "session.timeout.ms": f"{session_timeout_ms}", + "max.poll.interval.ms": f"{max_poll_interval_ms}", + "experimental.snapshot.enable" :f"{experimental_snapshot_enable}", + } + return conf + + def sub_consumer_once(self,consumer, group_id, topic_name, counter, stop_event): + group_id = int(group_id) + if group_id < 100 : + consumer.subscribe([topic_name]) + nrows = 0 + consumer_nrows = 0 + + while not stop_event.is_set(): + start = datetime.now() + tdLog.info(f"time:{start},consumer:{group_id}, start to consume") + #start = datetime.now() + #print(f"time:{start},consumer:{group_id}, start to consume") + tdLog.info(f"consumer_nrows:{consumer_nrows}") + message = consumer.poll(timeout=10.0) + + if message: + id = message.offset() + topic = message.topic() + database = message.database() + for block in message: + addrows = block.nrows() + nrows += block.nrows() + counter.rows(block.nrows()) + ncols = block.ncols() + values = block.fetchall + end = datetime.now() + elapsed_time = end -start + # tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + consumer.commit() + # tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") + consumer_nrows = nrows + # consumer.unsubscribe() + # consumer.close() + # break + + print("Consumer subscription thread is stopping.") + def taosc_consumer(self, conf, topic_name, counter,stop_event): + try: + print(conf) + from taos.tmq import Consumer + print("3333") + consumer = Consumer(conf) + print("456") + group_id = int(conf["group.id"]) + tdLog.info(f"{consumer},{group_id}") + except Exception as e: + tdLog.exit(f"{e}") + #counsmer sub: + # while True: + # try: + # self.sub_consumer_once(consumer,group_id) + # except Exception as e: + # print(str(e)) + # time.sleep(1) + # break + # only consumer once + try: + self.sub_consumer_once(consumer, group_id, topic_name, counter, stop_event) + + except Exception as e: + tdLog.exit(f"{e}") + + #consumer.close() + + +class ThreadSafeCounter: + def __init__(self): + self.counter = 0 + self.lock = threading.Lock() + + def rows(self, rows): + with self.lock: + self.counter += rows + + def get(self): + with self.lock: + return self.counter + + +class TDTestCase: + # updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + self.consumer_instance = TaosConsumer() + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + def caseDescription(self): + ''' + drop_lost_consmuers: + 1. verifying that the boundary and valid values of session_timeout_ms are in effect + 2. verifying that the boundary and valid values of max_poll_interval_ms are in effect + 3. verifying that consumer will be closed when the session_timeout_ms and max_poll_interval_ms is expired + ''' + return + + def check_consumer(self,count,rows): + time.sleep(count) + print(count) + try: + for ct in range(5): + tdSql.query(f'show consumers') + anser_rows=tdSql.getRows() + if tdSql.checkRows(rows): + break + else: + time.sleep(1) + tdLog.info(f"wait for {count} seconds to check that consumers number is {rows}") + if anser_rows != rows: + tdLog.exit(f"consumer number is not {rows}") + except Exception as e: + tdLog.exit(f"{e},check consumer error") + + def drop_session_timeout_consmuers(self, consumer_groups_num, session_timeout_ms, max_poll_interval_ms, topic_name, timeout): + tdSql.execute(f'drop topic if exists {topic_name};') + tdSql.execute(f'use db_sub') + tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;') + + # start consumer and config some parameters + os.system(f"nohup python3 ./tmq/per_consumer.py -c {consumer_groups_num} -s {session_timeout_ms} -p {max_poll_interval_ms} > consumer.log &") + # wait 4s for consuming data + time.sleep(4) + # kill consumer to simulate session_timeout_ms + tdLog.info("kill per_consumer.py") + tdCom.kill_signal_process(signal=9,processor_name="python3\s*./tmq/per_consumer.py") + self.check_consumer(timeout,0) + tdSql.execute(f'drop topic if exists {topic_name};') + os.system("rm -rf consumer.log") + + + def drop_max_poll_timeout_consmuers(self, consumer_groups_num, consumer_rows, topic_name, timeout): + tdSql.execute(f'drop topic if exists {topic_name};') + tdSql.execute(f'use db_sub') + tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;') + + threads = [] + counter = ThreadSafeCounter() + stop_event = threading.Event() + for id in range(consumer_groups_num): + conf = self.consumer_instance.set_conf(group_id=id, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms) + threads.append(threading.Thread(target=self.consumer_instance.taosc_consumer, args=(conf, topic_name, counter, stop_event))) + for tr in threads: + tr.start() + + consumer_all_rows = consumer_rows * consumer_groups_num + while True: + if counter.get() < consumer_all_rows: + time.sleep(5) + print(f"consumer_all_rows:{consumer_all_rows},counter.get():{counter.get()}") + elif counter.get() >= consumer_all_rows: + self.check_consumer(timeout+20, 0) + stop_event.set() + tr.join() + break + time.sleep(2) + tdSql.execute(f'drop topic if exists {topic_name};') + + def case_session_12s(self): + #test session_timeout_ms=12s + session_timeout_ms=12000 + max_poll_interval_ms=180000 + topic_name = "select_d1" + self.drop_session_timeout_consmuers(consumer_groups_num=1, session_timeout_ms=session_timeout_ms, max_poll_interval_ms=max_poll_interval_ms, topic_name=topic_name , timeout=int(session_timeout_ms/1000)) + + + def case_max_poll_12s(self,consumer_rows): + #test max_poll_interval_ms=12s + self.session_timeout_ms=180000 + self.max_poll_interval_ms=12000 + topic_name = "select_d1" + self.drop_max_poll_timeout_consmuers(consumer_groups_num=1, topic_name=topic_name, consumer_rows=consumer_rows, timeout=int(self.max_poll_interval_ms/1000)) + + + def run(self): + table_number = 1000 + rows_per_table = 1000 + vgroups = 4 + etool.benchMark(command=f"-d db_sub -t {table_number} -n {rows_per_table} -v {vgroups} -y") + consumer_rows = table_number * rows_per_table # 消费的目标行数 + # self.case_session_12s() + self.case_max_poll_12s(consumer_rows) + remaining_threads = threading.Lock() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/tmq/per_consumer.py b/tests/army/tmq/per_consumer.py new file mode 100644 index 0000000000..67c82d5d3e --- /dev/null +++ b/tests/army/tmq/per_consumer.py @@ -0,0 +1,144 @@ +import os +import taos +import time +from datetime import datetime +import subprocess +from multiprocessing import Process +import threading +from taos.tmq import Consumer +import click + +try: + conn = taos.connect() +except Exception as e: + print(str(e)) + +@click.command() +@click.option('-c', '--consumer-groups-num', "consumer_group_num", default=1, help='Number of consumer group.') +@click.option('-s', '--session-timeout-ms', "session_timeout_ms", default=60000, help='session timeout:ms') +@click.option('-p', '--max-poll-interval-ms',"max_poll_interval_ms", default=180000, help='max poll interval timeout:ms') + +def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms): + threads = [] + print(consumer_group_num,session_timeout_ms,max_poll_interval_ms) + for id in range(consumer_group_num): + conf = set_conf(group_id=id,session_timeout_ms=session_timeout_ms,max_poll_interval_ms=max_poll_interval_ms) + print(conf) + threads.append(threading.Thread(target=taosc_consumer, args=(conf,))) + for tr in threads: + tr.start() + for tr in threads: + tr.join() + +def sub_consumer(consumer,group_id): + group_id = int(group_id) + if group_id < 100 : + try: + consumer.subscribe(["select_d1"]) + except Exception as e: + print(f"subscribe error") + exit(1) + + nrows = 0 + while True: + start = datetime.now() + print(f"time:{start},consumer:{group_id}, start to consume") + message = consumer.poll(timeout=10.0) + + if message: + id = message.offset() + topic = message.topic() + database = message.database() + + for block in message: + addrows = block.nrows() + nrows += block.nrows() + ncols = block.ncols() + values = block.fetchall + end = datetime.now() + elapsed_time = end -start + print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + consumer.commit() + print(f"consumer:{group_id},consumer_nrows:{nrows}") + # consumer.unsubscribe() + # consumer.close() + # break + # if nrows >= 1000000: + # break +def sub_consumer_once(consumer,group_id): + group_id = int(group_id) + if group_id < 100 : + consumer.subscribe(["select_d1"]) + nrows = 0 + consumer_nrows = 0 + while True: + start = datetime.now() + print(f"time:{start},consumer:{group_id}, start to consume") + #start = datetime.now() + #print(f"time:{start},consumer:{group_id}, start to consume") + print(f"consumer_nrows:{consumer_nrows}") + if consumer_nrows < 1000000: + message = consumer.poll(timeout=10.0) + else: + print(" stop consumer when consumer all rows") + + if message: + id = message.offset() + topic = message.topic() + database = message.database() + + for block in message: + addrows = block.nrows() + nrows += block.nrows() + ncols = block.ncols() + values = block.fetchall + end = datetime.now() + elapsed_time = end -start + # print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + consumer.commit() + # print(f"consumer:{group_id},consumer_nrows:{nrows}") + consumer_nrows = nrows + # consumer.unsubscribe() + # consumer.close() + # break + +def set_conf(td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=20000,experimental_snapshot_enable="false"): + conf = { + # auth options + # consume options + "td.connect.ip": f"{td_connect_ip}", + "group.id": f"{group_id}", + "client.id": f"{client_id}", + "enable.auto.commit": f"{enable_auto_commit}", + "auto.commit.interval.ms": f"{auto_commit_interval_ms}", + "auto.offset.reset": f"{auto_offset_reset}", + "msg.with.table.name": f"{msg_with_table_name}", + "session.timeout.ms": f"{session_timeout_ms}", + "max.poll.interval.ms": f"{max_poll_interval_ms}", + "experimental.snapshot.enable" :f"{experimental_snapshot_enable}", + } + return conf + +def taosc_consumer(conf): + consumer = Consumer(conf) + group_id = int(conf["group.id"]) + print(f"{consumer},{group_id}") + #counsmer sub: + # while True: + # try: + # self.sub_consumer_once(consumer,group_id) + # except Exception as e: + # print(str(e)) + # time.sleep(1) + # break + # only consumer once + try: + sub_consumer_once(consumer,group_id) + except Exception as e: + print(str(e)) + + #consumer.close() + + +if __name__ == '__main__': + test_timeout_sub() \ No newline at end of file diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index cfe88138ef..ef3e9c9c56 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -47,7 +47,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f query/window/base.py ,,y,army,./pytest.sh python3 ./test.py -f query/sys/tb_perf_queries_exist_test.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f query/test_having.py - +,,n,army,python3 ./test.py -f tmq/drop_lost_comsumers.py # # system test # From 5261c87620ff5f48d2ee2605211f4f07c47df4fd Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 29 Oct 2024 14:32:40 +0800 Subject: [PATCH 2/6] tetst:add testecase for clear consumer with session and poll timout --- tests/army/tmq/drop_lost_comsumers.py | 162 ++++++++++++++------------ tests/army/tmq/per_consumer.py | 33 +++--- 2 files changed, 106 insertions(+), 89 deletions(-) diff --git a/tests/army/tmq/drop_lost_comsumers.py b/tests/army/tmq/drop_lost_comsumers.py index b88aae8c03..d2ec796516 100644 --- a/tests/army/tmq/drop_lost_comsumers.py +++ b/tests/army/tmq/drop_lost_comsumers.py @@ -20,9 +20,17 @@ from taos.tmq import Consumer from frame.common import * class TaosConsumer: + #TODo: add to tq.py and remove from here def __init__(self): - pass - + self.sub_once = True + self.once_consumer_rows = 0 + self.sub_log = False + self.safe_counter = ThreadSafeCounter() + + def log_info(self, message): + if self.sub_log: + tdLog.info(message) + def sub_consumer(self ,consumer ,group_id ,topic_name ): group_id = int(group_id) if group_id < 100 : @@ -33,9 +41,9 @@ class TaosConsumer: nrows = 0 while True: start = datetime.now() - print(f"time:{start},consumer:{group_id}, start to consume") + tdLog.info(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=10.0) - + if message: id = message.offset() topic = message.topic() @@ -48,14 +56,11 @@ class TaosConsumer: values = block.fetchall end = datetime.now() elapsed_time = end -start - print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") consumer.commit() - print(f"consumer:{group_id},consumer_nrows:{nrows}") + tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") # consumer.unsubscribe() # consumer.close() - # break - # if nrows >= 1000000: - # break def set_conf(self,td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=180000,experimental_snapshot_enable="false"): conf = { @@ -74,7 +79,7 @@ class TaosConsumer: } return conf - def sub_consumer_once(self,consumer, group_id, topic_name, counter, stop_event): + def sub_consumer_once(self, consumer, group_id, topic_name, stop_event): group_id = int(group_id) if group_id < 100 : consumer.subscribe([topic_name]) @@ -83,11 +88,12 @@ class TaosConsumer: while not stop_event.is_set(): start = datetime.now() - tdLog.info(f"time:{start},consumer:{group_id}, start to consume") - #start = datetime.now() - #print(f"time:{start},consumer:{group_id}, start to consume") - tdLog.info(f"consumer_nrows:{consumer_nrows}") - message = consumer.poll(timeout=10.0) + self.log_info(f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}") + if consumer_nrows < self.once_consumer_rows: + message = consumer.poll(timeout=1.0) + elif consumer_nrows >= self.once_consumer_rows: + pass + # tdLog.info("stop consumer when consumer all rows") if message: id = message.offset() @@ -96,43 +102,35 @@ class TaosConsumer: for block in message: addrows = block.nrows() nrows += block.nrows() - counter.rows(block.nrows()) + self.safe_counter.rows(block.nrows()) ncols = block.ncols() values = block.fetchall end = datetime.now() elapsed_time = end -start - # tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") - consumer.commit() - # tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") - consumer_nrows = nrows - # consumer.unsubscribe() - # consumer.close() - # break - print("Consumer subscription thread is stopping.") - def taosc_consumer(self, conf, topic_name, counter,stop_event): + self.log_info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + self.log_info(f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}") + # consumer.commit() + consumer_nrows = nrows + # consumer.unsubscribe() + # consumer.close() + # break + tdLog.info("Consumer subscription thread is stopping.") + + def taosc_consumer(self, conf: list, topic_name: str, stop_event): try: - print(conf) + tdLog.info(conf) from taos.tmq import Consumer - print("3333") + tdLog.info("start to config consumer") consumer = Consumer(conf) - print("456") + tdLog.info("start to subscribe") group_id = int(conf["group.id"]) tdLog.info(f"{consumer},{group_id}") - except Exception as e: - tdLog.exit(f"{e}") - #counsmer sub: - # while True: - # try: - # self.sub_consumer_once(consumer,group_id) - # except Exception as e: - # print(str(e)) - # time.sleep(1) - # break - # only consumer once - try: - self.sub_consumer_once(consumer, group_id, topic_name, counter, stop_event) - + if self.sub_once: + self.sub_consumer_once(consumer, group_id, topic_name, stop_event) + else: + self.sub_consumer(consumer, group_id, topic_name) + # only consumer once except Exception as e: tdLog.exit(f"{e}") @@ -162,6 +160,17 @@ class TDTestCase: tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) self.consumer_instance = TaosConsumer() + #db parameter + self.table_number = 1000 + self.rows_per_table = 1000 + #consumer parameter + self.consumer_groups_num = 2 + self.session_timeout_ms= 180000 + self.max_poll_interval_ms= 180000 + #case consumer parameter + self.consumer_rows_per_thread = self.table_number * self.rows_per_table + self.consumer_all_rows = self.consumer_rows_per_thread * self.consumer_groups_num + #tdSql.init(conn.cursor(), logSql) # output sql.txt file def caseDescription(self): ''' @@ -172,20 +181,22 @@ class TDTestCase: ''' return - def check_consumer(self,count,rows): + def check_consumer(self, count, rows, stop_event=None): time.sleep(count) - print(count) try: + tdLog.info(f"wait timeout count:{count} and check consumer status whether is closed") for ct in range(5): tdSql.query(f'show consumers') anser_rows=tdSql.getRows() - if tdSql.checkRows(rows): + if anser_rows == rows: break else: time.sleep(1) - tdLog.info(f"wait for {count} seconds to check that consumers number is {rows}") + tdLog.info(f"wait for {count} seconds to check that consumers number is {anser_rows}") if anser_rows != rows: - tdLog.exit(f"consumer number is not {rows}") + if stop_event: + stop_event.set() + tdLog.exit(f"consumer number is {anser_rows } but not expected {rows}") except Exception as e: tdLog.exit(f"{e},check consumer error") @@ -206,58 +217,63 @@ class TDTestCase: os.system("rm -rf consumer.log") - def drop_max_poll_timeout_consmuers(self, consumer_groups_num, consumer_rows, topic_name, timeout): + def drop_max_poll_timeout_consmuers(self, topic_name, timeout): tdSql.execute(f'drop topic if exists {topic_name};') tdSql.execute(f'use db_sub') tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;') threads = [] - counter = ThreadSafeCounter() + self.safe_counter = ThreadSafeCounter() + self.consumer_instance.safe_counter = self.safe_counter stop_event = threading.Event() - for id in range(consumer_groups_num): + self.consumer_instance.once_consumer_rows = self.consumer_rows_per_thread + tdLog.info(f"consumer_rows:{self.consumer_instance.once_consumer_rows}") + self.consumer_instance.sub_once = True + for id in range(self.consumer_groups_num): conf = self.consumer_instance.set_conf(group_id=id, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms) - threads.append(threading.Thread(target=self.consumer_instance.taosc_consumer, args=(conf, topic_name, counter, stop_event))) + threads.append(threading.Thread(target=self.consumer_instance.taosc_consumer, args=(conf, topic_name, stop_event))) for tr in threads: tr.start() - consumer_all_rows = consumer_rows * consumer_groups_num while True: - if counter.get() < consumer_all_rows: + if self.safe_counter.get() < self.consumer_all_rows: time.sleep(5) - print(f"consumer_all_rows:{consumer_all_rows},counter.get():{counter.get()}") - elif counter.get() >= consumer_all_rows: - self.check_consumer(timeout+20, 0) + tdLog.info(f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}") + elif self.safe_counter.get() >= self.consumer_all_rows: + # adding 5s is for heartbeat check + self.check_consumer(timeout+5, 0, stop_event) stop_event.set() tr.join() break - time.sleep(2) + time.sleep(1) tdSql.execute(f'drop topic if exists {topic_name};') - def case_session_12s(self): + def case_session_timeout(self): + tdLog.info("start to test session_timeout_ms=12s") #test session_timeout_ms=12s - session_timeout_ms=12000 - max_poll_interval_ms=180000 + self.session_timeout_ms=12000 + self.max_poll_interval_ms=180000 topic_name = "select_d1" - self.drop_session_timeout_consmuers(consumer_groups_num=1, session_timeout_ms=session_timeout_ms, max_poll_interval_ms=max_poll_interval_ms, topic_name=topic_name , timeout=int(session_timeout_ms/1000)) - + self.drop_session_timeout_consmuers(consumer_groups_num=1, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms, topic_name=topic_name , timeout=int(self.session_timeout_ms/1000)) + tdLog.info("stop to test session_timeout_ms=12s and done ") - def case_max_poll_12s(self,consumer_rows): - #test max_poll_interval_ms=12s + def case_max_poll_timeout(self): + tdLog.info("start to test max_poll_interval_ms=20s") + #test max_poll_interval_ms=20s self.session_timeout_ms=180000 - self.max_poll_interval_ms=12000 + self.max_poll_interval_ms=20000 topic_name = "select_d1" - self.drop_max_poll_timeout_consmuers(consumer_groups_num=1, topic_name=topic_name, consumer_rows=consumer_rows, timeout=int(self.max_poll_interval_ms/1000)) + self.drop_max_poll_timeout_consmuers(topic_name=topic_name, timeout=int(self.max_poll_interval_ms/1000)) + tdLog.info("stop to test max_poll_interval_ms=20s and done ") def run(self): - table_number = 1000 - rows_per_table = 1000 vgroups = 4 - etool.benchMark(command=f"-d db_sub -t {table_number} -n {rows_per_table} -v {vgroups} -y") - consumer_rows = table_number * rows_per_table # 消费的目标行数 - # self.case_session_12s() - self.case_max_poll_12s(consumer_rows) - remaining_threads = threading.Lock() + etool.benchMark(command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y") + # test case start here + + self.case_session_timeout() + self.case_max_poll_timeout() def stop(self): tdSql.close() diff --git a/tests/army/tmq/per_consumer.py b/tests/army/tmq/per_consumer.py index 67c82d5d3e..810d7d44f8 100644 --- a/tests/army/tmq/per_consumer.py +++ b/tests/army/tmq/per_consumer.py @@ -2,6 +2,7 @@ import os import taos import time from datetime import datetime +from frame.log import * import subprocess from multiprocessing import Process import threading @@ -11,7 +12,7 @@ import click try: conn = taos.connect() except Exception as e: - print(str(e)) + tdLog.info(str(e)) @click.command() @click.option('-c', '--consumer-groups-num', "consumer_group_num", default=1, help='Number of consumer group.') @@ -20,10 +21,10 @@ except Exception as e: def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms): threads = [] - print(consumer_group_num,session_timeout_ms,max_poll_interval_ms) + tdLog.info(consumer_group_num,session_timeout_ms,max_poll_interval_ms) for id in range(consumer_group_num): conf = set_conf(group_id=id,session_timeout_ms=session_timeout_ms,max_poll_interval_ms=max_poll_interval_ms) - print(conf) + tdLog.info(conf) threads.append(threading.Thread(target=taosc_consumer, args=(conf,))) for tr in threads: tr.start() @@ -36,13 +37,13 @@ def sub_consumer(consumer,group_id): try: consumer.subscribe(["select_d1"]) except Exception as e: - print(f"subscribe error") + tdLog.info(f"subscribe error") exit(1) nrows = 0 while True: start = datetime.now() - print(f"time:{start},consumer:{group_id}, start to consume") + tdLog.info(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=10.0) if message: @@ -57,9 +58,9 @@ def sub_consumer(consumer,group_id): values = block.fetchall end = datetime.now() elapsed_time = end -start - print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") consumer.commit() - print(f"consumer:{group_id},consumer_nrows:{nrows}") + tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") # consumer.unsubscribe() # consumer.close() # break @@ -73,14 +74,14 @@ def sub_consumer_once(consumer,group_id): consumer_nrows = 0 while True: start = datetime.now() - print(f"time:{start},consumer:{group_id}, start to consume") + tdLog.info(f"time:{start},consumer:{group_id}, start to consume") #start = datetime.now() - #print(f"time:{start},consumer:{group_id}, start to consume") - print(f"consumer_nrows:{consumer_nrows}") + #tdLog.info(f"time:{start},consumer:{group_id}, start to consume") + tdLog.info(f"consumer_nrows:{consumer_nrows}") if consumer_nrows < 1000000: message = consumer.poll(timeout=10.0) else: - print(" stop consumer when consumer all rows") + tdLog.info(" stop consumer when consumer all rows") if message: id = message.offset() @@ -94,9 +95,9 @@ def sub_consumer_once(consumer,group_id): values = block.fetchall end = datetime.now() elapsed_time = end -start - # print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + # tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") consumer.commit() - # print(f"consumer:{group_id},consumer_nrows:{nrows}") + # tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") consumer_nrows = nrows # consumer.unsubscribe() # consumer.close() @@ -122,20 +123,20 @@ def set_conf(td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",e def taosc_consumer(conf): consumer = Consumer(conf) group_id = int(conf["group.id"]) - print(f"{consumer},{group_id}") + tdLog.info(f"{consumer},{group_id}") #counsmer sub: # while True: # try: # self.sub_consumer_once(consumer,group_id) # except Exception as e: - # print(str(e)) + # tdLog.info(str(e)) # time.sleep(1) # break # only consumer once try: sub_consumer_once(consumer,group_id) except Exception as e: - print(str(e)) + tdLog.info(str(e)) #consumer.close() From d99c9159082d2047cf10d008420c50749735aff5 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 29 Oct 2024 14:38:33 +0800 Subject: [PATCH 3/6] test:use the black tool to format the code style --- tests/army/tmq/drop_lost_comsumers.py | 211 ++++++++++++++++---------- tests/army/tmq/per_consumer.py | 123 +++++++++------ 2 files changed, 211 insertions(+), 123 deletions(-) diff --git a/tests/army/tmq/drop_lost_comsumers.py b/tests/army/tmq/drop_lost_comsumers.py index d2ec796516..f455931089 100644 --- a/tests/army/tmq/drop_lost_comsumers.py +++ b/tests/army/tmq/drop_lost_comsumers.py @@ -1,4 +1,3 @@ - import taos import sys import time @@ -19,8 +18,9 @@ from datetime import datetime from taos.tmq import Consumer from frame.common import * + class TaosConsumer: - #TODo: add to tq.py and remove from here + # TODo: add to tq.py and remove from here def __init__(self): self.sub_once = True self.once_consumer_rows = 0 @@ -31,43 +31,57 @@ class TaosConsumer: if self.sub_log: tdLog.info(message) - def sub_consumer(self ,consumer ,group_id ,topic_name ): + def sub_consumer(self, consumer, group_id, topic_name): group_id = int(group_id) - if group_id < 100 : + if group_id < 100: try: consumer.subscribe([topic_name]) except TmqError: - tdLog.exit(f"subscribe error") + tdLog.exit(f"subscribe error") nrows = 0 while True: start = datetime.now() tdLog.info(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=10.0) - + if message: id = message.offset() topic = message.topic() database = message.database() - + for block in message: addrows = block.nrows() nrows += block.nrows() ncols = block.ncols() values = block.fetchall end = datetime.now() - elapsed_time = end -start - tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + elapsed_time = end - start + tdLog.info( + f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}" + ) consumer.commit() tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") - # consumer.unsubscribe() - # consumer.close() + # consumer.unsubscribe() + # consumer.close() - def set_conf(self,td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=180000,experimental_snapshot_enable="false"): + def set_conf( + self, + td_connect_ip="localhost", + group_id=1, + client_id="test_consumer_py", + enable_auto_commit="false", + auto_commit_interval_ms="1000", + auto_offset_reset="earliest", + msg_with_table_name="true", + session_timeout_ms=10000, + max_poll_interval_ms=180000, + experimental_snapshot_enable="false", + ): conf = { # auth options # consume options "td.connect.ip": f"{td_connect_ip}", - "group.id": f"{group_id}", + "group.id": f"{group_id}", "client.id": f"{client_id}", "enable.auto.commit": f"{enable_auto_commit}", "auto.commit.interval.ms": f"{auto_commit_interval_ms}", @@ -75,25 +89,28 @@ class TaosConsumer: "msg.with.table.name": f"{msg_with_table_name}", "session.timeout.ms": f"{session_timeout_ms}", "max.poll.interval.ms": f"{max_poll_interval_ms}", - "experimental.snapshot.enable" :f"{experimental_snapshot_enable}", + "experimental.snapshot.enable": f"{experimental_snapshot_enable}", } return conf - + def sub_consumer_once(self, consumer, group_id, topic_name, stop_event): group_id = int(group_id) - if group_id < 100 : + if group_id < 100: consumer.subscribe([topic_name]) nrows = 0 consumer_nrows = 0 - + while not stop_event.is_set(): start = datetime.now() - self.log_info(f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}") + self.log_info( + f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}" + ) if consumer_nrows < self.once_consumer_rows: - message = consumer.poll(timeout=1.0) + message = consumer.poll(timeout=1.0) elif consumer_nrows >= self.once_consumer_rows: + # when break the loop, the consumer will be closed, so we need to continue to keep consumer alive util the stop_event is set pass - # tdLog.info("stop consumer when consumer all rows") + # tdLog.info("stop consumer when consumer all rows") if message: id = message.offset() @@ -106,37 +123,42 @@ class TaosConsumer: ncols = block.ncols() values = block.fetchall end = datetime.now() - elapsed_time = end -start + elapsed_time = end - start - self.log_info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") - self.log_info(f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}") + self.log_info( + f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}" + ) + self.log_info( + f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}" + ) # consumer.commit() consumer_nrows = nrows # consumer.unsubscribe() # consumer.close() - # break + # break tdLog.info("Consumer subscription thread is stopping.") def taosc_consumer(self, conf: list, topic_name: str, stop_event): try: tdLog.info(conf) from taos.tmq import Consumer + tdLog.info("start to config consumer") consumer = Consumer(conf) tdLog.info("start to subscribe") group_id = int(conf["group.id"]) tdLog.info(f"{consumer},{group_id}") if self.sub_once: - self.sub_consumer_once(consumer, group_id, topic_name, stop_event) + self.sub_consumer_once(consumer, group_id, topic_name, stop_event) else: self.sub_consumer(consumer, group_id, topic_name) # only consumer once except Exception as e: tdLog.exit(f"{e}") - - #consumer.close() - + # consumer.close() + + class ThreadSafeCounter: def __init__(self): self.counter = 0 @@ -154,45 +176,51 @@ class ThreadSafeCounter: class TDTestCase: # updatecfgDict = {'debugFlag': 135, 'asynclog': 0} - def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) self.consumer_instance = TaosConsumer() - #db parameter + # db parameter self.table_number = 1000 self.rows_per_table = 1000 - #consumer parameter + # consumer parameter self.consumer_groups_num = 2 - self.session_timeout_ms= 180000 - self.max_poll_interval_ms= 180000 - #case consumer parameter + self.session_timeout_ms = 180000 + self.max_poll_interval_ms = 180000 + # case consumer parameter self.consumer_rows_per_thread = self.table_number * self.rows_per_table - self.consumer_all_rows = self.consumer_rows_per_thread * self.consumer_groups_num + self.consumer_all_rows = ( + self.consumer_rows_per_thread * self.consumer_groups_num + ) + + # tdSql.init(conn.cursor(), logSql) # output sql.txt file - #tdSql.init(conn.cursor(), logSql) # output sql.txt file def caseDescription(self): - ''' - drop_lost_consmuers: + """ + drop_lost_consmuers: 1. verifying that the boundary and valid values of session_timeout_ms are in effect 2. verifying that the boundary and valid values of max_poll_interval_ms are in effect 3. verifying that consumer will be closed when the session_timeout_ms and max_poll_interval_ms is expired - ''' + """ return - + def check_consumer(self, count, rows, stop_event=None): time.sleep(count) try: - tdLog.info(f"wait timeout count:{count} and check consumer status whether is closed") + tdLog.info( + f"wait timeout count:{count} and check consumer status whether is closed" + ) for ct in range(5): - tdSql.query(f'show consumers') - anser_rows=tdSql.getRows() + tdSql.query(f"show consumers") + anser_rows = tdSql.getRows() if anser_rows == rows: break else: time.sleep(1) - tdLog.info(f"wait for {count} seconds to check that consumers number is {anser_rows}") + tdLog.info( + f"wait for {count} seconds to check that consumers number is {anser_rows}" + ) if anser_rows != rows: if stop_event: stop_event.set() @@ -200,27 +228,37 @@ class TDTestCase: except Exception as e: tdLog.exit(f"{e},check consumer error") - def drop_session_timeout_consmuers(self, consumer_groups_num, session_timeout_ms, max_poll_interval_ms, topic_name, timeout): - tdSql.execute(f'drop topic if exists {topic_name};') - tdSql.execute(f'use db_sub') - tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;') + def drop_session_timeout_consmuers( + self, + consumer_groups_num, + session_timeout_ms, + max_poll_interval_ms, + topic_name, + timeout, + ): + tdSql.execute(f"drop topic if exists {topic_name};") + tdSql.execute(f"use db_sub") + tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;") # start consumer and config some parameters - os.system(f"nohup python3 ./tmq/per_consumer.py -c {consumer_groups_num} -s {session_timeout_ms} -p {max_poll_interval_ms} > consumer.log &") + os.system( + f"nohup python3 ./tmq/per_consumer.py -c {consumer_groups_num} -s {session_timeout_ms} -p {max_poll_interval_ms} > consumer.log &" + ) # wait 4s for consuming data time.sleep(4) # kill consumer to simulate session_timeout_ms tdLog.info("kill per_consumer.py") - tdCom.kill_signal_process(signal=9,processor_name="python3\s*./tmq/per_consumer.py") - self.check_consumer(timeout,0) - tdSql.execute(f'drop topic if exists {topic_name};') + tdCom.kill_signal_process( + signal=9, processor_name="python3\s*./tmq/per_consumer.py" + ) + self.check_consumer(timeout, 0) + tdSql.execute(f"drop topic if exists {topic_name};") os.system("rm -rf consumer.log") - - + def drop_max_poll_timeout_consmuers(self, topic_name, timeout): - tdSql.execute(f'drop topic if exists {topic_name};') - tdSql.execute(f'use db_sub') - tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;') + tdSql.execute(f"drop topic if exists {topic_name};") + tdSql.execute(f"use db_sub") + tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;") threads = [] self.safe_counter = ThreadSafeCounter() @@ -230,48 +268,68 @@ class TDTestCase: tdLog.info(f"consumer_rows:{self.consumer_instance.once_consumer_rows}") self.consumer_instance.sub_once = True for id in range(self.consumer_groups_num): - conf = self.consumer_instance.set_conf(group_id=id, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms) - threads.append(threading.Thread(target=self.consumer_instance.taosc_consumer, args=(conf, topic_name, stop_event))) + conf = self.consumer_instance.set_conf( + group_id=id, + session_timeout_ms=self.session_timeout_ms, + max_poll_interval_ms=self.max_poll_interval_ms, + ) + threads.append( + threading.Thread( + target=self.consumer_instance.taosc_consumer, + args=(conf, topic_name, stop_event), + ) + ) for tr in threads: tr.start() - + while True: if self.safe_counter.get() < self.consumer_all_rows: time.sleep(5) - tdLog.info(f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}") + tdLog.info( + f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}" + ) elif self.safe_counter.get() >= self.consumer_all_rows: - # adding 5s is for heartbeat check - self.check_consumer(timeout+5, 0, stop_event) + # adding 5s is for heartbeat check + self.check_consumer(timeout + 5, 0, stop_event) stop_event.set() tr.join() break time.sleep(1) - tdSql.execute(f'drop topic if exists {topic_name};') - + tdSql.execute(f"drop topic if exists {topic_name};") + def case_session_timeout(self): tdLog.info("start to test session_timeout_ms=12s") - #test session_timeout_ms=12s - self.session_timeout_ms=12000 - self.max_poll_interval_ms=180000 + # test session_timeout_ms=12s + self.session_timeout_ms = 12000 + self.max_poll_interval_ms = 180000 topic_name = "select_d1" - self.drop_session_timeout_consmuers(consumer_groups_num=1, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms, topic_name=topic_name , timeout=int(self.session_timeout_ms/1000)) + self.drop_session_timeout_consmuers( + consumer_groups_num=1, + session_timeout_ms=self.session_timeout_ms, + max_poll_interval_ms=self.max_poll_interval_ms, + topic_name=topic_name, + timeout=int(self.session_timeout_ms / 1000), + ) tdLog.info("stop to test session_timeout_ms=12s and done ") def case_max_poll_timeout(self): tdLog.info("start to test max_poll_interval_ms=20s") - #test max_poll_interval_ms=20s - self.session_timeout_ms=180000 - self.max_poll_interval_ms=20000 + # test max_poll_interval_ms=20s + self.session_timeout_ms = 180000 + self.max_poll_interval_ms = 20000 topic_name = "select_d1" - self.drop_max_poll_timeout_consmuers(topic_name=topic_name, timeout=int(self.max_poll_interval_ms/1000)) + self.drop_max_poll_timeout_consmuers( + topic_name=topic_name, timeout=int(self.max_poll_interval_ms / 1000) + ) tdLog.info("stop to test max_poll_interval_ms=20s and done ") - def run(self): vgroups = 4 - etool.benchMark(command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y") + etool.benchMark( + command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y" + ) # test case start here - + self.case_session_timeout() self.case_max_poll_timeout() @@ -279,5 +337,6 @@ class TDTestCase: tdSql.close() tdLog.success(f"{__file__} successfully executed") + tdCases.addLinux(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/tmq/per_consumer.py b/tests/army/tmq/per_consumer.py index 810d7d44f8..418c88dead 100644 --- a/tests/army/tmq/per_consumer.py +++ b/tests/army/tmq/per_consumer.py @@ -9,21 +9,46 @@ import threading from taos.tmq import Consumer import click +# TDDO +# 1. using tmq common class to replace the function, file drop_lost_consumers.py has the same function + try: conn = taos.connect() except Exception as e: tdLog.info(str(e)) - -@click.command() -@click.option('-c', '--consumer-groups-num', "consumer_group_num", default=1, help='Number of consumer group.') -@click.option('-s', '--session-timeout-ms', "session_timeout_ms", default=60000, help='session timeout:ms') -@click.option('-p', '--max-poll-interval-ms',"max_poll_interval_ms", default=180000, help='max poll interval timeout:ms') -def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms): + +@click.command() +@click.option( + "-c", + "--consumer-groups-num", + "consumer_group_num", + default=1, + help="Number of consumer group.", +) +@click.option( + "-s", + "--session-timeout-ms", + "session_timeout_ms", + default=60000, + help="session timeout:ms", +) +@click.option( + "-p", + "--max-poll-interval-ms", + "max_poll_interval_ms", + default=180000, + help="max poll interval timeout:ms", +) +def test_timeout_sub(consumer_group_num, session_timeout_ms, max_poll_interval_ms): threads = [] - tdLog.info(consumer_group_num,session_timeout_ms,max_poll_interval_ms) + tdLog.info(consumer_group_num, session_timeout_ms, max_poll_interval_ms) for id in range(consumer_group_num): - conf = set_conf(group_id=id,session_timeout_ms=session_timeout_ms,max_poll_interval_ms=max_poll_interval_ms) + conf = set_conf( + group_id=id, + session_timeout_ms=session_timeout_ms, + max_poll_interval_ms=max_poll_interval_ms, + ) tdLog.info(conf) threads.append(threading.Thread(target=taosc_consumer, args=(conf,))) for tr in threads: @@ -31,52 +56,54 @@ def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms) for tr in threads: tr.join() -def sub_consumer(consumer,group_id): + +def sub_consumer(consumer, group_id): group_id = int(group_id) - if group_id < 100 : + if group_id < 100: try: consumer.subscribe(["select_d1"]) except Exception as e: - tdLog.info(f"subscribe error") - exit(1) + tdLog.info(f"subscribe error") + exit(1) nrows = 0 while True: start = datetime.now() tdLog.info(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=10.0) - + if message: id = message.offset() topic = message.topic() database = message.database() - + for block in message: addrows = block.nrows() nrows += block.nrows() ncols = block.ncols() values = block.fetchall end = datetime.now() - elapsed_time = end -start - tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + elapsed_time = end - start + tdLog.info( + f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}" + ) consumer.commit() tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") - # consumer.unsubscribe() - # consumer.close() - # break - # if nrows >= 1000000: - # break -def sub_consumer_once(consumer,group_id): + # consumer.unsubscribe() + # consumer.close() + + +def sub_consumer_once(consumer, group_id): group_id = int(group_id) - if group_id < 100 : + if group_id < 100: consumer.subscribe(["select_d1"]) nrows = 0 consumer_nrows = 0 while True: start = datetime.now() tdLog.info(f"time:{start},consumer:{group_id}, start to consume") - #start = datetime.now() - #tdLog.info(f"time:{start},consumer:{group_id}, start to consume") + # start = datetime.now() + # tdLog.info(f"time:{start},consumer:{group_id}, start to consume") tdLog.info(f"consumer_nrows:{consumer_nrows}") if consumer_nrows < 1000000: message = consumer.poll(timeout=10.0) @@ -87,28 +114,40 @@ def sub_consumer_once(consumer,group_id): id = message.offset() topic = message.topic() database = message.database() - + for block in message: addrows = block.nrows() nrows += block.nrows() ncols = block.ncols() values = block.fetchall end = datetime.now() - elapsed_time = end -start + elapsed_time = end - start # tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") consumer.commit() # tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") consumer_nrows = nrows - # consumer.unsubscribe() - # consumer.close() - # break + # consumer.unsubscribe() + # consumer.close() + # break -def set_conf(td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=20000,experimental_snapshot_enable="false"): + +def set_conf( + td_connect_ip="localhost", + group_id=1, + client_id="test_consumer_py", + enable_auto_commit="false", + auto_commit_interval_ms="1000", + auto_offset_reset="earliest", + msg_with_table_name="true", + session_timeout_ms=10000, + max_poll_interval_ms=20000, + experimental_snapshot_enable="false", +): conf = { # auth options # consume options "td.connect.ip": f"{td_connect_ip}", - "group.id": f"{group_id}", + "group.id": f"{group_id}", "client.id": f"{client_id}", "enable.auto.commit": f"{enable_auto_commit}", "auto.commit.interval.ms": f"{auto_commit_interval_ms}", @@ -116,30 +155,20 @@ def set_conf(td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",e "msg.with.table.name": f"{msg_with_table_name}", "session.timeout.ms": f"{session_timeout_ms}", "max.poll.interval.ms": f"{max_poll_interval_ms}", - "experimental.snapshot.enable" :f"{experimental_snapshot_enable}", + "experimental.snapshot.enable": f"{experimental_snapshot_enable}", } return conf + def taosc_consumer(conf): consumer = Consumer(conf) group_id = int(conf["group.id"]) tdLog.info(f"{consumer},{group_id}") - #counsmer sub: - # while True: - # try: - # self.sub_consumer_once(consumer,group_id) - # except Exception as e: - # tdLog.info(str(e)) - # time.sleep(1) - # break - # only consumer once try: - sub_consumer_once(consumer,group_id) + sub_consumer_once(consumer, group_id) except Exception as e: tdLog.info(str(e)) - - #consumer.close() -if __name__ == '__main__': - test_timeout_sub() \ No newline at end of file +if __name__ == "__main__": + test_timeout_sub() From 3ebdaf1d82f8ed8c4e3f74903fd4fcb8a5ca0aff Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 29 Oct 2024 16:21:44 +0800 Subject: [PATCH 4/6] test: use pylint to formate code --- tests/army/tmq/drop_lost_comsumers.py | 166 ++++++++++++-------------- 1 file changed, 78 insertions(+), 88 deletions(-) diff --git a/tests/army/tmq/drop_lost_comsumers.py b/tests/army/tmq/drop_lost_comsumers.py index f455931089..cef67ddfcc 100644 --- a/tests/army/tmq/drop_lost_comsumers.py +++ b/tests/army/tmq/drop_lost_comsumers.py @@ -1,26 +1,20 @@ -import taos -import sys import time -import socket import os import threading -import multiprocessing -from multiprocessing import Process, Queue - -from frame.log import * -from frame.cases import * -from frame.sql import * -from frame.caseBase import * -from frame import * -from taos.tmq import * -from frame import etool -from datetime import datetime +import datetime from taos.tmq import Consumer -from frame.common import * +from taos.error import TmqError + +from frame.log import tdLog +from frame.cases import tdCases +from frame.sql import tdSql +from frame.caseBase import * +from frame import etool +from frame.common import tdCom class TaosConsumer: - # TODo: add to tq.py and remove from here + # TODO: Move this class to tq.py and remove it from here def __init__(self): self.sub_once = True self.once_consumer_rows = 0 @@ -40,24 +34,26 @@ class TaosConsumer: tdLog.exit(f"subscribe error") nrows = 0 while True: - start = datetime.now() + start = datetime.datetime.now() tdLog.info(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=10.0) if message: - id = message.offset() - topic = message.topic() - database = message.database() + message_offset = message.offset() + # topic = message.topic() + # database = message.database() for block in message: addrows = block.nrows() nrows += block.nrows() ncols = block.ncols() - values = block.fetchall - end = datetime.now() + # values = block.fetchall + end = datetime.datetime.now() elapsed_time = end - start tdLog.info( - f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}" + f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time}," + f"consumer_nrows:{nrows},consumer_addrows:{addrows}," + f"consumer_ncols:{ncols},offset:{id}" ) consumer.commit() tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") @@ -101,10 +97,11 @@ class TaosConsumer: consumer_nrows = 0 while not stop_event.is_set(): - start = datetime.now() + start = datetime.datetime.now() self.log_info( f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}" ) + message = None if consumer_nrows < self.once_consumer_rows: message = consumer.poll(timeout=1.0) elif consumer_nrows >= self.once_consumer_rows: @@ -113,36 +110,32 @@ class TaosConsumer: # tdLog.info("stop consumer when consumer all rows") if message: - id = message.offset() - topic = message.topic() - database = message.database() + message_offset = message.offset() + # topic = message.topic() + # database = message.database() for block in message: addrows = block.nrows() nrows += block.nrows() self.safe_counter.rows(block.nrows()) ncols = block.ncols() - values = block.fetchall - end = datetime.now() + # values = block.fetchall + end = datetime.datetime.now() elapsed_time = end - start self.log_info( - f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}" + f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{message_offset}" ) self.log_info( f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}" ) # consumer.commit() consumer_nrows = nrows - # consumer.unsubscribe() - # consumer.close() - # break + tdLog.info("Consumer subscription thread is stopping.") - def taosc_consumer(self, conf: list, topic_name: str, stop_event): + def taosc_consumer(self, conf: list, topic_name: str, stop_event: threading.Event): try: tdLog.info(conf) - from taos.tmq import Consumer - tdLog.info("start to config consumer") consumer = Consumer(conf) tdLog.info("start to subscribe") @@ -175,12 +168,7 @@ class ThreadSafeCounter: class TDTestCase: # updatecfgDict = {'debugFlag': 135, 'asynclog': 0} - - def init(self, conn, logSql, replicaVar=1): - self.replicaVar = int(replicaVar) - tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) - self.consumer_instance = TaosConsumer() + def __init__(self): # db parameter self.table_number = 1000 self.rows_per_table = 1000 @@ -193,7 +181,12 @@ class TDTestCase: self.consumer_all_rows = ( self.consumer_rows_per_thread * self.consumer_groups_num ) - + self.topic_name = "select_d1" + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), logSql) + self.consumer_instance = TaosConsumer() # tdSql.init(conn.cursor(), logSql) # output sql.txt file def caseDescription(self): @@ -211,16 +204,15 @@ class TDTestCase: tdLog.info( f"wait timeout count:{count} and check consumer status whether is closed" ) - for ct in range(5): - tdSql.query(f"show consumers") + for _ in range(5): + tdSql.query("show consumers") anser_rows = tdSql.getRows() if anser_rows == rows: break - else: - time.sleep(1) - tdLog.info( - f"wait for {count} seconds to check that consumers number is {anser_rows}" - ) + time.sleep(1) + tdLog.info( + f"wait for {count} seconds to check that consumers number is {anser_rows}" + ) if anser_rows != rows: if stop_event: stop_event.set() @@ -228,37 +220,30 @@ class TDTestCase: except Exception as e: tdLog.exit(f"{e},check consumer error") - def drop_session_timeout_consmuers( - self, - consumer_groups_num, - session_timeout_ms, - max_poll_interval_ms, - topic_name, - timeout, - ): - tdSql.execute(f"drop topic if exists {topic_name};") - tdSql.execute(f"use db_sub") - tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;") + def drop_session_timeout_consmuers(self): + tdSql.execute(f"drop topic if exists {self.topic_name};") + tdSql.execute("use db_sub") + tdSql.execute(f"create topic {self.topic_name} as select * from db_sub.meters;") # start consumer and config some parameters os.system( - f"nohup python3 ./tmq/per_consumer.py -c {consumer_groups_num} -s {session_timeout_ms} -p {max_poll_interval_ms} > consumer.log &" + f"nohup python3 ./tmq/per_consumer.py -c {self.consumer_groups_num} -s {self.session_timeout_ms} -p {self.max_poll_interval_ms} > consumer.log &" ) # wait 4s for consuming data time.sleep(4) # kill consumer to simulate session_timeout_ms tdLog.info("kill per_consumer.py") tdCom.kill_signal_process( - signal=9, processor_name="python3\s*./tmq/per_consumer.py" + signal=9, processor_name=r"python3\s*./tmq/per_consumer.py" ) - self.check_consumer(timeout, 0) - tdSql.execute(f"drop topic if exists {topic_name};") + self.check_consumer(int(self.session_timeout_ms / 1000), 0) + tdSql.execute(f"drop topic if exists {self.topic_name};") os.system("rm -rf consumer.log") - def drop_max_poll_timeout_consmuers(self, topic_name, timeout): - tdSql.execute(f"drop topic if exists {topic_name};") - tdSql.execute(f"use db_sub") - tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;") + def drop_max_poll_timeout_consmuers(self): + tdSql.execute(f"drop topic if exists {self.topic_name};") + tdSql.execute("use db_sub") + tdSql.execute(f"create topic {self.topic_name} as select * from db_sub.meters;") threads = [] self.safe_counter = ThreadSafeCounter() @@ -267,16 +252,16 @@ class TDTestCase: self.consumer_instance.once_consumer_rows = self.consumer_rows_per_thread tdLog.info(f"consumer_rows:{self.consumer_instance.once_consumer_rows}") self.consumer_instance.sub_once = True - for id in range(self.consumer_groups_num): + for group_id in range(self.consumer_groups_num): conf = self.consumer_instance.set_conf( - group_id=id, + group_id=group_id, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms, ) threads.append( threading.Thread( target=self.consumer_instance.taosc_consumer, - args=(conf, topic_name, stop_event), + args=(conf, self.topic_name, stop_event), ) ) for tr in threads: @@ -290,50 +275,55 @@ class TDTestCase: ) elif self.safe_counter.get() >= self.consumer_all_rows: # adding 5s is for heartbeat check - self.check_consumer(timeout + 5, 0, stop_event) + self.check_consumer(int(self.max_poll_interval_ms / 1000 ) + 5, 0, stop_event) stop_event.set() - tr.join() break + for tr in threads: + tr.join() time.sleep(1) - tdSql.execute(f"drop topic if exists {topic_name};") + tdSql.execute(f"drop topic if exists {self.topic_name};") def case_session_timeout(self): + """ + TEST CASE: verifying that the boundary and valid values of session_timeout_ms are in effect + """ + tdLog.info("start to test session_timeout_ms=12s") # test session_timeout_ms=12s self.session_timeout_ms = 12000 self.max_poll_interval_ms = 180000 - topic_name = "select_d1" - self.drop_session_timeout_consmuers( - consumer_groups_num=1, - session_timeout_ms=self.session_timeout_ms, - max_poll_interval_ms=self.max_poll_interval_ms, - topic_name=topic_name, - timeout=int(self.session_timeout_ms / 1000), - ) + # self.set_session_timeout = int(self.session_timeout_ms / 1000) + self.drop_session_timeout_consmuers() tdLog.info("stop to test session_timeout_ms=12s and done ") def case_max_poll_timeout(self): + """ + TEST CASE: verifying that the boundary and valid values of max_poll_interval_ms are in effect + """ tdLog.info("start to test max_poll_interval_ms=20s") # test max_poll_interval_ms=20s self.session_timeout_ms = 180000 self.max_poll_interval_ms = 20000 - topic_name = "select_d1" - self.drop_max_poll_timeout_consmuers( - topic_name=topic_name, timeout=int(self.max_poll_interval_ms / 1000) - ) + self.drop_max_poll_timeout_consmuers() tdLog.info("stop to test max_poll_interval_ms=20s and done ") def run(self): + """ + Run the test cases for session timeout and max poll timeout. + """ vgroups = 4 etool.benchMark( command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y" ) # test case start here - + self.topic_name = "select_d1" self.case_session_timeout() self.case_max_poll_timeout() def stop(self): + """ + Closes the taos connection and logs the success message. + """ tdSql.close() tdLog.success(f"{__file__} successfully executed") From 7e89000d4e10d76c4200bda0cbb445e20e24fb68 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 29 Oct 2024 18:02:14 +0800 Subject: [PATCH 5/6] test: add the frequency of printing consumer logs --- tests/army/tmq/drop_lost_comsumers.py | 55 +++++++++++++++------------ tests/army/tmq/per_consumer.py | 32 ++++++++++------ 2 files changed, 50 insertions(+), 37 deletions(-) diff --git a/tests/army/tmq/drop_lost_comsumers.py b/tests/army/tmq/drop_lost_comsumers.py index cef67ddfcc..a5e8140c4a 100644 --- a/tests/army/tmq/drop_lost_comsumers.py +++ b/tests/army/tmq/drop_lost_comsumers.py @@ -24,7 +24,8 @@ class TaosConsumer: def log_info(self, message): if self.sub_log: tdLog.info(message) - + + #TODO merge sub_consumer and sub_consumer_once def sub_consumer(self, consumer, group_id, topic_name): group_id = int(group_id) if group_id < 100: @@ -95,20 +96,23 @@ class TaosConsumer: consumer.subscribe([topic_name]) nrows = 0 consumer_nrows = 0 - + count = 0 while not stop_event.is_set(): start = datetime.datetime.now() - self.log_info( - f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}" - ) + # self.log_info( + # f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}" + # ) message = None if consumer_nrows < self.once_consumer_rows: message = consumer.poll(timeout=1.0) elif consumer_nrows >= self.once_consumer_rows: - # when break the loop, the consumer will be closed, so we need to continue to keep consumer alive util the stop_event is set - pass - # tdLog.info("stop consumer when consumer all rows") - + if count == 0: + # when break the loop, the consumer will be closed, so we need to continue to keep consumer alive util the stop_event is set + tdLog.info("stop consumer when consumer all rows") + count += 1 + # tdLog.info("stop consumer when consumer all rows") + else: + continue if message: message_offset = message.offset() # topic = message.topic() @@ -122,12 +126,13 @@ class TaosConsumer: end = datetime.datetime.now() elapsed_time = end - start - self.log_info( - f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{message_offset}" - ) + # self.log_info( + # f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{message_offset}" + # ) self.log_info( - f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}" + f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter},counter.get():{self.safe_counter.get()}" ) + # consumer.commit() consumer_nrows = nrows @@ -204,7 +209,7 @@ class TDTestCase: tdLog.info( f"wait timeout count:{count} and check consumer status whether is closed" ) - for _ in range(5): + for _ in range(2): tdSql.query("show consumers") anser_rows = tdSql.getRows() if anser_rows == rows: @@ -227,10 +232,10 @@ class TDTestCase: # start consumer and config some parameters os.system( - f"nohup python3 ./tmq/per_consumer.py -c {self.consumer_groups_num} -s {self.session_timeout_ms} -p {self.max_poll_interval_ms} > consumer.log &" + f"nohup python3 ./tmq/per_consumer.py -c {self.consumer_groups_num} -s {self.session_timeout_ms} -p {self.max_poll_interval_ms} -t {self.topic_name} > consumer.log &" ) - # wait 4s for consuming data - time.sleep(4) + # wait 5s for consuming data + time.sleep(5) # kill consumer to simulate session_timeout_ms tdLog.info("kill per_consumer.py") tdCom.kill_signal_process( @@ -268,18 +273,18 @@ class TDTestCase: tr.start() while True: - if self.safe_counter.get() < self.consumer_all_rows: - time.sleep(5) + if self.safe_counter.counter < self.consumer_all_rows: + # control print log frequency + time.sleep(1) tdLog.info( - f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}" + f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.counter}" ) - elif self.safe_counter.get() >= self.consumer_all_rows: + elif self.safe_counter.counter == self.consumer_all_rows: # adding 5s is for heartbeat check self.check_consumer(int(self.max_poll_interval_ms / 1000 ) + 5, 0, stop_event) stop_event.set() break - for tr in threads: - tr.join() + time.sleep(1) tdSql.execute(f"drop topic if exists {self.topic_name};") @@ -302,7 +307,7 @@ class TDTestCase: """ tdLog.info("start to test max_poll_interval_ms=20s") # test max_poll_interval_ms=20s - self.session_timeout_ms = 180000 + self.session_timeout_ms = 300000 self.max_poll_interval_ms = 20000 self.drop_max_poll_timeout_consmuers() tdLog.info("stop to test max_poll_interval_ms=20s and done ") @@ -317,7 +322,7 @@ class TDTestCase: ) # test case start here self.topic_name = "select_d1" - self.case_session_timeout() + # self.case_session_timeout() self.case_max_poll_timeout() def stop(self): diff --git a/tests/army/tmq/per_consumer.py b/tests/army/tmq/per_consumer.py index 418c88dead..b8f409d710 100644 --- a/tests/army/tmq/per_consumer.py +++ b/tests/army/tmq/per_consumer.py @@ -1,8 +1,9 @@ import os import taos -import time +import sys from datetime import datetime -from frame.log import * +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from frame.log import tdLog import subprocess from multiprocessing import Process import threading @@ -40,28 +41,35 @@ except Exception as e: default=180000, help="max poll interval timeout:ms", ) -def test_timeout_sub(consumer_group_num, session_timeout_ms, max_poll_interval_ms): +@click.option( + "-t", + "--topic-name", + "topic_name", + default="select_d1", + help="topic name", +) +def test_timeout_sub(consumer_group_num, session_timeout_ms, max_poll_interval_ms, topic_name): threads = [] - tdLog.info(consumer_group_num, session_timeout_ms, max_poll_interval_ms) + tdLog.info(f"consumer_group_num:{consumer_group_num}, session_timeout_ms:{session_timeout_ms}, max_poll_interval_ms:{max_poll_interval_ms}") for id in range(consumer_group_num): conf = set_conf( group_id=id, session_timeout_ms=session_timeout_ms, max_poll_interval_ms=max_poll_interval_ms, ) - tdLog.info(conf) - threads.append(threading.Thread(target=taosc_consumer, args=(conf,))) + tdLog.info(f"conf:{conf}") + threads.append(threading.Thread(target=taosc_consumer, args=(conf,topic_name))) for tr in threads: tr.start() for tr in threads: tr.join() -def sub_consumer(consumer, group_id): +def sub_consumer(consumer, group_id, topic_name): group_id = int(group_id) if group_id < 100: try: - consumer.subscribe(["select_d1"]) + consumer.subscribe([topic_name]) except Exception as e: tdLog.info(f"subscribe error") exit(1) @@ -93,10 +101,10 @@ def sub_consumer(consumer, group_id): # consumer.close() -def sub_consumer_once(consumer, group_id): +def sub_consumer_once(consumer, group_id, topic_name): group_id = int(group_id) if group_id < 100: - consumer.subscribe(["select_d1"]) + consumer.subscribe([topic_name]) nrows = 0 consumer_nrows = 0 while True: @@ -160,12 +168,12 @@ def set_conf( return conf -def taosc_consumer(conf): +def taosc_consumer(conf,topic_name): consumer = Consumer(conf) group_id = int(conf["group.id"]) tdLog.info(f"{consumer},{group_id}") try: - sub_consumer_once(consumer, group_id) + sub_consumer_once(consumer, group_id, topic_name) except Exception as e: tdLog.info(str(e)) From 25ea22e0b17268099de9dcd0b174d59ebbdaa30e Mon Sep 17 00:00:00 2001 From: haoranchen Date: Wed, 30 Oct 2024 14:24:59 +0800 Subject: [PATCH 6/6] Update tests/army/frame/common.py Co-authored-by: WANG Xu --- tests/army/frame/common.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/army/frame/common.py b/tests/army/frame/common.py index bad86c828f..a82bf4c94f 100644 --- a/tests/army/frame/common.py +++ b/tests/army/frame/common.py @@ -807,8 +807,9 @@ class TDCom: if (platform.system().lower() == 'windows'): os.system(f"TASKKILL /F /IM {processor_name}.exe") else: - tdLog.debug(f"unset LD_PRELOAD; sudo pkill -f -{signal} '{processor_name}' ") - os.system(f"unset LD_PRELOAD; sudo pkill -f -{signal} '{processor_name}' ") + command = f"unset LD_PRELOAD; sudo pkill -f -{signal} '{processor_name}'" + tdLog.debug(f"command: {command}") + os.system(command) def gen_tag_col_str(self, gen_type, data_type, count):