test: add test case for tmq

This commit is contained in:
plum-lihui 2022-05-12 15:36:30 +08:00
parent 4511f5828f
commit 3309dd71db
1 changed files with 91 additions and 52 deletions

View File

@ -13,14 +13,12 @@ from util.dnodes import *
class TDTestCase: class TDTestCase:
hostname = socket.gethostname() hostname = socket.gethostname()
rpcDebugFlagVal = '143' #rpcDebugFlagVal = '143'
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal #print ("===================: ", updatecfgDict)
print ("===================: ", updatecfgDict)
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
@ -43,27 +41,35 @@ class TDTestCase:
break break
return buildPath return buildPath
def create_tables(self,dbName,vgroups,stbName,ctbNum,rowsPerTbl): def newcur(self,cfg,host,port):
tdSql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) user = "root"
tdSql.execute("use %s" %dbName) password = "taosdata"
tdSql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
cur=con.cursor()
print(cur)
return cur
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName)
tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
pre_create = "create table" pre_create = "create table"
sql = pre_create sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum): for i in range(ctbNum):
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
if (i > 0) and (i%100 == 0): if (i > 0) and (i%100 == 0):
tdSql.execute(sql) tsql.execute(sql)
sql = pre_create sql = pre_create
if sql != pre_create: if sql != pre_create:
tdSql.execute(sql) tsql.execute(sql)
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return return
def insert_data(self,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............") tdLog.debug("start to insert data ............")
tdSql.execute("use %s" %dbName) tsql.execute("use %s" %dbName)
pre_insert = "insert into " pre_insert = "insert into "
sql = pre_insert sql = pre_insert
@ -73,7 +79,7 @@ class TDTestCase:
for j in range(rowsPerTbl): for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tdSql.execute(sql) tsql.execute(sql)
if j < rowsPerTbl - 1: if j < rowsPerTbl - 1:
sql = "insert into %s_%d values " %(stbName,i) sql = "insert into %s_%d values " %(stbName,i)
else: else:
@ -81,25 +87,29 @@ class TDTestCase:
#end sql #end sql
if sql != pre_insert: if sql != pre_insert:
#print("insert sql:%s"%sql) #print("insert sql:%s"%sql)
tdSql.execute(sql) tsql.execute(sql)
tdLog.debug("insert data ............ [OK]") tdLog.debug("insert data ............ [OK]")
return return
def prepareEnv(self, **parameterDict): def prepareEnv(self, **parameterDict):
print ("input parameters:") print ("input parameters:")
print (parameterDict) print (parameterDict)
self.create_tables(parameterDict["dbName"],\ # create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
self.create_tables(tsql,\
parameterDict["dbName"],\
parameterDict["vgroups"],\ parameterDict["vgroups"],\
parameterDict["stbName"],\ parameterDict["stbName"],\
parameterDict["ctbNum"],\ parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"]) parameterDict["rowsPerTbl"])
self.insert_data(parameterDict["dbName"],\ self.insert_data(tsql,\
parameterDict["stbName"],\ parameterDict["dbName"],\
parameterDict["ctbNum"],\ parameterDict["stbName"],\
parameterDict["rowsPerTbl"],\ parameterDict["ctbNum"],\
parameterDict["batchNum"],\ parameterDict["rowsPerTbl"],\
parameterDict["startTs"]) parameterDict["batchNum"],\
parameterDict["startTs"])
return return
def run(self): def run(self):
@ -116,24 +126,29 @@ class TDTestCase:
tdLog.printNoPrefix("======== test scenario 1: ") tdLog.printNoPrefix("======== test scenario 1: ")
tdLog.info("step 1: create database, stb, ctb and insert data") tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread # create and start thread
parameterDict = {'dbName': 'db', \ parameterDict = {'cfg': '', \
'dbName': 'db', \
'vgroups': 1, \ 'vgroups': 1, \
'stbName': 'stb', \ 'stbName': 'stb', \
'ctbNum': 10, \ 'ctbNum': 10, \
'rowsPerTbl': 10000, \ 'rowsPerTbl': 10000, \
'batchNum': 10, \ 'batchNum': 10, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000 'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
time.sleep(1) time.sleep(2)
# wait stb ready # wait stb ready
while 1: while 1:
tdSql.query("show %s.stables"%parameterDict['dbName']) #tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1: tdSql.query("show db.stables")
#if (self.queryRows == 1): #print (self.queryResult)
time.sleep(1) #print (tdSql.getRows())
if tdSql.getRows() == 1:
break break
else:
time.sleep(1)
tdLog.info("create topics from super table") tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column' topicFromStb = 'topic_stb_column'
@ -141,57 +156,81 @@ class TDTestCase:
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1)
tdSql.query("show topics") tdSql.query("show topics")
tdSql.checkRows(2) print ("======================================")
#print (self.queryResult)
#tdSql.checkRows(2)
topic1 = tdSql.getData(0 , 0) topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0) topic2 = tdSql.getData(1 , 0)
if topic1 != topicFromStb or topic1 != topicFromCtb: print (topic1)
tdLog.exit("topic error") print (topic2)
if topic2 != topicFromStb or topic2 != topicFromCtb:
tdLog.exit("topic error") print (topicFromStb)
print (topicFromCtb)
#tdLog.info("show topics: %s, %s"%topic1, topic2)
#if topic1 != topicFromStb or topic1 != topicFromCtb:
# tdLog.exit("topic error1")
#if topic2 != topicFromStb or topic2 != topicFromCtb:
# tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table") tdLog.info("create consume info table and consume result table")
cdbName = 'cdb' cdbName = parameterDict["dbName"]
tdSql.query("create database %s"%cdbName) #tdSql.query("create database %s"%cdbName)
#tdSql.query("use %s"%cdbName)
tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)") tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")
consumerId = 0 consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] + 1) * parameterDict["ctbNum"] expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
topicList = topicFromStb topicList = topicFromStb
ifcheckdata = 0 ifcheckdata = 0
keyList = 'group.id:cgrp1, \ keyList = 'group.id:cgrp1,\
enable.auto.commit:false, \ enable.auto.commit:false,\
auto.commit.interval.ms:6000, \ auto.commit.interval.ms:6000,\
auto.offset.reset:none' auto.offset.reset:earliest'
sql = "insert into consumeinfo values " sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %l64d, %d)"%(consumerId, topicList, keyList, expectmsgcnt, ifcheckdata) sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
tdSql.query(sql) tdSql.query(sql)
tdLog.info("check stb if there are data")
while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
countOfStb = tdSql.getData(0, 0)
if countOfStb != 0:
tdLog.info("count from stb: %d"%countOfStb)
break
else:
time.sleep(1)
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 5
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s, -g %d, -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(taosCmd) os.system(shellCmd)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
tdLog.info("check consume result") tdLog.info("insert process end, and start to check consume result")
while 1: while 1:
tdSql.query("select * from consumeresult") tdSql.query("select * from consumeresult")
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1: if tdSql.getRows() == 1:
#if (self.queryRows == 1):
time.sleep(1)
break break
else:
time.sleep(5)
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt) tdSql.checkData(0 , 3, expectrowcnt)