fix test cases

This commit is contained in:
Ganlin Zhao 2022-07-30 11:33:39 +08:00
parent d00dd1673b
commit b7fae566eb
5 changed files with 178 additions and 178 deletions

View File

@ -47,7 +47,7 @@ class TDTestCase:
pollDelay = 20
showMsg = 1
showRow = 1
showRow = 1
hostname = socket.gethostname()
@ -59,7 +59,7 @@ class TDTestCase:
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
tdLog.info("step 1: create database, stb, ctb and insert data")
tmqCom.initConsumerTable(self.cdbName)
tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"])
@ -69,35 +69,35 @@ class TDTestCase:
tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"])
tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"])
# pThread1 = tmqCom.asyncInsertData(paraDict=self.paraDict)
self.paraDict["stbName"] = 'stb2'
self.paraDict["ctbPrefix"] = 'newctb'
self.paraDict["batchNum"] = 10000
tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"])
tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"])
# tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"])
pThread2 = tmqCom.asyncInsertData(paraDict=self.paraDict)
pThread2 = tmqCom.asyncInsertData(paraDict=self.paraDict)
tdLog.info("create topics from db")
topicName1 = 'UpperCasetopic_%s'%(self.paraDict['dbName'])
topicName1 = 'UpperCasetopic_%s'%(self.paraDict['dbName'])
tdSql.execute("create topic %s as database %s" %(topicName1, self.paraDict['dbName']))
topicList = topicName1 + ',' +topicName1
keyList = '%s,%s,%s,%s'%(self.groupId,self.autoCommit,self.autoCommitInterval,self.autoOffset)
self.expectrowcnt = self.paraDict["rowsPerTbl"] * self.paraDict["ctbNum"] * 2
tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit)
tdLog.info("start consume processor")
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)
tmqCom.getStartConsumeNotifyFromTmqsim()
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("drop one stable")
self.paraDict["stbName"] = 'stb1'
tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
self.paraDict["stbName"] = 'stb1'
tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
# tmqCom.drop_ctable(tdSql, dbname=self.paraDict['dbName'], count=self.paraDict["ctbNum"], default_ctbname_prefix=self.paraDict["ctbPrefix"])
pThread2.join()
tdLog.info("wait result from consumer, then check it")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
@ -105,7 +105,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if not (totalConsumeRows >= self.expectrowcnt/2 and totalConsumeRows <= self.expectrowcnt):
tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt))
tdLog.exit("tmq consume rows error!")

View File

@ -56,7 +56,7 @@ class TDTestCase:
print(cur)
return cur
def initConsumerTable(self,cdbName='cdb'):
def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
@ -65,12 +65,12 @@ class TDTestCase:
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
def initConsumerInfoTable(self,cdbName='cdb'):
def initConsumerInfoTable(self,cdbName='cdb'):
tdLog.info("drop consumeinfo table")
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql)
@ -85,11 +85,11 @@ class TDTestCase:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3))
return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
@ -97,14 +97,14 @@ class TDTestCase:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
if (platform.system().lower() == 'windows'):
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
@ -134,7 +134,7 @@ class TDTestCase:
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
@ -149,7 +149,7 @@ class TDTestCase:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
rowsOfSql = 0
for i in range(ctbNum):
sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl):
@ -168,8 +168,8 @@ class TDTestCase:
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareEnv(self, **parameterDict):
def prepareEnv(self, **parameterDict):
# create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
@ -189,11 +189,11 @@ class TDTestCase:
def tmqCase1(self, cfgPath, buildPath):
'''
Leave a TMQ process. Stop taosd, delete the data directory, restart taosd,
Leave a TMQ process. Stop taosd, delete the data directory, restart taosd,
and restart a consumption process to complete a consumption
'''
tdLog.printNoPrefix("======== test case 1: ")
self.initConsumerTable()
# create and start thread
@ -217,7 +217,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
# expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -236,7 +236,7 @@ class TDTestCase:
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
time.sleep(3)
tdLog.info("================= stop dnode, and remove data file, then start dnode ===========================")
tdDnodes.stop(1)
@ -248,7 +248,7 @@ class TDTestCase:
tdDnodes.start(1)
time.sleep(2)
######### redo to consume
######### redo to consume
self.initConsumerTable()
self.create_database(tdSql, parameterDict["dbName"])
@ -258,7 +258,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -282,7 +282,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not (totalConsumeRows == expectrowcnt):
tdLog.exit("tmq consume rows error!")
@ -293,7 +293,7 @@ class TDTestCase:
else:
os.system('pkill tmq_sim')
tdLog.printNoPrefix("======== test case 1 end ...... ")
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()

View File

