[test: modify case for tmq]

This commit is contained in:
plum-lihui 2022-05-12 20:53:52 +08:00
parent a3eaf53bf5
commit c4c81c83af
1 changed files with 145 additions and 32 deletions

View File

@ -110,19 +110,10 @@ class TDTestCase:
parameterDict["rowsPerTbl"],\ parameterDict["rowsPerTbl"],\
parameterDict["batchNum"],\ parameterDict["batchNum"],\
parameterDict["startTs"]) parameterDict["startTs"])
return return
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath)
def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test scenario 1: ") tdLog.printNoPrefix("======== test scenario 1: ")
tdLog.info("step 1: create database, stb, ctb and insert data") tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread # create and start thread
@ -131,7 +122,7 @@ class TDTestCase:
'vgroups': 1, \ 'vgroups': 1, \
'stbName': 'stb', \ 'stbName': 'stb', \
'ctbNum': 10, \ 'ctbNum': 10, \
'rowsPerTbl': 10000, \ 'rowsPerTbl': 100, \
'batchNum': 10, \ 'batchNum': 10, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000 'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath parameterDict['cfg'] = cfgPath
@ -141,10 +132,7 @@ class TDTestCase:
# wait stb ready # wait stb ready
while 1: while 1:
#tdSql.query("show %s.stables"%parameterDict['dbName']) tdSql.query("show %s.stables"%parameterDict['dbName'])
tdSql.query("show db.stables")
#print (self.queryResult)
#print (tdSql.getRows())
if tdSql.getRows() == 1: if tdSql.getRows() == 1:
break break
else: else:
@ -159,26 +147,18 @@ class TDTestCase:
time.sleep(1) time.sleep(1)
tdSql.query("show topics") tdSql.query("show topics")
print ("======================================")
#print (self.queryResult)
#tdSql.checkRows(2) #tdSql.checkRows(2)
topic1 = tdSql.getData(0 , 0) topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0) topic2 = tdSql.getData(1 , 0)
print (topic1)
print (topic2)
print (topicFromStb) tdLog.info("show topics: %s, %s"%(topic1, topic2))
print (topicFromCtb) if topic1 != topicFromStb and topic1 != topicFromCtb:
#tdLog.info("show topics: %s, %s"%topic1, topic2) tdLog.exit("topic error1")
#if topic1 != topicFromStb or topic1 != topicFromCtb: if topic2 != topicFromStb and topic2 != topicFromCtb:
# tdLog.exit("topic error1") tdLog.exit("topic error2")
#if topic2 != topicFromStb or topic2 != topicFromCtb:
# tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"] cdbName = parameterDict["dbName"]
#tdSql.query("create database %s"%cdbName)
#tdSql.query("use %s"%cdbName)
tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)") tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")
@ -234,14 +214,147 @@ class TDTestCase:
tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt) tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb)
tdLog.printNoPrefix("======== test scenario 2: ")
def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db2', \
'vgroups': 1, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
# wait db ready
while 1:
tdSql.query("show databases")
if tdSql.getRows() == 4:
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
break
else:
time.sleep(1)
tdSql.query("use %s"%parameterDict['dbName'])
# wait stb ready
while 1:
tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1:
break
else:
time.sleep(1)
tdLog.printNoPrefix("======== test scenario 3: ") tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column2'
topicFromCtb = 'topic_ctb_column2'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1)
tdSql.query("show topics")
topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0)
tdLog.info("show topics: %s, %s"%(topic1, topic2))
if topic1 != topicFromStb and topic1 != topicFromCtb:
tdLog.exit("topic error1")
if topic2 != topicFromStb and topic2 != topicFromCtb:
tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
#os.system('pkill tmq_sim') consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
countOfStb = tdSql.getData(0, 0)
if countOfStb != 0:
tdLog.info("count from stb: %d"%countOfStb)
break
else:
time.sleep(1)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
# create new child table and insert data
newCtbName = 'newctb'
rowsOfNewCtb = 1000
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
startTs = parameterDict["startTs"]
for j in range(rowsOfNewCtb):
sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j)
tdSql.execute(sql)
tdLog.debug("insert data into new child table ............ [OK]")
# wait for data ready
prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from consumeresult")
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
break
else:
time.sleep(5)
expectmsgcnt += rowsOfNewCtb
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdLog.printNoPrefix("======== test scenario 2 end ...... ")
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath)
#self.tmqCase2(cfgPath, buildPath)
#self.tmqCase3(cfgPath, buildPath)
def stop(self): def stop(self):
tdSql.close() tdSql.close()