test: add test case for tmq
This commit is contained in:
parent
c0d5086c12
commit
c2e006ac1d
|
@ -49,7 +49,38 @@ class TDTestCase:
|
||||||
print(cur)
|
print(cur)
|
||||||
return cur
|
return cur
|
||||||
|
|
||||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg,showRow,cdbName,valgrind=0):
|
def initConsumerTable(self,cdbName='cdb'):
|
||||||
|
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||||
|
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||||
|
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||||
|
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
||||||
|
|
||||||
|
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||||
|
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||||
|
|
||||||
|
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||||
|
sql = "insert into %s.consumeinfo values "%cdbName
|
||||||
|
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||||
|
tdLog.info("consume info sql: %s"%sql)
|
||||||
|
tdSql.query(sql)
|
||||||
|
|
||||||
|
def selectConsumeResult(self,expectRows,cdbName='cdb'):
|
||||||
|
resultList=[]
|
||||||
|
while 1:
|
||||||
|
tdSql.query("select * from %s.consumeresult"%cdbName)
|
||||||
|
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||||
|
if tdSql.getRows() == expectRows:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
for i in range(expectRows):
|
||||||
|
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||||
|
resultList.append(tdSql.getData(i , 3))
|
||||||
|
|
||||||
|
return resultList
|
||||||
|
|
||||||
|
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||||
shellCmd = 'nohup '
|
shellCmd = 'nohup '
|
||||||
if valgrind == 1:
|
if valgrind == 1:
|
||||||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||||
|
@ -58,7 +89,7 @@ class TDTestCase:
|
||||||
|
|
||||||
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||||
shellCmd += "> /dev/null 2>&1 &"
|
shellCmd += "> /dev/null 2>&1 &"
|
||||||
tdLog.info(shellCmd)
|
tdLog.info(shellCmd)
|
||||||
os.system(shellCmd)
|
os.system(shellCmd)
|
||||||
|
|
||||||
|
@ -87,6 +118,8 @@ class TDTestCase:
|
||||||
pre_insert = "insert into "
|
pre_insert = "insert into "
|
||||||
sql = pre_insert
|
sql = pre_insert
|
||||||
|
|
||||||
|
t = time.time()
|
||||||
|
startTs = int(round(t * 1000))
|
||||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
for i in range(ctbNum):
|
for i in range(ctbNum):
|
||||||
sql += " %s_%d values "%(stbName,i)
|
sql += " %s_%d values "%(stbName,i)
|
||||||
|
@ -127,7 +160,7 @@ class TDTestCase:
|
||||||
return
|
return
|
||||||
|
|
||||||
def tmqCase1(self, cfgPath, buildPath):
|
def tmqCase1(self, cfgPath, buildPath):
|
||||||
tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db")
|
tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db, inclue 1 stb")
|
||||||
tdLog.info("step 1: create database, stb, ctb and insert data")
|
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||||
# create and start thread
|
# create and start thread
|
||||||
parameterDict = {'cfg': '', \
|
parameterDict = {'cfg': '', \
|
||||||
|
@ -135,11 +168,13 @@ class TDTestCase:
|
||||||
'vgroups': 4, \
|
'vgroups': 4, \
|
||||||
'stbName': 'stb', \
|
'stbName': 'stb', \
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 100000, \
|
'rowsPerTbl': 10000, \
|
||||||
'batchNum': 200, \
|
'batchNum': 100, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
@ -149,23 +184,16 @@ class TDTestCase:
|
||||||
topicName1 = 'topic_db1'
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
tdLog.info("create consume info table and consume result table")
|
|
||||||
cdbName = parameterDict["dbName"]
|
|
||||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
|
||||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
ifcheckdata = 0
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
keyList = 'group.id:cgrp1,\
|
keyList = 'group.id:cgrp1,\
|
||||||
enable.auto.commit:false,\
|
enable.auto.commit:false,\
|
||||||
auto.commit.interval.ms:6000,\
|
auto.commit.interval.ms:6000,\
|
||||||
auto.offset.reset:earliest'
|
auto.offset.reset:earliest'
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
|
||||||
tdSql.query(sql)
|
|
||||||
|
|
||||||
event.wait()
|
event.wait()
|
||||||
|
|
||||||
|
@ -173,32 +201,28 @@ class TDTestCase:
|
||||||
pollDelay = 5
|
pollDelay = 5
|
||||||
showMsg = 1
|
showMsg = 1
|
||||||
showRow = 1
|
showRow = 1
|
||||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName)
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
# wait for data ready
|
# wait for data ready
|
||||||
prepareEnvThread.join()
|
prepareEnvThread.join()
|
||||||
|
|
||||||
tdLog.info("insert process end, and start to check consume result")
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
while 1:
|
expectRows = 1
|
||||||
tdSql.query("select * from %s.consumeresult"%cdbName)
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
totalConsumeRows = 0
|
||||||
if tdSql.getRows() == 1:
|
for i in range(expectRows):
|
||||||
break
|
totalConsumeRows += resultList[i]
|
||||||
else:
|
|
||||||
time.sleep(5)
|
if totalConsumeRows != expectrowcnt:
|
||||||
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3)))
|
tdLog.exit("tmq consume rows error!")
|
||||||
tdSql.checkData(0 , 1, consumerId)
|
|
||||||
# mulit rows and mulit tables in one sql, this num of msg is not sure
|
|
||||||
#tdSql.checkData(0 , 2, expectmsgcnt)
|
|
||||||
tdSql.checkData(0 , 3, expectrowcnt+1)
|
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicName1)
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
|
||||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
def tmqCase2(self, cfgPath, buildPath):
|
def tmqCase2(self, cfgPath, buildPath):
|
||||||
tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db")
|
tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db, inclue 1 stb")
|
||||||
tdLog.info("step 1: create database, stb, ctb and insert data")
|
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||||
# create and start thread
|
# create and start thread
|
||||||
parameterDict = {'cfg': '', \
|
parameterDict = {'cfg': '', \
|
||||||
|
@ -206,11 +230,13 @@ class TDTestCase:
|
||||||
'vgroups': 4, \
|
'vgroups': 4, \
|
||||||
'stbName': 'stb', \
|
'stbName': 'stb', \
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 100000, \
|
'rowsPerTbl': 10000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
@ -221,27 +247,19 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
tdLog.info("create consume info table and consume result table")
|
|
||||||
cdbName = parameterDict["dbName"]
|
|
||||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
|
||||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
ifcheckdata = 0
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
keyList = 'group.id:cgrp1,\
|
keyList = 'group.id:cgrp1,\
|
||||||
enable.auto.commit:false,\
|
enable.auto.commit:false,\
|
||||||
auto.commit.interval.ms:6000,\
|
auto.commit.interval.ms:6000,\
|
||||||
auto.offset.reset:earliest'
|
auto.offset.reset:earliest'
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
|
||||||
tdSql.query(sql)
|
|
||||||
|
|
||||||
consumerId = 1
|
consumerId = 1
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
|
||||||
tdSql.query(sql)
|
|
||||||
|
|
||||||
event.wait()
|
event.wait()
|
||||||
|
|
||||||
|
@ -249,30 +267,20 @@ class TDTestCase:
|
||||||
pollDelay = 5
|
pollDelay = 5
|
||||||
showMsg = 1
|
showMsg = 1
|
||||||
showRow = 1
|
showRow = 1
|
||||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName)
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
# wait for data ready
|
# wait for data ready
|
||||||
prepareEnvThread.join()
|
prepareEnvThread.join()
|
||||||
|
|
||||||
tdLog.info("insert process end, and start to check consume result")
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
while 1:
|
expectRows = 2
|
||||||
tdSql.query("select * from %s.consumeresult"%cdbName)
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
totalConsumeRows = 0
|
||||||
if tdSql.getRows() == 2:
|
for i in range(expectRows):
|
||||||
break
|
totalConsumeRows += resultList[i]
|
||||||
else:
|
|
||||||
time.sleep(5)
|
if totalConsumeRows != expectrowcnt:
|
||||||
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
consumerId0 = tdSql.getData(0 , 1)
|
|
||||||
consumerId1 = tdSql.getData(1 , 1)
|
|
||||||
actConsumeRows0 = tdSql.getData(0 , 3)
|
|
||||||
actConsumeRows1 = tdSql.getData(1 , 3)
|
|
||||||
|
|
||||||
tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0))
|
|
||||||
tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1))
|
|
||||||
|
|
||||||
totalConsumeRows = actConsumeRows0 + actConsumeRows1
|
|
||||||
if totalConsumeRows != expectrowcnt + 2:
|
|
||||||
tdLog.exit("tmq consume rows error!")
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicName1)
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
@ -288,11 +296,13 @@ class TDTestCase:
|
||||||
'vgroups': 4, \
|
'vgroups': 4, \
|
||||||
'stbName': 'stb', \
|
'stbName': 'stb', \
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 100000, \
|
'rowsPerTbl': 10000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
@ -303,7 +313,7 @@ class TDTestCase:
|
||||||
'vgroups': 4, \
|
'vgroups': 4, \
|
||||||
'stbName': 'stb2', \
|
'stbName': 'stb2', \
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 100000, \
|
'rowsPerTbl': 10000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
@ -316,65 +326,377 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
tdLog.info("create consume info table and consume result table")
|
|
||||||
cdbName = parameterDict["dbName"]
|
|
||||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
|
||||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
ifcheckdata = 0
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
keyList = 'group.id:cgrp1,\
|
keyList = 'group.id:cgrp1,\
|
||||||
enable.auto.commit:false,\
|
enable.auto.commit:false,\
|
||||||
auto.commit.interval.ms:6000,\
|
auto.commit.interval.ms:6000,\
|
||||||
auto.offset.reset:earliest'
|
auto.offset.reset:earliest'
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
|
||||||
tdSql.query(sql)
|
|
||||||
|
|
||||||
# consumerId = 1
|
# consumerId = 1
|
||||||
# sql = "insert into %s.consumeinfo values "%cdbName
|
# self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
# sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
|
||||||
# tdSql.query(sql)
|
|
||||||
|
|
||||||
event.wait()
|
event.wait()
|
||||||
|
|
||||||
tdLog.info("start consume processor")
|
tdLog.info("start consume processor")
|
||||||
pollDelay = 5
|
pollDelay = 5
|
||||||
showMsg = 1
|
showMsg = 1
|
||||||
showRow = 1
|
showRow = 1
|
||||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName)
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
# wait for data ready
|
# wait for data ready
|
||||||
prepareEnvThread.join()
|
prepareEnvThread.join()
|
||||||
prepareEnvThread2.join()
|
prepareEnvThread2.join()
|
||||||
|
|
||||||
tdLog.info("insert process end, and start to check consume result")
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
while 1:
|
expectRows = 1
|
||||||
tdSql.query("select * from %s.consumeresult"%cdbName)
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
totalConsumeRows = 0
|
||||||
if tdSql.getRows() == 1:
|
for i in range(expectRows):
|
||||||
break
|
totalConsumeRows += resultList[i]
|
||||||
else:
|
|
||||||
time.sleep(5)
|
if totalConsumeRows != expectrowcnt:
|
||||||
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
consumerId0 = tdSql.getData(0 , 1)
|
|
||||||
#consumerId1 = tdSql.getData(1 , 1)
|
|
||||||
actConsumeRows0 = tdSql.getData(0 , 3)
|
|
||||||
#actConsumeRows1 = tdSql.getData(1 , 3)
|
|
||||||
|
|
||||||
tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0))
|
|
||||||
#tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1))
|
|
||||||
|
|
||||||
#totalConsumeRows = actConsumeRows0 + actConsumeRows1
|
|
||||||
if actConsumeRows0 != expectrowcnt + 1:
|
|
||||||
tdLog.exit("tmq consume rows error!")
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicName1)
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
|
||||||
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||||
|
|
||||||
|
def tmqCase4(self, cfgPath, buildPath):
|
||||||
|
tdLog.printNoPrefix("======== test case 4: Produce while two consumers to subscribe one db, include 2 stb")
|
||||||
|
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||||
|
# create and start thread
|
||||||
|
parameterDict = {'cfg': '', \
|
||||||
|
'dbName': 'db4', \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'stbName': 'stb', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
|
||||||
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
prepareEnvThread.start()
|
||||||
|
|
||||||
|
parameterDict2 = {'cfg': '', \
|
||||||
|
'dbName': 'db4', \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'stbName': 'stb2', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
|
||||||
|
prepareEnvThread2.start()
|
||||||
|
|
||||||
|
tdLog.info("create topics from db")
|
||||||
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
|
topicList = topicName1
|
||||||
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
|
keyList = 'group.id:cgrp1,\
|
||||||
|
enable.auto.commit:false,\
|
||||||
|
auto.commit.interval.ms:6000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
consumerId = 1
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
event.wait()
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
pollDelay = 5
|
||||||
|
showMsg = 1
|
||||||
|
showRow = 1
|
||||||
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
|
# wait for data ready
|
||||||
|
prepareEnvThread.join()
|
||||||
|
prepareEnvThread2.join()
|
||||||
|
|
||||||
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
|
expectRows = 2
|
||||||
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
|
totalConsumeRows = 0
|
||||||
|
for i in range(expectRows):
|
||||||
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
|
if totalConsumeRows != expectrowcnt:
|
||||||
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 4 end ...... ")
|
||||||
|
|
||||||
|
def tmqCase5(self, cfgPath, buildPath):
|
||||||
|
tdLog.printNoPrefix("======== test case 5: Produce while two consumers to subscribe one db, firstly create one stb, after start consume create other stb")
|
||||||
|
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||||
|
# create and start thread
|
||||||
|
parameterDict = {'cfg': '', \
|
||||||
|
'dbName': 'db5', \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'stbName': 'stb', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
|
||||||
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
prepareEnvThread.start()
|
||||||
|
|
||||||
|
parameterDict2 = {'cfg': '', \
|
||||||
|
'dbName': 'db5', \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'stbName': 'stb2', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
tdLog.info("create topics from db")
|
||||||
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
|
topicList = topicName1
|
||||||
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
|
keyList = 'group.id:cgrp1,\
|
||||||
|
enable.auto.commit:false,\
|
||||||
|
auto.commit.interval.ms:6000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
consumerId = 1
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
event.wait()
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
pollDelay = 5
|
||||||
|
showMsg = 1
|
||||||
|
showRow = 1
|
||||||
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
|
prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
|
||||||
|
prepareEnvThread2.start()
|
||||||
|
|
||||||
|
# wait for data ready
|
||||||
|
prepareEnvThread.join()
|
||||||
|
prepareEnvThread2.join()
|
||||||
|
|
||||||
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
|
expectRows = 2
|
||||||
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
|
totalConsumeRows = 0
|
||||||
|
for i in range(expectRows):
|
||||||
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
|
if totalConsumeRows != expectrowcnt:
|
||||||
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 5 end ...... ")
|
||||||
|
|
||||||
|
def tmqCase6(self, cfgPath, buildPath):
|
||||||
|
tdLog.printNoPrefix("======== test case 6: Produce while one consumers to subscribe tow topic, Each contains one db")
|
||||||
|
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||||
|
# create and start thread
|
||||||
|
parameterDict = {'cfg': '', \
|
||||||
|
'dbName': 'db60', \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'stbName': 'stb', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
|
||||||
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
prepareEnvThread.start()
|
||||||
|
|
||||||
|
parameterDict2 = {'cfg': '', \
|
||||||
|
'dbName': 'db61', \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'stbName': 'stb2', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict2['dbName'], parameterDict2['vgroups']))
|
||||||
|
|
||||||
|
prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
|
||||||
|
prepareEnvThread2.start()
|
||||||
|
|
||||||
|
tdLog.info("create topics from db")
|
||||||
|
topicName1 = 'topic_db60'
|
||||||
|
topicName2 = 'topic_db61'
|
||||||
|
|
||||||
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
tdSql.execute("create topic %s as %s" %(topicName2, parameterDict2['dbName']))
|
||||||
|
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
|
topicList = topicName1 + ',' + topicName2
|
||||||
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
|
keyList = 'group.id:cgrp1,\
|
||||||
|
enable.auto.commit:false,\
|
||||||
|
auto.commit.interval.ms:6000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
#consumerId = 1
|
||||||
|
#self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
event.wait()
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
pollDelay = 5
|
||||||
|
showMsg = 1
|
||||||
|
showRow = 1
|
||||||
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
|
# wait for data ready
|
||||||
|
prepareEnvThread.join()
|
||||||
|
prepareEnvThread2.join()
|
||||||
|
|
||||||
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
|
expectRows = 1
|
||||||
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
|
totalConsumeRows = 0
|
||||||
|
for i in range(expectRows):
|
||||||
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
|
if totalConsumeRows != expectrowcnt:
|
||||||
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
tdSql.query("drop topic %s"%topicName2)
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 6 end ...... ")
|
||||||
|
|
||||||
|
def tmqCase7(self, cfgPath, buildPath):
|
||||||
|
tdLog.printNoPrefix("======== test case 7: Produce while two consumers to subscribe tow topic, Each contains one db")
|
||||||
|
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||||
|
# create and start thread
|
||||||
|
parameterDict = {'cfg': '', \
|
||||||
|
'dbName': 'db60', \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'stbName': 'stb', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
|
||||||
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
prepareEnvThread.start()
|
||||||
|
|
||||||
|
parameterDict2 = {'cfg': '', \
|
||||||
|
'dbName': 'db61', \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'stbName': 'stb2', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict2['dbName'], parameterDict2['vgroups']))
|
||||||
|
|
||||||
|
prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
|
||||||
|
prepareEnvThread2.start()
|
||||||
|
|
||||||
|
tdLog.info("create topics from db")
|
||||||
|
topicName1 = 'topic_db60'
|
||||||
|
topicName2 = 'topic_db61'
|
||||||
|
|
||||||
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
tdSql.execute("create topic %s as %s" %(topicName2, parameterDict2['dbName']))
|
||||||
|
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
|
topicList = topicName1 + ',' + topicName2
|
||||||
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
|
keyList = 'group.id:cgrp1,\
|
||||||
|
enable.auto.commit:false,\
|
||||||
|
auto.commit.interval.ms:6000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
consumerId = 1
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
event.wait()
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
pollDelay = 5
|
||||||
|
showMsg = 1
|
||||||
|
showRow = 1
|
||||||
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
|
# wait for data ready
|
||||||
|
prepareEnvThread.join()
|
||||||
|
prepareEnvThread2.join()
|
||||||
|
|
||||||
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
|
expectRows = 2
|
||||||
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
|
totalConsumeRows = 0
|
||||||
|
for i in range(expectRows):
|
||||||
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
|
if totalConsumeRows != expectrowcnt:
|
||||||
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
tdSql.query("drop topic %s"%topicName2)
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 7 end ...... ")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
|
|
||||||
|
@ -386,9 +708,14 @@ class TDTestCase:
|
||||||
cfgPath = buildPath + "/../sim/psim/cfg"
|
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||||
tdLog.info("cfgPath: %s" % cfgPath)
|
tdLog.info("cfgPath: %s" % cfgPath)
|
||||||
|
|
||||||
#self.tmqCase1(cfgPath, buildPath)
|
self.tmqCase1(cfgPath, buildPath)
|
||||||
self.tmqCase2(cfgPath, buildPath)
|
self.tmqCase2(cfgPath, buildPath)
|
||||||
#self.tmqCase3(cfgPath, buildPath)
|
self.tmqCase3(cfgPath, buildPath)
|
||||||
|
self.tmqCase4(cfgPath, buildPath)
|
||||||
|
self.tmqCase5(cfgPath, buildPath)
|
||||||
|
self.tmqCase6(cfgPath, buildPath)
|
||||||
|
self.tmqCase7(cfgPath, buildPath)
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
Loading…
Reference in New Issue