From 7e89000d4e10d76c4200bda0cbb445e20e24fb68 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 29 Oct 2024 18:02:14 +0800 Subject: [PATCH] test: add the frequency of printing consumer logs --- tests/army/tmq/drop_lost_comsumers.py | 55 +++++++++++++++------------ tests/army/tmq/per_consumer.py | 32 ++++++++++------ 2 files changed, 50 insertions(+), 37 deletions(-) diff --git a/tests/army/tmq/drop_lost_comsumers.py b/tests/army/tmq/drop_lost_comsumers.py index cef67ddfcc..a5e8140c4a 100644 --- a/tests/army/tmq/drop_lost_comsumers.py +++ b/tests/army/tmq/drop_lost_comsumers.py @@ -24,7 +24,8 @@ class TaosConsumer: def log_info(self, message): if self.sub_log: tdLog.info(message) - + + #TODO merge sub_consumer and sub_consumer_once def sub_consumer(self, consumer, group_id, topic_name): group_id = int(group_id) if group_id < 100: @@ -95,20 +96,23 @@ class TaosConsumer: consumer.subscribe([topic_name]) nrows = 0 consumer_nrows = 0 - + count = 0 while not stop_event.is_set(): start = datetime.datetime.now() - self.log_info( - f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}" - ) + # self.log_info( + # f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}" + # ) message = None if consumer_nrows < self.once_consumer_rows: message = consumer.poll(timeout=1.0) elif consumer_nrows >= self.once_consumer_rows: - # when break the loop, the consumer will be closed, so we need to continue to keep consumer alive util the stop_event is set - pass - # tdLog.info("stop consumer when consumer all rows") - + if count == 0: + # when break the loop, the consumer will be closed, so we need to continue to keep consumer alive util the stop_event is set + tdLog.info("stop consumer when consumer all rows") + count += 1 + # tdLog.info("stop consumer when consumer all rows") + else: + continue if message: message_offset = message.offset() # topic = message.topic() @@ -122,12 +126,13 @@ class TaosConsumer: end = datetime.datetime.now() elapsed_time = end - start - self.log_info( - f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{message_offset}" - ) + # self.log_info( + # f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{message_offset}" + # ) self.log_info( - f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}" + f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter},counter.get():{self.safe_counter.get()}" ) + # consumer.commit() consumer_nrows = nrows @@ -204,7 +209,7 @@ class TDTestCase: tdLog.info( f"wait timeout count:{count} and check consumer status whether is closed" ) - for _ in range(5): + for _ in range(2): tdSql.query("show consumers") anser_rows = tdSql.getRows() if anser_rows == rows: @@ -227,10 +232,10 @@ class TDTestCase: # start consumer and config some parameters os.system( - f"nohup python3 ./tmq/per_consumer.py -c {self.consumer_groups_num} -s {self.session_timeout_ms} -p {self.max_poll_interval_ms} > consumer.log &" + f"nohup python3 ./tmq/per_consumer.py -c {self.consumer_groups_num} -s {self.session_timeout_ms} -p {self.max_poll_interval_ms} -t {self.topic_name} > consumer.log &" ) - # wait 4s for consuming data - time.sleep(4) + # wait 5s for consuming data + time.sleep(5) # kill consumer to simulate session_timeout_ms tdLog.info("kill per_consumer.py") tdCom.kill_signal_process( @@ -268,18 +273,18 @@ class TDTestCase: tr.start() while True: - if self.safe_counter.get() < self.consumer_all_rows: - time.sleep(5) + if self.safe_counter.counter < self.consumer_all_rows: + # control print log frequency + time.sleep(1) tdLog.info( - f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}" + f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.counter}" ) - elif self.safe_counter.get() >= self.consumer_all_rows: + elif self.safe_counter.counter == self.consumer_all_rows: # adding 5s is for heartbeat check self.check_consumer(int(self.max_poll_interval_ms / 1000 ) + 5, 0, stop_event) stop_event.set() break - for tr in threads: - tr.join() + time.sleep(1) tdSql.execute(f"drop topic if exists {self.topic_name};") @@ -302,7 +307,7 @@ class TDTestCase: """ tdLog.info("start to test max_poll_interval_ms=20s") # test max_poll_interval_ms=20s - self.session_timeout_ms = 180000 + self.session_timeout_ms = 300000 self.max_poll_interval_ms = 20000 self.drop_max_poll_timeout_consmuers() tdLog.info("stop to test max_poll_interval_ms=20s and done ") @@ -317,7 +322,7 @@ class TDTestCase: ) # test case start here self.topic_name = "select_d1" - self.case_session_timeout() + # self.case_session_timeout() self.case_max_poll_timeout() def stop(self): diff --git a/tests/army/tmq/per_consumer.py b/tests/army/tmq/per_consumer.py index 418c88dead..b8f409d710 100644 --- a/tests/army/tmq/per_consumer.py +++ b/tests/army/tmq/per_consumer.py @@ -1,8 +1,9 @@ import os import taos -import time +import sys from datetime import datetime -from frame.log import * +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from frame.log import tdLog import subprocess from multiprocessing import Process import threading @@ -40,28 +41,35 @@ except Exception as e: default=180000, help="max poll interval timeout:ms", ) -def test_timeout_sub(consumer_group_num, session_timeout_ms, max_poll_interval_ms): +@click.option( + "-t", + "--topic-name", + "topic_name", + default="select_d1", + help="topic name", +) +def test_timeout_sub(consumer_group_num, session_timeout_ms, max_poll_interval_ms, topic_name): threads = [] - tdLog.info(consumer_group_num, session_timeout_ms, max_poll_interval_ms) + tdLog.info(f"consumer_group_num:{consumer_group_num}, session_timeout_ms:{session_timeout_ms}, max_poll_interval_ms:{max_poll_interval_ms}") for id in range(consumer_group_num): conf = set_conf( group_id=id, session_timeout_ms=session_timeout_ms, max_poll_interval_ms=max_poll_interval_ms, ) - tdLog.info(conf) - threads.append(threading.Thread(target=taosc_consumer, args=(conf,))) + tdLog.info(f"conf:{conf}") + threads.append(threading.Thread(target=taosc_consumer, args=(conf,topic_name))) for tr in threads: tr.start() for tr in threads: tr.join() -def sub_consumer(consumer, group_id): +def sub_consumer(consumer, group_id, topic_name): group_id = int(group_id) if group_id < 100: try: - consumer.subscribe(["select_d1"]) + consumer.subscribe([topic_name]) except Exception as e: tdLog.info(f"subscribe error") exit(1) @@ -93,10 +101,10 @@ def sub_consumer(consumer, group_id): # consumer.close() -def sub_consumer_once(consumer, group_id): +def sub_consumer_once(consumer, group_id, topic_name): group_id = int(group_id) if group_id < 100: - consumer.subscribe(["select_d1"]) + consumer.subscribe([topic_name]) nrows = 0 consumer_nrows = 0 while True: @@ -160,12 +168,12 @@ def set_conf( return conf -def taosc_consumer(conf): +def taosc_consumer(conf,topic_name): consumer = Consumer(conf) group_id = int(conf["group.id"]) tdLog.info(f"{consumer},{group_id}") try: - sub_consumer_once(consumer, group_id) + sub_consumer_once(consumer, group_id, topic_name) except Exception as e: tdLog.info(str(e))