From ecd39d33f5480f86c8276e4ec53e04b37da0e536 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Sat, 18 Mar 2023 18:07:55 +0800 Subject: [PATCH] test: add tmq case --- tests/system-test/7-tmq/tmqMultiConsumer.py | 29 +++++++++++---------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/tests/system-test/7-tmq/tmqMultiConsumer.py b/tests/system-test/7-tmq/tmqMultiConsumer.py index 10fb4d819e..cc217e0c4c 100644 --- a/tests/system-test/7-tmq/tmqMultiConsumer.py +++ b/tests/system-test/7-tmq/tmqMultiConsumer.py @@ -20,7 +20,7 @@ from tmqCommon import * class TDTestCase: def __init__(self): self.vgroups = 32 - self.ctbNum = 15000 + self.ctbNum = 100 self.rowsPerTbl = 1000 self.snapshot = 1 self.replicaVar = 3 @@ -91,7 +91,7 @@ class TDTestCase: 'rowsPerTbl': 10000, 'batchNum': 1000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 10, + 'pollDelay': 20, 'showMsg': 1, 'showRow': 1, 'snapshot': 1} @@ -101,8 +101,7 @@ class TDTestCase: paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['snapshot'] = self.snapshot - topicNameList = ['topic1'] - expectRowsList = [] + topicNameList = ['topic1', 'topic2'] tmqCom.initConsumerTable() tdLog.info("create topics from stb with filter") @@ -112,14 +111,18 @@ class TDTestCase: sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - tdSql.query(queryString) - expectRowsList.append(tdSql.getRows()) + + sqlString = "create topic %s as %s" %(topicNameList[1], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + # tdSql.query(queryString) + # expectRowsList.append(tdSql.getRows()) # init consume info, and start tmq_sim, then check consume result tdLog.info("insert consume info to consume processor") consumerId = 0 - expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] - topicList = topicNameList[0] + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 4 + topicList = topicNameList[0] + ',' + topicNameList[0] + ',' + topicNameList[1] ifcheckdata = 0 ifManualCommit = 1 keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' @@ -135,18 +138,16 @@ class TDTestCase: # continue to insert new rows paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl) pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) - pInsertThread.join() - - expectRowsList[0] = expectRowsList[0] * 2 + pInsertThread.join() expectRows = 2 resultList = tmqCom.selectConsumeResult(expectRows) actConsumeTotalRows = resultList[0] + resultList[1] - tdLog.info("act consume rows: %d, expect consume rows: %d"%(actConsumeTotalRows, expectRowsList[0])) + tdLog.info("act consume rows: %d, expect consume rows: %d"%(actConsumeTotalRows, expectrowcnt)) - if (expectRowsList[0] <= actConsumeTotalRows) or ((resultList[0] == 0) and (resultList[1] >= expectRowsList[0])) or ((resultList[1] == 0) and (resultList[0] >= expectRowsList[0])): - tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) + if not ((expectrowcnt <= actConsumeTotalRows) or ((resultList[0] == 0) and (resultList[1] >= expectrowcnt)) or ((resultList[1] == 0) and (resultList[0] >= expectrowcnt))): + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt, actConsumeTotalRows)) tdLog.exit("%d tmq consume rows error!"%consumerId) # tmqCom.checkFileContent(consumerId, queryString)