diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index 267b6ff5d8..8a1932f05c 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -110,19 +110,10 @@ class TDTestCase: parameterDict["rowsPerTbl"],\ parameterDict["batchNum"],\ parameterDict["startTs"]) - return - - def run(self): - tdSql.prepare() + return - buildPath = self.getBuildPath() - if (buildPath == ""): - tdLog.exit("taosd not found!") - else: - tdLog.info("taosd found in %s" % buildPath) - cfgPath = buildPath + "/../sim/psim/cfg" - tdLog.info("cfgPath: %s" % cfgPath) + def tmqCase1(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test scenario 1: ") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread @@ -131,7 +122,7 @@ class TDTestCase: 'vgroups': 1, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ + 'rowsPerTbl': 100, \ 'batchNum': 10, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath @@ -141,10 +132,7 @@ class TDTestCase: # wait stb ready while 1: - #tdSql.query("show %s.stables"%parameterDict['dbName']) - tdSql.query("show db.stables") - #print (self.queryResult) - #print (tdSql.getRows()) + tdSql.query("show %s.stables"%parameterDict['dbName']) if tdSql.getRows() == 1: break else: @@ -159,26 +147,18 @@ class TDTestCase: time.sleep(1) tdSql.query("show topics") - print ("======================================") - #print (self.queryResult) #tdSql.checkRows(2) topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) - print (topic1) - print (topic2) - print (topicFromStb) - print (topicFromCtb) - #tdLog.info("show topics: %s, %s"%topic1, topic2) - #if topic1 != topicFromStb or topic1 != topicFromCtb: - # tdLog.exit("topic error1") - #if topic2 != topicFromStb or topic2 != topicFromCtb: - # tdLog.exit("topic error2") + tdLog.info("show topics: %s, %s"%(topic1, topic2)) + if topic1 != topicFromStb and topic1 != topicFromCtb: + tdLog.exit("topic error1") + if topic2 != topicFromStb and topic2 != topicFromCtb: + tdLog.exit("topic error2") tdLog.info("create consume info table and consume result table") cdbName = parameterDict["dbName"] - #tdSql.query("create database %s"%cdbName) - #tdSql.query("use %s"%cdbName) tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)") @@ -234,14 +214,147 @@ class TDTestCase: tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicFromStb) + tdSql.query("drop topic %s"%topicFromCtb) - tdLog.printNoPrefix("======== test scenario 2: ") + + def tmqCase2(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db2', \ + 'vgroups': 1, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + # wait db ready + while 1: + tdSql.query("show databases") + if tdSql.getRows() == 4: + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) + break + else: + time.sleep(1) + + tdSql.query("use %s"%parameterDict['dbName']) + # wait stb ready + while 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: + break + else: + time.sleep(1) - tdLog.printNoPrefix("======== test scenario 3: ") + tdLog.info("create topics from super table") + topicFromStb = 'topic_stb_column2' + topicFromCtb = 'topic_ctb_column2' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) + + time.sleep(1) + tdSql.query("show topics") + topic1 = tdSql.getData(0 , 0) + topic2 = tdSql.getData(1 , 0) + tdLog.info("show topics: %s, %s"%(topic1, topic2)) + if topic1 != topicFromStb and topic1 != topicFromCtb: + tdLog.exit("topic error1") + if topic2 != topicFromStb and topic2 != topicFromCtb: + tdLog.exit("topic error2") + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - #os.system('pkill tmq_sim') + consumerId = 0 + expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"] + expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"] + topicList = topicFromStb + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into consumeinfo values " + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata) + tdSql.query(sql) + + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + # create new child table and insert data + newCtbName = 'newctb' + rowsOfNewCtb = 1000 + tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"])) + startTs = parameterDict["startTs"] + for j in range(rowsOfNewCtb): + sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j) + tdSql.execute(sql) + tdLog.debug("insert data into new child table ............ [OK]") + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from consumeresult") + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + expectmsgcnt += rowsOfNewCtb + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb + + tdSql.checkData(0 , 1, consumerId) + tdSql.checkData(0 , 2, expectmsgcnt) + tdSql.checkData(0 , 3, expectrowcnt) + + tdLog.printNoPrefix("======== test scenario 2 end ...... ") + + def run(self): + tdSql.prepare() + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + self.tmqCase1(cfgPath, buildPath) + #self.tmqCase2(cfgPath, buildPath) + #self.tmqCase3(cfgPath, buildPath) def stop(self): tdSql.close()