test: use pylint to formate code

This commit is contained in:
chenhaoran 2024-10-29 16:21:44 +08:00
parent d99c915908
commit 3ebdaf1d82
1 changed files with 78 additions and 88 deletions

View File

@ -1,26 +1,20 @@
import taos
import sys
import time
import socket
import os
import threading
import multiprocessing
from multiprocessing import Process, Queue
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
from taos.tmq import *
from frame import etool
from datetime import datetime
import datetime
from taos.tmq import Consumer
from frame.common import *
from taos.error import TmqError
from frame.log import tdLog
from frame.cases import tdCases
from frame.sql import tdSql
from frame.caseBase import *
from frame import etool
from frame.common import tdCom
class TaosConsumer:
# TODo: add to tq.py and remove from here
# TODO: Move this class to tq.py and remove it from here
def __init__(self):
self.sub_once = True
self.once_consumer_rows = 0
@ -40,24 +34,26 @@ class TaosConsumer:
tdLog.exit(f"subscribe error")
nrows = 0
while True:
start = datetime.now()
start = datetime.datetime.now()
tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
message = consumer.poll(timeout=10.0)
if message:
id = message.offset()
topic = message.topic()
database = message.database()
message_offset = message.offset()
# topic = message.topic()
# database = message.database()
for block in message:
addrows = block.nrows()
nrows += block.nrows()
ncols = block.ncols()
values = block.fetchall
end = datetime.now()
# values = block.fetchall
end = datetime.datetime.now()
elapsed_time = end - start
tdLog.info(
f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}"
f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},"
f"consumer_nrows:{nrows},consumer_addrows:{addrows},"
f"consumer_ncols:{ncols},offset:{id}"
)
consumer.commit()
tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
@ -101,10 +97,11 @@ class TaosConsumer:
consumer_nrows = 0
while not stop_event.is_set():
start = datetime.now()
start = datetime.datetime.now()
self.log_info(
f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}"
)
message = None
if consumer_nrows < self.once_consumer_rows:
message = consumer.poll(timeout=1.0)
elif consumer_nrows >= self.once_consumer_rows:
@ -113,36 +110,32 @@ class TaosConsumer:
# tdLog.info("stop consumer when consumer all rows")
if message:
id = message.offset()
topic = message.topic()
database = message.database()
message_offset = message.offset()
# topic = message.topic()
# database = message.database()
for block in message:
addrows = block.nrows()
nrows += block.nrows()
self.safe_counter.rows(block.nrows())
ncols = block.ncols()
values = block.fetchall
end = datetime.now()
# values = block.fetchall
end = datetime.datetime.now()
elapsed_time = end - start
self.log_info(
f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}"
f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{message_offset}"
)
self.log_info(
f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}"
)
# consumer.commit()
consumer_nrows = nrows
# consumer.unsubscribe()
# consumer.close()
# break
tdLog.info("Consumer subscription thread is stopping.")
def taosc_consumer(self, conf: list, topic_name: str, stop_event):
def taosc_consumer(self, conf: list, topic_name: str, stop_event: threading.Event):
try:
tdLog.info(conf)
from taos.tmq import Consumer
tdLog.info("start to config consumer")
consumer = Consumer(conf)
tdLog.info("start to subscribe")
@ -175,12 +168,7 @@ class ThreadSafeCounter:
class TDTestCase:
# updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
self.consumer_instance = TaosConsumer()
def __init__(self):
# db parameter
self.table_number = 1000
self.rows_per_table = 1000
@ -193,7 +181,12 @@ class TDTestCase:
self.consumer_all_rows = (
self.consumer_rows_per_thread * self.consumer_groups_num
)
self.topic_name = "select_d1"
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), logSql)
self.consumer_instance = TaosConsumer()
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
def caseDescription(self):
@ -211,16 +204,15 @@ class TDTestCase:
tdLog.info(
f"wait timeout count:{count} and check consumer status whether is closed"
)
for ct in range(5):
tdSql.query(f"show consumers")
for _ in range(5):
tdSql.query("show consumers")
anser_rows = tdSql.getRows()
if anser_rows == rows:
break
else:
time.sleep(1)
tdLog.info(
f"wait for {count} seconds to check that consumers number is {anser_rows}"
)
time.sleep(1)
tdLog.info(
f"wait for {count} seconds to check that consumers number is {anser_rows}"
)
if anser_rows != rows:
if stop_event:
stop_event.set()
@ -228,37 +220,30 @@ class TDTestCase:
except Exception as e:
tdLog.exit(f"{e},check consumer error")
def drop_session_timeout_consmuers(
self,
consumer_groups_num,
session_timeout_ms,
max_poll_interval_ms,
topic_name,
timeout,
):
tdSql.execute(f"drop topic if exists {topic_name};")
tdSql.execute(f"use db_sub")
tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;")
def drop_session_timeout_consmuers(self):
tdSql.execute(f"drop topic if exists {self.topic_name};")
tdSql.execute("use db_sub")
tdSql.execute(f"create topic {self.topic_name} as select * from db_sub.meters;")
# start consumer and config some parameters
os.system(
f"nohup python3 ./tmq/per_consumer.py -c {consumer_groups_num} -s {session_timeout_ms} -p {max_poll_interval_ms} > consumer.log &"
f"nohup python3 ./tmq/per_consumer.py -c {self.consumer_groups_num} -s {self.session_timeout_ms} -p {self.max_poll_interval_ms} > consumer.log &"
)
# wait 4s for consuming data
time.sleep(4)
# kill consumer to simulate session_timeout_ms
tdLog.info("kill per_consumer.py")
tdCom.kill_signal_process(
signal=9, processor_name="python3\s*./tmq/per_consumer.py"
signal=9, processor_name=r"python3\s*./tmq/per_consumer.py"
)
self.check_consumer(timeout, 0)
tdSql.execute(f"drop topic if exists {topic_name};")
self.check_consumer(int(self.session_timeout_ms / 1000), 0)
tdSql.execute(f"drop topic if exists {self.topic_name};")
os.system("rm -rf consumer.log")
def drop_max_poll_timeout_consmuers(self, topic_name, timeout):
tdSql.execute(f"drop topic if exists {topic_name};")
tdSql.execute(f"use db_sub")
tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;")
def drop_max_poll_timeout_consmuers(self):
tdSql.execute(f"drop topic if exists {self.topic_name};")
tdSql.execute("use db_sub")
tdSql.execute(f"create topic {self.topic_name} as select * from db_sub.meters;")
threads = []
self.safe_counter = ThreadSafeCounter()
@ -267,16 +252,16 @@ class TDTestCase:
self.consumer_instance.once_consumer_rows = self.consumer_rows_per_thread
tdLog.info(f"consumer_rows:{self.consumer_instance.once_consumer_rows}")
self.consumer_instance.sub_once = True
for id in range(self.consumer_groups_num):
for group_id in range(self.consumer_groups_num):
conf = self.consumer_instance.set_conf(
group_id=id,
group_id=group_id,
session_timeout_ms=self.session_timeout_ms,
max_poll_interval_ms=self.max_poll_interval_ms,
)
threads.append(
threading.Thread(
target=self.consumer_instance.taosc_consumer,
args=(conf, topic_name, stop_event),
args=(conf, self.topic_name, stop_event),
)
)
for tr in threads:
@ -290,50 +275,55 @@ class TDTestCase:
)
elif self.safe_counter.get() >= self.consumer_all_rows:
# adding 5s is for heartbeat check
self.check_consumer(timeout + 5, 0, stop_event)
self.check_consumer(int(self.max_poll_interval_ms / 1000 ) + 5, 0, stop_event)
stop_event.set()
tr.join()
break
for tr in threads:
tr.join()
time.sleep(1)
tdSql.execute(f"drop topic if exists {topic_name};")
tdSql.execute(f"drop topic if exists {self.topic_name};")
def case_session_timeout(self):
"""
TEST CASE: verifying that the boundary and valid values of session_timeout_ms are in effect
"""
tdLog.info("start to test session_timeout_ms=12s")
# test session_timeout_ms=12s
self.session_timeout_ms = 12000
self.max_poll_interval_ms = 180000
topic_name = "select_d1"
self.drop_session_timeout_consmuers(
consumer_groups_num=1,
session_timeout_ms=self.session_timeout_ms,
max_poll_interval_ms=self.max_poll_interval_ms,
topic_name=topic_name,
timeout=int(self.session_timeout_ms / 1000),
)
# self.set_session_timeout = int(self.session_timeout_ms / 1000)
self.drop_session_timeout_consmuers()
tdLog.info("stop to test session_timeout_ms=12s and done ")
def case_max_poll_timeout(self):
"""
TEST CASE: verifying that the boundary and valid values of max_poll_interval_ms are in effect
"""
tdLog.info("start to test max_poll_interval_ms=20s")
# test max_poll_interval_ms=20s
self.session_timeout_ms = 180000
self.max_poll_interval_ms = 20000
topic_name = "select_d1"
self.drop_max_poll_timeout_consmuers(
topic_name=topic_name, timeout=int(self.max_poll_interval_ms / 1000)
)
self.drop_max_poll_timeout_consmuers()
tdLog.info("stop to test max_poll_interval_ms=20s and done ")
def run(self):
"""
Run the test cases for session timeout and max poll timeout.
"""
vgroups = 4
etool.benchMark(
command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y"
)
# test case start here
self.topic_name = "select_d1"
self.case_session_timeout()
self.case_max_poll_timeout()
def stop(self):
"""
Closes the taos connection and logs the success message.
"""
tdSql.close()
tdLog.success(f"{__file__} successfully executed")