From 139136bc9889b1d8738d2a254609abd25ce95420 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 11 Aug 2022 16:21:31 +0800 Subject: [PATCH] fix test cases --- tests/system-test/99-TDcase/TD-16025.py | 54 ++++++++++++------------- tests/system-test/99-TDcase/TD-16821.py | 30 +++++++------- tests/system-test/99-TDcase/TD-17255.py | 42 +++++++++---------- tests/system-test/99-TDcase/TD-17699.py | 26 ++++++------ 4 files changed, 76 insertions(+), 76 deletions(-) diff --git a/tests/system-test/99-TDcase/TD-16025.py b/tests/system-test/99-TDcase/TD-16025.py index 6016b56192..4cf94f5daf 100644 --- a/tests/system-test/99-TDcase/TD-16025.py +++ b/tests/system-test/99-TDcase/TD-16025.py @@ -56,7 +56,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -65,12 +65,12 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def initConsumerInfoTable(self,cdbName='cdb'): + def initConsumerInfoTable(self,cdbName='cdb'): tdLog.info("drop consumeinfo table") tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -85,11 +85,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -98,9 +98,9 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -130,7 +130,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) return @@ -149,7 +149,7 @@ class TDTestCase: ctbDict[i] = 0 #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfCtb = 0 + rowsOfCtb = 0 while rowsOfCtb < rowsPerTbl: for i in range(ctbNum): sql += " %s.%s_%d values "%(dbName,ctbPrefix,i) @@ -176,7 +176,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s_%d values "%(ctbPrefix,i) for j in range(rowsPerTbl): @@ -207,7 +207,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i) for j in range(rowsPerTbl): @@ -226,8 +226,8 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - - def prepareEnv(self, **parameterDict): + + def prepareEnv(self, **parameterDict): # create new connector for my thread tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) @@ -246,8 +246,8 @@ class TDTestCase: return def tmqCase1(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 1: ") - + tdLog.printNoPrefix("======== test case 1: ") + self.initConsumerTable() # create and start thread @@ -264,7 +264,7 @@ class TDTestCase: 'batchNum': 23, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbPrefix"], parameterDict["ctbNum"]) @@ -272,7 +272,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -303,7 +303,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -313,8 +313,8 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 2: ") - + tdLog.printNoPrefix("======== test case 2: ") + self.initConsumerTable() # create and start thread @@ -339,7 +339,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] * 2 @@ -373,7 +373,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -387,8 +387,8 @@ class TDTestCase: # 自动建表完成数据插入,启动消费 def tmqCase3(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 3: ") - + tdLog.printNoPrefix("======== test case 3: ") + self.initConsumerTable() # create and start thread @@ -414,7 +414,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -444,7 +444,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -466,7 +466,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) # self.tmqCase1(cfgPath, buildPath) - # self.tmqCase2(cfgPath, buildPath) + # self.tmqCase2(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath) # self.tmqCase4(cfgPath, buildPath) # self.tmqCase5(cfgPath, buildPath) diff --git a/tests/system-test/99-TDcase/TD-16821.py b/tests/system-test/99-TDcase/TD-16821.py index ef74515792..31c1c34485 100644 --- a/tests/system-test/99-TDcase/TD-16821.py +++ b/tests/system-test/99-TDcase/TD-16821.py @@ -27,26 +27,26 @@ class TDTestCase: cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) tdLog.info(cmdStr) os.system(cmdStr) - + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) consumeFile = open(consumeRowsFile, mode='r') queryFile = open(dstFile, mode='r') - + # skip first line for it is schema queryFile.readline() while True: dst = queryFile.readline() src = consumeFile.readline() - + if dst: if dst != src: tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) else: break - return + return def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") @@ -78,7 +78,7 @@ class TDTestCase: tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) tdLog.info("insert data") tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - + tdLog.info("create topics from stb with filter") # queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) # queryString = "select ts, c1, c2 from %s.%s" %(paraDict['dbName'], paraDict['stbName']) @@ -88,7 +88,7 @@ class TDTestCase: tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) # queryString = 'select * from %s.%s'%(paraDict["dbName"],paraDict["stbName"]) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -104,15 +104,15 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("0 tmq consume rows error!") - self.checkFileContent(consumerId, queryString) + self.checkFileContent(consumerId, queryString) # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() @@ -121,7 +121,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[1], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) consumerId = 1 @@ -131,7 +131,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[1] != resultList[0]: @@ -147,8 +147,8 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[2], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) - expectRowsList.append(tdSql.getRows()) + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) consumerId = 2 topicList = topicNameList[2] @@ -157,7 +157,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) # if expectRowsList[2] != resultList[0]: @@ -166,7 +166,7 @@ class TDTestCase: # self.checkFileContent(consumerId, queryString) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/99-TDcase/TD-17255.py b/tests/system-test/99-TDcase/TD-17255.py index 28a1a1fd4a..5d18fee8d2 100644 --- a/tests/system-test/99-TDcase/TD-17255.py +++ b/tests/system-test/99-TDcase/TD-17255.py @@ -19,7 +19,7 @@ class TDTestCase: self.vgroups = 2 self.ctbNum = 100 self.rowsPerTbl = 10000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -49,7 +49,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -61,7 +61,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -93,7 +93,7 @@ class TDTestCase: # paraDict['vgroups'] = self.vgroups # paraDict['ctbNum'] = self.ctbNum # paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -107,12 +107,12 @@ class TDTestCase: startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + consumerId = 0 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] topicList = topicFromStb1 @@ -126,7 +126,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + # time.sleep(3) tmqCom.getStartCommitNotifyFromTmqsim() tdLog.info("================= restart dnode ===========================") @@ -143,7 +143,7 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + if totalConsumeRows != totalRowsInserted: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) tdLog.exit("tmq consume rows error!") @@ -153,7 +153,7 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") + tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -177,7 +177,7 @@ class TDTestCase: # paraDict['vgroups'] = self.vgroups # paraDict['ctbNum'] = self.ctbNum # paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -190,12 +190,12 @@ class TDTestCase: ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - + consumerId = 0 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 topicList = topicFromStb1 @@ -212,7 +212,7 @@ class TDTestCase: tdLog.info("create some new child table and insert data ") tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) - + tmqCom.getStartCommitNotifyFromTmqsim() tdLog.info("================= restart dnode ===========================") tdDnodes.stop(1) @@ -228,7 +228,7 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + if totalConsumeRows != totalRowsInserted: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) tdLog.exit("tmq consume rows error!") @@ -239,7 +239,7 @@ class TDTestCase: # 自动建表完成数据插入,启动消费 def tmqCase3(self): - tdLog.printNoPrefix("======== test case 3: ") + tdLog.printNoPrefix("======== test case 3: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', @@ -263,7 +263,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -272,12 +272,12 @@ class TDTestCase: tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - + tdSql.execute(sqlString) + consumerId = 0 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] topicList = topicFromStb1 @@ -306,7 +306,7 @@ class TDTestCase: tdSql.query(queryString) totalRowsInserted = tdSql.getRows() - + if totalConsumeRows != totalRowsInserted: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) tdLog.exit("tmq consume rows error!") @@ -320,7 +320,7 @@ class TDTestCase: tdSql.prepare() self.tmqCase1() - self.tmqCase2() + self.tmqCase2() self.tmqCase3() def stop(self): diff --git a/tests/system-test/99-TDcase/TD-17699.py b/tests/system-test/99-TDcase/TD-17699.py index cf5e7797ec..40650e4b92 100644 --- a/tests/system-test/99-TDcase/TD-17699.py +++ b/tests/system-test/99-TDcase/TD-17699.py @@ -48,7 +48,7 @@ class TDTestCase: pollDelay = 20 showMsg = 1 - showRow = 1 + showRow = 1 hostname = socket.gethostname() @@ -60,7 +60,7 @@ class TDTestCase: def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") tdLog.info("step 1: create database, stb, ctb and insert data") - + tmqCom.initConsumerTable(self.cdbName) tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"]) @@ -70,37 +70,37 @@ class TDTestCase: tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"]) tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"]) # pThread1 = tmqCom.asyncInsertData(paraDict=self.paraDict) - + self.paraDict["stbName"] = 'stb2' self.paraDict["ctbPrefix"] = 'newctb' self.paraDict["batchNum"] = 1000 tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"]) tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"]) # tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"]) - pThread2 = tmqCom.asyncInsertData(paraDict=self.paraDict) + pThread2 = tmqCom.asyncInsertData(paraDict=self.paraDict) tdLog.info("create topics from db") - topicName1 = 'UpperCasetopic_%s'%(self.paraDict['dbName']) + topicName1 = 'UpperCasetopic_%s'%(self.paraDict['dbName']) tdSql.execute("create topic %s as database %s" %(topicName1, self.paraDict['dbName'])) - + topicList = topicName1 + ',' +topicName1 keyList = '%s,%s,%s,%s'%(self.groupId,self.autoCommit,self.autoCommitInterval,self.autoOffset) self.expectrowcnt = self.paraDict["rowsPerTbl"] * self.paraDict["ctbNum"] * 2 tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit) - - tdLog.info("start consume processor") + + tdLog.info("start consume processor") # tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName) tmqCom.startTmqSimProcess(pollDelay=self.pollDelay,dbName=self.paraDict["dbName"],showMsg=self.showMsg, showRow=self.showRow,snapshot=self.paraDict['snapshot']) - tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("drop one stable") - self.paraDict["stbName"] = 'stb1' - tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName'])) + self.paraDict["stbName"] = 'stb1' + tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName'])) dropTblNum = int(self.paraDict["ctbNum"] / 4) tmqCom.drop_ctable(tdSql, dbname=self.paraDict['dbName'], count=dropTblNum, default_ctbname_prefix=self.paraDict["ctbPrefix"]) # pThread2.join() - + tdLog.info("wait result from consumer, then check it") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -108,7 +108,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if not (totalConsumeRows >= self.expectrowcnt*3/8 and totalConsumeRows <= self.expectrowcnt): tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt)) tdLog.exit("tmq consume rows error!")