From eba47b9ef4d34da14b64360d5449c20cf317cc42 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Fri, 13 May 2022 18:54:29 +0800 Subject: [PATCH] [test: add valgrind run tmq] --- tests/system-test/99-TDcase/TD-15557.py | 206 +++++++++++++++++++++++- 1 file changed, 198 insertions(+), 8 deletions(-) diff --git a/tests/system-test/99-TDcase/TD-15557.py b/tests/system-test/99-TDcase/TD-15557.py index 4798bb7c8d..e005985fe0 100644 --- a/tests/system-test/99-TDcase/TD-15557.py +++ b/tests/system-test/99-TDcase/TD-15557.py @@ -49,6 +49,19 @@ class TDTestCase: print(cur) return cur + def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg,showRow,cdbName,valgrind): + shellCmd = 'nohup ' + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) @@ -113,9 +126,8 @@ class TDTestCase: parameterDict["startTs"]) return - def tmqCase1(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 1: Produce while consume to subscribe one db") + tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread parameterDict = {'cfg': '', \ @@ -153,7 +165,7 @@ class TDTestCase: auto.offset.reset:earliest' sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) - tdSql.query(sql) + tdSql.query(sql) event.wait() @@ -162,11 +174,8 @@ class TDTestCase: showMsg = 1 showRow = 1 - shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" - tdLog.info(shellCmd) - os.system(shellCmd) + valgrind = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) # wait for data ready prepareEnvThread.join() @@ -190,6 +199,187 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") + def tmqCase2(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db2', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 100000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + consumerId = 1 + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + valgrind = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 2: + break + else: + time.sleep(5) + + consumerId0 = tdSql.getData(0 , 1) + consumerId1 = tdSql.getData(1 , 1) + actConsumeRows0 = tdSql.getData(0 , 3) + actConsumeRows1 = tdSql.getData(1 , 3) + + tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0)) + tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1)) + + totalConsumeRows = actConsumeRows0 + actConsumeRows1 + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def tmqCase3(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 3: Produce while one consumers to subscribe one db, include 2 stb") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 100000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + parameterDict2 = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 4, \ + 'stbName': 'stb2', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 100000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + # consumerId = 1 + # sql = "insert into %s.consumeinfo values "%cdbName + # sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + # tdSql.query(sql) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + valgrind = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) + + # wait for data ready + prepareEnvThread.join() + prepareEnvThread2.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + consumerId0 = tdSql.getData(0 , 1) + #consumerId1 = tdSql.getData(1 , 1) + actConsumeRows0 = tdSql.getData(0 , 3) + #actConsumeRows1 = tdSql.getData(1 , 3) + + tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0)) + #tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1)) + + #totalConsumeRows = actConsumeRows0 + actConsumeRows1 + if actConsumeRows0 != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 3 end ...... ") + def run(self): tdSql.prepare()