From 84d48bb7fad3b7c95c94dbf1ffce1a3990c86476 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Wed, 21 Jun 2023 15:23:39 +0800 Subject: [PATCH 1/5] test: add show-consumer-parameters --- tests/system-test/7-tmq/tmqParamsTest.py | 123 +++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 tests/system-test/7-tmq/tmqParamsTest.py diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py new file mode 100644 index 0000000000..fb61bf8fd8 --- /dev/null +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -0,0 +1,123 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math +from taos.tmq import Consumer +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + self.commit_value_list = ["true", "false"] + self.reset_value_list = ["", "earliest", "latest", "none"] + self.tbname_value_list = ["true", "false"] + + self.commit_value_list = ["true"] + self.reset_value_list = ["earliest"] + self.tbname_value_list = ["true"] + + def tmqParamsTest(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db1', + 'dropFlag': 1, + 'vgroups': 4, + 'stbName': 'stb', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'auto_commit_interval': "5000"} + + topic_name = 'topic1' + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) + tdLog.info("create stb") + tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) + tdLog.info("create ctb") + tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) + tdLog.info("insert data") + tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) + tdSql.execute("alter database %s wal_retention_period 3600" % (paraDict['dbName'])) + tdLog.info("create topics from stb with filter") + queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topic_name, queryString) + start_group_id = 1 + for commit_value in self.commit_value_list: + for reset_value in self.reset_value_list: + for tbname_value in self.tbname_value_list: + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + group_id = "csm_" + str(start_group_id) + consumer_dict = { + "group.id": group_id, + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.commit.interval.ms": paraDict["auto_commit_interval"], + "enable.auto.commit": commit_value, + "auto.offset.reset": reset_value, + "msg.with.table.name": tbname_value + } + consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 + consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 + consumer_ret = "earliest" if reset_value == "" else reset_value + expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}' + if len(reset_value) == 0: + del consumer_dict["auto.offset.reset"] + print(consumer_dict) + consumer = Consumer(consumer_dict) + consumer.subscribe([topic_name]) + try: + while True: + res = consumer.poll(1) + tdSql.query('show consumers;') + consumer_info = tdSql.queryResult[0][-1] + if not res: + break + err = res.error() + if err is not None: + raise err + val = res.value() + + # for block in val: + # print(block.fetchall()) + finally: + consumer.unsubscribe() + consumer.close() + tdSql.checkEqual(consumer_info, expected_parameters) + start_group_id += 1 + tdSql.query('show subscriptions;') + subscription_info = tdSql.queryResult + offset_value_list = list(map(lambda x: int(x[-2].replace("log:", "")), subscription_info)) + print(offset_value_list) + rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) + print(rows_value_list) + tdSql.execute(f"drop topic if exists {topic_name}") + return + + def run(self): + self.tmqParamsTest() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From 7eff0410e1c5ee0d9397fc60ff5517fdb9b8fadb Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Sun, 25 Jun 2023 15:25:46 +0800 Subject: [PATCH 2/5] test: save --- tests/system-test/7-tmq/tmqParamsTest.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py index fb61bf8fd8..9565beade3 100644 --- a/tests/system-test/7-tmq/tmqParamsTest.py +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -21,11 +21,12 @@ class TDTestCase: tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) self.commit_value_list = ["true", "false"] - self.reset_value_list = ["", "earliest", "latest", "none"] + self.offset_value_list = ["", "earliest", "latest", "none"] + self.offset_value_list = ["", "earliest", "none"] self.tbname_value_list = ["true", "false"] self.commit_value_list = ["true"] - self.reset_value_list = ["earliest"] + self.offset_value_list = ["latest"] self.tbname_value_list = ["true"] def tmqParamsTest(self): @@ -58,11 +59,12 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topic_name, queryString) start_group_id = 1 for commit_value in self.commit_value_list: - for reset_value in self.reset_value_list: + for offset_value in self.offset_value_list: for tbname_value in self.tbname_value_list: tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) tdSql.query(queryString) + expected_res = tdSql.queryRows group_id = "csm_" + str(start_group_id) consumer_dict = { "group.id": group_id, @@ -70,18 +72,19 @@ class TDTestCase: "td.connect.pass": "taosdata", "auto.commit.interval.ms": paraDict["auto_commit_interval"], "enable.auto.commit": commit_value, - "auto.offset.reset": reset_value, + "auto.offset.reset": offset_value, "msg.with.table.name": tbname_value } + print(consumer_dict) consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 - consumer_ret = "earliest" if reset_value == "" else reset_value + consumer_ret = "earliest" if offset_value == "" else offset_value expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}' - if len(reset_value) == 0: + if len(offset_value) == 0: del consumer_dict["auto.offset.reset"] - print(consumer_dict) consumer = Consumer(consumer_dict) consumer.subscribe([topic_name]) + tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],int(round(time.time()*1000))) try: while True: res = consumer.poll(1) @@ -92,8 +95,7 @@ class TDTestCase: err = res.error() if err is not None: raise err - val = res.value() - + # val = res.value() # for block in val: # print(block.fetchall()) finally: @@ -104,11 +106,12 @@ class TDTestCase: tdSql.query('show subscriptions;') subscription_info = tdSql.queryResult offset_value_list = list(map(lambda x: int(x[-2].replace("log:", "")), subscription_info)) + tdSql.checkEqual(sum(offset_value_list) > 0, True) print(offset_value_list) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) print(rows_value_list) + tdSql.checkEqual(sum(rows_value_list), expected_res) tdSql.execute(f"drop topic if exists {topic_name}") - return def run(self): self.tmqParamsTest() From faa421eddb27486924e2c0d302ce25a52bfff7fe Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Sun, 25 Jun 2023 20:15:00 +0800 Subject: [PATCH 3/5] test: save --- tests/system-test/7-tmq/tmqParamsTest.py | 227 +++++++++++++++-------- 1 file changed, 153 insertions(+), 74 deletions(-) diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py index 9565beade3..05a913f313 100644 --- a/tests/system-test/7-tmq/tmqParamsTest.py +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -16,18 +16,23 @@ sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: + updatecfgDict = {'debugFlag': 135} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) + self.wal_retention_period1 = 3600 + self.wal_retention_period2 = 1 self.commit_value_list = ["true", "false"] - self.offset_value_list = ["", "earliest", "latest", "none"] - self.offset_value_list = ["", "earliest", "none"] - self.tbname_value_list = ["true", "false"] - self.commit_value_list = ["true"] - self.offset_value_list = ["latest"] - self.tbname_value_list = ["true"] + self.offset_value_list = ["", "earliest", "latest", "none"] + self.tbname_value_list = ["true", "false"] + self.snapshot_value_list = ["true", "false"] + + # self.commit_value_list = ["true"] + # self.offset_value_list = ["latest"] + # self.offset_value_list = ["earliest"] + # self.tbname_value_list = ["true"] def tmqParamsTest(self): tdLog.printNoPrefix("======== test case 1: ") @@ -42,76 +47,150 @@ class TDTestCase: 'rowsPerTbl': 10000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'auto_commit_interval': "5000"} + 'auto_commit_interval': "100"} - topic_name = 'topic1' - tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) - tdLog.info("create stb") - tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) - tdLog.info("create ctb") - tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) - tdLog.info("insert data") - tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - tdSql.execute("alter database %s wal_retention_period 3600" % (paraDict['dbName'])) - tdLog.info("create topics from stb with filter") - queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) - sqlString = "create topic %s as %s" %(topic_name, queryString) + start_group_id = 1 - for commit_value in self.commit_value_list: - for offset_value in self.offset_value_list: - for tbname_value in self.tbname_value_list: - tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - tdSql.query(queryString) - expected_res = tdSql.queryRows - group_id = "csm_" + str(start_group_id) - consumer_dict = { - "group.id": group_id, - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.commit.interval.ms": paraDict["auto_commit_interval"], - "enable.auto.commit": commit_value, - "auto.offset.reset": offset_value, - "msg.with.table.name": tbname_value - } - print(consumer_dict) - consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 - consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 - consumer_ret = "earliest" if offset_value == "" else offset_value - expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}' - if len(offset_value) == 0: - del consumer_dict["auto.offset.reset"] - consumer = Consumer(consumer_dict) - consumer.subscribe([topic_name]) - tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],int(round(time.time()*1000))) - try: - while True: - res = consumer.poll(1) - tdSql.query('show consumers;') - consumer_info = tdSql.queryResult[0][-1] - if not res: - break - err = res.error() - if err is not None: - raise err - # val = res.value() - # for block in val: - # print(block.fetchall()) - finally: - consumer.unsubscribe() - consumer.close() - tdSql.checkEqual(consumer_info, expected_parameters) - start_group_id += 1 - tdSql.query('show subscriptions;') - subscription_info = tdSql.queryResult - offset_value_list = list(map(lambda x: int(x[-2].replace("log:", "")), subscription_info)) - tdSql.checkEqual(sum(offset_value_list) > 0, True) - print(offset_value_list) - rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) - print(rows_value_list) - tdSql.checkEqual(sum(rows_value_list), expected_res) - tdSql.execute(f"drop topic if exists {topic_name}") + for snapshot_value in self.snapshot_value_list: + for commit_value in self.commit_value_list: + for offset_value in self.offset_value_list: + for tbname_value in self.tbname_value_list: + topic_name = 'topic1' + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) + tdLog.info("create stb") + tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) + tdLog.info("create ctb") + tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) + tdLog.info("insert data") + tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) + + + tdLog.info("create topics from stb with filter") + queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topic_name, queryString) + print("----", snapshot_value) + tdSql.query(f'select * from information_schema.ins_databases') + db_wal_retention_period_list = list(map(lambda x:x[-8] if x[0] == paraDict['dbName'] else None, tdSql.queryResult)) + print("---db_wal_retention_period_list", db_wal_retention_period_list) + for i in range(len(db_wal_retention_period_list)): + if db_wal_retention_period_list[0] is None or db_wal_retention_period_list[-1] is None: + db_wal_retention_period_list.remove(None) + if snapshot_value =="true": + if db_wal_retention_period_list[0] != self.wal_retention_period2: + tdSql.execute(f"alter database {paraDict['dbName']} wal_retention_period {self.wal_retention_period2}") + time.sleep(self.wal_retention_period2+1) + tdSql.execute(f'flush database {paraDict["dbName"]}') + else: + print("iinininini") + if db_wal_retention_period_list[0] != self.wal_retention_period1: + tdSql.execute(f"alter database {paraDict['dbName']} wal_retention_period {self.wal_retention_period1}") + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + expected_res = tdSql.queryRows + group_id = "csm_" + str(start_group_id) + consumer_dict = { + "group.id": group_id, + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.commit.interval.ms": paraDict["auto_commit_interval"], + "enable.auto.commit": commit_value, + "auto.offset.reset": offset_value, + "experimental.snapshot.enable": snapshot_value, + "msg.with.table.name": tbname_value + } + print(consumer_dict) + consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 + consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 + # consumer_snapshot = 1 if consumer_dict["experimental.snapshot.enable"] == "true" else 0 + consumer_ret = "earliest" if offset_value == "" else offset_value + expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}' + if len(offset_value) == 0: + del consumer_dict["auto.offset.reset"] + consumer = Consumer(consumer_dict) + consumer.subscribe([topic_name]) + stop_flag = 0 + # try: + # while True: + # res = consumer.poll(1) + + # tdSql.query('show consumers;') + # consumer_info = tdSql.queryResult[0][-1] + # if not res: + # break + # # err = res.error() + # # if err is not None: + # # raise err + # # val = res.value() + # # for block in val: + # # print(block.fetchall()) + try: + while True: + res = consumer.poll(1) + + tdSql.query('show consumers;') + consumer_info = tdSql.queryResult[0][-1] + if offset_value == "latest": + if not res and stop_flag == 1: + break + else: + if not res: + break + # err = res.error() + # if err is not None: + # raise err + # val = res.value() + # for block in val: + # print(block.fetchall()) + if offset_value == "latest" and stop_flag == 0: + tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],int(round(time.time()*1000))) + stop_flag = 1 + finally: + consumer.unsubscribe() + consumer.close() + tdSql.checkEqual(consumer_info, expected_parameters) + start_group_id += 1 + tdSql.query('show subscriptions;') + subscription_info = tdSql.queryResult + print(subscription_info) + if snapshot_value == "true": + if offset_value != "earliest": + pass + else: + if offset_value != "none": + offset_value_str = ",".join(list(map(lambda x: x[-2], subscription_info))) + print(offset_value_str) + tdSql.checkEqual("snapshot" in offset_value_str, True) + rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) + tdSql.checkEqual(sum(rows_value_list), expected_res) + else: + offset_value_list = list(map(lambda x: x[-2], subscription_info)) + tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) + print(offset_value_list) + rows_value_list = list(map(lambda x: x[-1], subscription_info)) + tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) + else: + print("====offset_value----", offset_value) + if offset_value != "none": + offset_value_list = list(map(lambda x: int(x[-2].replace("log:", "")), subscription_info)) + tdSql.checkEqual(sum(offset_value_list) > 0, True) + print(offset_value_list) + rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) + tdSql.checkEqual(sum(rows_value_list), expected_res) + else: + offset_value_list = list(map(lambda x: x[-2], subscription_info)) + tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) + print(offset_value_list) + rows_value_list = list(map(lambda x: x[-1], subscription_info)) + tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) + # tdSql.checkEqual(sum(rows_value_list), expected_res) + # if offset_value == "latest": + # tdSql.checkEqual(sum(rows_value_list), expected_res) + # else: + # tdSql.checkEqual(sum(rows_value_list), expected_res) + tdSql.execute(f"drop topic if exists {topic_name}") + tdSql.execute(f'drop database if exists {paraDict["dbName"]}') def run(self): self.tmqParamsTest() From 7445a8aba9ecbc8cbe274f4a2134d812b00c8c07 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Mon, 26 Jun 2023 18:23:34 +0800 Subject: [PATCH 4/5] test: finish testcases for TS-3495 --- tests/system-test/7-tmq/tmqParamsTest.py | 69 ++++++++---------------- 1 file changed, 21 insertions(+), 48 deletions(-) diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py index 05a913f313..82000ba02a 100644 --- a/tests/system-test/7-tmq/tmqParamsTest.py +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -1,11 +1,7 @@ -import taos import sys import time -import socket -import os import threading -import math from taos.tmq import Consumer from util.log import * from util.sql import * @@ -24,18 +20,16 @@ class TDTestCase: self.wal_retention_period1 = 3600 self.wal_retention_period2 = 1 self.commit_value_list = ["true", "false"] - self.commit_value_list = ["true"] self.offset_value_list = ["", "earliest", "latest", "none"] self.tbname_value_list = ["true", "false"] self.snapshot_value_list = ["true", "false"] # self.commit_value_list = ["true"] - # self.offset_value_list = ["latest"] - # self.offset_value_list = ["earliest"] + # self.offset_value_list = ["none"] # self.tbname_value_list = ["true"] + # self.snapshot_value_list = ["true"] def tmqParamsTest(self): - tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'db1', 'dropFlag': 1, 'vgroups': 4, @@ -49,7 +43,7 @@ class TDTestCase: 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'auto_commit_interval': "100"} - + start_group_id = 1 for snapshot_value in self.snapshot_value_list: for commit_value in self.commit_value_list: @@ -64,15 +58,13 @@ class TDTestCase: tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) tdLog.info("insert data") tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - - + + tdLog.info("create topics from stb with filter") queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topic_name, queryString) - print("----", snapshot_value) tdSql.query(f'select * from information_schema.ins_databases') db_wal_retention_period_list = list(map(lambda x:x[-8] if x[0] == paraDict['dbName'] else None, tdSql.queryResult)) - print("---db_wal_retention_period_list", db_wal_retention_period_list) for i in range(len(db_wal_retention_period_list)): if db_wal_retention_period_list[0] is None or db_wal_retention_period_list[-1] is None: db_wal_retention_period_list.remove(None) @@ -82,7 +74,6 @@ class TDTestCase: time.sleep(self.wal_retention_period2+1) tdSql.execute(f'flush database {paraDict["dbName"]}') else: - print("iinininini") if db_wal_retention_period_list[0] != self.wal_retention_period1: tdSql.execute(f"alter database {paraDict['dbName']} wal_retention_period {self.wal_retention_period1}") tdLog.info("create topic sql: %s"%sqlString) @@ -100,35 +91,19 @@ class TDTestCase: "experimental.snapshot.enable": snapshot_value, "msg.with.table.name": tbname_value } - print(consumer_dict) consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 - # consumer_snapshot = 1 if consumer_dict["experimental.snapshot.enable"] == "true" else 0 consumer_ret = "earliest" if offset_value == "" else offset_value expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}' if len(offset_value) == 0: del consumer_dict["auto.offset.reset"] consumer = Consumer(consumer_dict) consumer.subscribe([topic_name]) + tdLog.info(f"enable.auto.commit: {commit_value}, auto.offset.reset: {offset_value}, experimental.snapshot.enable: {snapshot_value}, msg.with.table.name: {tbname_value}") stop_flag = 0 - # try: - # while True: - # res = consumer.poll(1) - - # tdSql.query('show consumers;') - # consumer_info = tdSql.queryResult[0][-1] - # if not res: - # break - # # err = res.error() - # # if err is not None: - # # raise err - # # val = res.value() - # # for block in val: - # # print(block.fetchall()) try: while True: res = consumer.poll(1) - tdSql.query('show consumers;') consumer_info = tdSql.queryResult[0][-1] if offset_value == "latest": @@ -153,42 +128,40 @@ class TDTestCase: start_group_id += 1 tdSql.query('show subscriptions;') subscription_info = tdSql.queryResult - print(subscription_info) if snapshot_value == "true": - if offset_value != "earliest": - pass + if offset_value != "earliest" and offset_value != "": + if offset_value == "latest": + offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info)) + tdSql.checkEqual(sum(offset_value_list) > 0, True) + rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) + tdSql.checkEqual(sum(rows_value_list), expected_res) + elif offset_value == "none": + offset_value_list = list(map(lambda x: x[-2], subscription_info)) + tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info)) + rows_value_list = list(map(lambda x: x[-1], subscription_info)) + tdSql.checkEqual(rows_value_list, [0]*len(subscription_info)) else: if offset_value != "none": offset_value_str = ",".join(list(map(lambda x: x[-2], subscription_info))) - print(offset_value_str) - tdSql.checkEqual("snapshot" in offset_value_str, True) + tdSql.checkEqual("tsdb" in offset_value_str, True) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) tdSql.checkEqual(sum(rows_value_list), expected_res) else: offset_value_list = list(map(lambda x: x[-2], subscription_info)) tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) - print(offset_value_list) rows_value_list = list(map(lambda x: x[-1], subscription_info)) tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) else: - print("====offset_value----", offset_value) if offset_value != "none": - offset_value_list = list(map(lambda x: int(x[-2].replace("log:", "")), subscription_info)) + offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info)) tdSql.checkEqual(sum(offset_value_list) > 0, True) - print(offset_value_list) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) tdSql.checkEqual(sum(rows_value_list), expected_res) else: offset_value_list = list(map(lambda x: x[-2], subscription_info)) - tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) - print(offset_value_list) + tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info)) - tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) - # tdSql.checkEqual(sum(rows_value_list), expected_res) - # if offset_value == "latest": - # tdSql.checkEqual(sum(rows_value_list), expected_res) - # else: - # tdSql.checkEqual(sum(rows_value_list), expected_res) + tdSql.checkEqual(rows_value_list, [0]*len(subscription_info)) tdSql.execute(f"drop topic if exists {topic_name}") tdSql.execute(f'drop database if exists {paraDict["dbName"]}') From a7c937e202abaa998b5113b58c8457dedcf28f25 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Mon, 26 Jun 2023 18:30:39 +0800 Subject: [PATCH 5/5] test: finish testcases for TS-3495 --- tests/parallel_test/cases.task | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 9bb3f75b3b..e24045207a 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -33,6 +33,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb3.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py