From 3309dd71db1f3152e64eb83095b1a6a737484147 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 12 May 2022 15:36:30 +0800 Subject: [PATCH] test: add test case for tmq --- tests/system-test/7-tmq/basic5.py | 143 +++++++++++++++++++----------- 1 file changed, 91 insertions(+), 52 deletions(-) diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index 7ed9f7ebe7..267b6ff5d8 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -13,14 +13,12 @@ from util.dnodes import * class TDTestCase: hostname = socket.gethostname() - rpcDebugFlagVal = '143' - clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} - clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal - - updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} - updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal - - print ("===================: ", updatecfgDict) + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") @@ -43,27 +41,35 @@ class TDTestCase: break return buildPath - def create_tables(self,dbName,vgroups,stbName,ctbNum,rowsPerTbl): - tdSql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) - tdSql.execute("use %s" %dbName) - tdSql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("use %s" %dbName) + tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" sql = pre_create #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) for i in range(ctbNum): sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) if (i > 0) and (i%100 == 0): - tdSql.execute(sql) + tsql.execute(sql) sql = pre_create if sql != pre_create: - tdSql.execute(sql) + tsql.execute(sql) tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return - def insert_data(self,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): tdLog.debug("start to insert data ............") - tdSql.execute("use %s" %dbName) + tsql.execute("use %s" %dbName) pre_insert = "insert into " sql = pre_insert @@ -73,7 +79,7 @@ class TDTestCase: for j in range(rowsPerTbl): sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): - tdSql.execute(sql) + tsql.execute(sql) if j < rowsPerTbl - 1: sql = "insert into %s_%d values " %(stbName,i) else: @@ -81,25 +87,29 @@ class TDTestCase: #end sql if sql != pre_insert: #print("insert sql:%s"%sql) - tdSql.execute(sql) + tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) - self.create_tables(parameterDict["dbName"],\ + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + self.create_tables(tsql,\ + parameterDict["dbName"],\ parameterDict["vgroups"],\ parameterDict["stbName"],\ parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"]) - self.insert_data(parameterDict["dbName"],\ - parameterDict["stbName"],\ - parameterDict["ctbNum"],\ - parameterDict["rowsPerTbl"],\ - parameterDict["batchNum"],\ - parameterDict["startTs"]) + self.insert_data(tsql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"],\ + parameterDict["startTs"]) return def run(self): @@ -116,24 +126,29 @@ class TDTestCase: tdLog.printNoPrefix("======== test scenario 1: ") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread - parameterDict = {'dbName': 'db', \ + parameterDict = {'cfg': '', \ + 'dbName': 'db', \ 'vgroups': 1, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ + 'rowsPerTbl': 10000, \ 'batchNum': 10, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - time.sleep(1) + time.sleep(2) # wait stb ready while 1: - tdSql.query("show %s.stables"%parameterDict['dbName']) - if tdSql.getRows() == 1: - #if (self.queryRows == 1): - time.sleep(1) + #tdSql.query("show %s.stables"%parameterDict['dbName']) + tdSql.query("show db.stables") + #print (self.queryResult) + #print (tdSql.getRows()) + if tdSql.getRows() == 1: break + else: + time.sleep(1) tdLog.info("create topics from super table") topicFromStb = 'topic_stb_column' @@ -141,57 +156,81 @@ class TDTestCase: tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) - + + time.sleep(1) tdSql.query("show topics") - tdSql.checkRows(2) + print ("======================================") + #print (self.queryResult) + #tdSql.checkRows(2) topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) - if topic1 != topicFromStb or topic1 != topicFromCtb: - tdLog.exit("topic error") - if topic2 != topicFromStb or topic2 != topicFromCtb: - tdLog.exit("topic error") + print (topic1) + print (topic2) + + print (topicFromStb) + print (topicFromCtb) + #tdLog.info("show topics: %s, %s"%topic1, topic2) + #if topic1 != topicFromStb or topic1 != topicFromCtb: + # tdLog.exit("topic error1") + #if topic2 != topicFromStb or topic2 != topicFromCtb: + # tdLog.exit("topic error2") tdLog.info("create consume info table and consume result table") - cdbName = 'cdb' - tdSql.query("create database %s"%cdbName) + cdbName = parameterDict["dbName"] + #tdSql.query("create database %s"%cdbName) + #tdSql.query("use %s"%cdbName) tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)") consumerId = 0 - expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] + 1) * parameterDict["ctbNum"] + expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"] + expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"] topicList = topicFromStb ifcheckdata = 0 - keyList = 'group.id:cgrp1, \ - enable.auto.commit:false, \ - auto.commit.interval.ms:6000, \ - auto.offset.reset:none' + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' sql = "insert into consumeinfo values " - sql += "(now, %d, '%s', '%s', %l64d, %d)"%(consumerId, topicList, keyList, expectmsgcnt, ifcheckdata) + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata) tdSql.query(sql) + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 showRow = 1 shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s, -g %d, -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(taosCmd) + os.system(shellCmd) # wait for data ready prepareEnvThread.join() - tdLog.info("check consume result") + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from consumeresult") #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) if tdSql.getRows() == 1: - #if (self.queryRows == 1): - time.sleep(1) break - + else: + time.sleep(5) + + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 3, expectrowcnt)