fix test cases
This commit is contained in:
parent
458082578c
commit
14946d8b74
|
@ -56,7 +56,7 @@ class TDTestCase:
|
|||
return cur
|
||||
|
||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("use %s" %dbName)
|
||||
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
pre_create = "create table"
|
||||
|
@ -69,7 +69,7 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
|
||||
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
|
||||
return
|
||||
|
||||
|
@ -96,7 +96,7 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
print ("input parameters:")
|
||||
print (parameterDict)
|
||||
|
@ -115,7 +115,7 @@ class TDTestCase:
|
|||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"],\
|
||||
parameterDict["startTs"])
|
||||
parameterDict["startTs"])
|
||||
return
|
||||
|
||||
|
||||
|
@ -135,34 +135,34 @@ class TDTestCase:
|
|||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
prepareEnvThread.join()
|
||||
|
||||
|
||||
# wait stb ready
|
||||
while 1:
|
||||
tdSql.query("show %s.stables"%parameterDict['dbName'])
|
||||
if tdSql.getRows() == 1:
|
||||
tdSql.query("show %s.stables"%parameterDict['dbName'])
|
||||
if tdSql.getRows() == 1:
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
tdLog.info("create topics from super table")
|
||||
topicFromStb = 'topic_stb_column'
|
||||
topicFromCtb = 'topic_ctb_column'
|
||||
|
||||
topicFromCtb = 'topic_ctb_column'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
|
||||
time.sleep(1)
|
||||
tdSql.query("show topics")
|
||||
#tdSql.checkRows(2)
|
||||
topic1 = tdSql.getData(0 , 0)
|
||||
topic2 = tdSql.getData(1 , 0)
|
||||
|
||||
|
||||
tdLog.info("show topics: %s, %s"%(topic1, topic2))
|
||||
if topic1 != topicFromStb and topic1 != topicFromCtb:
|
||||
tdLog.exit("topic error1")
|
||||
tdLog.exit("topic error1")
|
||||
if topic2 != topicFromStb and topic2 != topicFromCtb:
|
||||
tdLog.exit("topic error2")
|
||||
|
||||
tdLog.exit("topic error2")
|
||||
|
||||
tdLog.info("create consume info table and consume result table")
|
||||
cdbName = parameterDict["dbName"]
|
||||
tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
|
||||
|
@ -179,7 +179,7 @@ class TDTestCase:
|
|||
sql = "insert into consumeinfo values "
|
||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
||||
tdSql.query(sql)
|
||||
|
||||
|
||||
tdLog.info("check stb if there are data")
|
||||
while 1:
|
||||
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
|
||||
|
@ -190,26 +190,26 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 20
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
# wait for data ready
|
||||
# prepareEnvThread.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
while 1:
|
||||
tdSql.query("select * from consumeresult")
|
||||
|
@ -229,7 +229,7 @@ class TDTestCase:
|
|||
tdSql.query("drop topic %s"%topicFromCtb)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
|
||||
def tmqCase2(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 2: add child table with consuming ")
|
||||
# create and start thread
|
||||
|
@ -246,13 +246,13 @@ class TDTestCase:
|
|||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
prepareEnvThread.join()
|
||||
|
||||
|
||||
# wait db ready
|
||||
while 1:
|
||||
tdSql.query("show databases")
|
||||
if tdSql.getRows() == 4:
|
||||
print ('==================================================')
|
||||
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0))
|
||||
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0))
|
||||
index = 0
|
||||
if tdSql.getData(0,0) == parameterDict['dbName']:
|
||||
index = 0
|
||||
|
@ -264,7 +264,7 @@ class TDTestCase:
|
|||
index = 3
|
||||
else:
|
||||
continue
|
||||
|
||||
|
||||
if tdSql.getData(index,15) == 'ready':
|
||||
print("******************** index: %d"%index)
|
||||
break
|
||||
|
@ -272,12 +272,12 @@ class TDTestCase:
|
|||
continue
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
tdSql.query("use %s"%parameterDict['dbName'])
|
||||
# wait stb ready
|
||||
while 1:
|
||||
tdSql.query("show %s.stables"%parameterDict['dbName'])
|
||||
if tdSql.getRows() == 1:
|
||||
if tdSql.getRows() == 1:
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
@ -285,20 +285,20 @@ class TDTestCase:
|
|||
tdLog.info("create topics from super table")
|
||||
topicFromStb = 'topic_stb_column2'
|
||||
topicFromCtb = 'topic_ctb_column2'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
|
||||
time.sleep(1)
|
||||
tdSql.query("show topics")
|
||||
topic1 = tdSql.getData(0 , 0)
|
||||
topic2 = tdSql.getData(1 , 0)
|
||||
tdLog.info("show topics: %s, %s"%(topic1, topic2))
|
||||
if topic1 != topicFromStb and topic1 != topicFromCtb:
|
||||
tdLog.exit("topic error1")
|
||||
tdLog.exit("topic error1")
|
||||
if topic2 != topicFromStb and topic2 != topicFromCtb:
|
||||
tdLog.exit("topic error2")
|
||||
|
||||
tdLog.exit("topic error2")
|
||||
|
||||
tdLog.info("create consume info table and consume result table")
|
||||
cdbName = parameterDict["dbName"]
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
||||
|
@ -316,7 +316,7 @@ class TDTestCase:
|
|||
sql = "insert into consumeinfo values "
|
||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
||||
tdSql.query(sql)
|
||||
|
||||
|
||||
tdLog.info("check stb if there are data")
|
||||
while 1:
|
||||
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
|
||||
|
@ -327,21 +327,21 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 100
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
# create new child table and insert data
|
||||
newCtbName = 'newctb'
|
||||
|
@ -354,7 +354,7 @@ class TDTestCase:
|
|||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
while 1:
|
||||
tdSql.query("select * from consumeresult")
|
||||
|
@ -366,7 +366,7 @@ class TDTestCase:
|
|||
|
||||
tdSql.checkData(0 , 1, consumerId)
|
||||
tdSql.checkData(0 , 3, expectrowcnt)
|
||||
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb)
|
||||
tdSql.query("drop topic %s"%topicFromCtb)
|
||||
|
||||
|
@ -390,13 +390,13 @@ class TDTestCase:
|
|||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
prepareEnvThread.join()
|
||||
|
||||
|
||||
# wait db ready
|
||||
while 1:
|
||||
tdSql.query("show databases")
|
||||
if tdSql.getRows() == 5:
|
||||
if tdSql.getRows() == 5:
|
||||
print ('==================================================dbname: %s'%parameterDict['dbName'])
|
||||
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),tdSql.getData(3,0),tdSql.getData(4,0))
|
||||
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),tdSql.getData(3,0),tdSql.getData(4,0))
|
||||
index = 0
|
||||
if tdSql.getData(0,0) == parameterDict['dbName']:
|
||||
index = 0
|
||||
|
@ -409,8 +409,8 @@ class TDTestCase:
|
|||
elif tdSql.getData(4,0) == parameterDict['dbName']:
|
||||
index = 4
|
||||
else:
|
||||
continue
|
||||
|
||||
continue
|
||||
|
||||
if tdSql.getData(index,15) == 'ready':
|
||||
print("******************** index: %d"%index)
|
||||
break
|
||||
|
@ -418,16 +418,16 @@ class TDTestCase:
|
|||
continue
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
tdSql.query("use %s"%parameterDict['dbName'])
|
||||
# wait stb ready
|
||||
while 1:
|
||||
tdSql.query("show %s.stables"%parameterDict['dbName'])
|
||||
if tdSql.getRows() == 1:
|
||||
if tdSql.getRows() == 1:
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
tdLog.info("create stable2 for the seconde topic")
|
||||
parameterDict2 = {'cfg': '', \
|
||||
'dbName': 'db3', \
|
||||
|
@ -439,23 +439,23 @@ class TDTestCase:
|
|||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict2['cfg'] = cfgPath
|
||||
tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName']))
|
||||
|
||||
|
||||
tdLog.info("create topics from super table")
|
||||
topicFromStb = 'topic_stb_column3'
|
||||
topicFromStb2 = 'topic_stb_column32'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName']))
|
||||
|
||||
|
||||
tdSql.query("show topics")
|
||||
topic1 = tdSql.getData(0 , 0)
|
||||
topic2 = tdSql.getData(1 , 0)
|
||||
tdLog.info("show topics: %s, %s"%(topic1, topic2))
|
||||
if topic1 != topicFromStb and topic1 != topicFromStb2:
|
||||
tdLog.exit("topic error1")
|
||||
tdLog.exit("topic error1")
|
||||
if topic2 != topicFromStb and topic2 != topicFromStb2:
|
||||
tdLog.exit("topic error2")
|
||||
|
||||
tdLog.exit("topic error2")
|
||||
|
||||
tdLog.info("create consume info table and consume result table")
|
||||
cdbName = parameterDict["dbName"]
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
||||
|
@ -472,7 +472,7 @@ class TDTestCase:
|
|||
sql = "insert into consumeinfo values "
|
||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
||||
tdSql.query(sql)
|
||||
|
||||
|
||||
tdLog.info("check stb if there are data")
|
||||
while 1:
|
||||
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
|
||||
|
@ -483,22 +483,22 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 100
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
# start the second thread to create new child table and insert data
|
||||
prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
|
||||
|
@ -507,7 +507,7 @@ class TDTestCase:
|
|||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread2.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
while 1:
|
||||
tdSql.query("select * from consumeresult")
|
||||
|
@ -519,7 +519,7 @@ class TDTestCase:
|
|||
|
||||
tdSql.checkData(0 , 1, consumerId)
|
||||
tdSql.checkData(0 , 3, expectrowcnt)
|
||||
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb)
|
||||
tdSql.query("drop topic %s"%topicFromStb2)
|
||||
|
||||
|
@ -537,7 +537,7 @@ class TDTestCase:
|
|||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
self.tmqCase3(cfgPath, buildPath)
|
||||
|
||||
def stop(self):
|
||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
|||
self.vgroups = 4
|
||||
self.ctbNum = 100
|
||||
self.rowsPerTbl = 1000
|
||||
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
@ -50,7 +50,7 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
|
@ -58,12 +58,12 @@ class TDTestCase:
|
|||
tdLog.info("create ctb")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
|
||||
tdLog.info("insert data")
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
|
||||
tdLog.info("flush db to let data falls into the disk")
|
||||
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
return
|
||||
|
@ -93,18 +93,18 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
# tdSql.query(queryString)
|
||||
# tdSql.query(queryString)
|
||||
# expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
|
@ -121,29 +121,29 @@ class TDTestCase:
|
|||
paraDict['batchNum'] = 100
|
||||
paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl
|
||||
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
pInsertThread.join()
|
||||
|
||||
tdSql.query(queryString)
|
||||
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
|
||||
|
||||
for i in range(len(topicNameList)):
|
||||
tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
|
||||
tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
@ -173,18 +173,18 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
totalRowsInserted = expectRowsList[0]
|
||||
|
||||
|
@ -200,36 +200,36 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("start consume processor 0")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
actConsumeRows = resultList[0]
|
||||
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
|
||||
if not (expectrowcnt <= actConsumeRows and totalRowsInserted >= actConsumeRows):
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
|
||||
# reinit consume info, and start tmq_sim, then check consume result
|
||||
tmqCom.initConsumerTable()
|
||||
consumerId = 2
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
tdLog.info("start consume processor 1")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
actConsumeRows = resultList[0]
|
||||
tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
|
||||
tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
|
||||
if not ((actConsumeRows >= expectrowcnt) and (totalRowsInserted > actConsumeRows)):
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
|
||||
for i in range(len(topicNameList)):
|
||||
tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
|
||||
tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
|
|
@ -49,7 +49,7 @@ class TDTestCase:
|
|||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
|
@ -58,7 +58,7 @@ class TDTestCase:
|
|||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
|
@ -73,11 +73,11 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
|
@ -85,20 +85,20 @@ class TDTestCase:
|
|||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum):
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("use %s" %dbName)
|
||||
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
pre_create = "create table"
|
||||
|
@ -111,8 +111,8 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
event.set()
|
||||
|
||||
event.set()
|
||||
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
|
||||
return
|
||||
|
||||
|
@ -141,7 +141,7 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
print ("input parameters:")
|
||||
print (parameterDict)
|
||||
|
@ -159,7 +159,7 @@ class TDTestCase:
|
|||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"],\
|
||||
parameterDict["startTs"])
|
||||
parameterDict["startTs"])
|
||||
return
|
||||
|
||||
def tmqCase4(self, cfgPath, buildPath):
|
||||
|
@ -198,9 +198,9 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
|
||||
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||
topicList = topicName1
|
||||
|
@ -211,10 +211,10 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
consumerId = 1
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
@ -224,16 +224,16 @@ class TDTestCase:
|
|||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread2.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 2
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -275,9 +275,9 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
|
||||
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||
topicList = topicName1
|
||||
|
@ -288,10 +288,10 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
consumerId = 1
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
@ -304,16 +304,16 @@ class TDTestCase:
|
|||
prepareEnvThread2.start()
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread2.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 2
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows < expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
|
|
@ -49,7 +49,7 @@ class TDTestCase:
|
|||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
|
@ -58,7 +58,7 @@ class TDTestCase:
|
|||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
|
@ -73,11 +73,11 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
|
@ -85,20 +85,20 @@ class TDTestCase:
|
|||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum):
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("use %s" %dbName)
|
||||
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
pre_create = "create table"
|
||||
|
@ -111,8 +111,8 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
event.set()
|
||||
|
||||
event.set()
|
||||
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
|
||||
return
|
||||
|
||||
|
@ -141,7 +141,7 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
print ("input parameters:")
|
||||
print (parameterDict)
|
||||
|
@ -159,7 +159,7 @@ class TDTestCase:
|
|||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"],\
|
||||
parameterDict["startTs"])
|
||||
parameterDict["startTs"])
|
||||
return
|
||||
|
||||
def tmqCase6(self, cfgPath, buildPath):
|
||||
|
@ -201,10 +201,10 @@ class TDTestCase:
|
|||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db60'
|
||||
topicName2 = 'topic_db61'
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName']))
|
||||
|
||||
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||
topicList = topicName1 + ',' + topicName2
|
||||
|
@ -215,10 +215,10 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
#consumerId = 1
|
||||
#self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
@ -228,16 +228,16 @@ class TDTestCase:
|
|||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread2.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -286,10 +286,10 @@ class TDTestCase:
|
|||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db60'
|
||||
topicName2 = 'topic_db61'
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
tdSql.execute("create topic %s as database %s" %(topicName2, parameterDict2['dbName']))
|
||||
|
||||
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||
topicList = topicName1 + ',' + topicName2
|
||||
|
@ -300,10 +300,10 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
consumerId = 1
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
@ -313,16 +313,16 @@ class TDTestCase:
|
|||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread2.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 2
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
|
|
@ -56,7 +56,7 @@ class TDTestCase:
|
|||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
|
@ -65,12 +65,12 @@ class TDTestCase:
|
|||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
tdLog.info("drop consumeinfo table")
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
|
@ -85,11 +85,11 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
|
@ -97,14 +97,14 @@ class TDTestCase:
|
|||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
@ -134,7 +134,7 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
|
@ -149,7 +149,7 @@ class TDTestCase:
|
|||
startTs = int(round(t * 1000))
|
||||
|
||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||
rowsOfSql = 0
|
||||
rowsOfSql = 0
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d values "%(stbName,i)
|
||||
for j in range(rowsPerTbl):
|
||||
|
@ -168,8 +168,8 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
# create new connector for my thread
|
||||
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||
|
||||
|
@ -188,8 +188,8 @@ class TDTestCase:
|
|||
return
|
||||
|
||||
def tmqCase10(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 10: ")
|
||||
|
||||
tdLog.printNoPrefix("======== test case 10: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -218,7 +218,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -243,7 +243,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -267,7 +267,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt-10000:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt-10000))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -291,7 +291,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt*2:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -302,7 +302,7 @@ class TDTestCase:
|
|||
|
||||
def tmqCase11(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 11: ")
|
||||
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -318,7 +318,7 @@ class TDTestCase:
|
|||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
|
@ -331,7 +331,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -356,7 +356,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -378,7 +378,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -389,7 +389,7 @@ class TDTestCase:
|
|||
|
||||
def tmqCase12(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 12: ")
|
||||
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -405,7 +405,7 @@ class TDTestCase:
|
|||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
|
@ -418,7 +418,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -443,7 +443,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt/4:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -465,7 +465,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt/4:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -476,7 +476,7 @@ class TDTestCase:
|
|||
|
||||
def tmqCase13(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 13: ")
|
||||
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -492,7 +492,7 @@ class TDTestCase:
|
|||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
|
@ -505,7 +505,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -530,7 +530,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt/4:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -553,7 +553,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt*(1/2+1/4):
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*(1/2+1/4)))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -576,7 +576,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -596,7 +596,7 @@ class TDTestCase:
|
|||
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
self.tmqCase10(cfgPath, buildPath)
|
||||
self.tmqCase10(cfgPath, buildPath)
|
||||
self.tmqCase11(cfgPath, buildPath)
|
||||
self.tmqCase12(cfgPath, buildPath)
|
||||
self.tmqCase13(cfgPath, buildPath)
|
||||
|
|
|
@ -56,7 +56,7 @@ class TDTestCase:
|
|||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
|
@ -65,12 +65,12 @@ class TDTestCase:
|
|||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
tdLog.info("drop consumeinfo table")
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
|
@ -85,11 +85,11 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
|
@ -97,14 +97,14 @@ class TDTestCase:
|
|||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
@ -134,7 +134,7 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
|
@ -149,7 +149,7 @@ class TDTestCase:
|
|||
startTs = int(round(t * 1000))
|
||||
|
||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||
rowsOfSql = 0
|
||||
rowsOfSql = 0
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d values "%(stbName,i)
|
||||
for j in range(rowsPerTbl):
|
||||
|
@ -168,8 +168,8 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
# create new connector for my thread
|
||||
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||
|
||||
|
@ -188,8 +188,8 @@ class TDTestCase:
|
|||
return
|
||||
|
||||
def tmqCase1(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
auotCtbNum = 5
|
||||
|
@ -208,7 +208,7 @@ class TDTestCase:
|
|||
'batchNum': 100,
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
|
@ -216,7 +216,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * (auotCtbNum + parameterDict["ctbNum"])
|
||||
|
@ -248,7 +248,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -258,8 +258,8 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
def tmqCase2(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
|
||||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
auotCtbNum = 10
|
||||
|
@ -278,7 +278,7 @@ class TDTestCase:
|
|||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
|
@ -322,7 +322,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -343,7 +343,7 @@ class TDTestCase:
|
|||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
|||
self.vgroups = 4
|
||||
self.ctbNum = 3000
|
||||
self.rowsPerTbl = 70
|
||||
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
@ -50,7 +50,7 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
|
@ -62,7 +62,7 @@ class TDTestCase:
|
|||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
|
||||
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdDnodes.stop(1)
|
||||
# # tdDnodes.start(1)
|
||||
|
@ -95,18 +95,18 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
|
@ -121,18 +121,18 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
|
@ -163,18 +163,18 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
totalRowsInserted = expectRowsList[0]
|
||||
|
||||
|
@ -190,15 +190,15 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("start consume processor 0")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
|
||||
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
|
||||
firstConsumeRows = resultList[0]
|
||||
|
||||
# reinit consume info, and start tmq_sim, then check consume result
|
||||
|
@ -206,22 +206,22 @@ class TDTestCase:
|
|||
consumerId = 2
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
tdLog.info("start consume processor 1")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
actConsumeTotalRows = firstConsumeRows + resultList[0]
|
||||
|
||||
|
||||
if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows):
|
||||
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
|
||||
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
|
||||
tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
|||
self.vgroups = 4
|
||||
self.ctbNum = 10
|
||||
self.rowsPerTbl = 10000
|
||||
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
@ -50,7 +50,7 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
|
@ -62,7 +62,7 @@ class TDTestCase:
|
|||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
|
||||
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdDnodes.stop(1)
|
||||
# # tdDnodes.start(1)
|
||||
|
@ -95,18 +95,18 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
|
@ -121,18 +121,18 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
|
@ -163,18 +163,18 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
totalRowsInserted = expectRowsList[0]
|
||||
|
||||
|
@ -190,15 +190,15 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("start consume processor 0")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
|
||||
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
|
||||
firstConsumeRows = resultList[0]
|
||||
|
||||
# reinit consume info, and start tmq_sim, then check consume result
|
||||
|
@ -206,22 +206,22 @@ class TDTestCase:
|
|||
consumerId = 2
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
tdLog.info("start consume processor 1")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
actConsumeTotalRows = firstConsumeRows + resultList[0]
|
||||
|
||||
|
||||
if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows):
|
||||
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
|
||||
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
|
||||
tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
|||
self.vgroups = 1
|
||||
self.ctbNum = 1
|
||||
self.rowsPerTbl = 1000000
|
||||
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
@ -50,7 +50,7 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
|
@ -62,7 +62,7 @@ class TDTestCase:
|
|||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
|
||||
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdDnodes.stop(1)
|
||||
# tdDnodes.start(1)
|
||||
|
@ -93,11 +93,11 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379)
|
||||
|
@ -105,7 +105,7 @@ class TDTestCase:
|
|||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
totalRowsInserted = expectRowsList[0]
|
||||
|
||||
|
@ -125,17 +125,17 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("start consume processor 0")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 2
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
actConsumeTotalRows = resultList[0] + resultList[1]
|
||||
|
||||
|
||||
if not (totalRowsInserted == actConsumeTotalRows):
|
||||
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
|
@ -166,11 +166,11 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379)
|
||||
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
|
@ -178,7 +178,7 @@ class TDTestCase:
|
|||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
totalRowsInserted = expectRowsList[0]
|
||||
|
||||
|
@ -194,35 +194,35 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("start consume processor 0")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
|
||||
tdLog.info("wait commit notify")
|
||||
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||
|
||||
tdLog.info("pkill consume processor")
|
||||
tdCom.killProcessor("tmq_sim")
|
||||
|
||||
|
||||
# time.sleep(10)
|
||||
|
||||
|
||||
# reinit consume info, and start tmq_sim, then check consume result
|
||||
tmqCom.initConsumerTable()
|
||||
consumerId = 6
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
tdLog.info("start consume processor 1")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
actConsumeTotalRows = resultList[0]
|
||||
|
||||
|
||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
|||
self.vgroups = 1
|
||||
self.ctbNum = 1
|
||||
self.rowsPerTbl = 100000
|
||||
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
@ -50,7 +50,7 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
|
@ -62,7 +62,7 @@ class TDTestCase:
|
|||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
|
||||
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdDnodes.stop(1)
|
||||
# tdDnodes.start(1)
|
||||
|
@ -93,18 +93,18 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
totalRowsInserted = expectRowsList[0]
|
||||
|
||||
|
@ -124,17 +124,17 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("start consume processor 0")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 2
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
actConsumeTotalRows = resultList[0] + resultList[1]
|
||||
|
||||
|
||||
if not (totalRowsInserted == actConsumeTotalRows):
|
||||
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
|
@ -165,18 +165,18 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
totalRowsInserted = expectRowsList[0]
|
||||
|
||||
|
@ -192,35 +192,35 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("start consume processor 0")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
|
||||
tdLog.info("wait commit notify")
|
||||
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||
|
||||
tdLog.info("pkill consume processor")
|
||||
tdCom.killProcessor("tmq_sim")
|
||||
|
||||
|
||||
# time.sleep(10)
|
||||
|
||||
|
||||
# reinit consume info, and start tmq_sim, then check consume result
|
||||
tmqCom.initConsumerTable()
|
||||
consumerId = 6
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
tdLog.info("start consume processor 1")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
actConsumeTotalRows = resultList[0]
|
||||
|
||||
|
||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
|
|
|
@ -20,11 +20,11 @@ class TDTestCase:
|
|||
self.vgroups = 4
|
||||
self.ctbNum = 1000
|
||||
self.rowsPerTbl = 10
|
||||
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
||||
|
||||
# drop some ntbs
|
||||
def tmqCase1(self):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
|
@ -51,8 +51,8 @@ class TDTestCase:
|
|||
paraDict['snapshot'] = self.snapshot
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdLog.info("start create database....")
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
|
@ -60,11 +60,11 @@ class TDTestCase:
|
|||
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
|
||||
tdLog.info("start insert data into normal tables....")
|
||||
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
|
||||
|
||||
|
||||
tdLog.info("create topics from database")
|
||||
topicFromDb = 'topic_dbt'
|
||||
topicFromDb = 'topic_dbt'
|
||||
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
|
||||
|
||||
|
||||
if self.snapshot == 0:
|
||||
consumerId = 0
|
||||
elif self.snapshot == 1:
|
||||
|
@ -83,13 +83,13 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tdLog.info("drop some ntables")
|
||||
# drop 1/4 ctbls from half offset
|
||||
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
|
||||
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
|
||||
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
|
||||
|
||||
|
||||
tdLog.info("start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
@ -98,19 +98,19 @@ class TDTestCase:
|
|||
totalConsumeRows += resultList[i]
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
|
||||
|
||||
if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)):
|
||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||
|
||||
tdLog.info("wait subscriptions exit ....")
|
||||
|
||||
tdLog.info("wait subscriptions exit ....")
|
||||
tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
|
||||
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromDb)
|
||||
tdLog.info("success dorp topic: %s"%topicFromDb)
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# drop some ntbs and create some new ntbs
|
||||
def tmqCase2(self):
|
||||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
|
@ -137,8 +137,8 @@ class TDTestCase:
|
|||
paraDict['snapshot'] = self.snapshot
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdLog.info("start create database....")
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
|
@ -146,11 +146,11 @@ class TDTestCase:
|
|||
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
|
||||
tdLog.info("start insert data into normal tables....")
|
||||
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
|
||||
|
||||
|
||||
tdLog.info("create topics from database")
|
||||
topicFromDb = 'topic_dbt'
|
||||
topicFromDb = 'topic_dbt'
|
||||
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
|
||||
|
||||
|
||||
if self.snapshot == 0:
|
||||
consumerId = 2
|
||||
elif self.snapshot == 1:
|
||||
|
@ -169,20 +169,20 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tdLog.info("drop some ntables")
|
||||
# drop 1/4 ctbls from half offset
|
||||
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
|
||||
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
|
||||
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
|
||||
|
||||
|
||||
tdLog.info("start create some new normal tables....")
|
||||
paraDict["ctbPrefix"] = 'newCtb'
|
||||
paraDict["ctbNum"] = self.ctbNum
|
||||
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
|
||||
tdLog.info("start insert data into these new normal tables....")
|
||||
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
|
||||
|
||||
|
||||
tdLog.info("start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
@ -191,24 +191,24 @@ class TDTestCase:
|
|||
totalConsumeRows += resultList[i]
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
|
||||
|
||||
if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)):
|
||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||
|
||||
tdLog.info("wait subscriptions exit ....")
|
||||
|
||||
tdLog.info("wait subscriptions exit ....")
|
||||
tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
|
||||
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromDb)
|
||||
tdLog.info("success dorp topic: %s"%topicFromDb)
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
||||
def run(self):
|
||||
|
||||
def run(self):
|
||||
# tdLog.printNoPrefix("=============================================")
|
||||
# tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||
# self.snapshot = 0
|
||||
# self.tmqCase1()
|
||||
# self.tmqCase2()
|
||||
|
||||
# self.tmqCase2()
|
||||
|
||||
tdLog.printNoPrefix("====================================================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
|
||||
self.snapshot = 1
|
||||
|
|
|
@ -20,11 +20,11 @@ class TDTestCase:
|
|||
self.vgroups = 4
|
||||
self.ctbNum = 100
|
||||
self.rowsPerTbl = 1000
|
||||
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
||||
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
|
@ -50,7 +50,7 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
# tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
|
@ -65,11 +65,11 @@ class TDTestCase:
|
|||
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
|
||||
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
return
|
||||
|
||||
|
||||
# drop some ctbs
|
||||
def tmqCase1(self):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
|
@ -96,10 +96,10 @@ class TDTestCase:
|
|||
paraDict['snapshot'] = self.snapshot
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
# again create one new stb1
|
||||
paraDict["stbName"] = 'stb1'
|
||||
paraDict['ctbPrefix'] = 'ctb1n_'
|
||||
|
@ -112,16 +112,16 @@ class TDTestCase:
|
|||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||
|
||||
|
||||
tdLog.info("create topics from database")
|
||||
topicFromDb = 'topic_dbt'
|
||||
topicFromDb = 'topic_dbt'
|
||||
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
|
||||
|
||||
|
||||
if self.snapshot == 0:
|
||||
consumerId = 0
|
||||
elif self.snapshot == 1:
|
||||
consumerId = 1
|
||||
|
||||
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
|
||||
topicList = topicFromDb
|
||||
ifcheckdata = 1
|
||||
|
@ -135,17 +135,17 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tdLog.info("drop some ctables")
|
||||
paraDict["stbName"] = 'stb'
|
||||
paraDict["stbName"] = 'stb'
|
||||
paraDict['ctbPrefix'] = 'ctb'
|
||||
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 3 / 4) # drop 1/4 ctbls
|
||||
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
|
||||
# tdSql.execute("drop table %s.%s" %(paraDict['dbName'], paraDict['stbName']))
|
||||
# tdSql.execute("drop table %s.%s" %(paraDict['dbName'], paraDict['stbName']))
|
||||
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
|
||||
|
||||
|
||||
pInsertThread.join()
|
||||
|
||||
|
||||
tdLog.info("start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
@ -154,17 +154,17 @@ class TDTestCase:
|
|||
totalConsumeRows += resultList[i]
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
|
||||
|
||||
if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)):
|
||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||
|
||||
tdLog.info("wait subscriptions exit ....")
|
||||
tdLog.info("wait subscriptions exit ....")
|
||||
tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
|
||||
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromDb)
|
||||
tdLog.info("success dorp topic: %s"%topicFromDb)
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
|
||||
# drop one stb
|
||||
def tmqCase2(self):
|
||||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
|
@ -191,10 +191,10 @@ class TDTestCase:
|
|||
paraDict['snapshot'] = self.snapshot
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
|
||||
# again create one new stb1
|
||||
paraDict["stbName"] = 'stb2'
|
||||
paraDict['ctbPrefix'] = 'ctb2n_'
|
||||
|
@ -207,16 +207,16 @@ class TDTestCase:
|
|||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||
|
||||
|
||||
tdLog.info("create topics from database")
|
||||
topicFromDb = 'topic_dbt'
|
||||
topicFromDb = 'topic_dbt'
|
||||
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
|
||||
|
||||
|
||||
if self.snapshot == 0:
|
||||
consumerId = 2
|
||||
elif self.snapshot == 1:
|
||||
consumerId = 3
|
||||
|
||||
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
|
||||
topicList = topicFromDb
|
||||
ifcheckdata = 1
|
||||
|
@ -230,13 +230,13 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tdLog.info("drop one stable")
|
||||
paraDict["stbName"] = 'stb1'
|
||||
tdSql.execute("drop table %s.%s" %(paraDict['dbName'], paraDict['stbName']))
|
||||
tdSql.execute("drop table %s.%s" %(paraDict['dbName'], paraDict['stbName']))
|
||||
|
||||
pInsertThread.join()
|
||||
|
||||
|
||||
tdLog.info("start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
@ -245,25 +245,25 @@ class TDTestCase:
|
|||
totalConsumeRows += resultList[i]
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
|
||||
|
||||
if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)):
|
||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||
|
||||
tdLog.info("wait subscriptions exit ....")
|
||||
tdLog.info("wait subscriptions exit ....")
|
||||
tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
|
||||
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromDb)
|
||||
tdLog.info("success dorp topic: %s"%topicFromDb)
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
||||
def run(self):
|
||||
def run(self):
|
||||
tdLog.printNoPrefix("=============================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||
self.snapshot = 0
|
||||
self.prepareTestEnv()
|
||||
self.tmqCase1()
|
||||
self.tmqCase2()
|
||||
|
||||
self.tmqCase2()
|
||||
|
||||
tdLog.printNoPrefix("====================================================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
|
||||
self.snapshot = 1
|
||||
|
|
|
@ -22,12 +22,12 @@ class TDTestCase:
|
|||
self.vgroups = 4
|
||||
self.ctbNum = 100
|
||||
self.rowsPerTbl = 1000
|
||||
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
|
||||
def prepare_udf_so(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
|
@ -66,20 +66,20 @@ class TDTestCase:
|
|||
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
|
||||
tdLog.info(cmdStr)
|
||||
os.system(cmdStr)
|
||||
|
||||
|
||||
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
|
||||
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
|
||||
|
||||
consumeFile = open(consumeRowsFile, mode='r')
|
||||
queryFile = open(dstFile, mode='r')
|
||||
|
||||
|
||||
# skip first line for it is schema
|
||||
queryFile.readline()
|
||||
|
||||
while True:
|
||||
dst = queryFile.readline()
|
||||
src = consumeFile.readline()
|
||||
|
||||
|
||||
if dst:
|
||||
if dst != src:
|
||||
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
|
||||
|
@ -112,7 +112,7 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
|
@ -127,11 +127,11 @@ class TDTestCase:
|
|||
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
|
||||
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
return
|
||||
|
||||
|
||||
def tmqCase1(self):
|
||||
tdLog.printNoPrefix("======== test case 1: multi sub table")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
|
@ -168,13 +168,13 @@ class TDTestCase:
|
|||
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
|
||||
# tdLog.info("insert data")
|
||||
# tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
|
@ -190,10 +190,10 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("0 tmq consume rows error!")
|
||||
|
@ -208,7 +208,7 @@ class TDTestCase:
|
|||
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
consumerId = 1
|
||||
|
@ -218,7 +218,7 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
if expectRowsList[1] != resultList[0]:
|
||||
|
@ -228,12 +228,12 @@ class TDTestCase:
|
|||
# self.checkFileContent(consumerId, queryString)
|
||||
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
|
||||
def tmqCase2(self):
|
||||
tdLog.printNoPrefix("======== test case 2: multi sub table, consume with auto create tble and insert data")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
|
@ -270,13 +270,13 @@ class TDTestCase:
|
|||
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
|
||||
# tdLog.info("insert data")
|
||||
# tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
# tdSql.query(queryString)
|
||||
# tdSql.query(queryString)
|
||||
# expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
|
@ -292,18 +292,18 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
|
||||
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
tdSql.query(queryString)
|
||||
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("2 tmq consume rows error!")
|
||||
|
@ -318,7 +318,7 @@ class TDTestCase:
|
|||
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
consumerId = 3
|
||||
|
@ -328,7 +328,7 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
if expectRowsList[1] != resultList[0]:
|
||||
|
@ -338,7 +338,7 @@ class TDTestCase:
|
|||
# self.checkFileContent(consumerId, queryString)
|
||||
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
|
@ -348,13 +348,13 @@ class TDTestCase:
|
|||
# tdSql.prepare()
|
||||
self.prepare_udf_so()
|
||||
self.create_udf_function()
|
||||
|
||||
|
||||
# tdLog.printNoPrefix("=============================================")
|
||||
# tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||
# self.prepareTestEnv()
|
||||
# self.tmqCase1()
|
||||
# self.tmqCase2()
|
||||
|
||||
|
||||
tdLog.printNoPrefix("====================================================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
|
||||
self.prepareTestEnv()
|
||||
|
|
|
@ -22,12 +22,12 @@ class TDTestCase:
|
|||
self.vgroups = 4
|
||||
self.ctbNum = 1
|
||||
self.rowsPerTbl = 1000
|
||||
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
|
||||
def prepare_udf_so(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
|
@ -66,20 +66,20 @@ class TDTestCase:
|
|||
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
|
||||
tdLog.info(cmdStr)
|
||||
os.system(cmdStr)
|
||||
|
||||
|
||||
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
|
||||
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
|
||||
|
||||
consumeFile = open(consumeRowsFile, mode='r')
|
||||
queryFile = open(dstFile, mode='r')
|
||||
|
||||
|
||||
# skip first line for it is schema
|
||||
queryFile.readline()
|
||||
|
||||
while True:
|
||||
dst = queryFile.readline()
|
||||
src = consumeFile.readline()
|
||||
|
||||
|
||||
if dst:
|
||||
if dst != src:
|
||||
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
|
||||
|
@ -112,7 +112,7 @@ class TDTestCase:
|
|||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
|
@ -127,11 +127,11 @@ class TDTestCase:
|
|||
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
|
||||
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
return
|
||||
|
||||
|
||||
def tmqCase1(self):
|
||||
tdLog.printNoPrefix("======== test case 1: one sub table")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
|
@ -168,13 +168,13 @@ class TDTestCase:
|
|||
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
|
||||
# tdLog.info("insert data")
|
||||
# tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
|
@ -190,10 +190,10 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("0 tmq consume rows error!")
|
||||
|
@ -209,7 +209,7 @@ class TDTestCase:
|
|||
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
consumerId = 1
|
||||
|
@ -219,7 +219,7 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
if expectRowsList[1] != resultList[0]:
|
||||
|
@ -229,12 +229,12 @@ class TDTestCase:
|
|||
self.checkFileContent(consumerId, queryString)
|
||||
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
|
||||
def tmqCase2(self):
|
||||
tdLog.printNoPrefix("======== test case 2: one sub table, consume with auto create tble and insert data")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
|
@ -271,13 +271,13 @@ class TDTestCase:
|
|||
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
|
||||
# tdLog.info("insert data")
|
||||
# tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
|
||||
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
# tdSql.query(queryString)
|
||||
# tdSql.query(queryString)
|
||||
# expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
|
@ -293,18 +293,18 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
|
||||
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
tdSql.query(queryString)
|
||||
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("2 tmq consume rows error!")
|
||||
|
@ -319,7 +319,7 @@ class TDTestCase:
|
|||
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
consumerId = 3
|
||||
|
@ -329,7 +329,7 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
if expectRowsList[1] != resultList[0]:
|
||||
|
@ -339,7 +339,7 @@ class TDTestCase:
|
|||
self.checkFileContent(consumerId, queryString)
|
||||
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
|
||||
|
||||
time.sleep(10)
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
|
@ -349,13 +349,13 @@ class TDTestCase:
|
|||
# tdSql.prepare()
|
||||
self.prepare_udf_so()
|
||||
self.create_udf_function()
|
||||
|
||||
|
||||
tdLog.printNoPrefix("=============================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||
self.prepareTestEnv()
|
||||
self.tmqCase1()
|
||||
self.tmqCase2()
|
||||
|
||||
|
||||
tdLog.printNoPrefix("====================================================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
|
||||
self.prepareTestEnv()
|
||||
|
|
Loading…
Reference in New Issue