tmq add tmpBasic.py

This commit is contained in:
Alex Duan 2025-02-23 18:24:55 +08:00
parent 9b395d8b19
commit 0c2695ec6d
7 changed files with 153 additions and 47 deletions

View File

@ -331,11 +331,11 @@ class TBase:
for i in range(len(vlist)):
if vlist[i].find(s) != -1:
# found
tdLog.info(f"found {s} on index {i} , line={vlist[i]}")
tdLog.info(f'found "{s}" on index {i} , line={vlist[i]}')
return
# not found
tdLog.exit(f"faild, not found {s} on list:{vlist}")
tdLog.exit(f'faild, not found "{s}" on list:{vlist}')
#
# str util
@ -353,7 +353,7 @@ class TBase:
# taosBenchmark
#
# run taosBenchmark and check insert Result
# insert
def insertBenchJson(self, jsonFile, options="", checkStep=False):
# exe insert
cmd = f"{options} -f {jsonFile}"
@ -421,3 +421,17 @@ class TBase:
tdSql.checkData(0, 0, vgroups)
return db, stb,child_count, insert_rows
# tmq
def tmqBenchJson(self, jsonFile, options="", checkStep=False):
# exe insert
command = f"{options} -f {jsonFile}"
rlist = frame.etool.runBinFile("taosBenchmark", command, checkRun = True)
#
# check insert result
#
print(rlist)
return rlist

View File

@ -52,8 +52,11 @@ def isArm64Cpu():
#
# wait util execute file finished
def exe(file):
return os.system(file)
def exe(command, show = False):
code = os.system(command)
if show:
print(f"eos.exe retcode={code} command:{command}")
return code
# execute file and return immediately
def exeNoWait(file):
@ -70,7 +73,7 @@ def run(command, show=True):
out = f"out_{id}.txt"
err = f"err_{id}.txt"
ret = exe(command + f" 1>{out} 2>{err}")
code = exe(command + f" 1>{out} 2>{err}", show)
# read from file
output = readFileContext(out)
@ -82,12 +85,15 @@ def run(command, show=True):
if os.path.exists(err):
os.remove(err)
return output, error
return output, error, code
# return list after run
def runRetList(command, timeout=10):
output,error = run(command, timeout)
def runRetList(command, show = True, checkRun = False):
output, error, code = run(command, show)
if checkRun and code != 0:
print(f"eos.runRetList checkRun return code failed. code={code} error={error}")
assert code == 0
return output.splitlines()
#

View File

@ -51,7 +51,7 @@ def benchMark(command = "", json = "") :
# run
if command != "":
frame.eos.exe(bmFile + " " + command)
status = frame.eos.run(bmFile + " " + command)
if json != "":
cmd = f"{bmFile} -f {json}"
print(cmd)
@ -66,7 +66,7 @@ def curFile(fullPath, filename):
# run build/bin file
def runBinFile(fname, command, show=True):
def runBinFile(fname, command, show = True, checkRun = False):
binFile = frame.epath.binFile(fname)
if frame.eos.isWin():
binFile += ".exe"
@ -74,7 +74,7 @@ def runBinFile(fname, command, show=True):
cmd = f"{binFile} {command}"
if show:
tdLog.info(cmd)
return frame.eos.runRetList(cmd)
return frame.eos.runRetList(cmd, show, checkRun)
# exe build/bin file
def exeBinFile(fname, command, wait=True, show=True):

View File

