test: add the frequency of printing consumer logs
This commit is contained in:
parent
3ebdaf1d82
commit
7e89000d4e
|
@ -24,7 +24,8 @@ class TaosConsumer:
|
||||||
def log_info(self, message):
|
def log_info(self, message):
|
||||||
if self.sub_log:
|
if self.sub_log:
|
||||||
tdLog.info(message)
|
tdLog.info(message)
|
||||||
|
|
||||||
|
#TODO merge sub_consumer and sub_consumer_once
|
||||||
def sub_consumer(self, consumer, group_id, topic_name):
|
def sub_consumer(self, consumer, group_id, topic_name):
|
||||||
group_id = int(group_id)
|
group_id = int(group_id)
|
||||||
if group_id < 100:
|
if group_id < 100:
|
||||||
|
@ -95,20 +96,23 @@ class TaosConsumer:
|
||||||
consumer.subscribe([topic_name])
|
consumer.subscribe([topic_name])
|
||||||
nrows = 0
|
nrows = 0
|
||||||
consumer_nrows = 0
|
consumer_nrows = 0
|
||||||
|
count = 0
|
||||||
while not stop_event.is_set():
|
while not stop_event.is_set():
|
||||||
start = datetime.datetime.now()
|
start = datetime.datetime.now()
|
||||||
self.log_info(
|
# self.log_info(
|
||||||
f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}"
|
# f"time:{start},consumer:{group_id}, start to consume,consumer_nrows:{consumer_nrows}"
|
||||||
)
|
# )
|
||||||
message = None
|
message = None
|
||||||
if consumer_nrows < self.once_consumer_rows:
|
if consumer_nrows < self.once_consumer_rows:
|
||||||
message = consumer.poll(timeout=1.0)
|
message = consumer.poll(timeout=1.0)
|
||||||
elif consumer_nrows >= self.once_consumer_rows:
|
elif consumer_nrows >= self.once_consumer_rows:
|
||||||
# when break the loop, the consumer will be closed, so we need to continue to keep consumer alive util the stop_event is set
|
if count == 0:
|
||||||
pass
|
# when break the loop, the consumer will be closed, so we need to continue to keep consumer alive util the stop_event is set
|
||||||
# tdLog.info("stop consumer when consumer all rows")
|
tdLog.info("stop consumer when consumer all rows")
|
||||||
|
count += 1
|
||||||
|
# tdLog.info("stop consumer when consumer all rows")
|
||||||
|
else:
|
||||||
|
continue
|
||||||
if message:
|
if message:
|
||||||
message_offset = message.offset()
|
message_offset = message.offset()
|
||||||
# topic = message.topic()
|
# topic = message.topic()
|
||||||
|
@ -122,12 +126,13 @@ class TaosConsumer:
|
||||||
end = datetime.datetime.now()
|
end = datetime.datetime.now()
|
||||||
elapsed_time = end - start
|
elapsed_time = end - start
|
||||||
|
|
||||||
self.log_info(
|
# self.log_info(
|
||||||
f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{message_offset}"
|
# f"time:{end},consumer:{group_id}, elapsed time:{elapsed_time},consumer_nrows:{nrows},consumer_addrows:{addrows}, consumer_ncols:{ncols},offset:{message_offset}"
|
||||||
)
|
# )
|
||||||
self.log_info(
|
self.log_info(
|
||||||
f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter}"
|
f"consumer:{group_id},consumer_nrows:{nrows},counter.counter:{self.safe_counter.counter},counter.get():{self.safe_counter.get()}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# consumer.commit()
|
# consumer.commit()
|
||||||
consumer_nrows = nrows
|
consumer_nrows = nrows
|
||||||
|
|
||||||
|
@ -204,7 +209,7 @@ class TDTestCase:
|
||||||
tdLog.info(
|
tdLog.info(
|
||||||
f"wait timeout count:{count} and check consumer status whether is closed"
|
f"wait timeout count:{count} and check consumer status whether is closed"
|
||||||
)
|
)
|
||||||
for _ in range(5):
|
for _ in range(2):
|
||||||
tdSql.query("show consumers")
|
tdSql.query("show consumers")
|
||||||
anser_rows = tdSql.getRows()
|
anser_rows = tdSql.getRows()
|
||||||
if anser_rows == rows:
|
if anser_rows == rows:
|
||||||
|
@ -227,10 +232,10 @@ class TDTestCase:
|
||||||
|
|
||||||
# start consumer and config some parameters
|
# start consumer and config some parameters
|
||||||
os.system(
|
os.system(
|
||||||
f"nohup python3 ./tmq/per_consumer.py -c {self.consumer_groups_num} -s {self.session_timeout_ms} -p {self.max_poll_interval_ms} > consumer.log &"
|
f"nohup python3 ./tmq/per_consumer.py -c {self.consumer_groups_num} -s {self.session_timeout_ms} -p {self.max_poll_interval_ms} -t {self.topic_name} > consumer.log &"
|
||||||
)
|
)
|
||||||
# wait 4s for consuming data
|
# wait 5s for consuming data
|
||||||
time.sleep(4)
|
time.sleep(5)
|
||||||
# kill consumer to simulate session_timeout_ms
|
# kill consumer to simulate session_timeout_ms
|
||||||
tdLog.info("kill per_consumer.py")
|
tdLog.info("kill per_consumer.py")
|
||||||
tdCom.kill_signal_process(
|
tdCom.kill_signal_process(
|
||||||
|
@ -268,18 +273,18 @@ class TDTestCase:
|
||||||
tr.start()
|
tr.start()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if self.safe_counter.get() < self.consumer_all_rows:
|
if self.safe_counter.counter < self.consumer_all_rows:
|
||||||
time.sleep(5)
|
# control print log frequency
|
||||||
|
time.sleep(1)
|
||||||
tdLog.info(
|
tdLog.info(
|
||||||
f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.get()}"
|
f"consumer_all_rows:{self.consumer_all_rows},counter.get():{self.safe_counter.counter}"
|
||||||
)
|
)
|
||||||
elif self.safe_counter.get() >= self.consumer_all_rows:
|
elif self.safe_counter.counter == self.consumer_all_rows:
|
||||||
# adding 5s is for heartbeat check
|
# adding 5s is for heartbeat check
|
||||||
self.check_consumer(int(self.max_poll_interval_ms / 1000 ) + 5, 0, stop_event)
|
self.check_consumer(int(self.max_poll_interval_ms / 1000 ) + 5, 0, stop_event)
|
||||||
stop_event.set()
|
stop_event.set()
|
||||||
break
|
break
|
||||||
for tr in threads:
|
|
||||||
tr.join()
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
tdSql.execute(f"drop topic if exists {self.topic_name};")
|
tdSql.execute(f"drop topic if exists {self.topic_name};")
|
||||||
|
|
||||||
|
@ -302,7 +307,7 @@ class TDTestCase:
|
||||||
"""
|
"""
|
||||||
tdLog.info("start to test max_poll_interval_ms=20s")
|
tdLog.info("start to test max_poll_interval_ms=20s")
|
||||||
# test max_poll_interval_ms=20s
|
# test max_poll_interval_ms=20s
|
||||||
self.session_timeout_ms = 180000
|
self.session_timeout_ms = 300000
|
||||||
self.max_poll_interval_ms = 20000
|
self.max_poll_interval_ms = 20000
|
||||||
self.drop_max_poll_timeout_consmuers()
|
self.drop_max_poll_timeout_consmuers()
|
||||||
tdLog.info("stop to test max_poll_interval_ms=20s and done ")
|
tdLog.info("stop to test max_poll_interval_ms=20s and done ")
|
||||||
|
@ -317,7 +322,7 @@ class TDTestCase:
|
||||||
)
|
)
|
||||||
# test case start here
|
# test case start here
|
||||||
self.topic_name = "select_d1"
|
self.topic_name = "select_d1"
|
||||||
self.case_session_timeout()
|
# self.case_session_timeout()
|
||||||
self.case_max_poll_timeout()
|
self.case_max_poll_timeout()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
import os
|
import os
|
||||||
import taos
|
import taos
|
||||||
import time
|
import sys
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from frame.log import *
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
|
from frame.log import tdLog
|
||||||
import subprocess
|
import subprocess
|
||||||
from multiprocessing import Process
|
from multiprocessing import Process
|
||||||
import threading
|
import threading
|
||||||
|
@ -40,28 +41,35 @@ except Exception as e:
|
||||||
default=180000,
|
default=180000,
|
||||||
help="max poll interval timeout:ms",
|
help="max poll interval timeout:ms",
|
||||||
)
|
)
|
||||||
def test_timeout_sub(consumer_group_num, session_timeout_ms, max_poll_interval_ms):
|
@click.option(
|
||||||
|
"-t",
|
||||||
|
"--topic-name",
|
||||||
|
"topic_name",
|
||||||
|
default="select_d1",
|
||||||
|
help="topic name",
|
||||||
|
)
|
||||||
|
def test_timeout_sub(consumer_group_num, session_timeout_ms, max_poll_interval_ms, topic_name):
|
||||||
threads = []
|
threads = []
|
||||||
tdLog.info(consumer_group_num, session_timeout_ms, max_poll_interval_ms)
|
tdLog.info(f"consumer_group_num:{consumer_group_num}, session_timeout_ms:{session_timeout_ms}, max_poll_interval_ms:{max_poll_interval_ms}")
|
||||||
for id in range(consumer_group_num):
|
for id in range(consumer_group_num):
|
||||||
conf = set_conf(
|
conf = set_conf(
|
||||||
group_id=id,
|
group_id=id,
|
||||||
session_timeout_ms=session_timeout_ms,
|
session_timeout_ms=session_timeout_ms,
|
||||||
max_poll_interval_ms=max_poll_interval_ms,
|
max_poll_interval_ms=max_poll_interval_ms,
|
||||||
)
|
)
|
||||||
tdLog.info(conf)
|
tdLog.info(f"conf:{conf}")
|
||||||
threads.append(threading.Thread(target=taosc_consumer, args=(conf,)))
|
threads.append(threading.Thread(target=taosc_consumer, args=(conf,topic_name)))
|
||||||
for tr in threads:
|
for tr in threads:
|
||||||
tr.start()
|
tr.start()
|
||||||
for tr in threads:
|
for tr in threads:
|
||||||
tr.join()
|
tr.join()
|
||||||
|
|
||||||
|
|
||||||
def sub_consumer(consumer, group_id):
|
def sub_consumer(consumer, group_id, topic_name):
|
||||||
group_id = int(group_id)
|
group_id = int(group_id)
|
||||||
if group_id < 100:
|
if group_id < 100:
|
||||||
try:
|
try:
|
||||||
consumer.subscribe(["select_d1"])
|
consumer.subscribe([topic_name])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tdLog.info(f"subscribe error")
|
tdLog.info(f"subscribe error")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
@ -93,10 +101,10 @@ def sub_consumer(consumer, group_id):
|
||||||
# consumer.close()
|
# consumer.close()
|
||||||
|
|
||||||
|
|
||||||
def sub_consumer_once(consumer, group_id):
|
def sub_consumer_once(consumer, group_id, topic_name):
|
||||||
group_id = int(group_id)
|
group_id = int(group_id)
|
||||||
if group_id < 100:
|
if group_id < 100:
|
||||||
consumer.subscribe(["select_d1"])
|
consumer.subscribe([topic_name])
|
||||||
nrows = 0
|
nrows = 0
|
||||||
consumer_nrows = 0
|
consumer_nrows = 0
|
||||||
while True:
|
while True:
|
||||||
|
@ -160,12 +168,12 @@ def set_conf(
|
||||||
return conf
|
return conf
|
||||||
|
|
||||||
|
|
||||||
def taosc_consumer(conf):
|
def taosc_consumer(conf,topic_name):
|
||||||
consumer = Consumer(conf)
|
consumer = Consumer(conf)
|
||||||
group_id = int(conf["group.id"])
|
group_id = int(conf["group.id"])
|
||||||
tdLog.info(f"{consumer},{group_id}")
|
tdLog.info(f"{consumer},{group_id}")
|
||||||
try:
|
try:
|
||||||
sub_consumer_once(consumer, group_id)
|
sub_consumer_once(consumer, group_id, topic_name)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tdLog.info(str(e))
|
tdLog.info(str(e))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue