test:add cases for tmq
This commit is contained in:
parent
7b1833a6f8
commit
33ce0f2fb6
|
@ -1079,6 +1079,291 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix("======== test case 10 end ...... ")
|
||||
|
||||
def tmqCase11(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 11: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
parameterDict = {'cfg': '', \
|
||||
'actionType': 0, \
|
||||
'dbName': 'db11', \
|
||||
'dropFlag': 1, \
|
||||
'vgroups': 4, \
|
||||
'replica': 1, \
|
||||
'stbName': 'stb1', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 10000, \
|
||||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
self.insert_data(tdSql,\
|
||||
parameterDict["dbName"],\
|
||||
parameterDict["stbName"],\
|
||||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:none'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 5
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
tdLog.info("start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
self.initConsumerInfoTable()
|
||||
consumerId = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:none'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("again start consume processor")
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
tdLog.info("again check consume result")
|
||||
expectRows = 2
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 11 end ...... ")
|
||||
|
||||
def tmqCase12(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 12: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
parameterDict = {'cfg': '', \
|
||||
'actionType': 0, \
|
||||
'dbName': 'db12', \
|
||||
'dropFlag': 1, \
|
||||
'vgroups': 4, \
|
||||
'replica': 1, \
|
||||
'stbName': 'stb1', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 10000, \
|
||||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
self.insert_data(tdSql,\
|
||||
parameterDict["dbName"],\
|
||||
parameterDict["stbName"],\
|
||||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 5
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
tdLog.info("start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != expectrowcnt/4:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
self.initConsumerInfoTable()
|
||||
consumerId = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:none'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("again start consume processor")
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
tdLog.info("again check consume result")
|
||||
expectRows = 2
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != expectrowcnt/4:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 12 end ...... ")
|
||||
|
||||
def tmqCase13(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 13: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
parameterDict = {'cfg': '', \
|
||||
'actionType': 0, \
|
||||
'dbName': 'db13', \
|
||||
'dropFlag': 1, \
|
||||
'vgroups': 4, \
|
||||
'replica': 1, \
|
||||
'stbName': 'stb1', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 10000, \
|
||||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
self.insert_data(tdSql,\
|
||||
parameterDict["dbName"],\
|
||||
parameterDict["stbName"],\
|
||||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 5
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
tdLog.info("start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != expectrowcnt/4:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
self.initConsumerInfoTable()
|
||||
consumerId = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:none'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt/2,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("again start consume processor")
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
tdLog.info("again check consume result")
|
||||
expectRows = 2
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != expectrowcnt*(1/2+1/4):
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*(1/2+1/4)))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
self.initConsumerInfoTable()
|
||||
consumerId = 2
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:none'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("again start consume processor")
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
tdLog.info("again check consume result")
|
||||
expectRows = 3
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 13 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
|
@ -1099,8 +1384,10 @@ class TDTestCase:
|
|||
self.tmqCase7(cfgPath, buildPath)
|
||||
self.tmqCase8(cfgPath, buildPath)
|
||||
self.tmqCase9(cfgPath, buildPath)
|
||||
self.tmqCase10(cfgPath, buildPath)
|
||||
|
||||
self.tmqCase10(cfgPath, buildPath)
|
||||
self.tmqCase11(cfgPath, buildPath)
|
||||
self.tmqCase12(cfgPath, buildPath)
|
||||
self.tmqCase13(cfgPath, buildPath)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
Loading…
Reference in New Issue