diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index a5ca819a50..f76616616d 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -33,7 +33,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb3.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py - +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3 diff --git a/tests/system-test/7-tmq/tmqMaxTopic.py b/tests/system-test/7-tmq/tmqMaxTopic.py new file mode 100644 index 0000000000..5dc49fe48f --- /dev/null +++ b/tests/system-test/7-tmq/tmqMaxTopic.py @@ -0,0 +1,262 @@ + +import sys +import time +import threading +from taos.tmq import Consumer +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135} + + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 10 + self.tmqMaxTopicNum = 20 + self.tmqMaxGroups = 100 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def modifyMaxTopics(self, tmqMaxTopicNum): + # single dnode + cfgDir = tdDnodes.dnodes[0].cfgDir + + # cluster dnodes + # tdDnodes[1].dataDir + # tdDnodes[1].logDir + # tdDnodes[1].cfgDir + + cfgFile = f"%s/taos.cfg"%(cfgDir) + shellCmd = 'echo "tmqMaxTopicNum %d" >> %s'%(tmqMaxTopicNum, cfgFile) + tdLog.info(" shell cmd: %s"%(shellCmd)) + os.system(shellCmd) + tdDnodes.stoptaosd(1) + tdDnodes.starttaosd(1) + time.sleep(5) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdSql.execute("alter database %s wal_retention_period 3600" % (paraDict['dbName'])) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdDnodes.stop(1) + # tdDnodes.start(1) + tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqSubscribe(self, **inputDict): + # create new connector for new tdSql instance in my thread + # newTdSql = tdCom.newTdSql() + # topicName = inputDict['topic_name'] + # group_id = inputDict['group_id'] + + consumer_dict = { + "group.id": inputDict['group_id_prefix'], + "client.id": "client", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.commit.interval.ms": "1000", + "enable.auto.commit": "true", + "auto.offset.reset": "earliest", + "experimental.snapshot.enable": "false", + "msg.with.table.name": "false" + } + + for j in range(self.tmqMaxGroups): + consumer_dict["group.id"] = f"%s_%d"%(inputDict['group_id_prefix'], j) + consumer_dict["client.id"] = f"%s_%d"%(inputDict['group_id_prefix'], j) + print("======grpid: %s"%(consumer_dict["group.id"])) + consumer = Consumer(consumer_dict) + # print("======%s"%(inputDict['topic_name'])) + consumer.subscribe([inputDict['topic_name']]) + # res = consumer.poll(inputDict['pollDelay']) + return + + def asyncSubscribe(self, inputDict): + pThread = threading.Thread(target=self.tmqSubscribe, kwargs=inputDict) + pThread.start() + return pThread + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNamePrefix = 'topicname_' + tdLog.info("create topics from stb") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + for i in range(self.tmqMaxTopicNum): + sqlString = "create topic %s%d as %s" %(topicNamePrefix, i, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + sqlString = "create topic %s%s as %s" %(topicNamePrefix, 'xyz', queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.error(sqlString) + + tdSql.query('show topics;') + topicNum = tdSql.queryRows + tdLog.info(" topic count: %d"%(topicNum)) + if topicNum != self.tmqMaxTopicNum: + tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, self.tmqMaxTopicNum)) + + # self.updatecfgDict = {'tmqMaxTopicNum': 22} + # tdDnodes.stoptaosd(1) + # tdDnodes.deploy(1, self.updatecfgDict) + # tdDnodes.starttaosd(1) + # time.sleep(5) + + newTmqMaxTopicNum = 22 + self.modifyMaxTopics(newTmqMaxTopicNum) + + sqlString = "create topic %s%s as %s" %(topicNamePrefix, 'x', queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + sqlString = "create topic %s%s as %s" %(topicNamePrefix, 'y', queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + sqlString = "create topic %s%s as %s" %(topicNamePrefix, 'xyz', queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.error(sqlString) + + tdSql.query('show topics;') + topicNum = tdSql.queryRows + tdLog.info(" topic count: %d"%(topicNum)) + if topicNum != newTmqMaxTopicNum: + tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, newTmqMaxTopicNum)) + + newTmqMaxTopicNum = 18 + self.modifyMaxTopics(newTmqMaxTopicNum) + + i = 0 + sqlString = "drop topic %s%d" %(topicNamePrefix, i) + tdLog.info("drop topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + i = 1 + sqlString = "drop topic %s%d" %(topicNamePrefix, i) + tdLog.info("drop topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + sqlString = "drop topic %s%s" %(topicNamePrefix, "x") + tdLog.info("drop topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + sqlString = "drop topic %s%s" %(topicNamePrefix, "y") + tdLog.info("drop topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + sqlString = "create topic %s%s as %s" %(topicNamePrefix, 'xyz', queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.error(sqlString) + + # pThreadList = [] + # for i in range(self.tmqMaxTopicNum): + # topic_name = f"%s%d" %(topicNamePrefix, i) + # print("======%s"%(topic_name)) + # group_id_prefix = f"grp_%d"%(i) + # inputDict = {'group_id_prefix': group_id_prefix, + # 'topic_name': topic_name, + # 'pollDelay': 1 + # } + + # pThread = self.asyncSubscribe(inputDict) + # pThreadList.append(pThread) + + # for j in range(self.tmqMaxGroups): + # pThreadList[j].join() + + # time.sleep(5) + # tdSql.query('show subscriptions;') + # subscribeNum = tdSql.queryRows + # expectNum = self.tmqMaxGroups * self.tmqMaxTopicNum + # tdLog.info("loop index: %d, ======subscriptions %d and expect num: %d"%(i, subscribeNum, expectNum)) + # if subscribeNum != expectNum: + # tdLog.exit("subscriptions %d not equal expect num: %d"%(subscribeNum, expectNum)) + + # # drop all topics + # for i in range(self.tmqMaxTopicNum): + # sqlString = "drop topic %s%d" %(topicNamePrefix, i) + # tdLog.info("drop topic sql: %s"%sqlString) + # tdSql.execute(sqlString) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCase1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())