tetst:add testecase for clear consumer with session and poll timout

This commit is contained in:
chenhaoran 2024-10-29 14:32:40 +08:00
parent e4547fd5a7
commit 5261c87620
2 changed files with 106 additions and 89 deletions

View File

@ -20,9 +20,17 @@ from taos.tmq import Consumer
from frame.common import * from frame.common import *
class TaosConsumer: class TaosConsumer:
#TODo: add to tq.py and remove from here
def __init__(self): def __init__(self):
pass self.sub_once = True
self.once_consumer_rows = 0
self.sub_log = False
self.safe_counter = ThreadSafeCounter()
def log_info(self, message):
if self.sub_log:
tdLog.info(message)
def sub_consumer(self ,consumer ,group_id ,topic_name ): def sub_consumer(self ,consumer ,group_id ,topic_name ):
group_id = int(group_id) group_id = int(group_id)
if group_id < 100 : if group_id < 100 :
@ -33,9 +41,9 @@ class TaosConsumer:
nrows = 0 nrows = 0
while True: while True:
start = datetime.now() start = datetime.now()
print(f"time:{start},consumer:{group_id}, start to consume") tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
message = consumer.poll(timeout=10.0) message = consumer.poll(timeout=10.0)
if message: if message:
id = message.offset() id = message.offset()
topic = message.topic() topic = message.topic()
@ -48,14 +56,11 @@ class TaosConsumer:
values = block.fetchall values = block.fetchall
end = datetime.now() end = datetime.now()
elapsed_time = end -start elapsed_time = end -start
print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}")
consumer.commit() consumer.commit()
print(f"consumer:{group_id},consumer_nrows:{nrows}") tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
# consumer.unsubscribe() # consumer.unsubscribe()
# consumer.close() # consumer.close()
# break
# if nrows >= 1000000:
# break
def set_conf(self,td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=180000,experimental_snapshot_enable="false"): def set_conf(self,td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",enable_auto_commit="false",auto_commit_interval_ms="1000",auto_offset_reset="earliest",msg_with_table_name="true",session_timeout_ms=10000,max_poll_interval_ms=180000,experimental_snapshot_enable="false"):
conf = { conf = {
@ -74,7 +79,7 @@ class TaosConsumer:
} }
return conf return conf
def sub_consumer_once(self,consumer, group_id, topic_name, counter, stop_event): def sub_consumer_once(self, consumer, group_id, topic_name, stop_event):
group_id = int(group_id) group_id = int(group_id)
if group_id < 100 : if group_id < 100 :
consumer.subscribe([topic_name]) consumer.subscribe([topic_name])
@ -83,11 +88,12 @@ class TaosConsumer:
while not stop_event.is_set(): while not stop_event.is_set():
start = datetime.now() start = datetime.now()
tdLog.info(f"time:{start},consumer:{group_id}, start to consume") self.log_info(f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}")
#start = datetime.now() if consumer_nrows < self.once_consumer_rows:
#print(f"time:{start},consumer:{group_id}, start to consume") message = consumer.poll(timeout=1.0)
tdLog.info(f"consumer_nrows:{consumer_nrows}") elif consumer_nrows >= self.once_consumer_rows:
message = consumer.poll(timeout=10.0) pass
# tdLog.info("stop consumer when consumer all rows")
if message: if message:
id = message.offset() id = message.offset()
@ -96,43 +102,35 @@ class TaosConsumer:
for block in message: for block in message:
addrows = block.nrows() addrows = block.nrows()
nrows += block.nrows() nrows += block.nrows()
counter.rows(block.nrows()) self.safe_counter.rows(block.nrows())
ncols = block.ncols() ncols = block.ncols()
values = block.fetchall values = block.fetchall
end = datetime.now() end = datetime.now()
elapsed_time = end -start elapsed_time = end -start
# tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}")
consumer.commit()
# tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
consumer_nrows = nrows
# consumer.unsubscribe()
# consumer.close()
# break
print("Consumer subscription thread is stopping.") self.log_info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}")
def taosc_consumer(self, conf, topic_name, counter,stop_event): self.log_info(f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}")
# consumer.commit()
consumer_nrows = nrows
# consumer.unsubscribe()
# consumer.close()
# break
tdLog.info("Consumer subscription thread is stopping.")
def taosc_consumer(self, conf: list, topic_name: str, stop_event):
try: try:
print(conf) tdLog.info(conf)
from taos.tmq import Consumer from taos.tmq import Consumer
print("3333") tdLog.info("start to config consumer")
consumer = Consumer(conf) consumer = Consumer(conf)
print("456") tdLog.info("start to subscribe")
group_id = int(conf["group.id"]) group_id = int(conf["group.id"])
tdLog.info(f"{consumer},{group_id}") tdLog.info(f"{consumer},{group_id}")
except Exception as e: if self.sub_once:
tdLog.exit(f"{e}") self.sub_consumer_once(consumer, group_id, topic_name, stop_event)
#counsmer sub: else:
# while True: self.sub_consumer(consumer, group_id, topic_name)
# try: # only consumer once
# self.sub_consumer_once(consumer,group_id)
# except Exception as e:
# print(str(e))
# time.sleep(1)
# break
# only consumer once
try:
self.sub_consumer_once(consumer, group_id, topic_name, counter, stop_event)
except Exception as e: except Exception as e:
tdLog.exit(f"{e}") tdLog.exit(f"{e}")
@ -162,6 +160,17 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
self.consumer_instance = TaosConsumer() self.consumer_instance = TaosConsumer()
#db parameter
self.table_number = 1000
self.rows_per_table = 1000
#consumer parameter
self.consumer_groups_num = 2
self.session_timeout_ms= 180000
self.max_poll_interval_ms= 180000
#case consumer parameter
self.consumer_rows_per_thread = self.table_number * self.rows_per_table
self.consumer_all_rows = self.consumer_rows_per_thread * self.consumer_groups_num
#tdSql.init(conn.cursor(), logSql) # output sql.txt file #tdSql.init(conn.cursor(), logSql) # output sql.txt file
def caseDescription(self): def caseDescription(self):
''' '''
@ -172,20 +181,22 @@ class TDTestCase:
''' '''
return return
def check_consumer(self,count,rows): def check_consumer(self, count, rows, stop_event=None):
time.sleep(count) time.sleep(count)
print(count)
try: try:
tdLog.info(f"wait timeout count:{count} and check consumer status whether is closed")
for ct in range(5): for ct in range(5):
tdSql.query(f'show consumers') tdSql.query(f'show consumers')
anser_rows=tdSql.getRows() anser_rows=tdSql.getRows()
if tdSql.checkRows(rows): if anser_rows == rows:
break break
else: else:
time.sleep(1) time.sleep(1)
tdLog.info(f"wait for {count} seconds to check that consumers number is {rows}") tdLog.info(f"wait for {count} seconds to check that consumers number is {anser_rows}")
if anser_rows != rows: if anser_rows != rows:
tdLog.exit(f"consumer number is not {rows}") if stop_event:
stop_event.set()
tdLog.exit(f"consumer number is {anser_rows } but not expected {rows}")
except Exception as e: except Exception as e:
tdLog.exit(f"{e},check consumer error") tdLog.exit(f"{e},check consumer error")
@ -206,58 +217,63 @@ class TDTestCase:
os.system("rm -rf consumer.log") os.system("rm -rf consumer.log")
def drop_max_poll_timeout_consmuers(self, consumer_groups_num, consumer_rows, topic_name, timeout): def drop_max_poll_timeout_consmuers(self, topic_name, timeout):
tdSql.execute(f'drop topic if exists {topic_name};') tdSql.execute(f'drop topic if exists {topic_name};')
tdSql.execute(f'use db_sub') tdSql.execute(f'use db_sub')
tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;') tdSql.execute(f'create topic {topic_name} as select * from db_sub.meters;')
threads = [] threads = []
counter = ThreadSafeCounter() self.safe_counter = ThreadSafeCounter()
self.consumer_instance.safe_counter = self.safe_counter
stop_event = threading.Event() stop_event = threading.Event()
for id in range(consumer_groups_num): self.consumer_instance.once_consumer_rows = self.consumer_rows_per_thread
tdLog.info(f"consumer_rows:{self.consumer_instance.once_consumer_rows}")
self.consumer_instance.sub_once = True
for id in range(self.consumer_groups_num):
conf = self.consumer_instance.set_conf(group_id=id, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms) conf = self.consumer_instance.set_conf(group_id=id, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms)
threads.append(threading.Thread(target=self.consumer_instance.taosc_consumer, args=(conf, topic_name, counter, stop_event))) threads.append(threading.Thread(target=self.consumer_instance.taosc_consumer, args=(conf, topic_name, stop_event)))
for tr in threads: for tr in threads:
tr.start() tr.start()
consumer_all_rows = consumer_rows * consumer_groups_num
while True: while True:
if counter.get() < consumer_all_rows: if self.safe_counter.get() < self.consumer_all_rows:
time.sleep(5) time.sleep(5)
print(f"consumer_all_rows:{consumer_all_rows},counter.get():{counter.get()}") tdLog.info(f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}")
elif counter.get() >= consumer_all_rows: elif self.safe_counter.get() >= self.consumer_all_rows:
self.check_consumer(timeout+20, 0) # adding 5s is for heartbeat check
self.check_consumer(timeout+5, 0, stop_event)
stop_event.set() stop_event.set()
tr.join() tr.join()
break break
time.sleep(2) time.sleep(1)
tdSql.execute(f'drop topic if exists {topic_name};') tdSql.execute(f'drop topic if exists {topic_name};')
def case_session_12s(self): def case_session_timeout(self):
tdLog.info("start to test session_timeout_ms=12s")
#test session_timeout_ms=12s #test session_timeout_ms=12s
session_timeout_ms=12000 self.session_timeout_ms=12000
max_poll_interval_ms=180000 self.max_poll_interval_ms=180000
topic_name = "select_d1" topic_name = "select_d1"
self.drop_session_timeout_consmuers(consumer_groups_num=1, session_timeout_ms=session_timeout_ms, max_poll_interval_ms=max_poll_interval_ms, topic_name=topic_name , timeout=int(session_timeout_ms/1000)) self.drop_session_timeout_consmuers(consumer_groups_num=1, session_timeout_ms=self.session_timeout_ms, max_poll_interval_ms=self.max_poll_interval_ms, topic_name=topic_name , timeout=int(self.session_timeout_ms/1000))
tdLog.info("stop to test session_timeout_ms=12s and done ")
def case_max_poll_12s(self,consumer_rows): def case_max_poll_timeout(self):
#test max_poll_interval_ms=12s tdLog.info("start to test max_poll_interval_ms=20s")
#test max_poll_interval_ms=20s
self.session_timeout_ms=180000 self.session_timeout_ms=180000
self.max_poll_interval_ms=12000 self.max_poll_interval_ms=20000
topic_name = "select_d1" topic_name = "select_d1"
self.drop_max_poll_timeout_consmuers(consumer_groups_num=1, topic_name=topic_name, consumer_rows=consumer_rows, timeout=int(self.max_poll_interval_ms/1000)) self.drop_max_poll_timeout_consmuers(topic_name=topic_name, timeout=int(self.max_poll_interval_ms/1000))
tdLog.info("stop to test max_poll_interval_ms=20s and done ")
def run(self): def run(self):
table_number = 1000
rows_per_table = 1000
vgroups = 4 vgroups = 4
etool.benchMark(command=f"-d db_sub -t {table_number} -n {rows_per_table} -v {vgroups} -y") etool.benchMark(command=f"-d db_sub -t {self.table_number} -n {self.rows_per_table} -v {vgroups} -a {self.replicaVar} -y")
consumer_rows = table_number * rows_per_table # 消费的目标行数 # test case start here
# self.case_session_12s()
self.case_max_poll_12s(consumer_rows) self.case_session_timeout()
remaining_threads = threading.Lock() self.case_max_poll_timeout()
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -2,6 +2,7 @@ import os
import taos import taos
import time import time
from datetime import datetime from datetime import datetime
from frame.log import *
import subprocess import subprocess
from multiprocessing import Process from multiprocessing import Process
import threading import threading
@ -11,7 +12,7 @@ import click
try: try:
conn = taos.connect() conn = taos.connect()
except Exception as e: except Exception as e:
print(str(e)) tdLog.info(str(e))
@click.command() @click.command()
@click.option('-c', '--consumer-groups-num', "consumer_group_num", default=1, help='Number of consumer group.') @click.option('-c', '--consumer-groups-num', "consumer_group_num", default=1, help='Number of consumer group.')
@ -20,10 +21,10 @@ except Exception as e:
def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms): def test_timeout_sub(consumer_group_num,session_timeout_ms,max_poll_interval_ms):
threads = [] threads = []
print(consumer_group_num,session_timeout_ms,max_poll_interval_ms) tdLog.info(consumer_group_num,session_timeout_ms,max_poll_interval_ms)
for id in range(consumer_group_num): for id in range(consumer_group_num):
conf = set_conf(group_id=id,session_timeout_ms=session_timeout_ms,max_poll_interval_ms=max_poll_interval_ms) conf = set_conf(group_id=id,session_timeout_ms=session_timeout_ms,max_poll_interval_ms=max_poll_interval_ms)
print(conf) tdLog.info(conf)
threads.append(threading.Thread(target=taosc_consumer, args=(conf,))) threads.append(threading.Thread(target=taosc_consumer, args=(conf,)))
for tr in threads: for tr in threads:
tr.start() tr.start()
@ -36,13 +37,13 @@ def sub_consumer(consumer,group_id):
try: try:
consumer.subscribe(["select_d1"]) consumer.subscribe(["select_d1"])
except Exception as e: except Exception as e:
print(f"subscribe error") tdLog.info(f"subscribe error")
exit(1) exit(1)
nrows = 0 nrows = 0
while True: while True:
start = datetime.now() start = datetime.now()
print(f"time:{start},consumer:{group_id}, start to consume") tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
message = consumer.poll(timeout=10.0) message = consumer.poll(timeout=10.0)
if message: if message:
@ -57,9 +58,9 @@ def sub_consumer(consumer,group_id):
values = block.fetchall values = block.fetchall
end = datetime.now() end = datetime.now()
elapsed_time = end -start elapsed_time = end -start
print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}")
consumer.commit() consumer.commit()
print(f"consumer:{group_id},consumer_nrows:{nrows}") tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
# consumer.unsubscribe() # consumer.unsubscribe()
# consumer.close() # consumer.close()
# break # break
@ -73,14 +74,14 @@ def sub_consumer_once(consumer,group_id):
consumer_nrows = 0 consumer_nrows = 0
while True: while True:
start = datetime.now() start = datetime.now()
print(f"time:{start},consumer:{group_id}, start to consume") tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
#start = datetime.now() #start = datetime.now()
#print(f"time:{start},consumer:{group_id}, start to consume") #tdLog.info(f"time:{start},consumer:{group_id}, start to consume")
print(f"consumer_nrows:{consumer_nrows}") tdLog.info(f"consumer_nrows:{consumer_nrows}")
if consumer_nrows < 1000000: if consumer_nrows < 1000000:
message = consumer.poll(timeout=10.0) message = consumer.poll(timeout=10.0)
else: else:
print(" stop consumer when consumer all rows") tdLog.info(" stop consumer when consumer all rows")
if message: if message:
id = message.offset() id = message.offset()
@ -94,9 +95,9 @@ def sub_consumer_once(consumer,group_id):
values = block.fetchall values = block.fetchall
end = datetime.now() end = datetime.now()
elapsed_time = end -start elapsed_time = end -start
# print(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}") # tdLog.info(f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{id}")
consumer.commit() consumer.commit()
# print(f"consumer:{group_id},consumer_nrows:{nrows}") # tdLog.info(f"consumer:{group_id},consumer_nrows:{nrows}")
consumer_nrows = nrows consumer_nrows = nrows
# consumer.unsubscribe() # consumer.unsubscribe()
# consumer.close() # consumer.close()
@ -122,20 +123,20 @@ def set_conf(td_connect_ip="localhost",group_id=1,client_id="test_consumer_py",e
def taosc_consumer(conf): def taosc_consumer(conf):
consumer = Consumer(conf) consumer = Consumer(conf)
group_id = int(conf["group.id"]) group_id = int(conf["group.id"])
print(f"{consumer},{group_id}") tdLog.info(f"{consumer},{group_id}")
#counsmer sub: #counsmer sub:
# while True: # while True:
# try: # try:
# self.sub_consumer_once(consumer,group_id) # self.sub_consumer_once(consumer,group_id)
# except Exception as e: # except Exception as e:
# print(str(e)) # tdLog.info(str(e))
# time.sleep(1) # time.sleep(1)
# break # break
# only consumer once # only consumer once
try: try:
sub_consumer_once(consumer,group_id) sub_consumer_once(consumer,group_id)
except Exception as e: except Exception as e:
print(str(e)) tdLog.info(str(e))
#consumer.close() #consumer.close()