Merge pull request #12402 from taosdata/test-v3.0/lihui

[test: modify test process for tmq]
This commit is contained in:
Hui Li 2022-05-13 08:34:42 +08:00 committed by GitHub
commit d4de00f54f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 21 deletions

View File

@ -114,7 +114,7 @@ class TDTestCase:
def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test scenario 1: ")
tdLog.printNoPrefix("======== test case 1: ")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
@ -122,8 +122,8 @@ class TDTestCase:
'vgroups': 1, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 100, \
'batchNum': 10, \
'rowsPerTbl': 10, \
'batchNum': 200, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
@ -163,8 +163,7 @@ class TDTestCase:
tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
@ -172,7 +171,7 @@ class TDTestCase:
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
@ -209,18 +208,20 @@ class TDTestCase:
else:
time.sleep(5)
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3)))
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
# mulit rows and mulit tables in one sql, this num of msg is not sure
#tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ")
tdLog.printNoPrefix("======== test case 2: add child table with consuming ")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db2', \
@ -275,9 +276,9 @@ class TDTestCase:
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
rowsOfNewCtb = 1000
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
@ -285,7 +286,7 @@ class TDTestCase:
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
@ -312,7 +313,6 @@ class TDTestCase:
# create new child table and insert data
newCtbName = 'newctb'
rowsOfNewCtb = 1000
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
startTs = parameterDict["startTs"]
for j in range(rowsOfNewCtb):
@ -332,14 +332,13 @@ class TDTestCase:
else:
time.sleep(5)
expectmsgcnt += rowsOfNewCtb
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdLog.printNoPrefix("======== test scenario 2 end ...... ")
tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()

View File

@ -333,8 +333,8 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++;
if (totalMsgs >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalMsgs >= pInfo->expectMsgCnt, so break\n");
if (totalRows >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
break;
}
} else {