diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index 70813eba96..94201a335d 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -56,7 +56,7 @@ class TDTestCase: return cur def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): - tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" @@ -69,7 +69,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return @@ -96,7 +96,7 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - + def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) @@ -115,7 +115,7 @@ class TDTestCase: parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ - parameterDict["startTs"]) + parameterDict["startTs"]) return @@ -135,34 +135,34 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() prepareEnvThread.join() - + # wait stb ready while 1: - tdSql.query("show %s.stables"%parameterDict['dbName']) - if tdSql.getRows() == 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: break else: time.sleep(1) tdLog.info("create topics from super table") topicFromStb = 'topic_stb_column' - topicFromCtb = 'topic_ctb_column' - + topicFromCtb = 'topic_ctb_column' + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) - + time.sleep(1) tdSql.query("show topics") #tdSql.checkRows(2) topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) - + tdLog.info("show topics: %s, %s"%(topic1, topic2)) if topic1 != topicFromStb and topic1 != topicFromCtb: - tdLog.exit("topic error1") + tdLog.exit("topic error1") if topic2 != topicFromStb and topic2 != topicFromCtb: - tdLog.exit("topic error2") - + tdLog.exit("topic error2") + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") @@ -179,7 +179,7 @@ class TDTestCase: sql = "insert into consumeinfo values " sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) - + tdLog.info("check stb if there are data") while 1: tdSql.query("select count(*) from %s"%parameterDict["stbName"]) @@ -190,26 +190,26 @@ class TDTestCase: break else: time.sleep(1) - + tdLog.info("start consume processor") pollDelay = 20 showMsg = 1 showRow = 1 - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(shellCmd) + os.system(shellCmd) # wait for data ready # prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from consumeresult") @@ -229,7 +229,7 @@ class TDTestCase: tdSql.query("drop topic %s"%topicFromCtb) tdLog.printNoPrefix("======== test case 1 end ...... ") - + def tmqCase2(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 2: add child table with consuming ") # create and start thread @@ -246,13 +246,13 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() prepareEnvThread.join() - + # wait db ready while 1: tdSql.query("show databases") if tdSql.getRows() == 4: print ('==================================================') - print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0)) + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0)) index = 0 if tdSql.getData(0,0) == parameterDict['dbName']: index = 0 @@ -264,7 +264,7 @@ class TDTestCase: index = 3 else: continue - + if tdSql.getData(index,15) == 'ready': print("******************** index: %d"%index) break @@ -272,12 +272,12 @@ class TDTestCase: continue else: time.sleep(1) - + tdSql.query("use %s"%parameterDict['dbName']) # wait stb ready while 1: tdSql.query("show %s.stables"%parameterDict['dbName']) - if tdSql.getRows() == 1: + if tdSql.getRows() == 1: break else: time.sleep(1) @@ -285,20 +285,20 @@ class TDTestCase: tdLog.info("create topics from super table") topicFromStb = 'topic_stb_column2' topicFromCtb = 'topic_ctb_column2' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) - + time.sleep(1) tdSql.query("show topics") topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) tdLog.info("show topics: %s, %s"%(topic1, topic2)) if topic1 != topicFromStb and topic1 != topicFromCtb: - tdLog.exit("topic error1") + tdLog.exit("topic error1") if topic2 != topicFromStb and topic2 != topicFromCtb: - tdLog.exit("topic error2") - + tdLog.exit("topic error2") + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) @@ -316,7 +316,7 @@ class TDTestCase: sql = "insert into consumeinfo values " sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) - + tdLog.info("check stb if there are data") while 1: tdSql.query("select count(*) from %s"%parameterDict["stbName"]) @@ -327,21 +327,21 @@ class TDTestCase: break else: time.sleep(1) - + tdLog.info("start consume processor") pollDelay = 100 showMsg = 1 showRow = 1 if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(shellCmd) + os.system(shellCmd) # create new child table and insert data newCtbName = 'newctb' @@ -354,7 +354,7 @@ class TDTestCase: # wait for data ready prepareEnvThread.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from consumeresult") @@ -366,7 +366,7 @@ class TDTestCase: tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 3, expectrowcnt) - + tdSql.query("drop topic %s"%topicFromStb) tdSql.query("drop topic %s"%topicFromCtb) @@ -390,13 +390,13 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() prepareEnvThread.join() - + # wait db ready while 1: tdSql.query("show databases") - if tdSql.getRows() == 5: + if tdSql.getRows() == 5: print ('==================================================dbname: %s'%parameterDict['dbName']) - print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),tdSql.getData(3,0),tdSql.getData(4,0)) + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),tdSql.getData(3,0),tdSql.getData(4,0)) index = 0 if tdSql.getData(0,0) == parameterDict['dbName']: index = 0 @@ -409,8 +409,8 @@ class TDTestCase: elif tdSql.getData(4,0) == parameterDict['dbName']: index = 4 else: - continue - + continue + if tdSql.getData(index,15) == 'ready': print("******************** index: %d"%index) break @@ -418,16 +418,16 @@ class TDTestCase: continue else: time.sleep(1) - + tdSql.query("use %s"%parameterDict['dbName']) # wait stb ready while 1: tdSql.query("show %s.stables"%parameterDict['dbName']) - if tdSql.getRows() == 1: + if tdSql.getRows() == 1: break else: time.sleep(1) - + tdLog.info("create stable2 for the seconde topic") parameterDict2 = {'cfg': '', \ 'dbName': 'db3', \ @@ -439,23 +439,23 @@ class TDTestCase: 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict2['cfg'] = cfgPath tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName'])) - + tdLog.info("create topics from super table") topicFromStb = 'topic_stb_column3' topicFromStb2 = 'topic_stb_column32' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName'])) - + tdSql.query("show topics") topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) tdLog.info("show topics: %s, %s"%(topic1, topic2)) if topic1 != topicFromStb and topic1 != topicFromStb2: - tdLog.exit("topic error1") + tdLog.exit("topic error1") if topic2 != topicFromStb and topic2 != topicFromStb2: - tdLog.exit("topic error2") - + tdLog.exit("topic error2") + tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) @@ -472,7 +472,7 @@ class TDTestCase: sql = "insert into consumeinfo values " sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) - + tdLog.info("check stb if there are data") while 1: tdSql.query("select count(*) from %s"%parameterDict["stbName"]) @@ -483,22 +483,22 @@ class TDTestCase: break else: time.sleep(1) - + tdLog.info("start consume processor") pollDelay = 100 showMsg = 1 showRow = 1 - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(shellCmd) + os.system(shellCmd) # start the second thread to create new child table and insert data prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) @@ -507,7 +507,7 @@ class TDTestCase: # wait for data ready prepareEnvThread.join() prepareEnvThread2.join() - + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from consumeresult") @@ -519,7 +519,7 @@ class TDTestCase: tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 3, expectrowcnt) - + tdSql.query("drop topic %s"%topicFromStb) tdSql.query("drop topic %s"%topicFromStb2) @@ -537,7 +537,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase1(cfgPath, buildPath) - self.tmqCase2(cfgPath, buildPath) + self.tmqCase2(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath) def stop(self): diff --git a/tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py b/tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py index 2216000214..4add73ec2b 100644 --- a/tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py +++ b/tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 100 self.rowsPerTbl = 1000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -58,12 +58,12 @@ class TDTestCase: tdLog.info("create ctb") tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("insert data") tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("flush db to let data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) return @@ -93,18 +93,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - # tdSql.query(queryString) + # tdSql.query(queryString) # expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -121,29 +121,29 @@ class TDTestCase: paraDict['batchNum'] = 100 paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) - + tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) pInsertThread.join() - - tdSql.query(queryString) + + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) - - tdLog.info("wait the consume result") + + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) if expectRowsList[0] != resultList[0]: tdLog.exit("%d tmq consume rows error!"%consumerId) - # tmqCom.checkFileContent(consumerId, queryString) + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("flush database %s"%(paraDict['dbName'])) - + for i in range(len(topicNameList)): - tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) + tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 1 end ...... ") @@ -173,18 +173,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -200,36 +200,36 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) actConsumeRows = resultList[0] - + tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) if not (expectrowcnt <= actConsumeRows and totalRowsInserted >= actConsumeRows): tdLog.exit("%d tmq consume rows error!"%consumerId) - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 2 expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeRows = resultList[0] - tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) + tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) if not ((actConsumeRows >= expectrowcnt) and (totalRowsInserted > actConsumeRows)): tdLog.exit("%d tmq consume rows error!"%consumerId) - + for i in range(len(topicNameList)): - tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) + tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 2 end ...... ") diff --git a/tests/system-test/7-tmq/subscribeDb0.py b/tests/system-test/7-tmq/subscribeDb0.py index 4e8fb04517..7720001fbb 100644 --- a/tests/system-test/7-tmq/subscribeDb0.py +++ b/tests/system-test/7-tmq/subscribeDb0.py @@ -49,7 +49,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -58,7 +58,7 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -73,11 +73,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -85,20 +85,20 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum): - tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" @@ -111,8 +111,8 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - - event.set() + + event.set() tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return @@ -141,7 +141,7 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - + def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) @@ -159,7 +159,7 @@ class TDTestCase: parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ - parameterDict["startTs"]) + parameterDict["startTs"]) return def tmqCase4(self, cfgPath, buildPath): @@ -198,9 +198,9 @@ class TDTestCase: tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) - + consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] topicList = topicName1 @@ -211,10 +211,10 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + consumerId = 1 self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -224,16 +224,16 @@ class TDTestCase: self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() prepareEnvThread2.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 2 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -275,9 +275,9 @@ class TDTestCase: tdLog.info("create topics from db") topicName1 = 'topic_db1' - + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) - + consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] topicList = topicName1 @@ -288,10 +288,10 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + consumerId = 1 self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -304,16 +304,16 @@ class TDTestCase: prepareEnvThread2.start() # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() prepareEnvThread2.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 2 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows < expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/subscribeDb1.py b/tests/system-test/7-tmq/subscribeDb1.py index 28a341f8f3..404938158f 100644 --- a/tests/system-test/7-tmq/subscribeDb1.py +++ b/tests/system-test/7-tmq/subscribeDb1.py @@ -49,7 +49,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -58,7 +58,7 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -73,11 +73,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -85,20 +85,20 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum): - tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" @@ -111,8 +111,8 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - - event.set() + + event.set() tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return @@ -141,7 +141,7 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - + def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) @@ -159,7 +159,7 @@ class TDTestCase: parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ - parameterDict["startTs"]) + parameterDict["startTs"]) return def tmqCase6(self, cfgPath, buildPath): @@ -201,10 +201,10 @@ class TDTestCase: tdLog.info("create topics from db") topicName1 = 'topic_db60' topicName2 = 'topic_db61' - - tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) + + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName'])) - + consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] topicList = topicName1 + ',' + topicName2 @@ -215,10 +215,10 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + #consumerId = 1 #self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -228,16 +228,16 @@ class TDTestCase: self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() prepareEnvThread2.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -286,10 +286,10 @@ class TDTestCase: tdLog.info("create topics from db") topicName1 = 'topic_db60' topicName2 = 'topic_db61' - - tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) + + tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName'])) tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName'])) - + consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] topicList = topicName1 + ',' + topicName2 @@ -300,10 +300,10 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + consumerId = 1 self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + event.wait() tdLog.info("start consume processor") @@ -313,16 +313,16 @@ class TDTestCase: self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() prepareEnvThread2.join() - + tdLog.info("insert process end, and start to check consume result") expectRows = 2 resultList = self.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/subscribeStb3.py b/tests/system-test/7-tmq/subscribeStb3.py index e6eaa17564..9c1b3fd241 100644 --- a/tests/system-test/7-tmq/subscribeStb3.py +++ b/tests/system-test/7-tmq/subscribeStb3.py @@ -56,7 +56,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -65,12 +65,12 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def initConsumerInfoTable(self,cdbName='cdb'): + def initConsumerInfoTable(self,cdbName='cdb'): tdLog.info("drop consumeinfo table") tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -85,11 +85,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -97,14 +97,14 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -134,7 +134,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) return @@ -149,7 +149,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s_%d values "%(stbName,i) for j in range(rowsPerTbl): @@ -168,8 +168,8 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - - def prepareEnv(self, **parameterDict): + + def prepareEnv(self, **parameterDict): # create new connector for my thread tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) @@ -188,8 +188,8 @@ class TDTestCase: return def tmqCase10(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 10: ") - + tdLog.printNoPrefix("======== test case 10: ") + self.initConsumerTable() # create and start thread @@ -218,7 +218,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -243,7 +243,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != 0: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) tdLog.exit("tmq consume rows error!") @@ -267,7 +267,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt-10000: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt-10000)) tdLog.exit("tmq consume rows error!") @@ -291,7 +291,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt*2: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2)) tdLog.exit("tmq consume rows error!") @@ -302,7 +302,7 @@ class TDTestCase: def tmqCase11(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 11: ") - + self.initConsumerTable() # create and start thread @@ -318,7 +318,7 @@ class TDTestCase: 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) @@ -331,7 +331,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -356,7 +356,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != 0: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) tdLog.exit("tmq consume rows error!") @@ -378,7 +378,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != 0: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) tdLog.exit("tmq consume rows error!") @@ -389,7 +389,7 @@ class TDTestCase: def tmqCase12(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 12: ") - + self.initConsumerTable() # create and start thread @@ -405,7 +405,7 @@ class TDTestCase: 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) @@ -418,7 +418,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -443,7 +443,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt/4: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4)) tdLog.exit("tmq consume rows error!") @@ -465,7 +465,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt/4: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4)) tdLog.exit("tmq consume rows error!") @@ -476,7 +476,7 @@ class TDTestCase: def tmqCase13(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 13: ") - + self.initConsumerTable() # create and start thread @@ -492,7 +492,7 @@ class TDTestCase: 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) @@ -505,7 +505,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] @@ -530,7 +530,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt/4: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4)) tdLog.exit("tmq consume rows error!") @@ -553,7 +553,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt*(1/2+1/4): tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*(1/2+1/4))) tdLog.exit("tmq consume rows error!") @@ -576,7 +576,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -596,7 +596,7 @@ class TDTestCase: cfgPath = buildPath + "/../sim/psim/cfg" tdLog.info("cfgPath: %s" % cfgPath) - self.tmqCase10(cfgPath, buildPath) + self.tmqCase10(cfgPath, buildPath) self.tmqCase11(cfgPath, buildPath) self.tmqCase12(cfgPath, buildPath) self.tmqCase13(cfgPath, buildPath) diff --git a/tests/system-test/7-tmq/subscribeStb4.py b/tests/system-test/7-tmq/subscribeStb4.py index f3982c8f1f..33f4c4af1a 100644 --- a/tests/system-test/7-tmq/subscribeStb4.py +++ b/tests/system-test/7-tmq/subscribeStb4.py @@ -56,7 +56,7 @@ class TDTestCase: print(cur) return cur - def initConsumerTable(self,cdbName='cdb'): + def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) @@ -65,12 +65,12 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - def initConsumerInfoTable(self,cdbName='cdb'): + def initConsumerInfoTable(self,cdbName='cdb'): tdLog.info("drop consumeinfo table") tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) - def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) tdLog.info("consume info sql: %s"%sql) @@ -85,11 +85,11 @@ class TDTestCase: break else: time.sleep(5) - + for i in range(expectRows): tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) resultList.append(tdSql.getData(i , 3)) - + return resultList def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): @@ -97,14 +97,14 @@ class TDTestCase: logFile = cfgPath + '/../log/valgrind-tmq.log' shellCmd = 'nohup valgrind --log-file=' + logFile shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' - + if (platform.system().lower() == 'windows'): shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) - shellCmd += "> nul 2>&1 &" + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" else: shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) os.system(shellCmd) @@ -134,7 +134,7 @@ class TDTestCase: sql = pre_create if sql != pre_create: tsql.execute(sql) - + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) return @@ -149,7 +149,7 @@ class TDTestCase: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsOfSql = 0 for i in range(ctbNum): sql += " %s_%d values "%(stbName,i) for j in range(rowsPerTbl): @@ -168,8 +168,8 @@ class TDTestCase: tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return - - def prepareEnv(self, **parameterDict): + + def prepareEnv(self, **parameterDict): # create new connector for my thread tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) @@ -188,8 +188,8 @@ class TDTestCase: return def tmqCase1(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 1: ") - + tdLog.printNoPrefix("======== test case 1: ") + self.initConsumerTable() auotCtbNum = 5 @@ -208,7 +208,7 @@ class TDTestCase: 'batchNum': 100, 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) @@ -216,7 +216,7 @@ class TDTestCase: tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 expectrowcnt = parameterDict["rowsPerTbl"] * (auotCtbNum + parameterDict["ctbNum"]) @@ -248,7 +248,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -258,8 +258,8 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 2: ") - + tdLog.printNoPrefix("======== test case 2: ") + self.initConsumerTable() auotCtbNum = 10 @@ -278,7 +278,7 @@ class TDTestCase: 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath - + self.create_database(tdSql, parameterDict["dbName"]) self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) @@ -322,7 +322,7 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] - + if totalConsumeRows != expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -343,7 +343,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase1(cfgPath, buildPath) - self.tmqCase2(cfgPath, buildPath) + self.tmqCase2(cfgPath, buildPath) def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py index 882989bfb6..e4ce3b0f77 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 3000 self.rowsPerTbl = 70 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # # tdDnodes.start(1) @@ -95,18 +95,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -121,18 +121,18 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) - # tmqCom.checkFileContent(consumerId, queryString) + # tmqCom.checkFileContent(consumerId, queryString) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -163,18 +163,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -190,15 +190,15 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - + firstConsumeRows = resultList[0] # reinit consume info, and start tmq_sim, then check consume result @@ -206,22 +206,22 @@ class TDTestCase: consumerId = 2 expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeTotalRows = firstConsumeRows + resultList[0] - + if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows): - tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) + tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py index c334ff752b..da7d9e4651 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 10 self.rowsPerTbl = 10000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # # tdDnodes.start(1) @@ -95,18 +95,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -121,18 +121,18 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) - # tmqCom.checkFileContent(consumerId, queryString) + # tmqCom.checkFileContent(consumerId, queryString) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -163,18 +163,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -190,15 +190,15 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - + firstConsumeRows = resultList[0] # reinit consume info, and start tmq_sim, then check consume result @@ -206,22 +206,22 @@ class TDTestCase: consumerId = 2 expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeTotalRows = firstConsumeRows + resultList[0] - + if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows): - tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) + tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py index ec11b3286c..07fb9c7751 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 1 self.ctbNum = 1 self.rowsPerTbl = 1000000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -93,11 +93,11 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") # queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379) @@ -105,7 +105,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -125,17 +125,17 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 2 resultList = tmqCom.selectConsumeResult(expectRows) actConsumeTotalRows = resultList[0] + resultList[1] - + if not (totalRowsInserted == actConsumeTotalRows): tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -166,11 +166,11 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379) # queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) @@ -178,7 +178,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -194,35 +194,35 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("wait commit notify") tmqCom.getStartCommitNotifyFromTmqsim() tdLog.info("pkill consume processor") tdCom.killProcessor("tmq_sim") - + # time.sleep(10) - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 6 tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tdLog.info("wait the consume result") - + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeTotalRows = resultList[0] - + if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): - tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) + tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py index 4878652593..ecdb0a4358 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py @@ -20,7 +20,7 @@ class TDTestCase: self.vgroups = 1 self.ctbNum = 1 self.rowsPerTbl = 100000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -62,7 +62,7 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - + tdLog.info("restart taosd to ensure that the data falls into the disk") # tdDnodes.stop(1) # tdDnodes.start(1) @@ -93,18 +93,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -124,17 +124,17 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - + tdLog.info("wait the consume result") + expectRows = 2 resultList = tmqCom.selectConsumeResult(expectRows) actConsumeTotalRows = resultList[0] + resultList[1] - + if not (totalRowsInserted == actConsumeTotalRows): tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -165,18 +165,18 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - + tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) totalRowsInserted = expectRowsList[0] @@ -192,35 +192,35 @@ class TDTestCase: tdLog.info("start consume processor 0") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - + tdLog.info("wait commit notify") tmqCom.getStartCommitNotifyFromTmqsim() tdLog.info("pkill consume processor") tdCom.killProcessor("tmq_sim") - + # time.sleep(10) - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 6 tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - + tdLog.info("start consume processor 1") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tdLog.info("wait the consume result") - + expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + actConsumeTotalRows = resultList[0] - + if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): - tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) + tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) diff --git a/tests/system-test/7-tmq/tmqDropNtb-snapshot1.py b/tests/system-test/7-tmq/tmqDropNtb-snapshot1.py index b23f422585..20e363341f 100644 --- a/tests/system-test/7-tmq/tmqDropNtb-snapshot1.py +++ b/tests/system-test/7-tmq/tmqDropNtb-snapshot1.py @@ -20,11 +20,11 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 1000 self.rowsPerTbl = 10 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) - + # drop some ntbs def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") @@ -51,8 +51,8 @@ class TDTestCase: paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - + paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() tdLog.info("start create database....") tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) @@ -60,11 +60,11 @@ class TDTestCase: tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) tdLog.info("start insert data into normal tables....") tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) - + tdLog.info("create topics from database") - topicFromDb = 'topic_dbt' + topicFromDb = 'topic_dbt' tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) - + if self.snapshot == 0: consumerId = 0 elif self.snapshot == 1: @@ -83,13 +83,13 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("drop some ntables") # drop 1/4 ctbls from half offset paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2) paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -98,19 +98,19 @@ class TDTestCase: totalConsumeRows += resultList[i] tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - + if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") - - tdLog.info("wait subscriptions exit ....") + + tdLog.info("wait subscriptions exit ....") tmqCom.waitSubscriptionExit(tdSql, topicFromDb) - + tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) tdLog.printNoPrefix("======== test case 1 end ...... ") - - + + # drop some ntbs and create some new ntbs def tmqCase2(self): tdLog.printNoPrefix("======== test case 2: ") @@ -137,8 +137,8 @@ class TDTestCase: paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - + paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() tdLog.info("start create database....") tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) @@ -146,11 +146,11 @@ class TDTestCase: tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) tdLog.info("start insert data into normal tables....") tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) - + tdLog.info("create topics from database") - topicFromDb = 'topic_dbt' + topicFromDb = 'topic_dbt' tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) - + if self.snapshot == 0: consumerId = 2 elif self.snapshot == 1: @@ -169,20 +169,20 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("drop some ntables") # drop 1/4 ctbls from half offset paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2) paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) - + tdLog.info("start create some new normal tables....") paraDict["ctbPrefix"] = 'newCtb' paraDict["ctbNum"] = self.ctbNum tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) tdLog.info("start insert data into these new normal tables....") tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -191,24 +191,24 @@ class TDTestCase: totalConsumeRows += resultList[i] tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - + if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") - - tdLog.info("wait subscriptions exit ....") + + tdLog.info("wait subscriptions exit ....") tmqCom.waitSubscriptionExit(tdSql, topicFromDb) - + tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) tdLog.printNoPrefix("======== test case 2 end ...... ") - - def run(self): + + def run(self): # tdLog.printNoPrefix("=============================================") # tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") # self.snapshot = 0 # self.tmqCase1() - # self.tmqCase2() - + # self.tmqCase2() + tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 diff --git a/tests/system-test/7-tmq/tmqDropStbCtb.py b/tests/system-test/7-tmq/tmqDropStbCtb.py index f86d1295f4..992a128ac0 100644 --- a/tests/system-test/7-tmq/tmqDropStbCtb.py +++ b/tests/system-test/7-tmq/tmqDropStbCtb.py @@ -20,11 +20,11 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 100 self.rowsPerTbl = 1000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) - + def prepareTestEnv(self): tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") paraDict = {'dbName': 'dbt', @@ -50,7 +50,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + # tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -65,11 +65,11 @@ class TDTestCase: # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return - + # drop some ctbs def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") @@ -96,10 +96,10 @@ class TDTestCase: paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - + paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() - + # again create one new stb1 paraDict["stbName"] = 'stb1' paraDict['ctbPrefix'] = 'ctb1n_' @@ -112,16 +112,16 @@ class TDTestCase: # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) - + tdLog.info("create topics from database") - topicFromDb = 'topic_dbt' + topicFromDb = 'topic_dbt' tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) - + if self.snapshot == 0: consumerId = 0 elif self.snapshot == 1: consumerId = 1 - + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) topicList = topicFromDb ifcheckdata = 1 @@ -135,17 +135,17 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("drop some ctables") - paraDict["stbName"] = 'stb' + paraDict["stbName"] = 'stb' paraDict['ctbPrefix'] = 'ctb' paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 3 / 4) # drop 1/4 ctbls paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) - # tdSql.execute("drop table %s.%s" %(paraDict['dbName'], paraDict['stbName'])) + # tdSql.execute("drop table %s.%s" %(paraDict['dbName'], paraDict['stbName'])) tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) - + pInsertThread.join() - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -154,17 +154,17 @@ class TDTestCase: totalConsumeRows += resultList[i] tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - + if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") - tdLog.info("wait subscriptions exit ....") + tdLog.info("wait subscriptions exit ....") tmqCom.waitSubscriptionExit(tdSql, topicFromDb) - + tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) tdLog.printNoPrefix("======== test case 1 end ...... ") - + # drop one stb def tmqCase2(self): tdLog.printNoPrefix("======== test case 2: ") @@ -191,10 +191,10 @@ class TDTestCase: paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - + paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() - + # again create one new stb1 paraDict["stbName"] = 'stb2' paraDict['ctbPrefix'] = 'ctb2n_' @@ -207,16 +207,16 @@ class TDTestCase: # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) - + tdLog.info("create topics from database") - topicFromDb = 'topic_dbt' + topicFromDb = 'topic_dbt' tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) - + if self.snapshot == 0: consumerId = 2 elif self.snapshot == 1: consumerId = 3 - + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) topicList = topicFromDb ifcheckdata = 1 @@ -230,13 +230,13 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("drop one stable") paraDict["stbName"] = 'stb1' - tdSql.execute("drop table %s.%s" %(paraDict['dbName'], paraDict['stbName'])) + tdSql.execute("drop table %s.%s" %(paraDict['dbName'], paraDict['stbName'])) pInsertThread.join() - + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -245,25 +245,25 @@ class TDTestCase: totalConsumeRows += resultList[i] tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - + if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") - tdLog.info("wait subscriptions exit ....") + tdLog.info("wait subscriptions exit ....") tmqCom.waitSubscriptionExit(tdSql, topicFromDb) - + tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) tdLog.printNoPrefix("======== test case 2 end ...... ") - def run(self): + def run(self): tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.snapshot = 0 self.prepareTestEnv() self.tmqCase1() - self.tmqCase2() - + self.tmqCase2() + tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 diff --git a/tests/system-test/7-tmq/tmqUdf-multCtb-snapshot1.py b/tests/system-test/7-tmq/tmqUdf-multCtb-snapshot1.py index 5b964e1d38..1c999b86c7 100644 --- a/tests/system-test/7-tmq/tmqUdf-multCtb-snapshot1.py +++ b/tests/system-test/7-tmq/tmqUdf-multCtb-snapshot1.py @@ -22,12 +22,12 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 100 self.rowsPerTbl = 1000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) #tdSql.init(conn.cursor(), logSql) # output sql.txt file - + def prepare_udf_so(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -66,20 +66,20 @@ class TDTestCase: cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) tdLog.info(cmdStr) os.system(cmdStr) - + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) consumeFile = open(consumeRowsFile, mode='r') queryFile = open(dstFile, mode='r') - + # skip first line for it is schema queryFile.readline() while True: dst = queryFile.readline() src = consumeFile.readline() - + if dst: if dst != src: tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) @@ -112,7 +112,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -127,11 +127,11 @@ class TDTestCase: # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return - + def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: multi sub table") paraDict = {'dbName': 'dbt', @@ -168,13 +168,13 @@ class TDTestCase: # tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) # tdLog.info("insert data") # tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - + tdLog.info("create topics from stb with filter") queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -190,10 +190,10 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("0 tmq consume rows error!") @@ -208,7 +208,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[1], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) consumerId = 1 @@ -218,7 +218,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[1] != resultList[0]: @@ -228,12 +228,12 @@ class TDTestCase: # self.checkFileContent(consumerId, queryString) # tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 1 end ...... ") - + def tmqCase2(self): tdLog.printNoPrefix("======== test case 2: multi sub table, consume with auto create tble and insert data") paraDict = {'dbName': 'dbt', @@ -270,13 +270,13 @@ class TDTestCase: # tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) # tdLog.info("insert data") # tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - + tdLog.info("create topics from stb with filter") queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - # tdSql.query(queryString) + # tdSql.query(queryString) # expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -292,18 +292,18 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot']) - paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl) + paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl) tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - - tdSql.query(queryString) + + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("2 tmq consume rows error!") @@ -318,7 +318,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[1], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) consumerId = 3 @@ -328,7 +328,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[1] != resultList[0]: @@ -338,7 +338,7 @@ class TDTestCase: # self.checkFileContent(consumerId, queryString) # tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -348,13 +348,13 @@ class TDTestCase: # tdSql.prepare() self.prepare_udf_so() self.create_udf_function() - + # tdLog.printNoPrefix("=============================================") # tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") # self.prepareTestEnv() # self.tmqCase1() # self.tmqCase2() - + tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.prepareTestEnv() diff --git a/tests/system-test/7-tmq/tmqUdf.py b/tests/system-test/7-tmq/tmqUdf.py index 04067ccf65..6e1843404e 100644 --- a/tests/system-test/7-tmq/tmqUdf.py +++ b/tests/system-test/7-tmq/tmqUdf.py @@ -22,12 +22,12 @@ class TDTestCase: self.vgroups = 4 self.ctbNum = 1 self.rowsPerTbl = 1000 - + def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) #tdSql.init(conn.cursor(), logSql) # output sql.txt file - + def prepare_udf_so(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -66,20 +66,20 @@ class TDTestCase: cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) tdLog.info(cmdStr) os.system(cmdStr) - + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) consumeFile = open(consumeRowsFile, mode='r') queryFile = open(dstFile, mode='r') - + # skip first line for it is schema queryFile.readline() while True: dst = queryFile.readline() src = consumeFile.readline() - + if dst: if dst != src: tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) @@ -112,7 +112,7 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - + tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") @@ -127,11 +127,11 @@ class TDTestCase: # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") + + # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) return - + def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: one sub table") paraDict = {'dbName': 'dbt', @@ -168,13 +168,13 @@ class TDTestCase: # tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) # tdLog.info("insert data") # tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - + tdLog.info("create topics from stb with filter") queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -190,10 +190,10 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("0 tmq consume rows error!") @@ -209,7 +209,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[1], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) consumerId = 1 @@ -219,7 +219,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[1] != resultList[0]: @@ -229,12 +229,12 @@ class TDTestCase: self.checkFileContent(consumerId, queryString) tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 1 end ...... ") - + def tmqCase2(self): tdLog.printNoPrefix("======== test case 2: one sub table, consume with auto create tble and insert data") paraDict = {'dbName': 'dbt', @@ -271,13 +271,13 @@ class TDTestCase: # tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) # tdLog.info("insert data") # tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) - + tdLog.info("create topics from stb with filter") queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - # tdSql.query(queryString) + # tdSql.query(queryString) # expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result @@ -293,18 +293,18 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot']) - paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl) + paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl) tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - - tdSql.query(queryString) + + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) - + if expectRowsList[0] != resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("2 tmq consume rows error!") @@ -319,7 +319,7 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[1], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) consumerId = 3 @@ -329,7 +329,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") + tdLog.info("wait the consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) if expectRowsList[1] != resultList[0]: @@ -339,7 +339,7 @@ class TDTestCase: self.checkFileContent(consumerId, queryString) tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) - time.sleep(10) + time.sleep(10) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -349,13 +349,13 @@ class TDTestCase: # tdSql.prepare() self.prepare_udf_so() self.create_udf_function() - + tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.prepareTestEnv() self.tmqCase1() self.tmqCase2() - + tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.prepareTestEnv()