From 6e6a9acc430b21782a68211669ccd5e926737c52 Mon Sep 17 00:00:00 2001 From: bitcapybara Date: Wed, 26 Jun 2024 03:03:59 +0000 Subject: [PATCH] test: [TS-5067] add test cases for the drop consumer group statement --- tests/system-test/7-tmq/tmq_taosx.py | 67 +++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 49b62d8abb..d15c9d61b8 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -516,17 +516,17 @@ class TDTestCase: "td.connect.pass": "taosdata", "auto.offset.reset": "earliest", } - consumer = Consumer(consumer_dict) + consumer1 = Consumer(consumer_dict) try: - consumer.subscribe(["t1"]) + consumer1.subscribe(["t1"]) except TmqError: tdLog.exit(f"subscribe error") index = 0 try: while True: - res = consumer.poll(1) + res = consumer1.poll(1) if not res: if index != 1: tdLog.exit("consume error") @@ -543,18 +543,71 @@ class TDTestCase: index += 1 finally: - consumer.close() + consumer1.close() - consumer1 = Consumer(consumer_dict) + consumer2 = Consumer(consumer_dict) try: - consumer1.subscribe(["t2"]) + consumer2.subscribe(["t2"]) except TmqError: tdLog.exit(f"subscribe error") + tdSql.query(f'show subscriptions') + tdSql.checkRows(2) + + tdSql.query(f'show consumers') + tdSql.checkRows(1) tdSql.execute(f'drop consumer group g1 on t1') tdSql.query(f'show consumers') tdSql.checkRows(1) - consumer1.close() + + tdSql.query(f'show subscriptions') + tdSql.checkRows(1) + + index = 0 + try: + while True: + res = consumer2.poll(1) + if not res: + if index != 1: + tdLog.exit("consume error") + break + val = res.value() + if val is None: + continue + cnt = 0; + for block in val: + cnt += len(block.fetchall()) + + if cnt != 8: + tdLog.exit("consume error") + + index += 1 + finally: + consumer2.close() + + consumer3 = Consumer(consumer_dict) + try: + consumer3.subscribe(["t2"]) + except TmqError: + tdLog.exit(f"subscribe error") + + tdSql.query(f'show consumers') + tdSql.checkRows(1) + + tdSql.execute(f'insert into t4 using st tags(3) values(now, 1)') + try: + res = consumer3.poll(1) + if not res: + tdLog.exit("consume1 error") + finally: + consumer3.close() + + tdSql.query(f'show consumers') + tdSql.checkRows(0) + + tdSql.query(f'show subscriptions') + tdSql.checkRows(1) + tdSql.execute(f'drop topic t1') tdSql.execute(f'drop topic t2') tdSql.execute(f'drop database d1')