From 5261c87620ff5f48d2ee2605211f4f07c47df4fd Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 29 Oct 2024 14:32:40 +0800 Subject: [PATCH] tetst:add testecase for clear consumer with session and poll timout --- tests/army/tmq/drop_lost_comsumers.py | 162 ++++++++++++++------------ tests/army/tmq/per_consumer.py | 33 +++--- 2 files changed, 106 insertions(+), 89 deletions(-) diff --git a/tests/army/tmq/drop_lost_comsumers.py b/tests/army/tmq/drop_lost_comsumers.py index b88aae8c03..d2ec796516 100644 --- a/tests/army/tmq/drop_lost_comsumers.py +++ b/tests/army/tmq/drop_lost_comsumers.py @@ -20,9 +20,17 @@ from taos.tmq import Consumer from frame.common import * class TaosConsumer: + #TODo: add to tq.py and remove from here def __init__(self): - pass - + self.sub_once = True + self.once_consumer_rows = 0 + self.sub_log = False + self.safe_counter = ThreadSafeCounter() + + def log_info(self, message): + if self.sub_log: + tdLog.info(message) + def sub_consumer(self ,consumer ,group_id ,topic_name ): group_id = int(group_id) if group_id < 100 : @@ -33,9 +41,9 @@ class TaosConsumer: nrows = 0 while True: start = datetime.now() - print(f"time:{start},consumer:{group_id}, start to consume") + tdLog.info(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=10.0) - + if message: id = message.offset() topic = message.topic() @@ -48,14 +56,11 @@ class TaosConsumer: values = block.fetchall end = datetime.now() elapsed_time = end -start - print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") consumer.commit() - print(f"consumer:{group_id},consumer_nrows:{nrows}") + tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") # consumer.unsubscribe() # consumer.close() - # break - # if nrows >= 1000000: - # break def set_conf(self,td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=180000,experimental_snapshot_enable="false"): conf = { @@ -74,7 +79,7 @@ class TaosConsumer: } return conf - def sub_consumer_once(self,consumer, group_id, topic_name, counter, stop_event): + def sub_consumer_once(self, consumer, group_id, topic_name, stop_event): group_id = int(group_id) if group_id < 100 : consumer.subscribe([topic_name]) @@ -83,11 +88,12 @@ class TaosConsumer: while not stop_event.is_set(): start = datetime.now() - tdLog.info(f"time:{start},consumer:{group_id}, start to consume") - #start = datetime.now() - #print(f"time:{start},consumer:{group_id}, start to consume") - tdLog.info(f"consumer_nrows:{consumer_nrows}") - message = consumer.poll(timeout=10.0) + self.log_info(f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}") + if consumer_nrows < self.once_consumer_rows: + message = consumer.poll(timeout=1.0) + elif consumer_nrows >= self.once_consumer_rows: + pass + # tdLog.info("stop consumer when consumer all rows") if message: id = message.offset() @@ -96,43 +102,35 @@ class TaosConsumer: for block in message: addrows = block.nrows() nrows += block.nrows() - counter.rows(block.nrows()) + self.safe_counter.rows(block.nrows()) ncols = block.ncols() values = block.fetchall end = datetime.now() elapsed_time = end -start - # tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") - consumer.commit() - # tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") - consumer_nrows = nrows - # consumer.unsubscribe() - # consumer.close() - # break - print("Consumer subscription thread is stopping.") - def taosc_consumer(self, conf, topic_name, counter,stop_event): + self.log_info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + self.log_info(f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}") + # consumer.commit() + consumer_nrows = nrows + # consumer.unsubscribe() + # consumer.close() + # break + tdLog.info("Consumer subscription thread is stopping.") + + def taosc_consumer(self, conf: list, topic_name: str, stop_event): try: - print(conf) + tdLog.info(conf) from taos.tmq import Consumer - print("3333") + tdLog.info("start to config consumer") consumer = Consumer(conf) - print("456") + tdLog.info("start to subscribe") group_id = int(conf["group.id"]) tdLog.info(f"{consumer},{group_id}") - except Exception as e: - tdLog.exit(f"{e}") - #counsmer sub: - # while True: - # try: - # self.sub_consumer_once(consumer,group_id) - # except Exception as e: - # print(str(e)) - # time.sleep(1) - # break - # only consumer once - try: - self.sub_consumer_once(consumer, group_id, topic_name, counter, stop_event) - + if self.sub_once: + self.sub_consumer_once(consumer, group_id, topic_name, stop_event) + else: + self.sub_consumer(consumer, group_id, topic_name) + # only consumer once except Exception as e: tdLog.exit(f"{e}") @@ -162,6 +160,17 @@ class TDTestCase: tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) self.consumer_instance = TaosConsumer() + #db parameter + self.table_number = 1000 + self.rows_per_table = 1000 + #consumer parameter + self.consumer_groups_num = 2 + self.session_timeout_ms= 180000 + self.max_poll_interval_ms= 180000 + #case consumer parameter + self.consumer_rows_per_thread = self.table_number * self.rows_per_table + self.consumer_all_rows = self.consumer_rows_per_thread * self.consumer_groups_num + #tdSql.init(conn.cursor(), logSql) # output sql.txt file def caseDescription(self): ''' @@ -172,20 +181,22 @@ class TDTestCase: ''' return - def check_consumer(self,count,rows): + def check_consumer(self, count, rows, stop_event=None): time.sleep(count) - print(count) try: + tdLog.info(f"wait timeout count:{count} and check consumer status whether is closed") for ct in range(5): tdSql.query(f'show consumers') anser_rows=tdSql.getRows() - if tdSql.checkRows(rows): + if anser_rows == rows: break else: time.sleep(1) - tdLog.info(f"wait for {count} seconds to check that consumers number is {rows}") + tdLog.info(f"wait for {count} seconds to check that consumers number is {anser_rows}") if anser_rows != rows: - tdLog.exit(f"consumer number is not {rows}") + if stop_event: + stop_event.set() + tdLog.exit(f"consumer number is {anser_rows } but not expected {rows}") except Exception as e: tdLog.exit(f"{e},check consumer error") @@ -206,58 +217,63 @@ class TDTestCase: os.system("rm -rf consumer.log") - def drop_max_poll_timeout_consmuers(self, consumer_groups_num, consumer_rows, topic_name, timeout): + def drop_max_poll_timeout_consmuers(self, topic_name, timeout): tdSql.execute(f'drop topic if exists {topic_name};') tdSql.execute(f'use db_sub') tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;') threads = [] - counter = ThreadSafeCounter() + self.safe_counter = ThreadSafeCounter() + self.consumer_instance.safe_counter = self.safe_counter stop_event = threading.Event() - for id in range(consumer_groups_num): + self.consumer_instance.once_consumer_rows = self.consumer_rows_per_thread + tdLog.info(f"consumer_rows:{self.consumer_instance.once_consumer_rows}") + self.consumer_instance.sub_once = True + for id in range(self.consumer_groups_num): conf = self.consumer_instance.set_conf(group_id=id, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms) - threads.append(threading.Thread(target=self.consumer_instance.taosc_consumer, args=(conf, topic_name, counter, stop_event))) + threads.append(threading.Thread(target=self.consumer_instance.taosc_consumer, args=(conf, topic_name, stop_event))) for tr in threads: tr.start() - consumer_all_rows = consumer_rows * consumer_groups_num while True: - if counter.get() < consumer_all_rows: + if self.safe_counter.get() < self.consumer_all_rows: time.sleep(5) - print(f"consumer_all_rows:{consumer_all_rows},counter.get():{counter.get()}") - elif counter.get() >= consumer_all_rows: - self.check_consumer(timeout+20, 0) + tdLog.info(f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}") + elif self.safe_counter.get() >= self.consumer_all_rows: + # adding 5s is for heartbeat check + self.check_consumer(timeout+5, 0, stop_event) stop_event.set() tr.join() break - time.sleep(2) + time.sleep(1) tdSql.execute(f'drop topic if exists {topic_name};') - def case_session_12s(self): + def case_session_timeout(self): + tdLog.info("start to test session_timeout_ms=12s") #test session_timeout_ms=12s - session_timeout_ms=12000 - max_poll_interval_ms=180000 + self.session_timeout_ms=12000 + self.max_poll_interval_ms=180000 topic_name = "select_d1" - self.drop_session_timeout_consmuers(consumer_groups_num=1, session_timeout_ms=session_timeout_ms, max_poll_interval_ms=max_poll_interval_ms, topic_name=topic_name , timeout=int(session_timeout_ms/1000)) - + self.drop_session_timeout_consmuers(consumer_groups_num=1, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms, topic_name=topic_name , timeout=int(self.session_timeout_ms/1000)) + tdLog.info("stop to test session_timeout_ms=12s and done ") - def case_max_poll_12s(self,consumer_rows): - #test max_poll_interval_ms=12s + def case_max_poll_timeout(self): + tdLog.info("start to test max_poll_interval_ms=20s") + #test max_poll_interval_ms=20s self.session_timeout_ms=180000 - self.max_poll_interval_ms=12000 + self.max_poll_interval_ms=20000 topic_name = "select_d1" - self.drop_max_poll_timeout_consmuers(consumer_groups_num=1, topic_name=topic_name, consumer_rows=consumer_rows, timeout=int(self.max_poll_interval_ms/1000)) + self.drop_max_poll_timeout_consmuers(topic_name=topic_name, timeout=int(self.max_poll_interval_ms/1000)) + tdLog.info("stop to test max_poll_interval_ms=20s and done ") def run(self): - table_number = 1000 - rows_per_table = 1000 vgroups = 4 - etool.benchMark(command=f"-d db_sub -t {table_number} -n {rows_per_table} -v {vgroups} -y") - consumer_rows = table_number * rows_per_table # 消费的目标行数 - # self.case_session_12s() - self.case_max_poll_12s(consumer_rows) - remaining_threads = threading.Lock() + etool.benchMark(command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y") + # test case start here + + self.case_session_timeout() + self.case_max_poll_timeout() def stop(self): tdSql.close() diff --git a/tests/army/tmq/per_consumer.py b/tests/army/tmq/per_consumer.py index 67c82d5d3e..810d7d44f8 100644 --- a/tests/army/tmq/per_consumer.py +++ b/tests/army/tmq/per_consumer.py @@ -2,6 +2,7 @@ import os import taos import time from datetime import datetime +from frame.log import * import subprocess from multiprocessing import Process import threading @@ -11,7 +12,7 @@ import click try: conn = taos.connect() except Exception as e: - print(str(e)) + tdLog.info(str(e)) @click.command() @click.option('-c', '--consumer-groups-num', "consumer_group_num", default=1, help='Number of consumer group.') @@ -20,10 +21,10 @@ except Exception as e: def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms): threads = [] - print(consumer_group_num,session_timeout_ms,max_poll_interval_ms) + tdLog.info(consumer_group_num,session_timeout_ms,max_poll_interval_ms) for id in range(consumer_group_num): conf = set_conf(group_id=id,session_timeout_ms=session_timeout_ms,max_poll_interval_ms=max_poll_interval_ms) - print(conf) + tdLog.info(conf) threads.append(threading.Thread(target=taosc_consumer, args=(conf,))) for tr in threads: tr.start() @@ -36,13 +37,13 @@ def sub_consumer(consumer,group_id): try: consumer.subscribe(["select_d1"]) except Exception as e: - print(f"subscribe error") + tdLog.info(f"subscribe error") exit(1) nrows = 0 while True: start = datetime.now() - print(f"time:{start},consumer:{group_id}, start to consume") + tdLog.info(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=10.0) if message: @@ -57,9 +58,9 @@ def sub_consumer(consumer,group_id): values = block.fetchall end = datetime.now() elapsed_time = end -start - print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") consumer.commit() - print(f"consumer:{group_id},consumer_nrows:{nrows}") + tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") # consumer.unsubscribe() # consumer.close() # break @@ -73,14 +74,14 @@ def sub_consumer_once(consumer,group_id): consumer_nrows = 0 while True: start = datetime.now() - print(f"time:{start},consumer:{group_id}, start to consume") + tdLog.info(f"time:{start},consumer:{group_id}, start to consume") #start = datetime.now() - #print(f"time:{start},consumer:{group_id}, start to consume") - print(f"consumer_nrows:{consumer_nrows}") + #tdLog.info(f"time:{start},consumer:{group_id}, start to consume") + tdLog.info(f"consumer_nrows:{consumer_nrows}") if consumer_nrows < 1000000: message = consumer.poll(timeout=10.0) else: - print(" stop consumer when consumer all rows") + tdLog.info(" stop consumer when consumer all rows") if message: id = message.offset() @@ -94,9 +95,9 @@ def sub_consumer_once(consumer,group_id): values = block.fetchall end = datetime.now() elapsed_time = end -start - # print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") + # tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") consumer.commit() - # print(f"consumer:{group_id},consumer_nrows:{nrows}") + # tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") consumer_nrows = nrows # consumer.unsubscribe() # consumer.close() @@ -122,20 +123,20 @@ def set_conf(td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",e def taosc_consumer(conf): consumer = Consumer(conf) group_id = int(conf["group.id"]) - print(f"{consumer},{group_id}") + tdLog.info(f"{consumer},{group_id}") #counsmer sub: # while True: # try: # self.sub_consumer_once(consumer,group_id) # except Exception as e: - # print(str(e)) + # tdLog.info(str(e)) # time.sleep(1) # break # only consumer once try: sub_consumer_once(consumer,group_id) except Exception as e: - print(str(e)) + tdLog.info(str(e)) #consumer.close()