diff --git a/tests/system-test/7-tmq/dataFromTsdbNWal.py b/tests/system-test/7-tmq/dataFromTsdbNWal.py index a55fbbfd18..227ce9d5a5 100644 --- a/tests/system-test/7-tmq/dataFromTsdbNWal.py +++ b/tests/system-test/7-tmq/dataFromTsdbNWal.py @@ -38,9 +38,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 10, + 'ctbNum': 100, 'rowsPerTbl': 10000, - 'batchNum': 1000, + 'batchNum': 3000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 10, 'showMsg': 1, @@ -64,9 +64,7 @@ class TDTestCase: ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("restart taosd to ensure that the data falls into the disk") - # tdDnodes.stop(1) - # tdDnodes.start(1) + tdLog.info("flush db to let data falls into the disk") tdSql.query("flush database %s"%(paraDict['dbName'])) return @@ -85,7 +83,7 @@ class TDTestCase: 'ctbStartIdx': 0, 'ctbNum': 10, 'rowsPerTbl': 10000, - 'batchNum': 10, + 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 3, 'showMsg': 1, @@ -95,8 +93,6 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - paraDict['batchNum'] = 100 - paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl topicNameList = ['topic1'] expectRowsList = [] @@ -125,6 +121,8 @@ class TDTestCase: tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) # after start consume, continue insert some data + paraDict['batchNum'] = 100 + paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) @@ -137,11 +135,12 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) + + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) if expectRowsList[0] != resultList[0]: - tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) - tmqCom.checkFileContent(consumerId, queryString) + tmqCom.checkFileContent(consumerId, queryString) time.sleep(10) for i in range(len(topicNameList)): diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 98c9e37132..060e17c11f 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -444,6 +444,7 @@ class TMQCom: # skip first line for it is schema queryFile.readline() + lines = 0 while True: dst = queryFile.readline()