fix test cases
This commit is contained in:
parent
5c1eef7a43
commit
589c6d5f96
|
@ -49,7 +49,7 @@ class TDTestCase:
|
|||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
|
@ -58,7 +58,7 @@ class TDTestCase:
|
|||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
|
@ -73,11 +73,11 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
|
@ -85,20 +85,20 @@ class TDTestCase:
|
|||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum):
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("use %s" %dbName)
|
||||
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
pre_create = "create table"
|
||||
|
@ -111,8 +111,8 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
event.set()
|
||||
|
||||
event.set()
|
||||
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
|
||||
return
|
||||
|
||||
|
@ -141,7 +141,7 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
print ("input parameters:")
|
||||
print (parameterDict)
|
||||
|
@ -159,7 +159,7 @@ class TDTestCase:
|
|||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"],\
|
||||
parameterDict["startTs"])
|
||||
parameterDict["startTs"])
|
||||
return
|
||||
|
||||
def tmqCase1(self, cfgPath, buildPath):
|
||||
|
@ -182,10 +182,10 @@ class TDTestCase:
|
|||
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
|
||||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -197,7 +197,7 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
@ -208,14 +208,14 @@ class TDTestCase:
|
|||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -226,7 +226,7 @@ class TDTestCase:
|
|||
self.initConsumerTable()
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -238,7 +238,7 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 20
|
||||
showMsg = 1
|
||||
|
@ -250,7 +250,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -279,12 +279,12 @@ class TDTestCase:
|
|||
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
|
||||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
|
||||
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
topicList = topicName1
|
||||
|
@ -298,25 +298,25 @@ class TDTestCase:
|
|||
|
||||
consumerId = 1
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 20
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 2
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
if not (totalConsumeRows >= expectrowcnt):
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -343,12 +343,12 @@ class TDTestCase:
|
|||
|
||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||
tdSql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
|
||||
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
topicList = topicName1
|
||||
|
@ -366,11 +366,11 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 100
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
|
@ -378,14 +378,14 @@ class TDTestCase:
|
|||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 2
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt * 2:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -430,9 +430,9 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
|
||||
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||
topicList = topicName1
|
||||
|
@ -443,10 +443,10 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
# consumerId = 1
|
||||
# self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
@ -456,16 +456,16 @@ class TDTestCase:
|
|||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread2.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -486,10 +486,10 @@ class TDTestCase:
|
|||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
self.tmqCase2a(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
self.tmqCase2a(cfgPath, buildPath)
|
||||
self.tmqCase3(cfgPath, buildPath)
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
|
|
@ -50,7 +50,7 @@ class TDTestCase:
|
|||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
|
@ -59,7 +59,7 @@ class TDTestCase:
|
|||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
|
@ -73,12 +73,12 @@ class TDTestCase:
|
|||
if tdSql.getRows() == expectRows:
|
||||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
time.sleep(5)
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
|
@ -89,17 +89,17 @@ class TDTestCase:
|
|||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("use %s" %dbName)
|
||||
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
pre_create = "create table"
|
||||
|
@ -112,8 +112,8 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
event.set()
|
||||
|
||||
event.set()
|
||||
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
|
||||
return
|
||||
|
||||
|
@ -146,7 +146,7 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
print ("input parameters:")
|
||||
print (parameterDict)
|
||||
|
@ -165,7 +165,7 @@ class TDTestCase:
|
|||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"],\
|
||||
parameterDict["startTs"])
|
||||
parameterDict["startTs"])
|
||||
return
|
||||
|
||||
def tmqCase8(self, cfgPath, buildPath):
|
||||
|
@ -188,10 +188,10 @@ class TDTestCase:
|
|||
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
|
||||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = math.ceil(parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2)
|
||||
|
@ -203,7 +203,7 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
@ -214,19 +214,19 @@ class TDTestCase:
|
|||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if not (totalConsumeRows >= expectrowcnt):
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
|
||||
|
||||
tdLog.info("again start consume processer")
|
||||
self.initConsumerTable()
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -237,7 +237,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -266,10 +266,10 @@ class TDTestCase:
|
|||
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
|
||||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = math.ceil(parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] / 2)
|
||||
|
@ -281,7 +281,7 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
@ -292,14 +292,14 @@ class TDTestCase:
|
|||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
tdSql.query("select count(*) from %s.%s" %(parameterDict['dbName'], parameterDict['stbName']))
|
||||
countOfStb = tdSql.getData(0,0)
|
||||
print ("====total rows of stb: %d"%countOfStb)
|
||||
|
@ -307,7 +307,7 @@ class TDTestCase:
|
|||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
if totalConsumeRows < expectrowcnt:
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
|
||||
tdLog.info("again start consume processer")
|
||||
self.initConsumerTable()
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -340,7 +340,7 @@ class TDTestCase:
|
|||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
self.tmqCase8(cfgPath, buildPath)
|
||||
self.tmqCase9(cfgPath, buildPath)
|
||||
self.tmqCase9(cfgPath, buildPath)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -49,18 +49,18 @@ class TDTestCase:
|
|||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
||||
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
|
||||
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
|
||||
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
|
@ -75,7 +75,7 @@ class TDTestCase:
|
|||
else:
|
||||
time.sleep(0.1)
|
||||
return
|
||||
|
||||
|
||||
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
|
||||
while 1:
|
||||
tdSql.query("select * from %s.notifyinfo"%cdbName)
|
||||
|
@ -95,12 +95,12 @@ class TDTestCase:
|
|||
if tdSql.getRows() == expectRows:
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
time.sleep(1)
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
|
@ -111,17 +111,17 @@ class TDTestCase:
|
|||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("use %s" %dbName)
|
||||
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
pre_create = "create table"
|
||||
|
@ -134,8 +134,8 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
event.set()
|
||||
|
||||
event.set()
|
||||
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
|
||||
return
|
||||
|
||||
|
@ -164,7 +164,7 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
print ("input parameters:")
|
||||
print (parameterDict)
|
||||
|
@ -183,7 +183,7 @@ class TDTestCase:
|
|||
parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],\
|
||||
parameterDict["batchNum"],\
|
||||
parameterDict["startTs"])
|
||||
parameterDict["startTs"])
|
||||
return
|
||||
|
||||
def tmqCase10(self, cfgPath, buildPath):
|
||||
|
@ -206,10 +206,10 @@ class TDTestCase:
|
|||
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
|
||||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -221,7 +221,7 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
@ -242,18 +242,18 @@ class TDTestCase:
|
|||
resultList = self.selectConsumeResult(expectRows)
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread.join()
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
|
||||
tdLog.info("again start consume processer")
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -283,10 +283,10 @@ class TDTestCase:
|
|||
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
|
||||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_db1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -298,7 +298,7 @@ class TDTestCase:
|
|||
auto.commit.interval.ms:1000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
|
||||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
@ -307,7 +307,7 @@ class TDTestCase:
|
|||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
# time.sleep(6)
|
||||
# time.sleep(6)
|
||||
tdLog.info("start to wait commit notify")
|
||||
self.getStartCommitNotifyFromTmqsim()
|
||||
|
||||
|
@ -320,18 +320,18 @@ class TDTestCase:
|
|||
# resultList = self.selectConsumeResult(expectRows)
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread.join()
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
|
||||
tdLog.info("again start consume processer")
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows >= expectrowcnt or totalConsumeRows <= 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
|
|
@ -47,7 +47,7 @@ class TDTestCase:
|
|||
|
||||
pollDelay = 20
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
showRow = 1
|
||||
|
||||
hostname = socket.gethostname()
|
||||
|
||||
|
@ -59,7 +59,7 @@ class TDTestCase:
|
|||
def tmqCase12(self):
|
||||
tdLog.printNoPrefix("======== test case 12: ")
|
||||
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||
|
||||
|
||||
tmqCom.initConsumerTable(self.cdbName)
|
||||
|
||||
tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"])
|
||||
|
@ -76,20 +76,20 @@ class TDTestCase:
|
|||
tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"])
|
||||
|
||||
tdLog.info("create topics from db")
|
||||
topicName1 = 'topic_%s'%(self.paraDict['dbName'])
|
||||
topicName1 = 'topic_%s'%(self.paraDict['dbName'])
|
||||
tdSql.execute("create topic %s as database %s" %(topicName1, self.paraDict['dbName']))
|
||||
|
||||
|
||||
topicList = topicName1
|
||||
keyList = '%s,%s,%s,%s'%(self.groupId,self.autoCommit,self.autoCommitInterval,self.autoOffset)
|
||||
self.expectrowcnt = self.paraDict["rowsPerTbl"] * self.paraDict["ctbNum"] * 2
|
||||
tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)
|
||||
|
||||
tdLog.info("After waiting for a period of time, drop one stable")
|
||||
time.sleep(3)
|
||||
tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
|
||||
time.sleep(3)
|
||||
tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
|
||||
|
||||
tdLog.info("wait result from consumer, then check it")
|
||||
expectRows = 1
|
||||
|
@ -98,7 +98,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if not (totalConsumeRows >= self.expectrowcnt/2 and totalConsumeRows <= self.expectrowcnt):
|
||||
tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
|
|
@ -56,7 +56,7 @@ class TDTestCase:
|
|||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
|
@ -65,12 +65,12 @@ class TDTestCase:
|
|||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
tdLog.info("drop consumeinfo table")
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
|
@ -85,11 +85,11 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
|
@ -97,14 +97,14 @@ class TDTestCase:
|
|||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
@ -134,7 +134,7 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
|
@ -149,7 +149,7 @@ class TDTestCase:
|
|||
startTs = int(round(t * 1000))
|
||||
|
||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||
rowsOfSql = 0
|
||||
rowsOfSql = 0
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d values "%(stbName,i)
|
||||
for j in range(rowsPerTbl):
|
||||
|
@ -168,8 +168,8 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
# create new connector for my thread
|
||||
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||
|
||||
|
@ -188,8 +188,8 @@ class TDTestCase:
|
|||
return
|
||||
|
||||
def tmqCase3(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 3: ")
|
||||
|
||||
tdLog.printNoPrefix("======== test case 3: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -213,7 +213,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -231,7 +231,7 @@ class TDTestCase:
|
|||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
|
||||
time.sleep(1.5)
|
||||
tdLog.info("drop som child table of stb1")
|
||||
dropTblNum = 4
|
||||
|
@ -246,9 +246,9 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
remaindrowcnt = parameterDict["rowsPerTbl"] * (parameterDict["ctbNum"] - dropTblNum)
|
||||
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, remaindrowcnt, expectrowcnt))
|
||||
if not (totalConsumeRows <= expectrowcnt and totalConsumeRows >= remaindrowcnt):
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -259,7 +259,7 @@ class TDTestCase:
|
|||
|
||||
def tmqCase4(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 4: ")
|
||||
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -275,7 +275,7 @@ class TDTestCase:
|
|||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
|
@ -288,7 +288,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -313,7 +313,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt/4:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -331,7 +331,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -342,7 +342,7 @@ class TDTestCase:
|
|||
|
||||
def tmqCase5(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 5: ")
|
||||
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -358,7 +358,7 @@ class TDTestCase:
|
|||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
|
@ -371,7 +371,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -396,7 +396,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt/4:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -414,14 +414,14 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != (expectrowcnt * (1 + 1/4)):
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 5 end ...... ")
|
||||
tdLog.printNoPrefix("======== test case 5 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
|
|
@ -56,7 +56,7 @@ class TDTestCase:
|
|||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
|
@ -65,12 +65,12 @@ class TDTestCase:
|
|||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
tdLog.info("drop consumeinfo table")
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
|
@ -85,11 +85,11 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
|
@ -97,14 +97,14 @@ class TDTestCase:
|
|||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
@ -134,7 +134,7 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
|
@ -149,7 +149,7 @@ class TDTestCase:
|
|||
startTs = int(round(t * 1000))
|
||||
|
||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||
rowsOfSql = 0
|
||||
rowsOfSql = 0
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d values "%(stbName,i)
|
||||
for j in range(rowsPerTbl):
|
||||
|
@ -168,8 +168,8 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
# create new connector for my thread
|
||||
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||
|
||||
|
@ -189,7 +189,7 @@ class TDTestCase:
|
|||
|
||||
def tmqCase6(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 6: ")
|
||||
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -205,7 +205,7 @@ class TDTestCase:
|
|||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
|
@ -218,7 +218,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -243,7 +243,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt/4:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -265,7 +265,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -276,7 +276,7 @@ class TDTestCase:
|
|||
|
||||
def tmqCase7(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 7: ")
|
||||
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -292,7 +292,7 @@ class TDTestCase:
|
|||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
|
@ -305,7 +305,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -330,7 +330,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -348,7 +348,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
|
|
@ -56,7 +56,7 @@ class TDTestCase:
|
|||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
|
@ -65,12 +65,12 @@ class TDTestCase:
|
|||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
tdLog.info("drop consumeinfo table")
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
|
@ -85,11 +85,11 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
|
@ -97,14 +97,14 @@ class TDTestCase:
|
|||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
@ -134,7 +134,7 @@ class TDTestCase:
|
|||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
|
@ -149,7 +149,7 @@ class TDTestCase:
|
|||
startTs = int(round(t * 1000))
|
||||
|
||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||
rowsOfSql = 0
|
||||
rowsOfSql = 0
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d values "%(stbName,i)
|
||||
for j in range(rowsPerTbl):
|
||||
|
@ -168,8 +168,8 @@ class TDTestCase:
|
|||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
# create new connector for my thread
|
||||
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||
|
||||
|
@ -188,8 +188,8 @@ class TDTestCase:
|
|||
return
|
||||
|
||||
def tmqCase8(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 8: ")
|
||||
|
||||
tdLog.printNoPrefix("======== test case 8: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -218,7 +218,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -243,7 +243,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -263,7 +263,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -283,7 +283,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt*2:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -293,8 +293,8 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix("======== test case 8 end ...... ")
|
||||
|
||||
def tmqCase9(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 9: ")
|
||||
|
||||
tdLog.printNoPrefix("======== test case 9: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
|
@ -323,7 +323,7 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
|
@ -348,7 +348,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -372,7 +372,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
@ -392,7 +392,7 @@ class TDTestCase:
|
|||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
|
||||
if totalConsumeRows != expectrowcnt*2:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
|
Loading…
Reference in New Issue