test: add test case for tmq
This commit is contained in:
parent
61d4f76ddf
commit
7f56e6f16d
|
@ -535,6 +535,18 @@ class TMQCom:
|
||||||
column_value_str = column_value_str.rstrip()[:-1]
|
column_value_str = column_value_str.rstrip()[:-1]
|
||||||
insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});'
|
insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});'
|
||||||
tsql.execute(insert_sql)
|
tsql.execute(insert_sql)
|
||||||
|
|
||||||
|
def waitSubscriptionExit(self, tsql, max_wait_count=20):
|
||||||
|
wait_cnt = 0
|
||||||
|
while (wait_cnt < max_wait_count):
|
||||||
|
tsql.query("show subscriptions")
|
||||||
|
if tsql.getRows() == 0:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
time.sleep(2)
|
||||||
|
wait_cnt += 1
|
||||||
|
|
||||||
|
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.cursor.close()
|
self.cursor.close()
|
||||||
|
|
|
@ -18,8 +18,8 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.snapshot = 0
|
self.snapshot = 0
|
||||||
self.vgroups = 2
|
self.vgroups = 2
|
||||||
self.ctbNum = 100
|
self.ctbNum = 1000
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql):
|
def init(self, conn, logSql):
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
@ -38,8 +38,8 @@ class TDTestCase:
|
||||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 100,
|
'ctbNum': 1000,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 3,
|
'pollDelay': 3,
|
||||||
|
@ -83,8 +83,8 @@ class TDTestCase:
|
||||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 100,
|
'ctbNum': 1000,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 5,
|
'pollDelay': 5,
|
||||||
|
@ -117,13 +117,13 @@ class TDTestCase:
|
||||||
tdSql.execute(sqlString)
|
tdSql.execute(sqlString)
|
||||||
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
|
||||||
topicList = topicFromStb1
|
topicList = topicFromStb1
|
||||||
ifcheckdata = 0
|
ifcheckdata = 0
|
||||||
ifManualCommit = 0
|
ifManualCommit = 0
|
||||||
keyList = 'group.id:cgrp1,\
|
keyList = 'group.id:cgrp1,\
|
||||||
enable.auto.commit:true,\
|
enable.auto.commit:true,\
|
||||||
auto.commit.interval.ms:500,\
|
auto.commit.interval.ms:3000,\
|
||||||
auto.offset.reset:earliest'
|
auto.offset.reset:earliest'
|
||||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
@ -147,10 +147,46 @@ class TDTestCase:
|
||||||
tdSql.query(queryString)
|
tdSql.query(queryString)
|
||||||
totalRowsFromQury = tdSql.getRows()
|
totalRowsFromQury = tdSql.getRows()
|
||||||
|
|
||||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQury))
|
tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
|
||||||
if totalConsumeRows != totalRowsFromQury:
|
if not (totalConsumeRows == totalRowsFromQury):
|
||||||
tdLog.exit("tmq consume rows error!")
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# tdLog.info("****************************************************************************")
|
||||||
|
# tmqCom.initConsumerTable()
|
||||||
|
# consumerId = 1
|
||||||
|
# expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
|
||||||
|
# topicList = topicFromStb1
|
||||||
|
# ifcheckdata = 0
|
||||||
|
# ifManualCommit = 0
|
||||||
|
# keyList = 'group.id:cgrp2,\
|
||||||
|
# enable.auto.commit:true,\
|
||||||
|
# auto.commit.interval.ms:3000,\
|
||||||
|
# auto.offset.reset:earliest'
|
||||||
|
# tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
# tdLog.info("start consume processor")
|
||||||
|
# tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
|
||||||
|
# expectRows = 1
|
||||||
|
# resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
# totalConsumeRows = 0
|
||||||
|
# for i in range(expectRows):
|
||||||
|
# totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
|
# tdSql.query(queryString)
|
||||||
|
# totalRowsFromQury = tdSql.getRows()
|
||||||
|
|
||||||
|
# tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
|
||||||
|
# if not (totalConsumeRows == totalRowsFromQury):
|
||||||
|
# tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
|
||||||
|
# tdLog.info("****************************************************************************")
|
||||||
|
|
||||||
|
tmqCom.waitSubscriptionExit(tdSql)
|
||||||
tdSql.query("drop topic %s"%topicFromStb1)
|
tdSql.query("drop topic %s"%topicFromStb1)
|
||||||
|
|
||||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
@ -168,8 +204,8 @@ class TDTestCase:
|
||||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 100,
|
'ctbNum': 1000,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 3000,
|
'batchNum': 3000,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 5,
|
'pollDelay': 5,
|
||||||
|
@ -201,7 +237,7 @@ class TDTestCase:
|
||||||
tdSql.execute(sqlString)
|
tdSql.execute(sqlString)
|
||||||
|
|
||||||
consumerId = 1
|
consumerId = 1
|
||||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + 100000
|
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
|
||||||
topicList = topicFromStb1
|
topicList = topicFromStb1
|
||||||
ifcheckdata = 0
|
ifcheckdata = 0
|
||||||
ifManualCommit = 0
|
ifManualCommit = 0
|
||||||
|
@ -220,8 +256,10 @@ class TDTestCase:
|
||||||
tdDnodes.start(1)
|
tdDnodes.start(1)
|
||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
|
|
||||||
tdLog.info("create some new child table and insert data ")
|
# tdLog.info("create some new child table and insert data ")
|
||||||
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
|
# paraDict["batchNum"] = 1000
|
||||||
|
# paraDict["ctbPrefix"] = 'newCtb'
|
||||||
|
# tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
|
||||||
|
|
||||||
tdLog.info("insert process end, and start to check consume result")
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
|
@ -242,9 +280,9 @@ class TDTestCase:
|
||||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
# tdSql.prepare()
|
||||||
self.prepareTestEnv()
|
self.prepareTestEnv()
|
||||||
# self.tmqCase1()
|
self.tmqCase1()
|
||||||
self.tmqCase2()
|
self.tmqCase2()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
|
@ -25,18 +25,6 @@ class TDTestCase:
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
tdSql.init(conn.cursor(), False)
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
def waitSubscriptionExit(self, max_wait_count=20):
|
|
||||||
wait_cnt = 0
|
|
||||||
while (wait_cnt < max_wait_count):
|
|
||||||
tdSql.query("show subscriptions")
|
|
||||||
if tdSql.getRows() == 0:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
time.sleep(1)
|
|
||||||
wait_cnt += 1
|
|
||||||
|
|
||||||
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
|
|
||||||
|
|
||||||
# drop some ntbs
|
# drop some ntbs
|
||||||
def tmqCase1(self):
|
def tmqCase1(self):
|
||||||
tdLog.printNoPrefix("======== test case 1: ")
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
|
@ -115,7 +103,7 @@ class TDTestCase:
|
||||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||||
|
|
||||||
tdLog.info("wait subscriptions exit ....")
|
tdLog.info("wait subscriptions exit ....")
|
||||||
self.waitSubscriptionExit()
|
tmqCom.waitSubscriptionExit(tdSql)
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicFromDb)
|
tdSql.query("drop topic %s"%topicFromDb)
|
||||||
tdLog.info("success dorp topic: %s"%topicFromDb)
|
tdLog.info("success dorp topic: %s"%topicFromDb)
|
||||||
|
@ -208,7 +196,7 @@ class TDTestCase:
|
||||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||||
|
|
||||||
tdLog.info("wait subscriptions exit ....")
|
tdLog.info("wait subscriptions exit ....")
|
||||||
self.waitSubscriptionExit()
|
tmqCom.waitSubscriptionExit(tdSql)
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicFromDb)
|
tdSql.query("drop topic %s"%topicFromDb)
|
||||||
tdLog.info("success dorp topic: %s"%topicFromDb)
|
tdLog.info("success dorp topic: %s"%topicFromDb)
|
||||||
|
|
|
@ -24,19 +24,7 @@ class TDTestCase:
|
||||||
def init(self, conn, logSql):
|
def init(self, conn, logSql):
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
tdSql.init(conn.cursor(), False)
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
def waitSubscriptionExit(self, max_wait_count=20):
|
|
||||||
wait_cnt = 0
|
|
||||||
while (wait_cnt < max_wait_count):
|
|
||||||
tdSql.query("show subscriptions")
|
|
||||||
if tdSql.getRows() == 0:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
time.sleep(2)
|
|
||||||
wait_cnt += 1
|
|
||||||
|
|
||||||
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
|
|
||||||
|
|
||||||
def prepareTestEnv(self):
|
def prepareTestEnv(self):
|
||||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
paraDict = {'dbName': 'dbt',
|
paraDict = {'dbName': 'dbt',
|
||||||
|
@ -169,7 +157,7 @@ class TDTestCase:
|
||||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||||
|
|
||||||
tdLog.info("wait subscriptions exit ....")
|
tdLog.info("wait subscriptions exit ....")
|
||||||
self.waitSubscriptionExit()
|
tmqCom.waitSubscriptionExit(tdSql)
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicFromDb)
|
tdSql.query("drop topic %s"%topicFromDb)
|
||||||
tdLog.info("success dorp topic: %s"%topicFromDb)
|
tdLog.info("success dorp topic: %s"%topicFromDb)
|
||||||
|
@ -258,7 +246,7 @@ class TDTestCase:
|
||||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||||
|
|
||||||
tdLog.info("wait subscriptions exit ....")
|
tdLog.info("wait subscriptions exit ....")
|
||||||
self.waitSubscriptionExit()
|
tmqCom.waitSubscriptionExit(tdSql)
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicFromDb)
|
tdSql.query("drop topic %s"%topicFromDb)
|
||||||
tdLog.info("success dorp topic: %s"%topicFromDb)
|
tdLog.info("success dorp topic: %s"%topicFromDb)
|
||||||
|
|
Loading…
Reference in New Issue