@ -0,0 +1,71 @@
{
"filetype":"insert",
"cfgdir":"/etc/taos",
"host":"127.0.0.1",
"port":6030,
"user":"root",
"password":"taosdata",
"thread_count": 2,
"create_table_thread_count":1,
"confirm_parameter_prompt":"no",
"prepare_rand":100,
"num_of_records_per_req":100,
"databases": [
{
"dbinfo":{
"name":"test",
"drop":"yes",
"vgroups": 4
},
"super_tables":[
{
"name":"meters",
"child_table_exists":"no",
"childtable_prefix":"d",
"data_source":"rand",
"insert_mode":"taosc",
"childtable_count": 10,
"insert_rows": 100,
"timestamp_step": 1,
"start_timestamp":"2022-10-01 00:00:00.000",
"columns":[
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc", "max": 1, "min": 0 },
{ "type": "double", "name": "dc", "max": 10, "min": 0 },
{ "type": "tinyint", "name": "ti", "max": 100, "min": -100 },
{ "type": "smallint", "name": "si", "max": 100, "min": -50 },
{ "type": "int", "name": "ic", "max": 1000, "min": -1000 },
{ "type": "bigint", "name": "bi", "max": 100, "min": -1000 },
{ "type": "utinyint", "name": "uti", "max": 100, "min": 0 },
{ "type": "usmallint", "name": "usi", "max": 100, "min": 0 },
{ "type": "uint", "name": "ui", "max": 1000, "min": 0 },
{ "type": "ubigint", "name": "ubi", "max": 10000, "min": 0 },
{ "type": "binary", "name": "bin", "len": 4},
{ "type": "nchar", "name": "nch", "len": 8},
{ "type": "varbinary", "name": "vab", "len": 8},
{ "type": "varchar", "name": "vac", "len": 8},
{ "type": "geometry", "name": "geo", "len": 32}
],
"tags":[
{ "type": "bool", "name": "tbc"},
{ "type": "float", "name": "tfc", "max": 1, "min": 0 },
{ "type": "double", "name": "tdc", "max": 10, "min": 0 },
{ "type": "tinyint", "name": "tti", "max": 100, "min": -100 },
{ "type": "smallint", "name": "tsi", "max": 100, "min": -50 },
{ "type": "int", "name": "tic", "max": 1000, "min": -1000 },
{ "type": "bigint", "name": "tbi", "max": 100, "min": -1000 },
{ "type": "utinyint", "name": "tuti", "max": 100, "min": 0 },
{ "type": "usmallint", "name": "tusi", "max": 100, "min": 0 },
{ "type": "uint", "name": "tui", "max": 1000, "min": 0 },
{ "type": "ubigint", "name": "tubi", "max": 10000, "min": 0 },
{ "type": "binary", "name": "tbin", "len": 4},
{ "type": "nchar", "name": "tnch", "len": 8},
{ "type": "varbinary", "name": "tvab", "len": 8},
{ "type": "varchar", "name": "tvac", "len": 8},
{ "type": "geometry", "name": "tgeo", "len": 32}
]
}
]
}
]
}

View File

@ -0,0 +1,29 @@
{
"filetype": "subscribe",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"databases": "test",
"confirm_parameter_prompt": "no",
"result_file": "./tmq_result_para.txt",
"tmq_info": {
"concurrent": 2,
"poll_delay": 3000,
"create_mode": "parallel",
"group_mode": "independent",
"client.id": "clientId",
"auto.offset.reset": "earliest",
"enable.auto.commit": "true",
"auto.commit.interval.ms": 1000,
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "false",
"rows_file": "./rows_file_para",
"expect_rows": 100,
"topic_list": [
{"name": "topic_benchmark_d0", "sql": "select * from test.d0;"}
]
}
}

View File

@ -5,12 +5,13 @@
"port": 6030,
"user": "root",
"password": "taosdata",
"databases": "db",
"databases": "test",
"confirm_parameter_prompt": "no",
"result_file": "./tmq_result1.txt",
"result_file": "./tmq_result_sequ.txt",
"tmq_info": {
"concurrent": 2,
"concurrent": 4,
"poll_delay": 3000,
"group_mode": "independent",
"group.id": "grpId_0",
"client.id": "clientId",
"auto.offset.reset": "earliest",
@ -19,10 +20,10 @@
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "false",
"rows_file": "./consumed_rows1",
"expect_rows": 50,
"rows_file": "./rows_file_sequ",
"expect_rows": 1000,
"topic_list": [
{"name": "tmq_topic_0", "sql": "select c0 from db.stb;"}
{"name": "topic_benchmark_meters", "sql": "select * from test.meters;"}
]
}
}

View File

@ -25,37 +25,22 @@ from frame import *
class TDTestCase(TBase):
def caseDescription(self):
"""
[TD-11510] taosBenchmark test cases
taosBenchmark tmp->Basic test cases
"""
def run(self):
tdSql.execute("drop topic if exists topic_0")
binPath = etool.benchMarkFile()
cmd = "%s -f ./tools/benchmark/basic/json/tmqBasic.json" % binPath
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
tdSql.execute("reset query cache")
tdSql.execute("alter database db WAL_RETENTION_PERIOD 3600000")
# insert data
json = "tools/benchmark/basic/json/tmqBasicInsert.json"
db, stb, child_count, insert_rows = self.insertBenchJson(json, checkStep = True)
cmd = "%s -f ./tools/benchmark/basic/json/tmq.json " % binPath
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
sleep(15)
# tmq Sequ
json = "tools/benchmark/basic/json/tmqBasicSequ.json"
self.tmqBenchJson(json)
# try:
# for line in os.popen("ps ax | grep taosBenchmark | grep -v grep"):
# fields = line.split()
# pid = fields[0]
# os.kill(int(pid), signal.SIGINT)
# time.sleep(3)
# print("taosBenchmark be killed on purpose")
# except:
# tdLog.exit("failed to kill taosBenchmark")
# tmq Parallel
json = "tools/benchmark/basic/json/tmqBasicPara.json"
self.tmqBenchJson(json)
def stop(self):
tdSql.close()