diff --git a/tests/system-test/7-tmq/TD-17803.py b/tests/system-test/7-tmq/TD-17803.py deleted file mode 100644 index 771ff83a29..0000000000 --- a/tests/system-test/7-tmq/TD-17803.py +++ /dev/null @@ -1,198 +0,0 @@ -from distutils.log import error -import taos -import sys -import time -import socket -import os -import threading -import subprocess -import platform - -from util.log import * -from util.sql import * -from util.cases import * -from util.dnodes import * -from util.common import * -sys.path.append("./7-tmq") -from tmqCommon import * - - - -class TDTestCase: - def __init__(self): - self.snapshot = 0 - self.replica = 3 - self.vgroups = 3 - self.ctbNum = 2 - self.rowsPerTbl = 2 - - def init(self, conn, logSql): - tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) - #tdSql.init(conn.cursor(), logSql) # output sql.txt file - - def checkFileContent(self, consumerId, queryString): - buildPath = tdCom.getBuildPath() - cfgPath = tdCom.getClientCfgPath() - dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId) - cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) - tdLog.info(cmdStr) - os.system(cmdStr) - - consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) - tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) - - consumeFile = open(consumeRowsFile, mode='r') - queryFile = open(dstFile, mode='r') - - # skip first line for it is schema - queryFile.readline() - - while True: - dst = queryFile.readline() - src = consumeFile.readline() - - if dst: - if dst != src: - tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) - else: - break - return - - def prepareTestEnv(self): - tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") - paraDict = {'dbName': 'dbt', - 'dropFlag': 1, - 'event': '', - 'vgroups': 4, - 'stbName': 'stb', - 'colPrefix': 'c', - 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1}], - 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1}], - 'ctbPrefix': 'ctb', - 'ctbStartIdx': 0, - 'ctbNum': 2, - 'rowsPerTbl': 1000, - 'batchNum': 10, - 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 3, - 'showMsg': 1, - 'showRow': 1, - 'snapshot': 0} - - paraDict['vgroups'] = self.vgroups - paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - - tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replica) - tdLog.info("create stb") - tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) - tdLog.info("create ctb") - tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], - ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("insert data") - tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", - # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - # tmqCom.asyncInsertDataByInterlace(paraDict) - tdLog.printNoPrefix("11111111111111111111111") - tmqCom.create_ntable(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=1) - tdLog.printNoPrefix("222222222222222") - tmqCom.insert_rows_into_ntbl(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"], startTs=paraDict["startTs"], tblNum=1, rows=2) # tdLog.info("restart taosd to ensure that the data falls into the disk") - - tdLog.printNoPrefix("333333333333333333333") - tdSql.query("drop database %s"%paraDict["dbName"]) - tdLog.printNoPrefix("44444444444444444") - return - - def tmqCase1(self): - tdLog.printNoPrefix("======== test case 1: ") - - # create and start thread - paraDict = {'dbName': 'dbt', - 'dropFlag': 1, - 'event': '', - 'vgroups': 4, - 'stbName': 'stb', - 'colPrefix': 'c', - 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], - 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], - 'ctbPrefix': 'ctb', - 'ctbStartIdx': 0, - 'ctbNum': 100, - 'rowsPerTbl': 1000, - 'batchNum': 100, - 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 3, - 'showMsg': 1, - 'showRow': 1, - 'snapshot': 1} - - paraDict['vgroups'] = self.vgroups - paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' - queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(paraDict['dbName'], paraDict['stbName']) - sqlString = "create topic %s as %s" %(topicFromStb1, queryString) - tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - - consumerId = 0 - expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] - topicList = topicFromStb1 - ifcheckdata = 0 - ifManualCommit = 0 - keyList = 'group.id:cgrp1,\ - enable.auto.commit:false,\ - auto.commit.interval.ms:6000,\ - auto.offset.reset:earliest' - tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - tdLog.info("start consume processor") - pollDelay = 100 - showMsg = 1 - showRow = 1 - tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - - tdLog.info("start to check consume result") - expectRows = 1 - resultList = tmqCom.selectConsumeResult(expectRows) - totalConsumeRows = 0 - for i in range(expectRows): - totalConsumeRows += resultList[i] - - tdSql.query(queryString) - totalRowsInserted = tdSql.getRows() - - tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) - - if totalConsumeRows != expectrowcnt: - tdLog.exit("tmq consume rows error!") - - # tmqCom.checkFileContent(consumerId, queryString) - - tmqCom.waitSubscriptionExit(tdSql, topicFromStb1) - tdSql.query("drop topic %s"%topicFromStb1) - - tdLog.printNoPrefix("======== test case 1 end ...... ") - - def run(self): - self.prepareTestEnv() - # self.tmqCase1() - - def stop(self): - tdSql.close() - tdLog.success(f"{__file__} successfully executed") - -event = threading.Event() - -tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase())