test: add test case into CI
This commit is contained in:
parent
33525deb80
commit
bca2428cdf
|
@ -20,18 +20,54 @@ class TDTestCase:
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor())
|
||||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||||
|
|
||||||
def tmqCase1(self):
|
def prepareTestEnv(self):
|
||||||
tdLog.printNoPrefix("======== test case 1: ")
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
paraDict = {'dbName': 'db1',
|
paraDict = {'dbName': 'dbt',
|
||||||
'dropFlag': 1,
|
'dropFlag': 1,
|
||||||
'event': '',
|
'event': '',
|
||||||
'vgroups': 4,
|
'vgroups': 4,
|
||||||
|
'replica': 1,
|
||||||
'stbName': 'stb',
|
'stbName': 'stb',
|
||||||
'colPrefix': 'c',
|
'colPrefix': 'c',
|
||||||
'tagPrefix': 't',
|
'tagPrefix': 't',
|
||||||
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 10,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 100,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 10,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1}
|
||||||
|
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
tmqCom.create_database(tsql=tdSql, dbName=paraDict["dbName"],dropFlag=paraDict["dropFlag"], vgroups=paraDict['vgroups'],replica=paraDict['replica'])
|
||||||
|
tdLog.info("create stb")
|
||||||
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
|
tdLog.info("create ctb")
|
||||||
|
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'])
|
||||||
|
tdLog.info("insert data")
|
||||||
|
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||||
|
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||||
|
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
return
|
||||||
|
|
||||||
|
def tmqCase1(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 4,
|
||||||
|
'replica': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 10000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
|
@ -43,13 +79,6 @@ class TDTestCase:
|
||||||
topicNameList = ['topic1', 'topic2', 'topic3']
|
topicNameList = ['topic1', 'topic2', 'topic3']
|
||||||
expectRowsList = []
|
expectRowsList = []
|
||||||
tmqCom.initConsumerTable()
|
tmqCom.initConsumerTable()
|
||||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
|
|
||||||
tdLog.info("create stb")
|
|
||||||
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
|
|
||||||
tdLog.info("create ctb")
|
|
||||||
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
|
|
||||||
tdLog.info("insert data")
|
|
||||||
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
|
|
||||||
|
|
||||||
tdLog.info("create topics from stb with filter")
|
tdLog.info("create topics from stb with filter")
|
||||||
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 4 == 0" %(paraDict['dbName'], paraDict['stbName'])
|
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 4 == 0" %(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
@ -134,16 +163,18 @@ class TDTestCase:
|
||||||
|
|
||||||
def tmqCase2(self):
|
def tmqCase2(self):
|
||||||
tdLog.printNoPrefix("======== test case 2: ")
|
tdLog.printNoPrefix("======== test case 2: ")
|
||||||
paraDict = {'dbName': 'db2',
|
paraDict = {'dbName': 'dbt',
|
||||||
'dropFlag': 1,
|
'dropFlag': 1,
|
||||||
'event': '',
|
'event': '',
|
||||||
'vgroups': 4,
|
'vgroups': 4,
|
||||||
|
'replica': 1,
|
||||||
'stbName': 'stb',
|
'stbName': 'stb',
|
||||||
'colPrefix': 'c',
|
'colPrefix': 'c',
|
||||||
'tagPrefix': 't',
|
'tagPrefix': 't',
|
||||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
|
||||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 10000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
|
@ -155,13 +186,6 @@ class TDTestCase:
|
||||||
topicNameList = ['topic1', 'topic2', 'topic3']
|
topicNameList = ['topic1', 'topic2', 'topic3']
|
||||||
expectRowsList = []
|
expectRowsList = []
|
||||||
tmqCom.initConsumerTable()
|
tmqCom.initConsumerTable()
|
||||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
|
|
||||||
tdLog.info("create stb")
|
|
||||||
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
|
|
||||||
tdLog.info("create ctb")
|
|
||||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'])
|
|
||||||
tdLog.info("insert data")
|
|
||||||
tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
|
|
||||||
|
|
||||||
tdLog.info("create topics from stb with filter")
|
tdLog.info("create topics from stb with filter")
|
||||||
# sqlString = "create topic %s as select ts, sin(c1), pow(c2,3) from %s.%s where c2 >= 0" %(topicNameList[0], paraDict['dbName'], paraDict['stbName'])
|
# sqlString = "create topic %s as select ts, sin(c1), pow(c2,3) from %s.%s where c2 >= 0" %(topicNameList[0], paraDict['dbName'], paraDict['stbName'])
|
||||||
|
@ -247,6 +271,7 @@ class TDTestCase:
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
|
self.prepareTestEnv()
|
||||||
self.tmqCase1()
|
self.tmqCase1()
|
||||||
self.tmqCase2()
|
self.tmqCase2()
|
||||||
|
|
||||||
|
|
|
@ -170,33 +170,42 @@ class TMQCom:
|
||||||
tdLog.debug("complete to create database %s"%(dbName))
|
tdLog.debug("complete to create database %s"%(dbName))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# self.create_stable() and self.create_ctable() and self.insert_data_interlaceByMultiTbl() : The three functions are matched
|
||||||
|
# schema: (ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))
|
||||||
def create_stable(self,tsql, dbName,stbName):
|
def create_stable(self,tsql, dbName,stbName):
|
||||||
tsql.execute("create table if not exists %s.%s (ts timestamp, c1 int, c2 int, c3 binary(16)) tags(t1 int, t2 binary(32))"%(dbName, stbName))
|
schemaString = "(ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))"
|
||||||
|
tsql.execute("create table if not exists %s.%s %s"%(dbName, stbName, schemaString))
|
||||||
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
|
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
|
||||||
return
|
return
|
||||||
|
|
||||||
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
|
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
|
||||||
tsql.execute("use %s" %dbName)
|
# tsql.execute("use %s" %dbName)
|
||||||
pre_create = "create table"
|
pre_create = "create table"
|
||||||
sql = pre_create
|
sql = pre_create
|
||||||
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||||
|
batchNum = 10
|
||||||
|
tblBatched = 0
|
||||||
for i in range(ctbNum):
|
for i in range(ctbNum):
|
||||||
tagValue = 'beijing'
|
tagBinaryValue = 'beijing'
|
||||||
if (i % 2 == 0):
|
if (i % 2 == 0):
|
||||||
tagValue = 'shanghai'
|
tagBinaryValue = 'shanghai'
|
||||||
elif (i % 3 == 0):
|
elif (i % 3 == 0):
|
||||||
tagValue = 'changsha'
|
tagBinaryValue = 'changsha'
|
||||||
|
|
||||||
sql += " %s%d using %s tags(%d, '%s')"%(ctbPrefix,i+ctbStartIdx,stbName,i+ctbStartIdx+1, tagValue)
|
sql += " %s.%s%d using %s.%s tags(%d, %d, %d, '%s', '%s')"%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
|
||||||
if (i > 0) and (i%100 == 0):
|
tblBatched += 1
|
||||||
|
if (i == ctbNum-1 ) or (tblBatched == batchNum):
|
||||||
tsql.execute(sql)
|
tsql.execute(sql)
|
||||||
|
tblBatched = 0
|
||||||
sql = pre_create
|
sql = pre_create
|
||||||
|
|
||||||
if sql != pre_create:
|
if sql != pre_create:
|
||||||
tsql.execute(sql)
|
tsql.execute(sql)
|
||||||
|
|
||||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# schema: (ts timestamp, c1 int, c2 binary(16))
|
||||||
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=None):
|
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=None):
|
||||||
tdLog.debug("start to insert data ............")
|
tdLog.debug("start to insert data ............")
|
||||||
tsql.execute("use %s" %dbName)
|
tsql.execute("use %s" %dbName)
|
||||||
|
@ -208,11 +217,14 @@ class TMQCom:
|
||||||
startTs = int(round(t * 1000))
|
startTs = int(round(t * 1000))
|
||||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
for i in range(ctbNum):
|
for i in range(ctbNum):
|
||||||
|
rowsBatched = 0
|
||||||
sql += " %s%d values "%(stbName,i)
|
sql += " %s%d values "%(stbName,i)
|
||||||
for j in range(rowsPerTbl):
|
for j in range(rowsPerTbl):
|
||||||
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
|
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
|
||||||
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
|
rowsBatched += 1
|
||||||
|
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
|
||||||
tsql.execute(sql)
|
tsql.execute(sql)
|
||||||
|
rowsBatched = 0
|
||||||
if j < rowsPerTbl - 1:
|
if j < rowsPerTbl - 1:
|
||||||
sql = "insert into %s%d values " %(stbName,i)
|
sql = "insert into %s%d values " %(stbName,i)
|
||||||
else:
|
else:
|
||||||
|
@ -224,6 +236,7 @@ class TMQCom:
|
||||||
tdLog.debug("insert data ............ [OK]")
|
tdLog.debug("insert data ............ [OK]")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# schema: (ts timestamp, c1 int, c2 int, c3 binary(16))
|
||||||
def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
|
def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
|
||||||
tdLog.debug("start to insert data ............")
|
tdLog.debug("start to insert data ............")
|
||||||
tsql.execute("use %s" %dbName)
|
tsql.execute("use %s" %dbName)
|
||||||
|
@ -234,14 +247,17 @@ class TMQCom:
|
||||||
startTs = int(round(t * 1000))
|
startTs = int(round(t * 1000))
|
||||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
for i in range(ctbNum):
|
for i in range(ctbNum):
|
||||||
|
rowsBatched = 0
|
||||||
sql += " %s%d values "%(ctbPrefix,i)
|
sql += " %s%d values "%(ctbPrefix,i)
|
||||||
for j in range(rowsPerTbl):
|
for j in range(rowsPerTbl):
|
||||||
if (j % 2 == 0):
|
if (j % 2 == 0):
|
||||||
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j)
|
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j)
|
||||||
else:
|
else:
|
||||||
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, -j, j)
|
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, -j, j)
|
||||||
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
|
rowsBatched += 1
|
||||||
|
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
|
||||||
tsql.execute(sql)
|
tsql.execute(sql)
|
||||||
|
rowsBatched = 0
|
||||||
if j < rowsPerTbl - 1:
|
if j < rowsPerTbl - 1:
|
||||||
sql = "insert into %s%d values " %(ctbPrefix,i)
|
sql = "insert into %s%d values " %(ctbPrefix,i)
|
||||||
else:
|
else:
|
||||||
|
@ -253,6 +269,7 @@ class TMQCom:
|
||||||
tdLog.debug("insert data ............ [OK]")
|
tdLog.debug("insert data ............ [OK]")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# schema: (ts timestamp, c1 int, c2 int, c3 binary(16), c4 timestamp)
|
||||||
def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,ctbStartIdx=0):
|
def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,ctbStartIdx=0):
|
||||||
tdLog.debug("start to insert data ............")
|
tdLog.debug("start to insert data ............")
|
||||||
tsql.execute("use %s" %dbName)
|
tsql.execute("use %s" %dbName)
|
||||||
|
@ -263,14 +280,17 @@ class TMQCom:
|
||||||
startTs = int(round(t * 1000))
|
startTs = int(round(t * 1000))
|
||||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
for i in range(ctbNum):
|
for i in range(ctbNum):
|
||||||
|
rowsBatched = 0
|
||||||
sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx)
|
sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx)
|
||||||
for j in range(rowsPerTbl):
|
for j in range(rowsPerTbl):
|
||||||
if (j % 2 == 0):
|
if (j % 2 == 0):
|
||||||
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
|
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
|
||||||
else:
|
else:
|
||||||
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, -j, j)
|
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, -j, j)
|
||||||
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
|
rowsBatched += 1
|
||||||
|
if (rowsBatched == batchNum) or (j == rowsPerTbl - 1):
|
||||||
tsql.execute(sql)
|
tsql.execute(sql)
|
||||||
|
rowsBatched = 0
|
||||||
if j < rowsPerTbl - 1:
|
if j < rowsPerTbl - 1:
|
||||||
sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx)
|
sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx)
|
||||||
else:
|
else:
|
||||||
|
@ -282,7 +302,8 @@ class TMQCom:
|
||||||
tdLog.debug("insert data ............ [OK]")
|
tdLog.debug("insert data ............ [OK]")
|
||||||
return
|
return
|
||||||
|
|
||||||
def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
|
# schema: (ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))
|
||||||
|
def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0,ctbStartIdx=0):
|
||||||
tdLog.debug("start to insert data ............")
|
tdLog.debug("start to insert data ............")
|
||||||
tsql.execute("use %s" %dbName)
|
tsql.execute("use %s" %dbName)
|
||||||
pre_insert = "insert into "
|
pre_insert = "insert into "
|
||||||
|
@ -297,15 +318,22 @@ class TMQCom:
|
||||||
ctbDict[i] = 0
|
ctbDict[i] = 0
|
||||||
|
|
||||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
rowsOfCtb = 0
|
rowsOfCtb = 0
|
||||||
while rowsOfCtb < rowsPerTbl:
|
while rowsOfCtb < rowsPerTbl:
|
||||||
for i in range(ctbNum):
|
for i in range(ctbNum):
|
||||||
sql += " %s.%s_%d values "%(dbName,ctbPrefix,i)
|
sql += " %s.%s%d values "%(dbName,ctbPrefix,i+ctbStartIdx)
|
||||||
|
rowsBatched = 0
|
||||||
for k in range(batchNum):
|
for k in range(batchNum):
|
||||||
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + ctbDict[i], ctbDict[i], ctbDict[i])
|
if (k % 2 == 0):
|
||||||
|
sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+ctbDict[i], ctbDict[i],ctbDict[i], ctbDict[i],ctbDict[i],ctbDict[i])
|
||||||
|
else:
|
||||||
|
sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+ctbDict[i],-ctbDict[i],ctbDict[i],-ctbDict[i],ctbDict[i],ctbDict[i])
|
||||||
|
|
||||||
|
rowsBatched += 1
|
||||||
ctbDict[i] += 1
|
ctbDict[i] += 1
|
||||||
if (0 == ctbDict[i]%batchNum) or (ctbDict[i] == rowsPerTbl):
|
if (rowsBatched == batchNum) or (ctbDict[i] == rowsPerTbl):
|
||||||
tsql.execute(sql)
|
tsql.execute(sql)
|
||||||
|
rowsBatched = 0
|
||||||
sql = "insert into "
|
sql = "insert into "
|
||||||
break
|
break
|
||||||
rowsOfCtb = ctbDict[0]
|
rowsOfCtb = ctbDict[0]
|
||||||
|
@ -313,7 +341,18 @@ class TMQCom:
|
||||||
tdLog.debug("insert data ............ [OK]")
|
tdLog.debug("insert data ............ [OK]")
|
||||||
return
|
return
|
||||||
|
|
||||||
def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
|
def threadFunctionForInsertByInterlace(self, **paraDict):
|
||||||
|
# create new connector for new tdSql instance in my thread
|
||||||
|
newTdSql = tdCom.newTdSql()
|
||||||
|
self.insert_data_interlaceByMultiTbl(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"])
|
||||||
|
return
|
||||||
|
|
||||||
|
def asyncInsertDataByInterlace(self, paraDict):
|
||||||
|
pThread = threading.Thread(target=self.threadFunctionForInsertByInterlace, kwargs=paraDict)
|
||||||
|
pThread.start()
|
||||||
|
return pThread
|
||||||
|
|
||||||
|
def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0,ctbStartIdx=0):
|
||||||
tdLog.debug("start to insert data wiht auto create child table ............")
|
tdLog.debug("start to insert data wiht auto create child table ............")
|
||||||
tsql.execute("use %s" %dbName)
|
tsql.execute("use %s" %dbName)
|
||||||
pre_insert = "insert into "
|
pre_insert = "insert into "
|
||||||
|
@ -324,17 +363,17 @@ class TMQCom:
|
||||||
startTs = int(round(t * 1000))
|
startTs = int(round(t * 1000))
|
||||||
|
|
||||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
rowsOfSql = 0
|
rowsBatched = 0
|
||||||
for i in range(ctbNum):
|
for i in range(ctbNum):
|
||||||
sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i)
|
sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i)
|
||||||
for j in range(rowsPerTbl):
|
for j in range(rowsPerTbl):
|
||||||
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
|
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
|
||||||
rowsOfSql += 1
|
rowsBatched += 1
|
||||||
if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)):
|
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
|
||||||
tsql.execute(sql)
|
tsql.execute(sql)
|
||||||
rowsOfSql = 0
|
rowsBatched = 0
|
||||||
if j < rowsPerTbl - 1:
|
if j < rowsPerTbl - 1:
|
||||||
sql = "insert into %s.%s_%d using %s.%s tags (%d) values " %(dbName,ctbPrefix,i,dbName,stbName,i)
|
sql = "insert into %s.%s_%d using %s.%s tags (%d) values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i)
|
||||||
else:
|
else:
|
||||||
sql = "insert into "
|
sql = "insert into "
|
||||||
#end sql
|
#end sql
|
||||||
|
|
|
@ -26,7 +26,7 @@ class TDTestCase:
|
||||||
tdSql.init(conn.cursor(), False)
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
def prepareTestEnv(self):
|
def prepareTestEnv(self):
|
||||||
tdLog.printNoPrefix("======== test case 1: ")
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
paraDict = {'dbName': 'dbt',
|
paraDict = {'dbName': 'dbt',
|
||||||
'dropFlag': 1,
|
'dropFlag': 1,
|
||||||
'event': '',
|
'event': '',
|
||||||
|
|
|
@ -26,7 +26,7 @@ class TDTestCase:
|
||||||
tdSql.init(conn.cursor(), False)
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
def prepareTestEnv(self):
|
def prepareTestEnv(self):
|
||||||
tdLog.printNoPrefix("======== test case 1: ")
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
paraDict = {'dbName': 'dbt',
|
paraDict = {'dbName': 'dbt',
|
||||||
'dropFlag': 1,
|
'dropFlag': 1,
|
||||||
'event': '',
|
'event': '',
|
||||||
|
|
|
@ -154,3 +154,5 @@ python3 ./test.py -f 7-tmq/tmqUdf.py
|
||||||
python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
|
python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
|
||||||
python3 ./test.py -f 7-tmq/tmqShow.py
|
python3 ./test.py -f 7-tmq/tmqShow.py
|
||||||
python3 ./test.py -f 7-tmq/tmqAlterSchema.py
|
python3 ./test.py -f 7-tmq/tmqAlterSchema.py
|
||||||
|
python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
|
||||||
|
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py
|
||||||
|
|
Loading…
Reference in New Issue