@ -56,7 +56,7 @@ class TDTestCase:
print(cur)
return cur
def initConsumerTable(self,cdbName='cdb'):
def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
@ -65,12 +65,12 @@ class TDTestCase:
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
def initConsumerInfoTable(self,cdbName='cdb'):
def initConsumerInfoTable(self,cdbName='cdb'):
tdLog.info("drop consumeinfo table")
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql)
@ -85,11 +85,11 @@ class TDTestCase:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3))
return resultList
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
@ -98,9 +98,9 @@ class TDTestCase:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
@ -130,7 +130,7 @@ class TDTestCase:
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
@ -149,7 +149,7 @@ class TDTestCase:
ctbDict[i] = 0
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfCtb = 0
rowsOfCtb = 0
while rowsOfCtb < rowsPerTbl:
for i in range(ctbNum):
sql += " %s.%s_%d values "%(dbName,ctbPrefix,i)
@ -176,7 +176,7 @@ class TDTestCase:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
rowsOfSql = 0
for i in range(ctbNum):
sql += " %s_%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
@ -207,7 +207,7 @@ class TDTestCase:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
rowsOfSql = 0
for i in range(ctbNum):
sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i)
for j in range(rowsPerTbl):
@ -226,8 +226,8 @@ class TDTestCase:
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareEnv(self, **parameterDict):
def prepareEnv(self, **parameterDict):
# create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
@ -246,8 +246,8 @@ class TDTestCase:
return
def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 1: ")
tdLog.printNoPrefix("======== test case 1: ")
self.initConsumerTable()
# create and start thread
@ -263,7 +263,7 @@ class TDTestCase:
'batchNum': 33, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
@ -271,7 +271,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -296,7 +296,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
@ -306,8 +306,8 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 2: ")
tdLog.printNoPrefix("======== test case 2: ")
self.initConsumerTable()
# create and start thread
@ -343,7 +343,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -392,7 +392,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
@ -402,8 +402,8 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 2 end ...... ")
def tmqCase3(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 3: ")
tdLog.printNoPrefix("======== test case 3: ")
self.initConsumerTable()
# create and start thread
@ -427,7 +427,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -445,7 +445,7 @@ class TDTestCase:
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
time.sleep(5)
tdLog.info("drop som child table of stb1")
dropTblNum = 4
@ -460,7 +460,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
remaindrowcnt = parameterDict["rowsPerTbl"] * (parameterDict["ctbNum"] - dropTblNum)
if not (totalConsumeRows < expectrowcnt and totalConsumeRows > remaindrowcnt):
@ -473,7 +473,7 @@ class TDTestCase:
def tmqCase4(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 4: ")
self.initConsumerTable()
# create and start thread
@ -489,7 +489,7 @@ class TDTestCase:
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
@ -502,7 +502,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -527,7 +527,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
@ -545,7 +545,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
@ -556,7 +556,7 @@ class TDTestCase:
def tmqCase5(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 5: ")
self.initConsumerTable()
# create and start thread
@ -572,7 +572,7 @@ class TDTestCase:
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
@ -585,7 +585,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -610,7 +610,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
@ -628,7 +628,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != (expectrowcnt * (1 + 1/4)):
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
@ -639,7 +639,7 @@ class TDTestCase:
def tmqCase6(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 6: ")
self.initConsumerTable()
# create and start thread
@ -655,7 +655,7 @@ class TDTestCase:
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
@ -668,7 +668,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -693,7 +693,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
@ -715,7 +715,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
@ -726,7 +726,7 @@ class TDTestCase:
def tmqCase7(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 7: ")
self.initConsumerTable()
# create and start thread
@ -742,7 +742,7 @@ class TDTestCase:
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
@ -755,7 +755,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -780,7 +780,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
@ -798,7 +798,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
@ -808,8 +808,8 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 7 end ...... ")
def tmqCase8(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 8: ")
tdLog.printNoPrefix("======== test case 8: ")
self.initConsumerTable()
# create and start thread
@ -838,7 +838,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -863,7 +863,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
@ -883,7 +883,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
@ -903,7 +903,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt*2:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
tdLog.exit("tmq consume rows error!")
@ -913,8 +913,8 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 8 end ...... ")
def tmqCase9(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 9: ")
tdLog.printNoPrefix("======== test case 9: ")
self.initConsumerTable()
# create and start thread
@ -943,7 +943,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -968,7 +968,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
@ -992,7 +992,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
@ -1012,7 +1012,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt*2:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
tdLog.exit("tmq consume rows error!")
@ -1022,8 +1022,8 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 9 end ...... ")
def tmqCase10(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 10: ")
tdLog.printNoPrefix("======== test case 10: ")
self.initConsumerTable()
# create and start thread
@ -1052,7 +1052,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -1077,7 +1077,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
@ -1101,7 +1101,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt-10000:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt-10000))
tdLog.exit("tmq consume rows error!")
@ -1125,7 +1125,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt*2:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
tdLog.exit("tmq consume rows error!")
@ -1136,7 +1136,7 @@ class TDTestCase:
def tmqCase11(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 11: ")
self.initConsumerTable()
# create and start thread
@ -1152,7 +1152,7 @@ class TDTestCase:
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
@ -1165,7 +1165,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -1190,7 +1190,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
@ -1212,7 +1212,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
@ -1223,7 +1223,7 @@ class TDTestCase:
def tmqCase12(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 12: ")
self.initConsumerTable()
# create and start thread
@ -1239,7 +1239,7 @@ class TDTestCase:
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
@ -1252,7 +1252,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -1277,7 +1277,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
@ -1299,7 +1299,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
@ -1310,7 +1310,7 @@ class TDTestCase:
def tmqCase13(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 13: ")
self.initConsumerTable()
# create and start thread
@ -1326,7 +1326,7 @@ class TDTestCase:
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
@ -1339,7 +1339,7 @@ class TDTestCase:
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
@ -1364,7 +1364,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
@ -1387,7 +1387,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt*(1/2+1/4):
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*(1/2+1/4)))
tdLog.exit("tmq consume rows error!")
@ -1410,7 +1410,7 @@ class TDTestCase:
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
@ -1431,7 +1431,7 @@ class TDTestCase:
tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath)
# self.tmqCase2(cfgPath, buildPath)
# self.tmqCase2(cfgPath, buildPath)
# self.tmqCase3(cfgPath, buildPath)
# self.tmqCase4(cfgPath, buildPath)
# self.tmqCase5(cfgPath, buildPath)

View File

@ -39,20 +39,20 @@ class TDTestCase:
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
@ -85,7 +85,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replica)
tdLog.info("create stb")
@ -101,7 +101,7 @@ class TDTestCase:
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.asyncInsertDataByInterlace(paraDict)
tdLog.info("wait some data inserted")
exitFlag = 1
while exitFlag:
@ -112,7 +112,7 @@ class TDTestCase:
if (rowsInserted > ((self.ctbNum * self.rowsPerTbl)/5)):
exitFlag = 0
time.sleep(0.1)
tdLog.info("inserted rows: %d"%tdSql.getData(0,0))
# tdDnodes=cluster.dnodes
tdLog.info("================= restart dnode 2===========================")
@ -123,18 +123,18 @@ class TDTestCase:
cluster.dnodes[2].starttaosd()
tdLog.info("================= restart dnode 4===========================")
cluster.dnodes[3].stoptaosd()
cluster.dnodes[3].starttaosd()
cluster.dnodes[3].starttaosd()
tdLog.info("================= restart dnode 5===========================")
cluster.dnodes[4].stoptaosd()
cluster.dnodes[4].starttaosd()
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
tdLog.printNoPrefix("======== test case 1: ")
# create and start thread
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
@ -159,14 +159,14 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicFromStb1
@ -178,9 +178,9 @@ class TDTestCase:
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
@ -190,13 +190,13 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
tmqCom.waitSubscriptionExit(tdSql, topicFromStb1)
tdSql.query("drop topic %s"%topicFromStb1)
@ -204,8 +204,8 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
tdLog.printNoPrefix("======== test case 2: ")
# create and start thread
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
@ -230,14 +230,14 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicFromStb1
@ -249,7 +249,7 @@ class TDTestCase:
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("================= restart dnode 2===========================")
@ -260,11 +260,11 @@ class TDTestCase:
cluster.dnodes[2].starttaosd()
tdLog.info("================= restart dnode 4===========================")
cluster.dnodes[3].stoptaosd()
cluster.dnodes[3].starttaosd()
cluster.dnodes[3].starttaosd()
tdLog.info("================= restart dnode 5===========================")
cluster.dnodes[4].stoptaosd()
cluster.dnodes[4].starttaosd()
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
@ -274,13 +274,13 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
tmqCom.waitSubscriptionExit(tdSql, topicFromStb1)
tdSql.query("drop topic %s"%topicFromStb1)

View File

@ -22,12 +22,12 @@ class TDTestCase:
self.vgroups = 4
self.ctbNum = 100
self.rowsPerTbl = 1000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def prepare_udf_so(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
@ -66,20 +66,20 @@ class TDTestCase:
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
@ -112,7 +112,7 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
@ -127,11 +127,11 @@ class TDTestCase:
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: multi sub table")
paraDict = {'dbName': 'dbt',
@ -168,13 +168,13 @@ class TDTestCase:
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# tdLog.info("insert data")
# tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
@ -190,10 +190,10 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
@ -208,7 +208,7 @@ class TDTestCase:
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 1
@ -218,7 +218,7 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]:
@ -228,12 +228,12 @@ class TDTestCase:
# self.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: multi sub table, consume with auto create tble and insert data")
paraDict = {'dbName': 'dbt',
@ -270,13 +270,13 @@ class TDTestCase:
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# tdLog.info("insert data")
# tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
@ -292,18 +292,18 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("2 tmq consume rows error!")
@ -318,7 +318,7 @@ class TDTestCase:
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 3
@ -328,7 +328,7 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]:
@ -338,7 +338,7 @@ class TDTestCase:
# self.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
@ -348,13 +348,13 @@ class TDTestCase:
# tdSql.prepare()
self.prepare_udf_so()
self.create_udf_function()
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
# tdLog.printNoPrefix("====================================================================")
# tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
# self.prepareTestEnv()