test: add cases for tmq
This commit is contained in:
parent
bfa5caa2d2
commit
c680901163
|
@ -93,7 +93,7 @@ class TDTestCase:
|
|||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
|
||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum):
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("use %s" %dbName)
|
||||
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
|
@ -147,8 +147,7 @@ class TDTestCase:
|
|||
parameterDict["dbName"],\
|
||||
parameterDict["vgroups"],\
|
||||
parameterDict["stbName"],\
|
||||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"])
|
||||
parameterDict["ctbNum"])
|
||||
|
||||
self.insert_data(tsql,\
|
||||
parameterDict["dbName"],\
|
||||
|
@ -322,6 +321,75 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
||||
def tmqCase2a(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 2a: Produce while two consumers to subscribe one db, inclue 1 stb")
|
||||
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||
# create and start thread
|
||||
parameterDict = {'cfg': '', \
|
||||
'dbName': 'db2a', \
|
||||
'vgroups': 4, \
|
||||
'stbName': 'stb1', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 10000, \
|
||||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||
tdSql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
topicList = topicName1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
consumerId = 1
|
||||
keyList = 'group.id:cgrp2,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 10
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 2
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != expectrowcnt * 2:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicName1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 2a end ...... ")
|
||||
|
||||
def tmqCase3(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 3: Produce while one consumers to subscribe one db, include 2 stb")
|
||||
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||
|
@ -745,6 +813,7 @@ class TDTestCase:
|
|||
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
# self.tmqCase2a(cfgPath, buildPath)
|
||||
self.tmqCase3(cfgPath, buildPath)
|
||||
self.tmqCase4(cfgPath, buildPath)
|
||||
self.tmqCase5(cfgPath, buildPath)
|
||||
|
|
Loading…
Reference in New Issue