From 7eff0410e1c5ee0d9397fc60ff5517fdb9b8fadb Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Sun, 25 Jun 2023 15:25:46 +0800 Subject: [PATCH] test: save --- tests/system-test/7-tmq/tmqParamsTest.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py index fb61bf8fd8..9565beade3 100644 --- a/tests/system-test/7-tmq/tmqParamsTest.py +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -21,11 +21,12 @@ class TDTestCase: tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) self.commit_value_list = ["true", "false"] - self.reset_value_list = ["", "earliest", "latest", "none"] + self.offset_value_list = ["", "earliest", "latest", "none"] + self.offset_value_list = ["", "earliest", "none"] self.tbname_value_list = ["true", "false"] self.commit_value_list = ["true"] - self.reset_value_list = ["earliest"] + self.offset_value_list = ["latest"] self.tbname_value_list = ["true"] def tmqParamsTest(self): @@ -58,11 +59,12 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topic_name, queryString) start_group_id = 1 for commit_value in self.commit_value_list: - for reset_value in self.reset_value_list: + for offset_value in self.offset_value_list: for tbname_value in self.tbname_value_list: tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) tdSql.query(queryString) + expected_res = tdSql.queryRows group_id = "csm_" + str(start_group_id) consumer_dict = { "group.id": group_id, @@ -70,18 +72,19 @@ class TDTestCase: "td.connect.pass": "taosdata", "auto.commit.interval.ms": paraDict["auto_commit_interval"], "enable.auto.commit": commit_value, - "auto.offset.reset": reset_value, + "auto.offset.reset": offset_value, "msg.with.table.name": tbname_value } + print(consumer_dict) consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 - consumer_ret = "earliest" if reset_value == "" else reset_value + consumer_ret = "earliest" if offset_value == "" else offset_value expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}' - if len(reset_value) == 0: + if len(offset_value) == 0: del consumer_dict["auto.offset.reset"] - print(consumer_dict) consumer = Consumer(consumer_dict) consumer.subscribe([topic_name]) + tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],int(round(time.time()*1000))) try: while True: res = consumer.poll(1) @@ -92,8 +95,7 @@ class TDTestCase: err = res.error() if err is not None: raise err - val = res.value() - + # val = res.value() # for block in val: # print(block.fetchall()) finally: @@ -104,11 +106,12 @@ class TDTestCase: tdSql.query('show subscriptions;') subscription_info = tdSql.queryResult offset_value_list = list(map(lambda x: int(x[-2].replace("log:", "")), subscription_info)) + tdSql.checkEqual(sum(offset_value_list) > 0, True) print(offset_value_list) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) print(rows_value_list) + tdSql.checkEqual(sum(rows_value_list), expected_res) tdSql.execute(f"drop topic if exists {topic_name}") - return def run(self): self.tmqParamsTest()