test:use the black tool to format the code style
This commit is contained in:
parent
5261c87620
commit
d99c915908
|
@ -1,4 +1,3 @@
|
||||||
|
|
||||||
import taos
|
import taos
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
@ -19,8 +18,9 @@ from datetime import datetime
|
||||||
from taos.tmq import Consumer
|
from taos.tmq import Consumer
|
||||||
from frame.common import *
|
from frame.common import *
|
||||||
|
|
||||||
|
|
||||||
class TaosConsumer:
|
class TaosConsumer:
|
||||||
#TODo: add to tq.py and remove from here
|
# TODo: add to tq.py and remove from here
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.sub_once = True
|
self.sub_once = True
|
||||||
self.once_consumer_rows = 0
|
self.once_consumer_rows = 0
|
||||||
|
@ -31,43 +31,57 @@ class TaosConsumer:
|
||||||
if self.sub_log:
|
if self.sub_log:
|
||||||
tdLog.info(message)
|
tdLog.info(message)
|
||||||
|
|
||||||
def sub_consumer(self ,consumer ,group_id ,topic_name ):
|
def sub_consumer(self, consumer, group_id, topic_name):
|
||||||
group_id = int(group_id)
|
group_id = int(group_id)
|
||||||
if group_id < 100 :
|
if group_id < 100:
|
||||||
try:
|
try:
|
||||||
consumer.subscribe([topic_name])
|
consumer.subscribe([topic_name])
|
||||||
except TmqError:
|
except TmqError:
|
||||||
tdLog.exit(f"subscribe error")
|
tdLog.exit(f"subscribe error")
|
||||||
nrows = 0
|
nrows = 0
|
||||||
while True:
|
while True:
|
||||||
start = datetime.now()
|
start = datetime.now()
|
||||||
tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
|
tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
|
||||||
message = consumer.poll(timeout=10.0)
|
message = consumer.poll(timeout=10.0)
|
||||||
|
|
||||||
if message:
|
if message:
|
||||||
id = message.offset()
|
id = message.offset()
|
||||||
topic = message.topic()
|
topic = message.topic()
|
||||||
database = message.database()
|
database = message.database()
|
||||||
|
|
||||||
for block in message:
|
for block in message:
|
||||||
addrows = block.nrows()
|
addrows = block.nrows()
|
||||||
nrows += block.nrows()
|
nrows += block.nrows()
|
||||||
ncols = block.ncols()
|
ncols = block.ncols()
|
||||||
values = block.fetchall
|
values = block.fetchall
|
||||||
end = datetime.now()
|
end = datetime.now()
|
||||||
elapsed_time = end -start
|
elapsed_time = end - start
|
||||||
tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}")
|
tdLog.info(
|
||||||
|
f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}"
|
||||||
|
)
|
||||||
consumer.commit()
|
consumer.commit()
|
||||||
tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
|
tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
|
||||||
# consumer.unsubscribe()
|
# consumer.unsubscribe()
|
||||||
# consumer.close()
|
# consumer.close()
|
||||||
|
|
||||||
def set_conf(self,td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=180000,experimental_snapshot_enable="false"):
|
def set_conf(
|
||||||
|
self,
|
||||||
|
td_connect_ip="localhost",
|
||||||
|
group_id=1,
|
||||||
|
client_id="test_consumer_py",
|
||||||
|
enable_auto_commit="false",
|
||||||
|
auto_commit_interval_ms="1000",
|
||||||
|
auto_offset_reset="earliest",
|
||||||
|
msg_with_table_name="true",
|
||||||
|
session_timeout_ms=10000,
|
||||||
|
max_poll_interval_ms=180000,
|
||||||
|
experimental_snapshot_enable="false",
|
||||||
|
):
|
||||||
conf = {
|
conf = {
|
||||||
# auth options
|
# auth options
|
||||||
# consume options
|
# consume options
|
||||||
"td.connect.ip": f"{td_connect_ip}",
|
"td.connect.ip": f"{td_connect_ip}",
|
||||||
"group.id": f"{group_id}",
|
"group.id": f"{group_id}",
|
||||||
"client.id": f"{client_id}",
|
"client.id": f"{client_id}",
|
||||||
"enable.auto.commit": f"{enable_auto_commit}",
|
"enable.auto.commit": f"{enable_auto_commit}",
|
||||||
"auto.commit.interval.ms": f"{auto_commit_interval_ms}",
|
"auto.commit.interval.ms": f"{auto_commit_interval_ms}",
|
||||||
|
@ -75,25 +89,28 @@ class TaosConsumer:
|
||||||
"msg.with.table.name": f"{msg_with_table_name}",
|
"msg.with.table.name": f"{msg_with_table_name}",
|
||||||
"session.timeout.ms": f"{session_timeout_ms}",
|
"session.timeout.ms": f"{session_timeout_ms}",
|
||||||
"max.poll.interval.ms": f"{max_poll_interval_ms}",
|
"max.poll.interval.ms": f"{max_poll_interval_ms}",
|
||||||
"experimental.snapshot.enable" :f"{experimental_snapshot_enable}",
|
"experimental.snapshot.enable": f"{experimental_snapshot_enable}",
|
||||||
}
|
}
|
||||||
return conf
|
return conf
|
||||||
|
|
||||||
def sub_consumer_once(self, consumer, group_id, topic_name, stop_event):
|
def sub_consumer_once(self, consumer, group_id, topic_name, stop_event):
|
||||||
group_id = int(group_id)
|
group_id = int(group_id)
|
||||||
if group_id < 100 :
|
if group_id < 100:
|
||||||
consumer.subscribe([topic_name])
|
consumer.subscribe([topic_name])
|
||||||
nrows = 0
|
nrows = 0
|
||||||
consumer_nrows = 0
|
consumer_nrows = 0
|
||||||
|
|
||||||
while not stop_event.is_set():
|
while not stop_event.is_set():
|
||||||
start = datetime.now()
|
start = datetime.now()
|
||||||
self.log_info(f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}")
|
self.log_info(
|
||||||
|
f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}"
|
||||||
|
)
|
||||||
if consumer_nrows < self.once_consumer_rows:
|
if consumer_nrows < self.once_consumer_rows:
|
||||||
message = consumer.poll(timeout=1.0)
|
message = consumer.poll(timeout=1.0)
|
||||||
elif consumer_nrows >= self.once_consumer_rows:
|
elif consumer_nrows >= self.once_consumer_rows:
|
||||||
|
# when break the loop, the consumer will be closed, so we need to continue to keep consumer alive util the stop_event is set
|
||||||
pass
|
pass
|
||||||
# tdLog.info("stop consumer when consumer all rows")
|
# tdLog.info("stop consumer when consumer all rows")
|
||||||
|
|
||||||
if message:
|
if message:
|
||||||
id = message.offset()
|
id = message.offset()
|
||||||
|
@ -106,37 +123,42 @@ class TaosConsumer:
|
||||||
ncols = block.ncols()
|
ncols = block.ncols()
|
||||||
values = block.fetchall
|
values = block.fetchall
|
||||||
end = datetime.now()
|
end = datetime.now()
|
||||||
elapsed_time = end -start
|
elapsed_time = end - start
|
||||||
|
|
||||||
self.log_info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}")
|
self.log_info(
|
||||||
self.log_info(f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}")
|
f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}"
|
||||||
|
)
|
||||||
|
self.log_info(
|
||||||
|
f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}"
|
||||||
|
)
|
||||||
# consumer.commit()
|
# consumer.commit()
|
||||||
consumer_nrows = nrows
|
consumer_nrows = nrows
|
||||||
# consumer.unsubscribe()
|
# consumer.unsubscribe()
|
||||||
# consumer.close()
|
# consumer.close()
|
||||||
# break
|
# break
|
||||||
tdLog.info("Consumer subscription thread is stopping.")
|
tdLog.info("Consumer subscription thread is stopping.")
|
||||||
|
|
||||||
def taosc_consumer(self, conf: list, topic_name: str, stop_event):
|
def taosc_consumer(self, conf: list, topic_name: str, stop_event):
|
||||||
try:
|
try:
|
||||||
tdLog.info(conf)
|
tdLog.info(conf)
|
||||||
from taos.tmq import Consumer
|
from taos.tmq import Consumer
|
||||||
|
|
||||||
tdLog.info("start to config consumer")
|
tdLog.info("start to config consumer")
|
||||||
consumer = Consumer(conf)
|
consumer = Consumer(conf)
|
||||||
tdLog.info("start to subscribe")
|
tdLog.info("start to subscribe")
|
||||||
group_id = int(conf["group.id"])
|
group_id = int(conf["group.id"])
|
||||||
tdLog.info(f"{consumer},{group_id}")
|
tdLog.info(f"{consumer},{group_id}")
|
||||||
if self.sub_once:
|
if self.sub_once:
|
||||||
self.sub_consumer_once(consumer, group_id, topic_name, stop_event)
|
self.sub_consumer_once(consumer, group_id, topic_name, stop_event)
|
||||||
else:
|
else:
|
||||||
self.sub_consumer(consumer, group_id, topic_name)
|
self.sub_consumer(consumer, group_id, topic_name)
|
||||||
# only consumer once
|
# only consumer once
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tdLog.exit(f"{e}")
|
tdLog.exit(f"{e}")
|
||||||
|
|
||||||
#consumer.close()
|
|
||||||
|
|
||||||
|
# consumer.close()
|
||||||
|
|
||||||
|
|
||||||
class ThreadSafeCounter:
|
class ThreadSafeCounter:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.counter = 0
|
self.counter = 0
|
||||||
|
@ -154,45 +176,51 @@ class ThreadSafeCounter:
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
# updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
|
# updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
|
||||||
|
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor())
|
||||||
self.consumer_instance = TaosConsumer()
|
self.consumer_instance = TaosConsumer()
|
||||||
#db parameter
|
# db parameter
|
||||||
self.table_number = 1000
|
self.table_number = 1000
|
||||||
self.rows_per_table = 1000
|
self.rows_per_table = 1000
|
||||||
#consumer parameter
|
# consumer parameter
|
||||||
self.consumer_groups_num = 2
|
self.consumer_groups_num = 2
|
||||||
self.session_timeout_ms= 180000
|
self.session_timeout_ms = 180000
|
||||||
self.max_poll_interval_ms= 180000
|
self.max_poll_interval_ms = 180000
|
||||||
#case consumer parameter
|
# case consumer parameter
|
||||||
self.consumer_rows_per_thread = self.table_number * self.rows_per_table
|
self.consumer_rows_per_thread = self.table_number * self.rows_per_table
|
||||||
self.consumer_all_rows = self.consumer_rows_per_thread * self.consumer_groups_num
|
self.consumer_all_rows = (
|
||||||
|
self.consumer_rows_per_thread * self.consumer_groups_num
|
||||||
|
)
|
||||||
|
|
||||||
|
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||||
|
|
||||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
|
||||||
def caseDescription(self):
|
def caseDescription(self):
|
||||||
'''
|
"""
|
||||||
drop_lost_consmuers<hrchen>:
|
drop_lost_consmuers<hrchen>:
|
||||||
1. verifying that the boundary and valid values of session_timeout_ms are in effect
|
1. verifying that the boundary and valid values of session_timeout_ms are in effect
|
||||||
2. verifying that the boundary and valid values of max_poll_interval_ms are in effect
|
2. verifying that the boundary and valid values of max_poll_interval_ms are in effect
|
||||||
3. verifying that consumer will be closed when the session_timeout_ms and max_poll_interval_ms is expired
|
3. verifying that consumer will be closed when the session_timeout_ms and max_poll_interval_ms is expired
|
||||||
'''
|
"""
|
||||||
return
|
return
|
||||||
|
|
||||||
def check_consumer(self, count, rows, stop_event=None):
|
def check_consumer(self, count, rows, stop_event=None):
|
||||||
time.sleep(count)
|
time.sleep(count)
|
||||||
try:
|
try:
|
||||||
tdLog.info(f"wait timeout count:{count} and check consumer status whether is closed")
|
tdLog.info(
|
||||||
|
f"wait timeout count:{count} and check consumer status whether is closed"
|
||||||
|
)
|
||||||
for ct in range(5):
|
for ct in range(5):
|
||||||
tdSql.query(f'show consumers')
|
tdSql.query(f"show consumers")
|
||||||
anser_rows=tdSql.getRows()
|
anser_rows = tdSql.getRows()
|
||||||
if anser_rows == rows:
|
if anser_rows == rows:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
tdLog.info(f"wait for {count} seconds to check that consumers number is {anser_rows}")
|
tdLog.info(
|
||||||
|
f"wait for {count} seconds to check that consumers number is {anser_rows}"
|
||||||
|
)
|
||||||
if anser_rows != rows:
|
if anser_rows != rows:
|
||||||
if stop_event:
|
if stop_event:
|
||||||
stop_event.set()
|
stop_event.set()
|
||||||
|
@ -200,27 +228,37 @@ class TDTestCase:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tdLog.exit(f"{e},check consumer error")
|
tdLog.exit(f"{e},check consumer error")
|
||||||
|
|
||||||
def drop_session_timeout_consmuers(self, consumer_groups_num, session_timeout_ms, max_poll_interval_ms, topic_name, timeout):
|
def drop_session_timeout_consmuers(
|
||||||
tdSql.execute(f'drop topic if exists {topic_name};')
|
self,
|
||||||
tdSql.execute(f'use db_sub')
|
consumer_groups_num,
|
||||||
tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;')
|
session_timeout_ms,
|
||||||
|
max_poll_interval_ms,
|
||||||
|
topic_name,
|
||||||
|
timeout,
|
||||||
|
):
|
||||||
|
tdSql.execute(f"drop topic if exists {topic_name};")
|
||||||
|
tdSql.execute(f"use db_sub")
|
||||||
|
tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;")
|
||||||
|
|
||||||
# start consumer and config some parameters
|
# start consumer and config some parameters
|
||||||
os.system(f"nohup python3 ./tmq/per_consumer.py -c {consumer_groups_num} -s {session_timeout_ms} -p {max_poll_interval_ms} > consumer.log &")
|
os.system(
|
||||||
|
f"nohup python3 ./tmq/per_consumer.py -c {consumer_groups_num} -s {session_timeout_ms} -p {max_poll_interval_ms} > consumer.log &"
|
||||||
|
)
|
||||||
# wait 4s for consuming data
|
# wait 4s for consuming data
|
||||||
time.sleep(4)
|
time.sleep(4)
|
||||||
# kill consumer to simulate session_timeout_ms
|
# kill consumer to simulate session_timeout_ms
|
||||||
tdLog.info("kill per_consumer.py")
|
tdLog.info("kill per_consumer.py")
|
||||||
tdCom.kill_signal_process(signal=9,processor_name="python3\s*./tmq/per_consumer.py")
|
tdCom.kill_signal_process(
|
||||||
self.check_consumer(timeout,0)
|
signal=9, processor_name="python3\s*./tmq/per_consumer.py"
|
||||||
tdSql.execute(f'drop topic if exists {topic_name};')
|
)
|
||||||
|
self.check_consumer(timeout, 0)
|
||||||
|
tdSql.execute(f"drop topic if exists {topic_name};")
|
||||||
os.system("rm -rf consumer.log")
|
os.system("rm -rf consumer.log")
|
||||||
|
|
||||||
|
|
||||||
def drop_max_poll_timeout_consmuers(self, topic_name, timeout):
|
def drop_max_poll_timeout_consmuers(self, topic_name, timeout):
|
||||||
tdSql.execute(f'drop topic if exists {topic_name};')
|
tdSql.execute(f"drop topic if exists {topic_name};")
|
||||||
tdSql.execute(f'use db_sub')
|
tdSql.execute(f"use db_sub")
|
||||||
tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;')
|
tdSql.execute(f"create topic {topic_name} as select * from db_sub.meters;")
|
||||||
|
|
||||||
threads = []
|
threads = []
|
||||||
self.safe_counter = ThreadSafeCounter()
|
self.safe_counter = ThreadSafeCounter()
|
||||||
|
@ -230,48 +268,68 @@ class TDTestCase:
|
||||||
tdLog.info(f"consumer_rows:{self.consumer_instance.once_consumer_rows}")
|
tdLog.info(f"consumer_rows:{self.consumer_instance.once_consumer_rows}")
|
||||||
self.consumer_instance.sub_once = True
|
self.consumer_instance.sub_once = True
|
||||||
for id in range(self.consumer_groups_num):
|
for id in range(self.consumer_groups_num):
|
||||||
conf = self.consumer_instance.set_conf(group_id=id, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms)
|
conf = self.consumer_instance.set_conf(
|
||||||
threads.append(threading.Thread(target=self.consumer_instance.taosc_consumer, args=(conf, topic_name, stop_event)))
|
group_id=id,
|
||||||
|
session_timeout_ms=self.session_timeout_ms,
|
||||||
|
max_poll_interval_ms=self.max_poll_interval_ms,
|
||||||
|
)
|
||||||
|
threads.append(
|
||||||
|
threading.Thread(
|
||||||
|
target=self.consumer_instance.taosc_consumer,
|
||||||
|
args=(conf, topic_name, stop_event),
|
||||||
|
)
|
||||||
|
)
|
||||||
for tr in threads:
|
for tr in threads:
|
||||||
tr.start()
|
tr.start()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if self.safe_counter.get() < self.consumer_all_rows:
|
if self.safe_counter.get() < self.consumer_all_rows:
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
tdLog.info(f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}")
|
tdLog.info(
|
||||||
|
f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}"
|
||||||
|
)
|
||||||
elif self.safe_counter.get() >= self.consumer_all_rows:
|
elif self.safe_counter.get() >= self.consumer_all_rows:
|
||||||
# adding 5s is for heartbeat check
|
# adding 5s is for heartbeat check
|
||||||
self.check_consumer(timeout+5, 0, stop_event)
|
self.check_consumer(timeout + 5, 0, stop_event)
|
||||||
stop_event.set()
|
stop_event.set()
|
||||||
tr.join()
|
tr.join()
|
||||||
break
|
break
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
tdSql.execute(f'drop topic if exists {topic_name};')
|
tdSql.execute(f"drop topic if exists {topic_name};")
|
||||||
|
|
||||||
def case_session_timeout(self):
|
def case_session_timeout(self):
|
||||||
tdLog.info("start to test session_timeout_ms=12s")
|
tdLog.info("start to test session_timeout_ms=12s")
|
||||||
#test session_timeout_ms=12s
|
# test session_timeout_ms=12s
|
||||||
self.session_timeout_ms=12000
|
self.session_timeout_ms = 12000
|
||||||
self.max_poll_interval_ms=180000
|
self.max_poll_interval_ms = 180000
|
||||||
topic_name = "select_d1"
|
topic_name = "select_d1"
|
||||||
self.drop_session_timeout_consmuers(consumer_groups_num=1, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms, topic_name=topic_name , timeout=int(self.session_timeout_ms/1000))
|
self.drop_session_timeout_consmuers(
|
||||||
|
consumer_groups_num=1,
|
||||||
|
session_timeout_ms=self.session_timeout_ms,
|
||||||
|
max_poll_interval_ms=self.max_poll_interval_ms,
|
||||||
|
topic_name=topic_name,
|
||||||
|
timeout=int(self.session_timeout_ms / 1000),
|
||||||
|
)
|
||||||
tdLog.info("stop to test session_timeout_ms=12s and done ")
|
tdLog.info("stop to test session_timeout_ms=12s and done ")
|
||||||
|
|
||||||
def case_max_poll_timeout(self):
|
def case_max_poll_timeout(self):
|
||||||
tdLog.info("start to test max_poll_interval_ms=20s")
|
tdLog.info("start to test max_poll_interval_ms=20s")
|
||||||
#test max_poll_interval_ms=20s
|
# test max_poll_interval_ms=20s
|
||||||
self.session_timeout_ms=180000
|
self.session_timeout_ms = 180000
|
||||||
self.max_poll_interval_ms=20000
|
self.max_poll_interval_ms = 20000
|
||||||
topic_name = "select_d1"
|
topic_name = "select_d1"
|
||||||
self.drop_max_poll_timeout_consmuers(topic_name=topic_name, timeout=int(self.max_poll_interval_ms/1000))
|
self.drop_max_poll_timeout_consmuers(
|
||||||
|
topic_name=topic_name, timeout=int(self.max_poll_interval_ms / 1000)
|
||||||
|
)
|
||||||
tdLog.info("stop to test max_poll_interval_ms=20s and done ")
|
tdLog.info("stop to test max_poll_interval_ms=20s and done ")
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
vgroups = 4
|
vgroups = 4
|
||||||
etool.benchMark(command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y")
|
etool.benchMark(
|
||||||
|
command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y"
|
||||||
|
)
|
||||||
# test case start here
|
# test case start here
|
||||||
|
|
||||||
self.case_session_timeout()
|
self.case_session_timeout()
|
||||||
self.case_max_poll_timeout()
|
self.case_max_poll_timeout()
|
||||||
|
|
||||||
|
@ -279,5 +337,6 @@ class TDTestCase:
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success(f"{__file__} successfully executed")
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
||||||
tdCases.addLinux(__file__, TDTestCase())
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
tdCases.addWindows(__file__, TDTestCase())
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
|
|
@ -9,21 +9,46 @@ import threading
|
||||||
from taos.tmq import Consumer
|
from taos.tmq import Consumer
|
||||||
import click
|
import click
|
||||||
|
|
||||||
|
# TDDO
|
||||||
|
# 1. using tmq common class to replace the function, file drop_lost_consumers.py has the same function
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn = taos.connect()
|
conn = taos.connect()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tdLog.info(str(e))
|
tdLog.info(str(e))
|
||||||
|
|
||||||
@click.command()
|
|
||||||
@click.option('-c', '--consumer-groups-num', "consumer_group_num", default=1, help='Number of consumer group.')
|
|
||||||
@click.option('-s', '--session-timeout-ms', "session_timeout_ms", default=60000, help='session timeout:ms')
|
|
||||||
@click.option('-p', '--max-poll-interval-ms',"max_poll_interval_ms", default=180000, help='max poll interval timeout:ms')
|
|
||||||
|
|
||||||
def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms):
|
|
||||||
|
@click.command()
|
||||||
|
@click.option(
|
||||||
|
"-c",
|
||||||
|
"--consumer-groups-num",
|
||||||
|
"consumer_group_num",
|
||||||
|
default=1,
|
||||||
|
help="Number of consumer group.",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"-s",
|
||||||
|
"--session-timeout-ms",
|
||||||
|
"session_timeout_ms",
|
||||||
|
default=60000,
|
||||||
|
help="session timeout:ms",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"-p",
|
||||||
|
"--max-poll-interval-ms",
|
||||||
|
"max_poll_interval_ms",
|
||||||
|
default=180000,
|
||||||
|
help="max poll interval timeout:ms",
|
||||||
|
)
|
||||||
|
def test_timeout_sub(consumer_group_num, session_timeout_ms, max_poll_interval_ms):
|
||||||
threads = []
|
threads = []
|
||||||
tdLog.info(consumer_group_num,session_timeout_ms,max_poll_interval_ms)
|
tdLog.info(consumer_group_num, session_timeout_ms, max_poll_interval_ms)
|
||||||
for id in range(consumer_group_num):
|
for id in range(consumer_group_num):
|
||||||
conf = set_conf(group_id=id,session_timeout_ms=session_timeout_ms,max_poll_interval_ms=max_poll_interval_ms)
|
conf = set_conf(
|
||||||
|
group_id=id,
|
||||||
|
session_timeout_ms=session_timeout_ms,
|
||||||
|
max_poll_interval_ms=max_poll_interval_ms,
|
||||||
|
)
|
||||||
tdLog.info(conf)
|
tdLog.info(conf)
|
||||||
threads.append(threading.Thread(target=taosc_consumer, args=(conf,)))
|
threads.append(threading.Thread(target=taosc_consumer, args=(conf,)))
|
||||||
for tr in threads:
|
for tr in threads:
|
||||||
|
@ -31,52 +56,54 @@ def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms)
|
||||||
for tr in threads:
|
for tr in threads:
|
||||||
tr.join()
|
tr.join()
|
||||||
|
|
||||||
def sub_consumer(consumer,group_id):
|
|
||||||
|
def sub_consumer(consumer, group_id):
|
||||||
group_id = int(group_id)
|
group_id = int(group_id)
|
||||||
if group_id < 100 :
|
if group_id < 100:
|
||||||
try:
|
try:
|
||||||
consumer.subscribe(["select_d1"])
|
consumer.subscribe(["select_d1"])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tdLog.info(f"subscribe error")
|
tdLog.info(f"subscribe error")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
nrows = 0
|
nrows = 0
|
||||||
while True:
|
while True:
|
||||||
start = datetime.now()
|
start = datetime.now()
|
||||||
tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
|
tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
|
||||||
message = consumer.poll(timeout=10.0)
|
message = consumer.poll(timeout=10.0)
|
||||||
|
|
||||||
if message:
|
if message:
|
||||||
id = message.offset()
|
id = message.offset()
|
||||||
topic = message.topic()
|
topic = message.topic()
|
||||||
database = message.database()
|
database = message.database()
|
||||||
|
|
||||||
for block in message:
|
for block in message:
|
||||||
addrows = block.nrows()
|
addrows = block.nrows()
|
||||||
nrows += block.nrows()
|
nrows += block.nrows()
|
||||||
ncols = block.ncols()
|
ncols = block.ncols()
|
||||||
values = block.fetchall
|
values = block.fetchall
|
||||||
end = datetime.now()
|
end = datetime.now()
|
||||||
elapsed_time = end -start
|
elapsed_time = end - start
|
||||||
tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}")
|
tdLog.info(
|
||||||
|
f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}"
|
||||||
|
)
|
||||||
consumer.commit()
|
consumer.commit()
|
||||||
tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
|
tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
|
||||||
# consumer.unsubscribe()
|
# consumer.unsubscribe()
|
||||||
# consumer.close()
|
# consumer.close()
|
||||||
# break
|
|
||||||
# if nrows >= 1000000:
|
|
||||||
# break
|
def sub_consumer_once(consumer, group_id):
|
||||||
def sub_consumer_once(consumer,group_id):
|
|
||||||
group_id = int(group_id)
|
group_id = int(group_id)
|
||||||
if group_id < 100 :
|
if group_id < 100:
|
||||||
consumer.subscribe(["select_d1"])
|
consumer.subscribe(["select_d1"])
|
||||||
nrows = 0
|
nrows = 0
|
||||||
consumer_nrows = 0
|
consumer_nrows = 0
|
||||||
while True:
|
while True:
|
||||||
start = datetime.now()
|
start = datetime.now()
|
||||||
tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
|
tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
|
||||||
#start = datetime.now()
|
# start = datetime.now()
|
||||||
#tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
|
# tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
|
||||||
tdLog.info(f"consumer_nrows:{consumer_nrows}")
|
tdLog.info(f"consumer_nrows:{consumer_nrows}")
|
||||||
if consumer_nrows < 1000000:
|
if consumer_nrows < 1000000:
|
||||||
message = consumer.poll(timeout=10.0)
|
message = consumer.poll(timeout=10.0)
|
||||||
|
@ -87,28 +114,40 @@ def sub_consumer_once(consumer,group_id):
|
||||||
id = message.offset()
|
id = message.offset()
|
||||||
topic = message.topic()
|
topic = message.topic()
|
||||||
database = message.database()
|
database = message.database()
|
||||||
|
|
||||||
for block in message:
|
for block in message:
|
||||||
addrows = block.nrows()
|
addrows = block.nrows()
|
||||||
nrows += block.nrows()
|
nrows += block.nrows()
|
||||||
ncols = block.ncols()
|
ncols = block.ncols()
|
||||||
values = block.fetchall
|
values = block.fetchall
|
||||||
end = datetime.now()
|
end = datetime.now()
|
||||||
elapsed_time = end -start
|
elapsed_time = end - start
|
||||||
# tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}")
|
# tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}")
|
||||||
consumer.commit()
|
consumer.commit()
|
||||||
# tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
|
# tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
|
||||||
consumer_nrows = nrows
|
consumer_nrows = nrows
|
||||||
# consumer.unsubscribe()
|
# consumer.unsubscribe()
|
||||||
# consumer.close()
|
# consumer.close()
|
||||||
# break
|
# break
|
||||||
|
|
||||||
def set_conf(td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=20000,experimental_snapshot_enable="false"):
|
|
||||||
|
def set_conf(
|
||||||
|
td_connect_ip="localhost",
|
||||||
|
group_id=1,
|
||||||
|
client_id="test_consumer_py",
|
||||||
|
enable_auto_commit="false",
|
||||||
|
auto_commit_interval_ms="1000",
|
||||||
|
auto_offset_reset="earliest",
|
||||||
|
msg_with_table_name="true",
|
||||||
|
session_timeout_ms=10000,
|
||||||
|
max_poll_interval_ms=20000,
|
||||||
|
experimental_snapshot_enable="false",
|
||||||
|
):
|
||||||
conf = {
|
conf = {
|
||||||
# auth options
|
# auth options
|
||||||
# consume options
|
# consume options
|
||||||
"td.connect.ip": f"{td_connect_ip}",
|
"td.connect.ip": f"{td_connect_ip}",
|
||||||
"group.id": f"{group_id}",
|
"group.id": f"{group_id}",
|
||||||
"client.id": f"{client_id}",
|
"client.id": f"{client_id}",
|
||||||
"enable.auto.commit": f"{enable_auto_commit}",
|
"enable.auto.commit": f"{enable_auto_commit}",
|
||||||
"auto.commit.interval.ms": f"{auto_commit_interval_ms}",
|
"auto.commit.interval.ms": f"{auto_commit_interval_ms}",
|
||||||
|
@ -116,30 +155,20 @@ def set_conf(td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",e
|
||||||
"msg.with.table.name": f"{msg_with_table_name}",
|
"msg.with.table.name": f"{msg_with_table_name}",
|
||||||
"session.timeout.ms": f"{session_timeout_ms}",
|
"session.timeout.ms": f"{session_timeout_ms}",
|
||||||
"max.poll.interval.ms": f"{max_poll_interval_ms}",
|
"max.poll.interval.ms": f"{max_poll_interval_ms}",
|
||||||
"experimental.snapshot.enable" :f"{experimental_snapshot_enable}",
|
"experimental.snapshot.enable": f"{experimental_snapshot_enable}",
|
||||||
}
|
}
|
||||||
return conf
|
return conf
|
||||||
|
|
||||||
|
|
||||||
def taosc_consumer(conf):
|
def taosc_consumer(conf):
|
||||||
consumer = Consumer(conf)
|
consumer = Consumer(conf)
|
||||||
group_id = int(conf["group.id"])
|
group_id = int(conf["group.id"])
|
||||||
tdLog.info(f"{consumer},{group_id}")
|
tdLog.info(f"{consumer},{group_id}")
|
||||||
#counsmer sub:
|
|
||||||
# while True:
|
|
||||||
# try:
|
|
||||||
# self.sub_consumer_once(consumer,group_id)
|
|
||||||
# except Exception as e:
|
|
||||||
# tdLog.info(str(e))
|
|
||||||
# time.sleep(1)
|
|
||||||
# break
|
|
||||||
# only consumer once
|
|
||||||
try:
|
try:
|
||||||
sub_consumer_once(consumer,group_id)
|
sub_consumer_once(consumer, group_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tdLog.info(str(e))
|
tdLog.info(str(e))
|
||||||
|
|
||||||
#consumer.close()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
test_timeout_sub()
|
test_timeout_sub()
|
||||||
|
|
Loading…
Reference in New Issue