diff --git a/tests/army/tmq/drop_lost_comsumers.py b/tests/army/tmq/drop_lost_comsumers.py index f455931089..cef67ddfcc 100644 --- a/tests/army/tmq/drop_lost_comsumers.py +++ b/tests/army/tmq/drop_lost_comsumers.py @@ -1,26 +1,20 @@ -import taos -import sys import time -import socket import os import threading -import multiprocessing -from multiprocessing import Process, Queue - -from frame.log import * -from frame.cases import * -from frame.sql import * -from frame.caseBase import * -from frame import * -from taos.tmq import * -from frame import etool -from datetime import datetime +import datetime from taos.tmq import Consumer -from frame.common import * +from taos.error import TmqError + +from frame.log import tdLog +from frame.cases import tdCases +from frame.sql import tdSql +from frame.caseBase import * +from frame import etool +from frame.common import tdCom class TaosConsumer: - # TODo: add to tq.py and remove from here + # TODO: Move this class to tq.py and remove it from here def __init__(self): self.sub_once = True self.once_consumer_rows = 0 @@ -40,24 +34,26 @@ class TaosConsumer: tdLog.exit(f"subscribe error") nrows = 0 while True: - start = datetime.now() + start = datetime.datetime.now() tdLog.info(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=10.0) if message: - id = message.offset() - topic = message.topic() - database = message.database() + message_offset = message.offset() + # topic = message.topic() + # database = message.database() for block in message: addrows = block.nrows() nrows += block.nrows() ncols = block.ncols() - values = block.fetchall - end = datetime.now() + # values = block.fetchall + end = datetime.datetime.now() elapsed_time = end - start tdLog.info( - f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}" + f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time}," + f"consumer_nrows:{nrows},consumer_addrows:{addrows}," + f"consumer_ncols:{ncols},offset:{id}" ) consumer.commit() tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}") @@ -101,10 +97,11 @@ class TaosConsumer: consumer_nrows = 0 while not stop_event.is_set(): - start = datetime.now() + start = datetime.datetime.now() self.log_info( f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}" ) + message = None if consumer_nrows < self.once_consumer_rows: message = consumer.poll(timeout=1.0) elif consumer_nrows >= self.once_consumer_rows: @@ -113,36 +110,32 @@ class TaosConsumer: # tdLog.info("stop consumer when consumer all rows") if message: - id = message.offset() - topic = message.topic() - database = message.database() + message_offset = message.offset() + # topic = message.topic() + # database = message.database() for block in message: addrows = block.nrows() nrows += block.nrows() self.safe_counter.rows(block.nrows()) ncols = block.ncols() - values = block.fetchall - end = datetime.now() + # values = block.fetchall + end = datetime.datetime.now() elapsed_time = end - start self.log_info( - f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}" + f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{message_offset}" ) self.log_info( f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}" ) # consumer.commit() consumer_nrows = nrows - # consumer.unsubscribe() - # consumer.close() - # break + tdLog.info("Consumer subscription thread is stopping.") - def taosc_consumer(self, conf: list, topic_name: str, stop_event): + def taosc_consumer(self, conf: list, topic_name: str, stop_event: threading.Event): try: tdLog.info(conf) - from taos.tmq import Consumer - tdLog.info("start to config consumer") consumer = Consumer(conf) tdLog.info("start to subscribe") @@ -175,12 +168,7 @@ class ThreadSafeCounter: class TDTestCase: # updatecfgDict = {'debugFlag': 135, 'asynclog': 0} - - def init(self, conn, logSql, replicaVar=1): - self.replicaVar = int(replicaVar) - tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) - self.consumer_instance = TaosConsumer() + def __init__(self): # db parameter self.table_number = 1000 self.rows_per_table = 1000 @@ -193,7 +181,12 @@ class TDTestCase: self.consumer_all_rows = ( self.consumer_rows_per_thread * self.consumer_groups_num ) - + self.topic_name = "select_d1" + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), logSql) + self.consumer_instance = TaosConsumer() # tdSql.init(conn.cursor(), logSql) # output sql.txt file def caseDescription(self): @@ -211,16 +204,15 @@ class TDTestCase: tdLog.info( f"wait timeout count:{count} and check consumer status whether is closed" ) - for ct in range(5): - tdSql.query(f"show consumers") + for _ in range(5): + tdSql.query("show consumers") anser_rows = tdSql.getRows() if anser_rows == rows: break - else: - time.sleep(1) - tdLog.info( - f"wait for {count} seconds to check that consumers number is {anser_rows}" - ) + time.sleep(1) + tdLog.info( + f"wait for {count} seconds to check that consumers number is {anser_rows}" + ) if anser_rows != rows: if stop_event: stop_event.set() @@ -228,37 +220,30 @@ class TDTestCase: except Exception as e: tdLog.exit(f"{e},check consumer error") - def drop_session_timeout_consmuers( - self, - consumer_groups_num, - session_timeout_ms, - max_poll_interval_ms, - topic_name, - timeout, - ): - tdSql.execute(f"drop topic if exists {topic_name};") - tdSql.execute(f"use db_sub") - tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;") + def drop_session_timeout_consmuers(self): + tdSql.execute(f"drop topic if exists {self.topic_name};") + tdSql.execute("use db_sub") + tdSql.execute(f"create topic {self.topic_name} as select * from db_sub.meters;") # start consumer and config some parameters os.system( - f"nohup python3 ./tmq/per_consumer.py -c {consumer_groups_num} -s {session_timeout_ms} -p {max_poll_interval_ms} > consumer.log &" + f"nohup python3 ./tmq/per_consumer.py -c {self.consumer_groups_num} -s {self.session_timeout_ms} -p {self.max_poll_interval_ms} > consumer.log &" ) # wait 4s for consuming data time.sleep(4) # kill consumer to simulate session_timeout_ms tdLog.info("kill per_consumer.py") tdCom.kill_signal_process( - signal=9, processor_name="python3\s*./tmq/per_consumer.py" + signal=9, processor_name=r"python3\s*./tmq/per_consumer.py" ) - self.check_consumer(timeout, 0) - tdSql.execute(f"drop topic if exists {topic_name};") + self.check_consumer(int(self.session_timeout_ms / 1000), 0) + tdSql.execute(f"drop topic if exists {self.topic_name};") os.system("rm -rf consumer.log") - def drop_max_poll_timeout_consmuers(self, topic_name, timeout): - tdSql.execute(f"drop topic if exists {topic_name};") - tdSql.execute(f"use db_sub") - tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;") + def drop_max_poll_timeout_consmuers(self): + tdSql.execute(f"drop topic if exists {self.topic_name};") + tdSql.execute("use db_sub") + tdSql.execute(f"create topic {self.topic_name} as select * from db_sub.meters;") threads = [] self.safe_counter = ThreadSafeCounter() @@ -267,16 +252,16 @@ class TDTestCase: self.consumer_instance.once_consumer_rows = self.consumer_rows_per_thread tdLog.info(f"consumer_rows:{self.consumer_instance.once_consumer_rows}") self.consumer_instance.sub_once = True - for id in range(self.consumer_groups_num): + for group_id in range(self.consumer_groups_num): conf = self.consumer_instance.set_conf( - group_id=id, + group_id=group_id, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms, ) threads.append( threading.Thread( target=self.consumer_instance.taosc_consumer, - args=(conf, topic_name, stop_event), + args=(conf, self.topic_name, stop_event), ) ) for tr in threads: @@ -290,50 +275,55 @@ class TDTestCase: ) elif self.safe_counter.get() >= self.consumer_all_rows: # adding 5s is for heartbeat check - self.check_consumer(timeout + 5, 0, stop_event) + self.check_consumer(int(self.max_poll_interval_ms / 1000 ) + 5, 0, stop_event) stop_event.set() - tr.join() break + for tr in threads: + tr.join() time.sleep(1) - tdSql.execute(f"drop topic if exists {topic_name};") + tdSql.execute(f"drop topic if exists {self.topic_name};") def case_session_timeout(self): + """ + TEST CASE: verifying that the boundary and valid values of session_timeout_ms are in effect + """ + tdLog.info("start to test session_timeout_ms=12s") # test session_timeout_ms=12s self.session_timeout_ms = 12000 self.max_poll_interval_ms = 180000 - topic_name = "select_d1" - self.drop_session_timeout_consmuers( - consumer_groups_num=1, - session_timeout_ms=self.session_timeout_ms, - max_poll_interval_ms=self.max_poll_interval_ms, - topic_name=topic_name, - timeout=int(self.session_timeout_ms / 1000), - ) + # self.set_session_timeout = int(self.session_timeout_ms / 1000) + self.drop_session_timeout_consmuers() tdLog.info("stop to test session_timeout_ms=12s and done ") def case_max_poll_timeout(self): + """ + TEST CASE: verifying that the boundary and valid values of max_poll_interval_ms are in effect + """ tdLog.info("start to test max_poll_interval_ms=20s") # test max_poll_interval_ms=20s self.session_timeout_ms = 180000 self.max_poll_interval_ms = 20000 - topic_name = "select_d1" - self.drop_max_poll_timeout_consmuers( - topic_name=topic_name, timeout=int(self.max_poll_interval_ms / 1000) - ) + self.drop_max_poll_timeout_consmuers() tdLog.info("stop to test max_poll_interval_ms=20s and done ") def run(self): + """ + Run the test cases for session timeout and max poll timeout. + """ vgroups = 4 etool.benchMark( command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y" ) # test case start here - + self.topic_name = "select_d1" self.case_session_timeout() self.case_max_poll_timeout() def stop(self): + """ + Closes the taos connection and logs the success message. + """ tdSql.close() tdLog.success(f"{__file__} successfully executed")