diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 49b62d8abb..d30d88bb1c 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -516,17 +516,17 @@ class TDTestCase: "td.connect.pass": "taosdata", "auto.offset.reset": "earliest", } - consumer = Consumer(consumer_dict) + consumer1 = Consumer(consumer_dict) try: - consumer.subscribe(["t1"]) + consumer1.subscribe(["t1"]) except TmqError: tdLog.exit(f"subscribe error") index = 0 try: while True: - res = consumer.poll(1) + res = consumer1.poll(1) if not res: if index != 1: tdLog.exit("consume error") @@ -543,18 +543,85 @@ class TDTestCase: index += 1 finally: - consumer.close() + consumer1.close() - consumer1 = Consumer(consumer_dict) + consumer2 = Consumer(consumer_dict) try: - consumer1.subscribe(["t2"]) + consumer2.subscribe(["t2"]) except TmqError: tdLog.exit(f"subscribe error") + tdSql.query(f'show subscriptions') + tdSql.checkRows(2) + tdSql.checkData(0, 0, "t2") + tdSql.checkData(0, 1, 'g1') + tdSql.checkData(1, 0, 't1') + tdSql.checkData(1, 1, 'g1') + + tdSql.query(f'show consumers') + tdSql.checkRows(1) + tdSql.checkData(0, 1, 'g1') + tdSql.checkData(0, 4, 't2') tdSql.execute(f'drop consumer group g1 on t1') tdSql.query(f'show consumers') tdSql.checkRows(1) - consumer1.close() + tdSql.checkData(0, 1, 'g1') + tdSql.checkData(0, 4, 't2') + + tdSql.query(f'show subscriptions') + tdSql.checkRows(1) + tdSql.checkData(0, 0, "t2") + tdSql.checkData(0, 1, 'g1') + + index = 0 + try: + while True: + res = consumer2.poll(1) + if not res: + if index != 1: + tdLog.exit("consume error") + break + val = res.value() + if val is None: + continue + cnt = 0; + for block in val: + cnt += len(block.fetchall()) + + if cnt != 8: + tdLog.exit("consume error") + + index += 1 + finally: + consumer2.close() + + consumer3 = Consumer(consumer_dict) + try: + consumer3.subscribe(["t2"]) + except TmqError: + tdLog.exit(f"subscribe error") + + tdSql.query(f'show consumers') + tdSql.checkRows(1) + tdSql.checkData(0, 1, 'g1') + tdSql.checkData(0, 4, 't2') + + tdSql.execute(f'insert into t4 using st tags(3) values(now, 1)') + try: + res = consumer3.poll(1) + if not res: + tdLog.exit("consume1 error") + finally: + consumer3.close() + + tdSql.query(f'show consumers') + tdSql.checkRows(0) + + tdSql.query(f'show subscriptions') + tdSql.checkRows(1) + tdSql.checkData(0, 0, "t2") + tdSql.checkData(0, 1, 'g1') + tdSql.execute(f'drop topic t1') tdSql.execute(f'drop topic t2') tdSql.execute(f'drop database d1')