From f0c6515e362df3d96a49fa418b1d32eb96a89e8d Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 19 Mar 2024 15:01:39 +0800 Subject: [PATCH] add testcase of disorder and null data in tmq test --- tests/system-test/7-tmq/tmq_ts4563.py | 38 ++++++++++++++++++++------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/tests/system-test/7-tmq/tmq_ts4563.py b/tests/system-test/7-tmq/tmq_ts4563.py index fc1cc259ce..a4b739f1ab 100644 --- a/tests/system-test/7-tmq/tmq_ts4563.py +++ b/tests/system-test/7-tmq/tmq_ts4563.py @@ -29,9 +29,11 @@ class TDTestCase: tdSql.execute(f'use db_stmt') tdSql.query("select ts,k from st") - tdSql.checkRows(2) + tdSql.checkRows(self.expected_affected_rows) tdSql.execute(f'create topic t_unorder_data as select ts,k from st') + tdSql.execute(f'create topic t_unorder_data_none as select i,k from st') + consumer_dict = { "group.id": "g1", "td.connect.user": "root", @@ -41,7 +43,7 @@ class TDTestCase: consumer = Consumer(consumer_dict) try: - consumer.subscribe(["t_unorder_data"]) + consumer.subscribe(["t_unorder_data", "t_unorder_data_none"]) except TmqError: tdLog.exit(f"subscribe error") @@ -49,19 +51,23 @@ class TDTestCase: try: while True: res = consumer.poll(1) - print(res) + print(cnt,res) if not res: - if cnt == 0: + print("111",cnt) + if cnt == 0 or cnt != 2*self.expected_affected_rows: tdLog.exit("consume error") break val = res.value() + print(val) if val is None: continue for block in val: + print(block.fetchall(),len(block.fetchall())) cnt += len(block.fetchall()) - - if cnt != 2: - tdLog.exit("consume error") + # print(cnt) + # if cnt != 3: + # tdLog.exit("consume error") + # print("polling") finally: consumer.close() @@ -110,20 +116,32 @@ class TDTestCase: params = new_multi_binds(2) params[0].timestamp((1626861392589, 1626861392590)) params[1].int([3, None]) - + # print(type(stmt)) tdLog.debug("bind_param_batch start") stmt.bind_param_batch(params) + tdLog.debug("bind_param_batch end") stmt.execute() tdLog.debug("execute end") + conn.execute("flush database %s" % dbname) + + params1 = new_multi_binds(2) + params1[0].timestamp((1626861392587,1626861392586)) + params1[1].int([None,3]) + stmt.bind_param_batch(params1) + stmt.execute() + end = datetime.now() print("elapsed time: ", end - start) - assert stmt.affected_rows == 2 + print(stmt.affected_rows) + self.expected_affected_rows = 4 + if stmt.affected_rows != self.expected_affected_rows : + tdLog.exit("affected_rows error") tdLog.debug("close start") stmt.close() - + # conn.execute("drop database if exists %s" % dbname) conn.close()