From ba90514d9020a186668dbaf3746dd7ca75b26e64 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Wed, 13 Jul 2022 14:17:07 +0800 Subject: [PATCH] test: add test casse for tmq --- tests/system-test/7-tmq/tmqCommon.py | 4 +- tests/system-test/7-tmq/tmqUpdate-1ctb.py | 14 ++-- tests/system-test/7-tmq/tmqUpdate-multiCtb.py | 77 +++++++++++-------- tests/system-test/fulltest.sh | 4 +- 4 files changed, 59 insertions(+), 40 deletions(-) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 2030563a9a..98c9e37132 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -371,7 +371,7 @@ class TMQCom: elif (i % 3 == 0): tagBinaryValue = 'changsha' - sql += " %s.%s_%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) + sql += " %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) for j in range(rowsPerTbl): sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched) rowsBatched += 1 @@ -379,7 +379,7 @@ class TMQCom: tsql.execute(sql) rowsBatched = 0 if j < rowsPerTbl - 1: - sql = "insert into %s.%s_%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) + sql = "insert into %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) else: sql = "insert into " #end sql diff --git a/tests/system-test/7-tmq/tmqUpdate-1ctb.py b/tests/system-test/7-tmq/tmqUpdate-1ctb.py index 9e2d06d1bd..3cb364f91d 100644 --- a/tests/system-test/7-tmq/tmqUpdate-1ctb.py +++ b/tests/system-test/7-tmq/tmqUpdate-1ctb.py @@ -17,7 +17,7 @@ from tmqCommon import * class TDTestCase: def __init__(self): self.snapshot = 0 - self.vgroups = 2 + self.vgroups = 4 self.ctbNum = 1 self.rowsPerTbl = 100000 @@ -235,18 +235,18 @@ class TDTestCase: def run(self): tdSql.prepare() - self.prepareTestEnv() - tdLog.printNoPrefix("=============================================") - tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") - self.tmqCase1() - self.tmqCase2() + # self.prepareTestEnv() + # tdLog.printNoPrefix("=============================================") + # tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") + # self.tmqCase1() + # self.tmqCase2() self.prepareTestEnv() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.tmqCase1() - self.tmqCase2() + # self.tmqCase2() def stop(self): diff --git a/tests/system-test/7-tmq/tmqUpdate-multiCtb.py b/tests/system-test/7-tmq/tmqUpdate-multiCtb.py index 9e2d06d1bd..491feb68bd 100644 --- a/tests/system-test/7-tmq/tmqUpdate-multiCtb.py +++ b/tests/system-test/7-tmq/tmqUpdate-multiCtb.py @@ -17,9 +17,10 @@ from tmqCommon import * class TDTestCase: def __init__(self): self.snapshot = 0 - self.vgroups = 2 - self.ctbNum = 1 - self.rowsPerTbl = 100000 + self.vgroups = 4 + self.ctbNum = 100 + self.rowsPerTbl = 1000 + self.autoCtbPrefix = 'aCtb' def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") @@ -38,9 +39,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1, - 'rowsPerTbl': 100000, - 'batchNum': 1200, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 10000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 3, 'showMsg': 1, @@ -62,9 +63,9 @@ class TDTestCase: tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", - # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # tdLog.info("restart taosd to ensure that the data falls into the disk") # tdSql.query("flush database %s"%(paraDict['dbName'])) @@ -84,9 +85,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1, - 'rowsPerTbl': 100000, - 'batchNum': 3000, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 10000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 5, 'showMsg': 1, @@ -98,10 +99,11 @@ class TDTestCase: paraDict['rowsPerTbl'] = self.rowsPerTbl # update to half tables + paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) - # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", - # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) @@ -113,10 +115,14 @@ class TDTestCase: tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - # paraDict['ctbNum'] = self.ctbNum + paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 0 - expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 3/2) + if self.snapshot == 0: + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2)) + elif self.snapshot == 1: + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -129,7 +135,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("insert process end, and start to check consume result") + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) totalConsumeRows = 0 @@ -143,7 +149,7 @@ class TDTestCase: if totalConsumeRows != expectrowcnt: tdLog.exit("tmq consume rows error!") - tmqCom.checkFileContent(consumerId, queryString) + # tmqCom.checkFileContent(consumerId, queryString) tdSql.query("drop topic %s"%topicFromStb1) tdLog.printNoPrefix("======== test case 1 end ...... ") @@ -161,9 +167,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1, - 'rowsPerTbl': 10000, - 'batchNum': 5000, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 10000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 5, 'showMsg': 1, @@ -179,14 +185,20 @@ class TDTestCase: tdSql.query("flush database %s"%(paraDict['dbName'])) # update to half tables - paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) + paraDict['ctbNum'] = int(self.ctbNum/2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) - tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], + paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix, ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) + + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby", + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) + + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2)) tmqCom.initConsumerTable() tdLog.info("create topics from stb1") @@ -197,9 +209,14 @@ class TDTestCase: tdSql.execute(sqlString) # paraDict['ctbNum'] = self.ctbNum + paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl consumerId = 1 - expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) + if self.snapshot == 0: + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2)) + elif self.snapshot == 1: + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2)) + topicList = topicFromStb1 ifcheckdata = 1 ifManualCommit = 1 @@ -212,7 +229,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("insert process end, and start to check consume result") + tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) totalConsumeRows = 0 diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 269f83a139..5672571bb5 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -178,6 +178,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py #python3 ./test.py -f 7-tmq/tmqDnodeRestart.py +#python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py +python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb.py #------------querPolicy 2----------- @@ -353,4 +355,4 @@ python3 ./test.py -f 2-query/twa.py -Q 3 python3 ./test.py -f 2-query/irate.py -Q 3 python3 ./test.py -f 2-query/function_null.py -Q 3 python3 ./test.py -f 2-query/count_partition.py -Q 3 -python3 ./test.py -f 2-query/max_partition.py -Q 3 \ No newline at end of file +python3 ./test.py -f 2-query/max_partition.py -Q 3