diff --git a/tests/system-test/7-tmq/tmqError.py b/tests/system-test/7-tmq/tmqError.py index 45731f4bb1..27f1e39ddd 100644 --- a/tests/system-test/7-tmq/tmqError.py +++ b/tests/system-test/7-tmq/tmqError.py @@ -11,6 +11,8 @@ from util.log import * from util.sql import * from util.cases import * from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * class actionType(Enum): CREATE_DATABASE = 0 @@ -193,32 +195,41 @@ class TDTestCase: and restart a consumption process to complete a consumption ''' tdLog.printNoPrefix("======== test case 1: ") + tmqCom.initConsumerTable() - self.initConsumerTable() + #self.initConsumerTable() - # create and start thread - parameterDict = {'cfg': '', \ - 'actionType': 0, \ - 'dbName': 'db3', \ - 'dropFlag': 1, \ - 'vgroups': 4, \ - 'replica': 1, \ - 'stbName': 'stb1', \ - 'ctbNum': 10, \ - 'rowsPerTbl': 20000, \ - 'batchNum': 100, \ - 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 - parameterDict['cfg'] = cfgPath + # create and start thread + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 20000, + 'batchNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 30, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + paraDict['cfg'] = cfgPath - self.create_database(tdSql, parameterDict["dbName"]) - self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) - self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) - self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"]) + self.create_database(tdSql, paraDict["dbName"]) + self.create_stable(tdSql, paraDict["dbName"], paraDict["stbName"]) + self.create_ctables(tdSql, paraDict["dbName"], paraDict["stbName"], paraDict["ctbNum"]) + self.insert_data(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName'])) consumerId = 0 # expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = 90000000000 @@ -229,13 +240,16 @@ class TDTestCase: enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + #self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") pollDelay = 9000000 # Forever loop showMsg = 1 showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + #self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) #time.sleep(3) tmqCom.getStartConsumeNotifyFromTmqsim() @@ -254,17 +268,17 @@ class TDTestCase: ######### redo to consume self.initConsumerTable() - self.create_database(tdSql, parameterDict["dbName"]) - self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) - self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) - self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"]) + self.create_database(tdSql, paraDict["dbName"]) + self.create_stable(tdSql, paraDict["dbName"], paraDict["stbName"]) + self.create_ctables(tdSql, paraDict["dbName"], paraDict["stbName"], paraDict["ctbNum"]) + self.insert_data(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName'])) consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] topicList = topicFromStb1 ifcheckdata = 0 ifManualCommit = 0 @@ -272,13 +286,17 @@ class TDTestCase: enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + #self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") pollDelay = 20 showMsg = 1 showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + paraDict['pollDelay'] = 20 + #self.startTmqSimProcess(buildPath,cfgPath,pollDelay,paraDict["dbName"],showMsg, showRow) + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) expectRows = 1 resultList = self.selectConsumeResult(expectRows)