From 8ad95ad76f4e8d007b642ae05d4f965ff65b4d5c Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 27 Oct 2022 13:59:57 +0800 Subject: [PATCH 1/4] test:modify test cases of tmq for poll delay --- tests/system-test/7-tmq/tmqDnodeRestart.py | 4 ++-- tests/system-test/7-tmq/tmqDnodeRestart1.py | 6 +++--- tests/system-test/7-tmq/tmqError.py | 7 ++++--- tests/system-test/7-tmq/tmqShow.py | 4 ++-- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py index bcc6725848..925692dbb8 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart.py @@ -87,7 +87,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 10, + 'pollDelay': 30, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -173,7 +173,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 20, + 'pollDelay': 30, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqDnodeRestart1.py b/tests/system-test/7-tmq/tmqDnodeRestart1.py index bb3cee616a..7e21c43039 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart1.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart1.py @@ -42,7 +42,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 3, + 'pollDelay': 30, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -87,7 +87,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 10, + 'pollDelay': 30, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -161,7 +161,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 10, + 'pollDelay': 30, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} diff --git a/tests/system-test/7-tmq/tmqError.py b/tests/system-test/7-tmq/tmqError.py index 2d7b464025..45731f4bb1 100644 --- a/tests/system-test/7-tmq/tmqError.py +++ b/tests/system-test/7-tmq/tmqError.py @@ -237,18 +237,19 @@ class TDTestCase: showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - time.sleep(3) + #time.sleep(3) + tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("================= stop dnode, and remove data file, then start dnode ===========================") tdDnodes.stop(1) - # time.sleep(5) + time.sleep(5) dataPath = buildPath + "/../sim/dnode1/data/*" shellCmd = 'rm -rf ' + dataPath tdLog.info(shellCmd) os.system(shellCmd) #tdDnodes.start(1) tdDnodes.starttaosd(1) - time.sleep(2) + time.sleep(5) ######### redo to consume self.initConsumerTable() diff --git a/tests/system-test/7-tmq/tmqShow.py b/tests/system-test/7-tmq/tmqShow.py index 0691da6786..f348173d6d 100644 --- a/tests/system-test/7-tmq/tmqShow.py +++ b/tests/system-test/7-tmq/tmqShow.py @@ -41,7 +41,7 @@ class TDTestCase: 'rowsPerTbl': 4000, 'batchNum': 15, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 20, + 'pollDelay': 30, 'showMsg': 1, 'showRow': 1} @@ -124,7 +124,7 @@ class TDTestCase: tdLog.info("async insert data") pThread = tmqCom.asyncInsertData(paraDict) - tmqCom.getStartConsumeNotifyFromTmqsim(); + tmqCom.getStartConsumeNotifyFromTmqsim() #time.sleep(5) tdLog.info("check show consumers") tdSql.query("show consumers") From 7c34c705beaf7f8a29bbca9c8dda260ceef75147 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 27 Oct 2022 14:52:50 +0800 Subject: [PATCH 2/4] test: add test case for tmq --- tests/system-test/7-tmq/tmqDnodeRestart1.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/tests/system-test/7-tmq/tmqDnodeRestart1.py b/tests/system-test/7-tmq/tmqDnodeRestart1.py index 7e21c43039..8a826cc273 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart1.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart1.py @@ -40,7 +40,7 @@ class TDTestCase: 'ctbStartIdx': 0, 'ctbNum': 100, 'rowsPerTbl': 1000, - 'batchNum': 10, + 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 30, 'showMsg': 1, @@ -101,6 +101,7 @@ class TDTestCase: topicFromStb = 'topic_stb' queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as stable %s.%s" %(topicFromStb, paraDict['dbName'], paraDict['stbName']) + #sqlString = "create topic %s as %s" %(topicFromStb, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) @@ -111,7 +112,7 @@ class TDTestCase: ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:true,\ - auto.commit.interval.ms:1000,\ + auto.commit.interval.ms:200,\ auto.offset.reset:latest' tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) @@ -119,7 +120,15 @@ class TDTestCase: tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) # time.sleep(3) - tmqCom.getStartCommitNotifyFromTmqsim() + tmqCom.getStartCommitNotifyFromTmqsim('cdb',1) + + tdLog.info("create some new child table and insert data for latest mode") + paraDict["batchNum"] = 100 + paraDict["ctbPrefix"] = 'newCtb' + paraDict["ctbNum"] = 10 + paraDict["rowsPerTbl"] = 10 + tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + tdLog.info("================= restart dnode ===========================") tdDnodes.stoptaosd(1) tdDnodes.starttaosd(1) @@ -132,8 +141,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - tdSql.query(queryString) - totalRowsFromQury = tdSql.getRows() + totalRowsFromQury = paraDict["ctbNum"] * paraDict["rowsPerTbl"] tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury)) if (totalConsumeRows < totalRowsFromQury): @@ -185,7 +193,7 @@ class TDTestCase: ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:true,\ - auto.commit.interval.ms:1000,\ + auto.commit.interval.ms:200,\ auto.offset.reset:latest' tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) From 7af4b2c58437cac6bf4f98274767534ec7b3d862 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 27 Oct 2022 14:55:51 +0800 Subject: [PATCH 3/4] test: add test case for tmq --- tests/system-test/7-tmq/tmqDnodeRestart1.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/system-test/7-tmq/tmqDnodeRestart1.py b/tests/system-test/7-tmq/tmqDnodeRestart1.py index 8a826cc273..054afe4fef 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart1.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart1.py @@ -200,12 +200,18 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - # time.sleep(3) - tmqCom.getStartCommitNotifyFromTmqsim() + tmqCom.getStartCommitNotifyFromTmqsim('cdb',1) + + tdLog.info("create some new child table and insert data for latest mode") + paraDict["batchNum"] = 100 + paraDict["ctbPrefix"] = 'newCtb' + paraDict["ctbNum"] = 10 + paraDict["rowsPerTbl"] = 10 + tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + tdLog.info("================= restart dnode ===========================") tdDnodes.stoptaosd(1) tdDnodes.starttaosd(1) - # time.sleep(3) tdLog.info(" restart taosd end and wait to check consume result") expectRows = 1 @@ -214,8 +220,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - tdSql.query(queryString) - totalRowsFromQury = tdSql.getRows() + totalRowsFromQury = paraDict["ctbNum"] * paraDict["rowsPerTbl"] tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury)) if (totalConsumeRows < totalRowsFromQury): From 7797027da2fc5dac4ff495bcd1cc4de70622428e Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 27 Oct 2022 17:34:51 +0800 Subject: [PATCH 4/4] test:modify test case --- tests/system-test/7-tmq/tmqError.py | 76 ++++++++++++++++++----------- 1 file changed, 47 insertions(+), 29 deletions(-) diff --git a/tests/system-test/7-tmq/tmqError.py b/tests/system-test/7-tmq/tmqError.py index 45731f4bb1..27f1e39ddd 100644 --- a/tests/system-test/7-tmq/tmqError.py +++ b/tests/system-test/7-tmq/tmqError.py @@ -11,6 +11,8 @@ from util.log import * from util.sql import * from util.cases import * from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * class actionType(Enum): CREATE_DATABASE = 0 @@ -193,32 +195,41 @@ class TDTestCase: and restart a consumption process to complete a consumption ''' tdLog.printNoPrefix("======== test case 1: ") + tmqCom.initConsumerTable() - self.initConsumerTable() + #self.initConsumerTable() - # create and start thread - parameterDict = {'cfg': '', \ - 'actionType': 0, \ - 'dbName': 'db3', \ - 'dropFlag': 1, \ - 'vgroups': 4, \ - 'replica': 1, \ - 'stbName': 'stb1', \ - 'ctbNum': 10, \ - 'rowsPerTbl': 20000, \ - 'batchNum': 100, \ - 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 - parameterDict['cfg'] = cfgPath + # create and start thread + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 20000, + 'batchNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 30, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + paraDict['cfg'] = cfgPath - self.create_database(tdSql, parameterDict["dbName"]) - self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) - self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) - self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"]) + self.create_database(tdSql, paraDict["dbName"]) + self.create_stable(tdSql, paraDict["dbName"], paraDict["stbName"]) + self.create_ctables(tdSql, paraDict["dbName"], paraDict["stbName"], paraDict["ctbNum"]) + self.insert_data(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName'])) consumerId = 0 # expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] expectrowcnt = 90000000000 @@ -229,13 +240,16 @@ class TDTestCase: enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + #self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") pollDelay = 9000000 # Forever loop showMsg = 1 showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + #self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) #time.sleep(3) tmqCom.getStartConsumeNotifyFromTmqsim() @@ -254,17 +268,17 @@ class TDTestCase: ######### redo to consume self.initConsumerTable() - self.create_database(tdSql, parameterDict["dbName"]) - self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) - self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) - self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"]) + self.create_database(tdSql, paraDict["dbName"]) + self.create_stable(tdSql, paraDict["dbName"], paraDict["stbName"]) + self.create_ctables(tdSql, paraDict["dbName"], paraDict["stbName"], paraDict["ctbNum"]) + self.insert_data(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName'])) consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] topicList = topicFromStb1 ifcheckdata = 0 ifManualCommit = 0 @@ -272,13 +286,17 @@ class TDTestCase: enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' - self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + #self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") pollDelay = 20 showMsg = 1 showRow = 1 - self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + paraDict['pollDelay'] = 20 + #self.startTmqSimProcess(buildPath,cfgPath,pollDelay,paraDict["dbName"],showMsg, showRow) + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) expectRows = 1 resultList = self.selectConsumeResult(expectRows)