diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py index 05a913f313..82000ba02a 100644 --- a/tests/system-test/7-tmq/tmqParamsTest.py +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -1,11 +1,7 @@ -import taos import sys import time -import socket -import os import threading -import math from taos.tmq import Consumer from util.log import * from util.sql import * @@ -24,18 +20,16 @@ class TDTestCase: self.wal_retention_period1 = 3600 self.wal_retention_period2 = 1 self.commit_value_list = ["true", "false"] - self.commit_value_list = ["true"] self.offset_value_list = ["", "earliest", "latest", "none"] self.tbname_value_list = ["true", "false"] self.snapshot_value_list = ["true", "false"] # self.commit_value_list = ["true"] - # self.offset_value_list = ["latest"] - # self.offset_value_list = ["earliest"] + # self.offset_value_list = ["none"] # self.tbname_value_list = ["true"] + # self.snapshot_value_list = ["true"] def tmqParamsTest(self): - tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'db1', 'dropFlag': 1, 'vgroups': 4, @@ -49,7 +43,7 @@ class TDTestCase: 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'auto_commit_interval': "100"} - + start_group_id = 1 for snapshot_value in self.snapshot_value_list: for commit_value in self.commit_value_list: @@ -64,15 +58,13 @@ class TDTestCase: tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) tdLog.info("insert data") tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - - + + tdLog.info("create topics from stb with filter") queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topic_name, queryString) - print("----", snapshot_value) tdSql.query(f'select * from information_schema.ins_databases') db_wal_retention_period_list = list(map(lambda x:x[-8] if x[0] == paraDict['dbName'] else None, tdSql.queryResult)) - print("---db_wal_retention_period_list", db_wal_retention_period_list) for i in range(len(db_wal_retention_period_list)): if db_wal_retention_period_list[0] is None or db_wal_retention_period_list[-1] is None: db_wal_retention_period_list.remove(None) @@ -82,7 +74,6 @@ class TDTestCase: time.sleep(self.wal_retention_period2+1) tdSql.execute(f'flush database {paraDict["dbName"]}') else: - print("iinininini") if db_wal_retention_period_list[0] != self.wal_retention_period1: tdSql.execute(f"alter database {paraDict['dbName']} wal_retention_period {self.wal_retention_period1}") tdLog.info("create topic sql: %s"%sqlString) @@ -100,35 +91,19 @@ class TDTestCase: "experimental.snapshot.enable": snapshot_value, "msg.with.table.name": tbname_value } - print(consumer_dict) consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 - # consumer_snapshot = 1 if consumer_dict["experimental.snapshot.enable"] == "true" else 0 consumer_ret = "earliest" if offset_value == "" else offset_value expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}' if len(offset_value) == 0: del consumer_dict["auto.offset.reset"] consumer = Consumer(consumer_dict) consumer.subscribe([topic_name]) + tdLog.info(f"enable.auto.commit: {commit_value}, auto.offset.reset: {offset_value}, experimental.snapshot.enable: {snapshot_value}, msg.with.table.name: {tbname_value}") stop_flag = 0 - # try: - # while True: - # res = consumer.poll(1) - - # tdSql.query('show consumers;') - # consumer_info = tdSql.queryResult[0][-1] - # if not res: - # break - # # err = res.error() - # # if err is not None: - # # raise err - # # val = res.value() - # # for block in val: - # # print(block.fetchall()) try: while True: res = consumer.poll(1) - tdSql.query('show consumers;') consumer_info = tdSql.queryResult[0][-1] if offset_value == "latest": @@ -153,42 +128,40 @@ class TDTestCase: start_group_id += 1 tdSql.query('show subscriptions;') subscription_info = tdSql.queryResult - print(subscription_info) if snapshot_value == "true": - if offset_value != "earliest": - pass + if offset_value != "earliest" and offset_value != "": + if offset_value == "latest": + offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info)) + tdSql.checkEqual(sum(offset_value_list) > 0, True) + rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) + tdSql.checkEqual(sum(rows_value_list), expected_res) + elif offset_value == "none": + offset_value_list = list(map(lambda x: x[-2], subscription_info)) + tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info)) + rows_value_list = list(map(lambda x: x[-1], subscription_info)) + tdSql.checkEqual(rows_value_list, [0]*len(subscription_info)) else: if offset_value != "none": offset_value_str = ",".join(list(map(lambda x: x[-2], subscription_info))) - print(offset_value_str) - tdSql.checkEqual("snapshot" in offset_value_str, True) + tdSql.checkEqual("tsdb" in offset_value_str, True) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) tdSql.checkEqual(sum(rows_value_list), expected_res) else: offset_value_list = list(map(lambda x: x[-2], subscription_info)) tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) - print(offset_value_list) rows_value_list = list(map(lambda x: x[-1], subscription_info)) tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) else: - print("====offset_value----", offset_value) if offset_value != "none": - offset_value_list = list(map(lambda x: int(x[-2].replace("log:", "")), subscription_info)) + offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info)) tdSql.checkEqual(sum(offset_value_list) > 0, True) - print(offset_value_list) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) tdSql.checkEqual(sum(rows_value_list), expected_res) else: offset_value_list = list(map(lambda x: x[-2], subscription_info)) - tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) - print(offset_value_list) + tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info)) - tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) - # tdSql.checkEqual(sum(rows_value_list), expected_res) - # if offset_value == "latest": - # tdSql.checkEqual(sum(rows_value_list), expected_res) - # else: - # tdSql.checkEqual(sum(rows_value_list), expected_res) + tdSql.checkEqual(rows_value_list, [0]*len(subscription_info)) tdSql.execute(f"drop topic if exists {topic_name}") tdSql.execute(f'drop database if exists {paraDict["dbName"]}')