From 572267470b3c59f5f8bcc128d9ee7d877cd41d3c Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Fri, 22 Jul 2022 16:01:28 +0800 Subject: [PATCH 1/3] test: add case into ci --- tests/system-test/fulltest.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index ec70d9ddbf..76374b4cb5 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -204,7 +204,7 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py -#python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py +python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py #python3 ./test.py -f 7-tmq/tmqDnodeRestart.py python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py From cff70e6253832ee773ad2e27b84ed70a91b4e988 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Fri, 22 Jul 2022 16:04:05 +0800 Subject: [PATCH 2/3] test: add case into ci --- tests/system-test/7-tmq/tmqAutoCreateTbl.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/system-test/7-tmq/tmqAutoCreateTbl.py b/tests/system-test/7-tmq/tmqAutoCreateTbl.py index ea100ae0d3..277fdf7afb 100644 --- a/tests/system-test/7-tmq/tmqAutoCreateTbl.py +++ b/tests/system-test/7-tmq/tmqAutoCreateTbl.py @@ -18,7 +18,7 @@ class TDTestCase: def __init__(self): self.snapshot = 0 self.vgroups = 4 - self.ctbNum = 1000 + self.ctbNum = 500 self.rowsPerTbl = 1000 def init(self, conn, logSql): @@ -38,9 +38,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1000, + 'ctbNum': 500, 'rowsPerTbl': 1000, - 'batchNum': 400, + 'batchNum': 500, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 3, 'showMsg': 1, @@ -83,9 +83,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1000, + 'ctbNum': 500, 'rowsPerTbl': 1000, - 'batchNum': 1000, + 'batchNum': 500, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 5, 'showMsg': 1, @@ -156,7 +156,7 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1000, + 'ctbNum': 500, 'rowsPerTbl': 1000, 'batchNum': 1000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 From e4b73870611cc589f29bc1b1b77835bffe8c2b45 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Fri, 22 Jul 2022 21:04:32 +0800 Subject: [PATCH 3/3] test: add test cases into ci --- tests/system-test/7-tmq/tmqCommon.py | 28 ++- tests/system-test/7-tmq/tmqDropNtb.py | 4 +- tests/system-test/7-tmq/tmqDropStbCtb.py | 14 +- .../system-test/7-tmq/tmqUpdateWithConsume.py | 190 ++++++++++++++++++ tests/system-test/fulltest.sh | 3 + 5 files changed, 225 insertions(+), 14 deletions(-) create mode 100644 tests/system-test/7-tmq/tmqUpdateWithConsume.py diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 81c2becbde..a2d08b07d0 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -536,16 +536,30 @@ class TMQCom: insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});' tsql.execute(insert_sql) - def waitSubscriptionExit(self, tsql, max_wait_count=20): + def waitSubscriptionExit(self, tsql, topicName): wait_cnt = 0 - while (wait_cnt < max_wait_count): + while True: + exit_flag = 1 tsql.query("show subscriptions") - if tsql.getRows() == 0: - break - else: - time.sleep(2) - wait_cnt += 1 + rows = tsql.getRows() + for idx in range (rows): + if tsql.getData(idx, 0) != topicName: + continue + if tsql.getData(idx, 3) == None: + continue + else: + time.sleep(0.5) + wait_cnt += 1 + exit_flag = 0 + break + + if exit_flag == 1: + break + + tsql.query("show subscriptions") + tdLog.info("show subscriptions:") + tdLog.info(tsql.queryResult) tdLog.info("wait subscriptions exit for %d s"%wait_cnt) def close(self): diff --git a/tests/system-test/7-tmq/tmqDropNtb.py b/tests/system-test/7-tmq/tmqDropNtb.py index 5d58c38690..e1f5794ce2 100644 --- a/tests/system-test/7-tmq/tmqDropNtb.py +++ b/tests/system-test/7-tmq/tmqDropNtb.py @@ -103,7 +103,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - tmqCom.waitSubscriptionExit(tdSql) + tmqCom.waitSubscriptionExit(tdSql, topicFromDb) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) @@ -196,7 +196,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - tmqCom.waitSubscriptionExit(tdSql) + tmqCom.waitSubscriptionExit(tdSql, topicFromDb) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) diff --git a/tests/system-test/7-tmq/tmqDropStbCtb.py b/tests/system-test/7-tmq/tmqDropStbCtb.py index e6783a2815..f86d1295f4 100644 --- a/tests/system-test/7-tmq/tmqDropStbCtb.py +++ b/tests/system-test/7-tmq/tmqDropStbCtb.py @@ -51,7 +51,7 @@ class TDTestCase: paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - tmqCom.initConsumerTable() + # tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) @@ -98,6 +98,8 @@ class TDTestCase: paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() + # again create one new stb1 paraDict["stbName"] = 'stb1' paraDict['ctbPrefix'] = 'ctb1n_' @@ -157,7 +159,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - tmqCom.waitSubscriptionExit(tdSql) + tmqCom.waitSubscriptionExit(tdSql, topicFromDb) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) @@ -191,6 +193,8 @@ class TDTestCase: paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() + # again create one new stb1 paraDict["stbName"] = 'stb2' paraDict['ctbPrefix'] = 'ctb2n_' @@ -209,9 +213,9 @@ class TDTestCase: tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) if self.snapshot == 0: - consumerId = 0 + consumerId = 2 elif self.snapshot == 1: - consumerId = 1 + consumerId = 3 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) topicList = topicFromDb @@ -246,7 +250,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - tmqCom.waitSubscriptionExit(tdSql) + tmqCom.waitSubscriptionExit(tdSql, topicFromDb) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) diff --git a/tests/system-test/7-tmq/tmqUpdateWithConsume.py b/tests/system-test/7-tmq/tmqUpdateWithConsume.py new file mode 100644 index 0000000000..4f21beffc4 --- /dev/null +++ b/tests/system-test/7-tmq/tmqUpdateWithConsume.py @@ -0,0 +1,190 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.snapshot = 0 + self.vgroups = 4 + self.ctbNum = 100 + self.rowsPerTbl = 1000 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # tdLog.info("restart taosd to ensure that the data falls into the disk") + tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + # update to half tables + paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + consumerId = 0 + + if self.snapshot == 0: + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/2 + 1)) + elif self.snapshot == 1: + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) + + topicList = topicFromStb1 + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + # update to half data + paraDict["startTs"] = paraDict["startTs"] + int(self.rowsPerTbl / 2) + tmqCom.asyncInsertDataByInterlace(paraDict) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) + if self.snapshot == 0: + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + elif self.snapshot == 1: + if not (totalConsumeRows < expectrowcnt and totalConsumeRows >= totalRowsInserted): + tdLog.exit("tmq consume rows error!") + + # tmqCom.checkFileContent(consumerId, queryString) + + tdSql.query("drop topic %s"%topicFromStb1) + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") + self.ctbNum = 1 + self.snapshot = 0 + self.prepareTestEnv() + self.tmqCase1() + + tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") + self.prepareTestEnv() + self.snapshot = 1 + self.tmqCase1() + + tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") + self.ctbNum = 100 + self.snapshot = 0 + self.prepareTestEnv() + self.tmqCase1() + tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") + self.prepareTestEnv() + self.snapshot = 1 + self.tmqCase1() + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 6c3a22d8aa..57a3d0136a 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -209,11 +209,14 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py #python3 ./test.py -f 7-tmq/tmqDnodeRestart.py python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py +python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot1.py python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py python3 ./test.py -f 7-tmq/tmqDropStb.py +python3 ./test.py -f 7-tmq/tmqDropStbCtb.py +python3 ./test.py -f 7-tmq/tmqDropNtb.py python3 ./test.py -f 7-tmq/tmqUdf.py # python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py # python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py