diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 7cbe799bd9..210961a547 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -132,6 +132,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py +,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5 diff --git a/tests/system-test/7-tmq/tmqDataPrecisionUnit.py b/tests/system-test/7-tmq/tmqDataPrecisionUnit.py new file mode 100644 index 0000000000..f050116a1b --- /dev/null +++ b/tests/system-test/7-tmq/tmqDataPrecisionUnit.py @@ -0,0 +1,139 @@ +import sys +import re +import time +import threading +from taos.tmq import * +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + self.db_name = "tmq_db" + self.topic_name = "tmq_topic" + self.stable_name = "stb" + self.rows_per_table = 1000 + self.ctb_num = 100 + + def prepareData(self, precisionUnit="ms"): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + startTS = 1672502400000 + if precisionUnit == "us": + startTS = 1672502400000000 + elif precisionUnit == "ns": + startTS = 1672502400000000000 + + paraDict = { + 'dbName': self.db_name, + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': self.stable_name, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': self.ctb_num, + 'rowsPerTbl': self.rows_per_table, + 'batchNum': 100, + 'startTs': startTS, # 2023-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0 + } + + # init the consumer database + tmqCom.initConsumerTable() + + # create testing database、stable、ctables + tdCom.create_database(tdSql, paraDict["dbName"], paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, precision=precisionUnit) + tdLog.info("create database %s successfully" % paraDict["dbName"]) + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"], stbName=paraDict["stbName"]) + tdLog.info("create stable %s successfully" % paraDict["stbName"]) + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"], ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"], ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create child tables successfully") + + # insert data into tables and wait the async thread exit + tdLog.info("insert data into tables") + pThread = tmqCom.asyncInsertDataByInterlace(paraDict) + pThread.join() + + def run(self): + """Check tmq feature for different data precision unit like "ms、us、ns" + """ + precision_unit = ["ms", "us", "ns"] + for unit in precision_unit: + tdLog.info(f"start to test precision unit {unit}") + self.prepareData(precisionUnit=unit) + # drop database if exists + tdSql.execute(f"drop database if exists {self.db_name}") + self.prepareData(unit) + + # create topic + tdLog.info("create topic from %s" % self.stable_name) + queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(self.db_name, self.stable_name) + sqlString = "create topic %s as %s" %(self.topic_name, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # save consumer info + consumerId = 0 + expectrowcnt = self.rows_per_table * self.ctb_num + topicList = self.topic_name + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt, topicList, keyList, ifcheckdata, ifManualCommit) + + # start consume processor + paraDict = { + 'pollDelay': 15, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0 + } + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'], dbName=self.db_name, showMsg=paraDict['showMsg'], showRow=paraDict['showRow'], snapshot=paraDict['snapshot']) + + tdLog.info("start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsFromQuery = tdSql.getRows() + tdLog.info("act consume rows: %d, act query rows: %d "%(totalConsumeRows, totalRowsFromQuery)) + + if totalConsumeRows < totalRowsFromQuery: + tdLog.exit("tmq consume rows error!") + + tmqCom.waitSubscriptionExit(tdSql, self.topic_name) + tdSql.query("drop topic %s" % self.topic_name) + tdSql.execute("drop database %s" % self.db_name) + + def stop(self): + tdSql.execute(f"drop database if exists {self.db_name}") + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())