feat: merge taos-tools 3.0 latest code

This commit is contained in:
Alex Duan 2025-02-02 17:57:38 +08:00
parent 1a8149cb07
commit 772202b84e
44 changed files with 2241 additions and 1099 deletions

View File

@ -12,7 +12,6 @@
# -*- coding: utf-8 -*-
import os
import json
import frame
import frame.etool
from frame.log import *
@ -31,12 +30,28 @@ def removeQuotation(origin):
return value
class TDTestCase(TBase):
class TDTestCase:
def caseDescription(self):
"""
[TD-11510] taosBenchmark test cases
"""
def benchmarkQuery(self, benchmark, jsonFile, keys, options=""):
# exe insert
result = "query.log"
os.system(f"rm -f {result}")
cmd = f"{benchmark} {options} -f {jsonFile} >> {result}"
os.system(cmd)
tdLog.info(cmd)
with open(result) as file:
content = file.read()
for key in keys:
if content.find(key) == -1:
tdLog.exit(f"not found key: {key} in content={content}")
else:
tdLog.info(f"found key:{key} successful.")
def testBenchmarkJson(self, benchmark, jsonFile, options="", checkStep=False):
# exe insert
cmd = f"{benchmark} {options} -f {jsonFile}"
@ -111,6 +126,11 @@ class TDTestCase(TBase):
self.testBenchmarkJson(benchmark, "./tools/benchmark/basic/json/TS-5234-1.json")
self.testBenchmarkJson(benchmark, "./tools/benchmark/basic/json/TS-5234-2.json")
self.testBenchmarkJson(benchmark, "./tools/benchmark/basic/json/TS-5234-3.json")
# TS-5846
keys = ["completed total queries: 40"]
self.benchmarkQuery(benchmark, "./tools/benchmark/basic/json/TS-5846-Query.json", keys)
keys = ["completed total queries: 20"]
self.benchmarkQuery(benchmark, "./tools/benchmark/basic/json/TS-5846-Mixed-Query.json", keys)
# bugs td
def bugsTD(self, benchmark):
@ -133,7 +153,6 @@ class TDTestCase(TBase):
def run(self):
benchmark = etool.benchMarkFile()
# ts
self.bugsTS(benchmark)

View File

@ -27,8 +27,6 @@ class TDTestCase(TBase):
[TD-20424] taosBenchmark insert child table from and to test cases
"""
def run(self):
binPath = etool.benchMarkFile()
cmd = "%s -t 6 -n 1 -y" % binPath
@ -41,7 +39,6 @@ class TDTestCase(TBase):
tdSql.query("select count(*) from test.meters")
tdSql.checkData(0, 0, 6 + 3)
binPath = etool.benchMarkFile()
cmd = "%s -t 5 -n 1 -y" % binPath
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
@ -54,7 +51,6 @@ class TDTestCase(TBase):
tdSql.query("select count(*) from test.meters")
tdSql.checkData(0, 0, 5 + 3)
binPath = etool.benchMarkFile()
cmd = "%s -t 4 -n 1 -y" % binPath
tdLog.info("%s" % cmd)
os.system("%s" % cmd)

View File

@ -14,10 +14,13 @@ import os
import subprocess
import time
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
import frame
import frame.etool
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase:
@ -27,35 +30,6 @@ class TDTestCase:
"""
return
def init(self, conn, logSql, replicaVar=1):
# comment off by Shuduo for CI self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def getPath(self, tool="taosBenchmark"):
if (platform.system().lower() == 'windows'):
tool = tool + ".exe"
selfPath = os.path.dirname(os.path.realpath(__file__))
if "community" in selfPath:
projPath = selfPath[: selfPath.find("community")]
else:
projPath = selfPath[: selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath):
if (tool) in files:
rootRealPath = os.path.dirname(os.path.realpath(root))
if "packaging" not in rootRealPath:
paths.append(os.path.join(root, tool))
break
if len(paths) == 0:
tdLog.exit("taosBenchmark not found!")
return
else:
tdLog.info("taosBenchmark found in %s" % paths[0])
return paths[0]
def checkDataCorrect(self):
sql = "select count(*) from meters"
tdSql.query(sql)
@ -86,8 +60,8 @@ class TDTestCase:
def run(self):
binPath = self.getPath()
cmd = "%s -f ./5-taos-tools/taosbenchmark/json/insertMix.json" % binPath
binPath = etool.benchMarkFile()
cmd = "%s -f ./tools/benchmark/basic//json/insertMix.json" % binPath
tdLog.info("%s" % cmd)
errcode = os.system("%s" % cmd)
if errcode != 0:

View File

@ -29,7 +29,7 @@
"super_tables": [{
"name": "product",
"child_table_exists": "no",
"childtable_count": 9,
"childtable_count": 5,
"childtable_prefix": "d",
"auto_create_table": "yes",
"batch_create_tbl_num": 10,
@ -37,7 +37,7 @@
"insert_mode": "stmt2",
"non_stop_mode": "no",
"line_protocol": "line",
"insert_rows": 10340,
"insert_rows": 1034,
"childtable_limit": 0,
"childtable_offset": 0,
"interlace_rows": 0,

View File

@ -0,0 +1,35 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "test",
"query_times": 100,
"query_mode": "taosc",
"specified_table_query": {
"concurrent": 3,
"sqls": [
{
"sql": "select last_row(*) from meters"
},
{
"sql": "select count(*) from d0",
"result": "./query_res1.txt"
}
]
},
"super_table_query": {
"stblname": "meters",
"concurrent": 3,
"query_interval": 1,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -0,0 +1,27 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"continue_if_fail": "yes",
"databases": "test",
"query_times": 100,
"query_mode": "taosc",
"super_table_query":
"stblname": "meters",
"concurrent": 3,
"query_interval": 1,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
},
{
"sql": "select count(ts) from xxxx"
}
]
}
}

View File

@ -0,0 +1,12 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "test",
"query_times": 100,
"query_mode": "taosc"
}

View File

@ -0,0 +1,24 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "test",
"query_times": 100,
"query_mode": "taosc",
"specified_table_query": {
"concurrent": 3,
"sqls": [
{
"sql": "select last_row(*) from meters"
},
{
"sql": "select count(*) from d0",
"result": "./query_res1.txt"
}
]
}
}

View File

@ -0,0 +1,25 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "test",
"query_times": 100,
"query_mode": "taosc",
"specified_table_query": {
"concurrent": 4,
"mixed_query": "yes",
"sqls": [
{
"sql": "select last_row(*) from meters"
},
{
"sql": "select count(*) from d0",
"result": "./query_res1.txt"
}
]
}
}

View File

@ -0,0 +1,28 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "test",
"query_times": 100,
"query_mode": "rest",
"specified_table_query": {
"concurrent": 3,
"mixed_query": "yes",
"sqls": [
{
"sql": "select last_row(*) from meters"
},
{
"sql": "select count(*) from d0",
"result": "./query_res1.txt"
},
{
"sql": "select count(*) from meters"
}
]
}
}

View File

@ -0,0 +1,27 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "test",
"query_times": 100,
"query_mode": "rest",
"specified_table_query": {
"concurrent": 3,
"sqls": [
{
"sql": "select last_row(*) from meters"
},
{
"sql": "select count(*) from d0",
"result": "./query_res1.txt"
},
{
"sql": "select count(*) from meters"
}
]
}
}

View File

@ -0,0 +1,27 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"continue_if_fail": "yes",
"databases": "test",
"query_times": 100,
"query_mode": "taosc",
"super_table_query": {
"stblname": "meters",
"concurrent": 3,
"query_interval": 0,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
},
{
"sql": "select count(ts) from xxxx"
}
]
}
}

View File

@ -0,0 +1,24 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"continue_if_fail": "yes",
"databases": "test",
"query_times": 100,
"query_mode": "rest",
"super_table_query": {
"stblname": "meters",
"concurrent": 3,
"query_interval": 0,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -20,16 +20,5 @@
"sql": "select last_row(*) from db.stb00_9 ",
"result": "./query_res1.txt"
}]
},
"super_table_query": {
"stblname": "stb1",
"query_interval":20,
"threads": 4,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -0,0 +1,22 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "db",
"query_times": 1,
"super_table_query": {
"stblname": "stb1",
"query_interval":20,
"threads": 4,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -22,17 +22,6 @@
"result": "./query_res1.txt"
}
]
},
"super_table_query": {
"stblname": "stb1",
"query_interval": 1,
"threads": 3,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -0,0 +1,24 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "db",
"query_times": 2,
"query_mode": "rest",
"super_table_query": {
"stblname": "stb1",
"query_interval": 1,
"threads": 3,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -413,17 +413,6 @@
"result": "./query_res0.txt"
}]
},
"super_table_query": {
"stblname": "stb1",
"query_interval": 1,
"threads": 3,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -23,16 +23,5 @@
"result": "./query_res1.txt"
}
]
},
"super_table_query": {
"stblname": "stb1",
"query_interval": 1,
"threads": 3,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -0,0 +1,23 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "db",
"query_times": 2,
"query_mode": "taosc",
"super_table_query": {
"stblname": "stb1",
"query_interval": 1,
"threads": 3,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -22,16 +22,5 @@
"result": "./query_res1.txt"
}
]
},
"super_table_query": {
"stblname": "stb1",
"query_interval": 1,
"threads": 3,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -0,0 +1,23 @@
{
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "db",
"query_times": 2,
"query_mode": "taosc",
"super_table_query": {
"stblname": "stb1",
"query_interval": 1,
"threads": 3,
"sqls": [
{
"sql": "select last_row(ts) from xxxx",
"result": "./query_res2.txt"
}
]
}
}

View File

@ -14,14 +14,5 @@
"sql": "select count(*) from db.stb",
"result": "rest_query_specified"
}]
},
"super_table_query": {
"stblname": "stb",
"sqls": [
{
"sql": "select count(*) from xxxx",
"result": "rest_query_super"
}
]
}
}

View File

@ -0,0 +1,18 @@
{
"filetype":"query",
"cfgdir": "/etc/taos",
"confirm_parameter_prompt": "no",
"databases": "db",
"query_mode": "rest",
"connection_pool_size": 10,
"response_buffer": 10000,
"super_table_query": {
"stblname": "stb",
"sqls": [
{
"sql": "select count(*) from xxxx",
"result": "rest_query_super"
}
]
}
}

View File

@ -14,7 +14,7 @@
{
"query_interval": 1,
"concurrent":1,
"sql_file": "./tools/benchmark/basic/json/query-error-sqls.txt",
"sql_file": "./taosbenchmark/json/query-error-sqls.txt",
"result": "taosc_query_specified-sqlfile"
}
}

View File

@ -14,6 +14,6 @@
"specified_table_query": {
"query_interval": 3,
"concurrent": 3,
"sql_file": "./tools/benchmark/basic/json/query-sqls-slow-query.txt"
"sql_file": "./taosbenchmark/json/query-sqls-slow-query.txt"
}
}

View File

@ -13,7 +13,7 @@
{
"query_interval": 1,
"concurrent":1,
"sql_file": "./tools/benchmark/basic/json/query-sqls.txt",
"sql_file": "./taosbenchmark/json/query-sqls.txt",
"result": "taosc_query_specified-sqlfile"
}
}

View File

@ -18,16 +18,5 @@
"sql": "select count(*) from db.stb",
"result": "taosc_query_specified"
}]
},
"super_table_query": {
"stblname": "stb",
"query_interval": 1,
"concurrent": 1,
"sqls": [
{
"sql": "select count(*) from xxxx",
"result": "taosc_query_super"
}
]
}
}

View File

@ -0,0 +1,24 @@
{
"filetype":"query",
"cfgdir": "/etc/taos",
"host": "localhost",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "db",
"query_times": 1,
"reset_query_cache": "yes",
"super_table_query":
{
"stblname": "stb",
"query_interval": 1,
"concurrent": 1,
"sqls": [
{
"sql": "select count(*) from xxxx",
"result": "taosc_query_super"
}
]
}
}

View File

@ -0,0 +1,260 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import json
import sys
import os
import time
import datetime
import platform
import subprocess
import frame
import frame.etool
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
# reomve single and double quotation
def removeQuotation(origin):
value = ""
for c in origin:
if c != '\'' and c != '"':
value += c
return value
class TDTestCase:
def caseDescription(self):
"""
[TD-11510] taosBenchmark test cases
"""
def runSeconds(self, command, timeout = 180):
tdLog.info(f"runSeconds {command} ...")
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait(timeout)
# get output
output = process.stdout.read().decode(encoding="gbk")
error = process.stderr.read().decode(encoding="gbk")
return output, error
def getKeyValue(self, content, key, end):
# find key
s = content.find(key)
if s == -1:
return False,""
# skip self
s += len(key)
# skip blank
while s < len(content):
if content[s] != " ":
break
s += 1
# end check
if s + 1 == len(content):
return False, ""
# find end
if len(end) == 0:
e = -1
else:
e = content.find(end, s)
# get value
if e == -1:
value = content[s : ]
else:
value = content[s : e]
return True, value
def getDbRows(self, times):
sql = f"select count(*) from test.meters"
tdSql.waitedQuery(sql, 1, times)
dbRows = tdSql.getData(0, 0)
return dbRows
def checkItem(self, output, key, end, expect, equal):
ret, value = self.getKeyValue(output, key, end)
if ret == False:
tdLog.exit(f"not found key:{key}. end:{end} output:\n{output}")
fval = float(value)
# compare
if equal and fval != expect:
tdLog.exit(f"check not expect. expect:{expect} real:{fval}, key:{key} end:{end} output:\n{output}")
elif equal == False and fval <= expect:
tdLog.exit(f"failed because {fval} <= {expect}, key:{key} end:{end} output:\n{output}")
else:
# succ
if equal:
tdLog.info(f"check successfully. key:{key} expect:{expect} real:{fval}")
else:
tdLog.info(f"check successfully. key:{key} {fval} > {expect}")
def checkAfterRun(self, benchmark, jsonFile, specMode, tbCnt):
# run
cmd = f"{benchmark} -f {jsonFile}"
output, error = self.runSeconds(cmd)
if specMode :
label = "specified_table_query"
else:
label = "super_table_query"
#
# check insert result
#
with open(jsonFile, "r") as file:
data = json.load(file)
queryTimes = data["query_times"]
# contineIfFail
try:
continueIfFail = data["continue_if_fail"]
except:
continueIfFail = "no"
concurrent = data[label]["concurrent"]
sqls = data[label]["sqls"]
# mix
try:
mixedQuery = data[label]["mixed_query"]
except:
mixedQuery = "no"
tdLog.info(f"queryTimes={queryTimes} concurrent={concurrent} mixedQuery={mixedQuery} len(sqls)={len(sqls)} label={label}\n")
totalQueries = 0
threadQueries = 0
if continueIfFail.lower() == "yes":
allEnd = " "
else:
allEnd = "\n"
if specMode and mixedQuery.lower() != "yes":
# spec
threadQueries = queryTimes * concurrent
totalQueries = queryTimes * concurrent * len(sqls)
threadKey = f"complete query with {concurrent} threads and "
qpsKey = "QPS: "
avgKey = "query delay avg: "
minKey = "min:"
else:
# spec mixed or super
if specMode:
# spec
totalQueries = queryTimes * len(sqls)
else:
# super
totalQueries = queryTimes * len(sqls) * tbCnt
threadQueries = totalQueries
nSql = len(sqls)
if specMode and nSql < concurrent :
tdLog.info(f"set concurrent = {nSql} because len(sqls) < concurrent")
concurrent = nSql
threadKey = f"using {concurrent} threads complete query "
qpsKey = ""
avgKey = "avg delay:"
minKey = "min delay:"
items = [
[threadKey, " ", threadQueries, True],
[qpsKey, " ", 5, False], # qps need > 1
[avgKey, "s", 0, False],
[minKey, "s", 0, False],
["max: ", "s", 0, False],
["p90: ", "s", 0, False],
["p95: ", "s", 0, False],
["p99: ", "s", 0, False],
["INFO: Spend ", " ", 0, False],
["completed total queries: ", ",", totalQueries, True],
["the QPS of all threads:", allEnd, 10 , False] # all qps need > 5
]
# check
for item in items:
if len(item[0]) > 0:
self.checkItem(output, item[0], item[1], item[2], item[3])
# native
def threeQueryMode(self, benchmark, tbCnt, tbRow):
# json
args = [
["./tools/benchmark/basic/json/queryModeSpec", True],
["./tools/benchmark/basic/json/queryModeSpecMix", True],
["./tools/benchmark/basic/json/queryModeSuper", False]
]
# native
for arg in args:
self.checkAfterRun(benchmark, arg[0] + ".json", arg[1], tbCnt)
# rest
for arg in args:
self.checkAfterRun(benchmark, arg[0] + "Rest.json", arg[1], tbCnt)
def expectFailed(self, command):
ret = os.system(command)
if ret == 0:
tdLog.exit(f" expect failed but success. command={command}")
else:
tdLog.info(f" expect failed is ok. command={command}")
# check excption
def exceptTest(self, benchmark, tbCnt, tbRow):
# 'specified_table_query' and 'super_table_query' error
self.expectFailed(f"{benchmark} -f ./tools/benchmark/basic/json/queryErrorNoSpecSuper.json")
self.expectFailed(f"{benchmark} -f ./tools/benchmark/basic/json/queryErrorBothSpecSuper.json")
# json format error
self.expectFailed(f"{benchmark} -f ./tools/benchmark/basic/json/queryErrorFormat.json")
def run(self):
tbCnt = 10
tbRow = 1000
benchmark = etool.benchMarkFile()
# insert
command = f"{benchmark} -d test -t {tbCnt} -n {tbRow} -I stmt2 -r 100 -y"
ret = os.system(command)
if ret !=0 :
tdLog.exit(f"exec failed. command={command}")
# query mode test
self.threeQueryMode(benchmark, tbCnt, tbRow)
# exception test
self.exceptTest(benchmark, tbCnt, tbRow);
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -27,8 +27,6 @@ class TDTestCase(TBase):
[TD-11510] taosBenchmark test cases
"""
def run(self):
binPath = etool.benchMarkFile()
os.system(

View File

@ -23,14 +23,12 @@ from frame.caseBase import *
from frame import *
class TDTestCase(TBase):
class TDTestCase:
def caseDescription(self):
"""
[TD-11510] taosBenchmark test cases
"""
def run(self):
binPath = etool.benchMarkFile()
os.system(
@ -46,6 +44,10 @@ class TDTestCase(TBase):
cmd = "%s -f ./tools/benchmark/basic/json/taosc_query.json" % binPath
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
cmd = "%s -f ./tools/benchmark/basic/json/taosc_query1.json" % binPath
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
with open("%s" % "taosc_query_specified-0", "r+") as f1:
for line in f1.readlines():
queryTaosc = line.strip().split()[0]
@ -56,9 +58,13 @@ class TDTestCase(TBase):
queryTaosc = line.strip().split()[0]
assert queryTaosc == "1", "result is %s != expect: 1" % queryTaosc
# split two
cmd = "%s -f ./tools/benchmark/basic/json/rest_query.json" % binPath
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
cmd = "%s -f ./tools/benchmark/basic/json/rest_query1.json" % binPath
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
times = 0
with open("rest_query_super-0", "r+") as f1:

View File

@ -14,10 +14,13 @@ import os
import subprocess
import time
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
import frame
import frame.etool
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase:
@ -27,35 +30,6 @@ class TDTestCase:
"""
return
def init(self, conn, logSql, replicaVar=1):
# comment off by Shuduo for CI self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def getPath(self, tool="taosBenchmark"):
if (platform.system().lower() == 'windows'):
tool = tool + ".exe"
selfPath = os.path.dirname(os.path.realpath(__file__))
if "community" in selfPath:
projPath = selfPath[: selfPath.find("community")]
else:
projPath = selfPath[: selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath):
if (tool) in files:
rootRealPath = os.path.dirname(os.path.realpath(root))
if "packaging" not in rootRealPath:
paths.append(os.path.join(root, tool))
break
if len(paths) == 0:
tdLog.exit("taosBenchmark not found!")
return
else:
tdLog.info("taosBenchmark found in %s" % paths[0])
return paths[0]
def checkDataCorrect(self):
sql = "select count(*) from meters"
tdSql.query(sql)
@ -81,8 +55,8 @@ class TDTestCase:
def run(self):
binPath = self.getPath()
cmd = "%s -f ./5-taos-tools/taosbenchmark/json/stt.json" % binPath
binPath = etool.benchMarkFile()
cmd = "%s -f ./tools/benchmark/basic/json/stt.json" % binPath
tdLog.info("%s" % cmd)
errcode = os.system("%s" % cmd)
if errcode != 0:

View File

@ -12,6 +12,9 @@
# -*- coding: utf-8 -*-
import os
import ast
import re
import frame
import frame.etool
from frame.log import *
@ -19,17 +22,43 @@ from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
import ast
import re
# from assertpy import assert_that
import subprocess
class TDTestCase(TBase):
class TDTestCase:
# pylint: disable=R0201
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
# pylint: disable=R0201
def getPath(self, tool="taosBenchmark"):
selfPath = os.path.dirname(os.path.realpath(__file__))
if "community" in selfPath:
projPath = selfPath[: selfPath.find("community")]
elif "src" in selfPath:
projPath = selfPath[: selfPath.find("src")]
elif "/tools/" in selfPath:
projPath = selfPath[: selfPath.find("/tools/")]
elif "/tests/" in selfPath:
projPath = selfPath[: selfPath.find("/tests/")]
else:
tdLog.info("cannot found %s in path: %s, use system's" % (tool, selfPath))
projPath = "/usr/local/taos/bin/"
paths = []
for root, dummy, files in os.walk(projPath):
if (tool) in files:
rootRealPath = os.path.dirname(os.path.realpath(root))
if "packaging" not in rootRealPath:
paths.append(os.path.join(root, tool))
break
if len(paths) == 0:
return ""
return paths[0]
# 获取taosc接口查询的结果文件中的内容,返回每行数据,并断言数据的第一列内容。
def assertfileDataTaosc(self, filename, expectResult):
@ -81,7 +110,7 @@ class TDTestCase(TBase):
)
def run(self):
binPath = etool.benchMarkFile()
binPath = self.getPath()
if binPath == "":
tdLog.exit("taosBenchmark not found!")
else:
@ -94,6 +123,7 @@ class TDTestCase(TBase):
# taosc query: query specified table and query super table
os.system("%s -f ./tools/benchmark/basic/json/queryInsertdata.json" % binPath)
os.system("%s -f ./tools/benchmark/basic/json/queryTaosc-mixed-query.json" % binPath)
os.system("%s -f ./tools/benchmark/basic/json/queryTaosc-mixed-query1.json" % binPath)
os.system("cat query_res2.txt* > all_query_res2_taosc.txt")
# correct Times testcases
@ -111,6 +141,7 @@ class TDTestCase(TBase):
# use restful api to query
os.system("%s -f ./tools/benchmark/basic/json/queryInsertrestdata.json" % binPath)
os.system("%s -f ./tools/benchmark/basic/json/queryRestful.json" % binPath)
os.system("%s -f ./tools/benchmark/basic/json/queryRestful1.json" % binPath)
os.system("cat query_res2.txt* > all_query_res2_rest.txt")
# correct Times testcases

View File

@ -12,6 +12,11 @@
# -*- coding: utf-8 -*-
import os
import ast
import re
# from assertpy import assert_that
import subprocess
import frame
import frame.etool
from frame.log import *
@ -19,15 +24,38 @@ from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
import ast
import re
# from assertpy import assert_that
import subprocess
class TDTestCase(TBase):
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def getPath(self, tool="taosBenchmark"):
selfPath = os.path.dirname(os.path.realpath(__file__))
if "community" in selfPath:
projPath = selfPath[: selfPath.find("community")]
elif "src" in selfPath:
projPath = selfPath[: selfPath.find("src")]
elif "/tools/" in selfPath:
projPath = selfPath[: selfPath.find("/tools/")]
elif "/tests/" in selfPath:
projPath = selfPath[: selfPath.find("/tests/")]
else:
tdLog.info("cannot found %s in path: %s, use system's" % (tool, selfPath))
projPath = "/usr/local/taos/bin/"
paths = []
for root, dummy, files in os.walk(projPath):
if (tool) in files:
rootRealPath = os.path.dirname(os.path.realpath(root))
if "packaging" not in rootRealPath:
paths.append(os.path.join(root, tool))
break
if len(paths) == 0:
return ""
return paths[0]
# 获取taosc接口查询的结果文件中的内容,返回每行数据,并断言数据的第一列内容。
def assertfileDataTaosc(self, filename, expectResult):
@ -79,7 +107,7 @@ class TDTestCase(TBase):
)
def run(self):
binPath = etool.benchMarkFile()
binPath = self.getPath()
if binPath == "":
tdLog.exit("taosBenchmark not found!")
else:
@ -90,8 +118,11 @@ class TDTestCase(TBase):
os.system("rm -rf ./all_query*")
# taosc query: query specified table and query super table
os.system("%s -f ./tools/benchmark/basic/json/queryInsertdata.json" % binPath)
os.system("%s -f ./tools/benchmark/basic/json/queryTaosc.json" % binPath)
os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath)
os.system("%s -f ./taosbenchmark/json/queryTaosc.json" % binPath)
# forbid parallel spec query with super query
os.system("%s -f ./taosbenchmark/json/queryTaosc1.json" % binPath)
os.system("cat query_res0.txt* > all_query_res0_taosc.txt")
os.system("cat query_res1.txt* > all_query_res1_taosc.txt")
os.system("cat query_res2.txt* > all_query_res2_taosc.txt")
@ -116,8 +147,9 @@ class TDTestCase(TBase):
os.system("rm -rf ./all_query*")
# use restful api to query
os.system("%s -f ./tools/benchmark/basic/json/queryInsertrestdata.json" % binPath)
os.system("%s -f ./tools/benchmark/basic/json/queryRestful.json" % binPath)
os.system("%s -f ./taosbenchmark/json/queryInsertrestdata.json" % binPath)
os.system("%s -f ./taosbenchmark/json/queryRestful.json" % binPath)
os.system("%s -f ./taosbenchmark/json/queryRestful1.json" % binPath)
os.system("cat query_res0.txt* > all_query_res0_rest.txt")
os.system("cat query_res1.txt* > all_query_res1_rest.txt")
os.system("cat query_res2.txt* > all_query_res2_rest.txt")
@ -151,49 +183,51 @@ class TDTestCase(TBase):
# query times less than or equal to 100
assert (
os.system("%s -f ./tools/benchmark/basic/json/queryInsertdata.json" % binPath) == 0
os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath) == 0
)
assert (
os.system("%s -f ./tools/benchmark/basic/json/querySpeciMutisql100.json" % binPath)
os.system("%s -f ./taosbenchmark/json/querySpeciMutisql100.json" % binPath)
!= 0
)
assert (
os.system("%s -f ./tools/benchmark/basic/json/querySuperMutisql100.json" % binPath)
os.system("%s -f ./taosbenchmark/json/querySuperMutisql100.json" % binPath)
== 0
)
# query result print QPS
os.system("%s -f ./tools/benchmark/basic/json/queryInsertdata.json" % binPath)
exceptcode = os.system("%s -f ./tools/benchmark/basic/json/queryQps.json" % binPath)
os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath)
exceptcode = os.system("%s -f ./taosbenchmark/json/queryQps.json" % binPath)
assert exceptcode == 0
exceptcode = os.system("%s -f ./taosbenchmark/json/queryQps1.json" % binPath)
assert exceptcode == 0
# 2021.02.09 need modify taosBenchmakr code
# use illegal or out of range parameters query json file
os.system("%s -f ./tools/benchmark/basic/json/queryInsertdata.json" % binPath)
os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath)
# 2021.02.09 need modify taosBenchmakr code
# exceptcode = os.system(
# "%s -f ./tools/benchmark/basic/json/queryTimes0.json" %
# "%s -f ./taosbenchmark/json/queryTimes0.json" %
# binPath)
# assert exceptcode != 0
# 2021.02.09 need modify taosBenchmakr code
# exceptcode0 = os.system(
# "%s -f ./tools/benchmark/basic/json/queryTimesless0.json" %
# "%s -f ./taosbenchmark/json/queryTimesless0.json" %
# binPath)
# assert exceptcode0 != 0
# exceptcode1 = os.system(
# "%s -f ./tools/benchmark/basic/json/queryConcurrent0.json" %
# "%s -f ./taosbenchmark/json/queryConcurrent0.json" %
# binPath)
# assert exceptcode2 != 0
# exceptcode3 = os.system(
# "%s -f ./tools/benchmark/basic/json/querrThreadsless0.json" %
# "%s -f ./taosbenchmark/json/querrThreadsless0.json" %
# binPath)
# assert exceptcode3 != 0
# exceptcode4 = os.system(
# "%s -f ./tools/benchmark/basic/json/querrThreads0.json" %
# "%s -f ./taosbenchmark/json/querrThreads0.json" %
# binPath)
# assert exceptcode4 != 0

View File

@ -27,9 +27,6 @@ class TDTestCase(TBase):
[TD-11510] taosBenchmark test cases
"""
def run(self):
tdSql.query("select client_version()")
client_ver = "".join(tdSql.res[0])

View File

@ -0,0 +1,33 @@
set -e
count=0
for i in `find benchmark/basic/ -name "*.py"`
do printf ",,y,army,./pytest.sh python3 ./test.py -f tools/$i\n"
((count=count+1))
done
for i in `find benchmark/cloud/ -name "*.py"`
do printf ",,y,army,./pytest.sh python3 ./test.py -f tools/$i\n"
((count=count+1))
done
for i in `find benchmark/ws/ -name "*.py"`
do printf ",,y,army,./pytest.sh python3 ./test.py -f tools/$i\n"
((count=count+1))
done
printf "\nbenchmark count=$count \n"
for i in `find taosdump/native/ -name "*.py"`
do printf ",,y,army,./pytest.sh python3 ./test.py -f tools/$i\n"
((count=count+1))
done
for i in `find taosdump/ws/ -name "*.py"`
do printf ",,y,army,./pytest.sh python3 ./test.py -f tools/$i\n"
((count=count+1))
done
printf "\nall count=$count \n"

View File

@ -648,6 +648,8 @@ typedef struct SpecifiedQueryInfo_S {
TAOS_RES *res[MAX_QUERY_SQL_COUNT];
uint64_t totalQueried;
bool mixed_query;
// error rate
uint64_t totalFail;
} SpecifiedQueryInfo;
typedef struct SuperQueryInfo_S {
@ -669,6 +671,8 @@ typedef struct SuperQueryInfo_S {
TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT];
char ** childTblName;
uint64_t totalQueried;
// error rate
uint64_t totalFail;
} SuperQueryInfo;
typedef struct SQueryMetaInfo_S {
@ -856,14 +860,24 @@ typedef struct SThreadInfo_S {
} threadInfo;
typedef struct SQueryThreadInfo_S {
int start_sql;
int end_sql;
int threadId;
BArray* query_delay_list;
int sockfd;
SBenchConn* conn;
int64_t total_delay;
} queryThreadInfo;
int32_t start_sql;
int32_t end_sql;
int32_t threadID;
BArray* query_delay_list;
int32_t sockfd;
double total_delay;
char filePath[MAX_PATH_LEN];
uint64_t start_table_from;
uint64_t end_table_to;
uint64_t ntables;
uint64_t querySeq;
// error rate
uint64_t nSucc;
uint64_t nFail;
} qThreadInfo;
typedef struct STSmaThreadInfo_S {
char* dbName;
@ -883,6 +897,7 @@ extern bool g_fail;
extern char configDir[];
extern tools_cJSON * root;
extern uint64_t g_memoryUsage;
extern int32_t g_majorVersionOfClient;
#define min(a, b) (((a) < (b)) ? (a) : (b))
#define BARRAY_GET_ELEM(array, index) \
@ -895,11 +910,11 @@ void initArgument();
void queryAggrFunc();
void parseFieldDatatype(char *dataType, BArray *fields, bool isTag);
/* demoJsonOpt.c */
int getInfoFromJsonFile();
int readJsonConfig(char * file);
/* demoUtil.c */
int compare(const void *a, const void *b);
void encodeAuthBase64();
void replaceChildTblName(char *inSql, char *outSql, int tblIndex);
int32_t replaceChildTblName(char *inSql, char *outSql, int tblIndex);
void setupForAnsiEscape(void);
void resetAfterAnsiEscape(void);
char * convertDatatypeToString(int type);
@ -907,7 +922,7 @@ int convertStringToDatatype(char *type, int length);
unsigned int taosRandom();
void tmfree(void *buf);
void tmfclose(FILE *fp);
int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo);
int64_t fetchResult(TAOS_RES *res, char *filePath);
void prompt(bool NonStopMode);
void ERROR_EXIT(const char *msg);
int getServerVersionRest(int16_t rest_port);
@ -927,7 +942,8 @@ int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName,
int64_t childTblCountOfSuperTbl);
void* benchCalloc(size_t nmemb, size_t size, bool record);
BArray* benchArrayInit(size_t size, size_t elemSize);
void* benchArrayPush(BArray* pArray, void* pData);
void* benchArrayPush(BArray* pArray, void* pData); // free pData for auto
void* benchArrayPushNoFree(BArray* pArray, void* pData); // not free pData
void* benchArrayDestroy(BArray* pArray);
void benchArrayClear(BArray* pArray);
void* benchArrayGet(const BArray* pArray, size_t index);
@ -1015,4 +1031,18 @@ bool isRest(int32_t iface);
// get group index about dbname.tbname
int32_t calcGroupIndex(char* dbName, char* tbName, int32_t groupCnt);
// ------------ benchQuery util -------------
void freeSpecialQueryInfo();
// init conn
int32_t initQueryConn(qThreadInfo * pThreadInfo, int iface);
// close conn
void closeQueryConn(qThreadInfo * pThreadInfo, int iface);
void *queryKiller(void *arg);
// kill show
int killSlowQuery();
// fetch super table child name from server
int fetchChildTableName(char *dbName, char *stbName);
#endif // INC_BENCH_H_

View File

@ -16,11 +16,12 @@
#include <benchData.h>
#include <benchInsertMix.h>
static int32_t stmt2BindVProgressive(
static int32_t stmt2BindAndSubmit(
threadInfo *pThreadInfo,
SChildTable *childTbl,
int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1);
int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w);
TAOS_STMT2* initStmt2(TAOS* taos, bool single);
#define FREE_PIDS_INFOS_RETURN_MINUS_1() \
do { \
@ -1710,6 +1711,159 @@ void loadChildTableInfo(threadInfo* pThreadInfo) {
tmfree(buf);
}
// create conn again
int32_t reCreateConn(threadInfo * pThreadInfo) {
// single
bool single = true;
if (pThreadInfo->dbInfo->superTbls->size > 1) {
single = false;
}
//
// retry stmt2 init
//
// stmt2 close
if (pThreadInfo->conn->stmt2) {
taos_stmt2_close(pThreadInfo->conn->stmt2);
pThreadInfo->conn->stmt2 = NULL;
}
// retry stmt2 init , maybe success
pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
if (pThreadInfo->conn->stmt2) {
succPrint("%s", "reCreateConn first taos_stmt2_init() success and return.\n");
return 0;
}
//
// close old
//
closeBenchConn(pThreadInfo->conn);
pThreadInfo->conn = NULL;
//
// create new
//
// conn
pThreadInfo->conn = initBenchConn();
if (pThreadInfo->conn == NULL) {
errorPrint("%s", "reCreateConn initBenchConn failed.");
return -1;
}
// stmt2
pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
if (NULL == pThreadInfo->conn->stmt2) {
errorPrint("reCreateConn taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
return -1;
}
succPrint("%s", "reCreateConn second taos_stmt2_init() success.\n");
// select db
if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
errorPrint("second taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
return -1;
}
return 0;
}
// reinit
int32_t reConnectStmt2(threadInfo * pThreadInfo, int32_t w) {
// re-create connection
int32_t code = reCreateConn(pThreadInfo);
if (code != 0) {
return code;
}
// prepare
code = prepareStmt2(pThreadInfo->conn->stmt2, pThreadInfo->stbInfo, NULL, w);
if (code != 0) {
return code;
}
return code;
}
int32_t submitStmt2Impl(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
int64_t* startTs, int64_t* endTs, uint32_t* generated) {
// call bind
int64_t start = toolsGetTimestampUs();
int32_t code = taos_stmt2_bind_param(pThreadInfo->conn->stmt2, bindv, -1);
if (code != 0) {
errorPrint("taos_stmt2_bind_param failed, reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
return code;
}
debugPrint("interlace taos_stmt2_bind_param() ok. bindv->count=%d \n", bindv->count);
*delay1 += toolsGetTimestampUs() - start;
// execute
*startTs = toolsGetTimestampUs();
code = execInsert(pThreadInfo, *generated, delay3);
*endTs = toolsGetTimestampUs();
return code;
}
int32_t submitStmt2(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
int64_t* startTs, int64_t* endTs, uint32_t* generated, int32_t w) {
// calc loop
int32_t loop = 1;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
if(stbInfo->continueIfFail == YES_IF_FAILED) {
if(stbInfo->keep_trying > 1) {
loop = stbInfo->keep_trying;
} else {
loop = 3; // default
}
}
// submit stmt2
int32_t i = 0;
bool connected = true;
while (1) {
int32_t code = -1;
if(connected) {
// reinit success to do submit
code = submitStmt2Impl(pThreadInfo, bindv, delay1, delay3, startTs, endTs, generated);
}
// check code
if ( code == 0) {
// success
break;
} else {
// failed to try
if (--loop == 0) {
// failed finally
char tip[64] = "";
if (i > 0) {
snprintf(tip, sizeof(tip), " after retry %d", i);
}
errorPrint("finally faild execute submitStmt2()%s\n", tip);
return -1;
}
// wait a memont for trying
toolsMsleep(stbInfo->trying_interval);
// reinit
infoPrint("stmt2 start retry submit i=%d after sleep %d ms...\n", i++, stbInfo->trying_interval);
code = reConnectStmt2(pThreadInfo, w);
if (code != 0) {
// faild and try again
errorPrint("faild reConnectStmt2 and retry again for next i=%d \n", i);
connected = false;
} else {
// succ
connected = true;
}
}
}
// success
return 0;
}
static void *syncWriteInterlace(void *sarg) {
threadInfo * pThreadInfo = (threadInfo *)sarg;
SDataBase * database = pThreadInfo->dbInfo;
@ -2138,28 +2292,26 @@ static void *syncWriteInterlace(void *sarg) {
}
}
// stmt2 bind param
// exec
if(stbInfo->iface == STMT2_IFACE) {
// exec stmt2
if(g_arguments->debug_print)
showBindV(bindv, stbInfo->tags, stbInfo->cols);
// call bind
int64_t start = toolsGetTimestampUs();
if (taos_stmt2_bind_param(pThreadInfo->conn->stmt2, bindv, -1)) {
errorPrint("taos_stmt2_bind_param failed, reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
// bind & exec stmt2
if (submitStmt2(pThreadInfo, bindv, &delay1, &delay3, &startTs, &endTs, &generated, w) != 0) {
g_fail = true;
goto free_of_interlace;
}
debugPrint("succ to call taos_stmt2_bind_param() with interlace mode. interlaceRows=%d bindv->count=%d \n", interlaceRows, bindv->count);
delay1 += toolsGetTimestampUs() - start;
}
// execute
} else {
// exec other
startTs = toolsGetTimestampUs();
if (execInsert(pThreadInfo, generated, &delay3)) {
g_fail = true;
goto free_of_interlace;
}
endTs = toolsGetTimestampUs();
}
debugPrint("execInsert tableIndex=%d left insert rows=%"PRId64" generated=%d\n", i, insertRows, generated);
// reset count
@ -2839,9 +2991,10 @@ void *syncWriteProgressive(void *sarg) {
break;
}
case STMT2_IFACE: {
generated = stmt2BindVProgressive(
generated = stmt2BindAndSubmit(
pThreadInfo,
childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1);
childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1,
&delay3, &startTs, &endTs, w);
break;
}
case SML_REST_IFACE:
@ -2861,7 +3014,10 @@ void *syncWriteProgressive(void *sarg) {
if (!stbInfo->non_stop) {
i += generated;
}
// only measure insert
// stmt2 execInsert already execute on stmt2BindAndSubmit
if (stbInfo->iface != STMT2_IFACE) {
// no stmt2 exec
startTs = toolsGetTimestampUs();
int code = execInsert(pThreadInfo, generated, &delay3);
if (code) {
@ -2890,10 +3046,10 @@ void *syncWriteProgressive(void *sarg) {
}
}
int ret = smartContinueIfFail(
code = smartContinueIfFail(
pThreadInfo,
childTbl, tagData, w, ttl);
if (0 != ret) {
if (0 != code) {
g_fail = true;
goto free_of_progressive;
}
@ -2918,6 +3074,7 @@ void *syncWriteProgressive(void *sarg) {
}
}
endTs = toolsGetTimestampUs() + 1;
}
if (stbInfo->insert_interval > 0) {
debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
@ -2931,7 +3088,7 @@ void *syncWriteProgressive(void *sarg) {
if (database->flush) {
char sql[260] = "";
sprintf(sql, "flush database %s", database->dbName);
code = executeSql(pThreadInfo->conn->taos,sql);
int32_t code = executeSql(pThreadInfo->conn->taos,sql);
if (code != 0) {
perfPrint(" %s failed. error code = 0x%x\n", sql, code);
} else {
@ -3602,6 +3759,9 @@ int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) {
// calc table count per vgroup
for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
if (vgIdx == -1) {
continue;
}
SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
vg->tbCountPerVgId ++;
}
@ -3609,7 +3769,7 @@ int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) {
// malloc vg->childTblArray memory with table count
for (int v = 0; v < database->vgroups; v++) {
SVGroup *vg = benchArrayGet(database->vgArray, v);
infoPrint("Total %"PRId64" tables on %s's vgroup %d (id: %d)\n",
infoPrint("Local hash calc %"PRId64" tables on %s's vgroup %d (id: %d)\n",
vg->tbCountPerVgId, database->dbName, v, vg->vgId);
if (vg->tbCountPerVgId) {
threads++;
@ -3623,6 +3783,9 @@ int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) {
// set vg->childTblArray data
for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
if (vgIdx == -1) {
continue;
}
SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
debugPrint("calc table hash to vgroup %s.%s vgIdx=%d\n",
database->dbName,
@ -3655,8 +3818,13 @@ TAOS_STMT2* initStmt2(TAOS* taos, bool single) {
memset(&op2, 0, sizeof(op2));
op2.singleStbInsert = single;
op2.singleTableBindOnce = single;
infoPrint("initStmt2 call taos_stmt2_init single=%d\n", single);
return taos_stmt2_init(taos, &op2);
TAOS_STMT2* stmt2 = taos_stmt2_init(taos, &op2);
if (stmt2)
succPrint("succ taos_stmt2_init single=%d\n", single);
else
errorPrint("failed taos_stmt2_init single=%d\n", single);
return stmt2;
}
// init insert thread
@ -4613,10 +4781,11 @@ int insertTestProcess() {
// ------- STMT 2 -----------
//
static int32_t stmt2BindVProgressive(
static int32_t stmt2BindAndSubmit(
threadInfo *pThreadInfo,
SChildTable *childTbl,
int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1) {
int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w) {
// create bindV
int32_t count = 1;
@ -4647,8 +4816,13 @@ static int32_t stmt2BindVProgressive(
batch = g_arguments->prepared_rand - pos;
}
int32_t generated = bindVColsProgressive(bindv, 0, pThreadInfo, batch, *timestamp, pos, childTbl, pkCur, pkCnt, &n);
if(generated <= 0) {
if (batch == 0) {
infoPrint("batch size is zero. pos = %"PRId64"\n", pos);
return 0;
}
uint32_t generated = bindVColsProgressive(bindv, 0, pThreadInfo, batch, *timestamp, pos, childTbl, pkCur, pkCnt, &n);
if(generated == 0) {
errorPrint( "get cols data bind information failed. table: %s\n", childTbl->name);
freeBindV(bindv);
return -1;
@ -4659,18 +4833,17 @@ static int32_t stmt2BindVProgressive(
showBindV(bindv, stbInfo->tags, stbInfo->cols);
}
// do bindv
int64_t start = toolsGetTimestampUs();
int32_t ret = taos_stmt2_bind_param(stmt2, bindv, -1);
if(ret != 0) {
errorPrint( "taos_stmt2_bind_param failed, table: %s . engine error: %s\n", childTbl->name, taos_stmt2_error(stmt2));
freeBindV(bindv);
return -1;
}
debugPrint("succ to call taos_stmt2_bind_param() progressive mode. table=%s batch=%d pos=%" PRId64 " ts=%" PRId64 " generated=%d\n",
childTbl->name, batch, pos, *timestamp, generated);
*delay1 = toolsGetTimestampUs() - start;
// bind and submit
int32_t code = submitStmt2(pThreadInfo, bindv, delay1, delay3, startTs, endTs, &generated, w);
// free
freeBindV(bindv);
if(code != 0) {
errorPrint( "failed submitStmt2() progressive mode, table: %s . engine error: %s\n", childTbl->name, taos_stmt2_error(stmt2));
return code;
} else {
debugPrint("succ submitStmt2 progressive mode. table=%s batch=%d pos=%" PRId64 " ts=%" PRId64 " generated=%d\n",
childTbl->name, batch, pos, *timestamp, generated);
return generated;
}
}

View File

@ -260,11 +260,12 @@ static int getColumnAndTagTypeFromInsertJsonFile(
minInDbl = min;
}
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
double valueRange = maxInDbl - minInDbl;
tools_cJSON *dataScalingFactor = tools_cJSON_GetObjectItem(column, "scalingFactor");
if (tools_cJSON_IsNumber(dataScalingFactor)) {
scalingFactor = dataScalingFactor->valueint;
if (scalingFactor > 1) {
if (1< scalingFactor && scalingFactor <= 1000000) {
max = maxInDbl * scalingFactor;
min = minInDbl * scalingFactor;
} else {
@ -279,6 +280,7 @@ static int getColumnAndTagTypeFromInsertJsonFile(
scalingFactor = 1;
}
}
}
// gen
tools_cJSON *dataGen = tools_cJSON_GetObjectItem(column, "gen");
@ -495,11 +497,13 @@ static int getColumnAndTagTypeFromInsertJsonFile(
minInDbl = min;
}
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
double valueRange = maxInDbl - minInDbl;
tools_cJSON *dataScalingFactor = tools_cJSON_GetObjectItem(tagObj, "scalingFactor");
if (tools_cJSON_IsNumber(dataScalingFactor)) {
scalingFactor = dataScalingFactor->valueint;
if (scalingFactor > 1) {
if (1< scalingFactor && scalingFactor <= 1000000) {
max = maxInDbl * scalingFactor;
min = minInDbl * scalingFactor;
} else {
@ -514,6 +518,7 @@ static int getColumnAndTagTypeFromInsertJsonFile(
scalingFactor = 1;
}
}
}
tools_cJSON *dataValues = tools_cJSON_GetObjectItem(tagObj, "values");
@ -1026,11 +1031,8 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) {
superTable->childTblLimit = superTable->childTblCount;
}
} else {
warnPrint("child table limit %"PRId64" is invalid, "
"set to %"PRId64"\n",
childTbl_limit->valueint,
superTable->childTblCount);
superTable->childTblLimit = superTable->childTblCount;
warnPrint("child table limit %"PRId64" is invalid, set to zero. \n",childTbl_limit->valueint);
superTable->childTblLimit = 0;
}
}
tools_cJSON *childTbl_offset =
@ -1038,6 +1040,14 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) {
if (tools_cJSON_IsNumber(childTbl_offset)) {
superTable->childTblOffset = childTbl_offset->valueint;
}
// check limit offset
if( superTable->childTblOffset + superTable->childTblLimit > superTable->childTblCount ) {
errorPrint("json config invalid. childtable_offset(%"PRId64") + childtable_limit(%"PRId64") > childtable_count(%"PRId64")",
superTable->childTblOffset, superTable->childTblLimit, superTable->childTblCount);
return -1;
}
tools_cJSON *childTbl_from =
tools_cJSON_GetObjectItem(stbInfo, "childtable_from");
if (tools_cJSON_IsNumber(childTbl_from)) {
@ -1071,6 +1081,7 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) {
return -1;
}
// read from super table
tools_cJSON *continueIfFail =
tools_cJSON_GetObjectItem(stbInfo, "continue_if_fail"); // yes, no,
if (tools_cJSON_IsString(continueIfFail)) {
@ -1085,6 +1096,9 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) {
continueIfFail->valuestring);
return -1;
}
} else {
// default value is common specialed
superTable->continueIfFail = g_arguments->continueIfFail;
}
// start_fillback_time
@ -1555,6 +1569,7 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) {
}
}
// read from common
tools_cJSON *continueIfFail =
tools_cJSON_GetObjectItem(json, "continue_if_fail"); // yes, no,
if (tools_cJSON_IsString(continueIfFail)) {
@ -1760,79 +1775,8 @@ PARSE_OVER:
return code;
}
static int getMetaFromQueryJsonFile(tools_cJSON *json) {
int32_t code = -1;
tools_cJSON *telnet_tcp_port =
tools_cJSON_GetObjectItem(json, "telnet_tcp_port");
if (tools_cJSON_IsNumber(telnet_tcp_port)) {
g_arguments->telnet_tcp_port = (uint16_t)telnet_tcp_port->valueint;
}
tools_cJSON *gQueryTimes = tools_cJSON_GetObjectItem(json, "query_times");
if (tools_cJSON_IsNumber(gQueryTimes)) {
g_queryInfo.query_times = gQueryTimes->valueint;
} else {
g_queryInfo.query_times = 1;
}
tools_cJSON *gKillSlowQueryThreshold =
tools_cJSON_GetObjectItem(json, "kill_slow_query_threshold");
if (tools_cJSON_IsNumber(gKillSlowQueryThreshold)) {
g_queryInfo.killQueryThreshold = gKillSlowQueryThreshold->valueint;
} else {
g_queryInfo.killQueryThreshold = 0;
}
tools_cJSON *gKillSlowQueryInterval =
tools_cJSON_GetObjectItem(json, "kill_slow_query_interval");
if (tools_cJSON_IsNumber(gKillSlowQueryInterval)) {
g_queryInfo.killQueryInterval = gKillSlowQueryInterval ->valueint;
} else {
g_queryInfo.killQueryInterval = 1; /* by default, interval 1s */
}
tools_cJSON *resetCache =
tools_cJSON_GetObjectItem(json, "reset_query_cache");
if (tools_cJSON_IsString(resetCache)) {
if (0 == strcasecmp(resetCache->valuestring, "yes")) {
g_queryInfo.reset_query_cache = true;
}
} else {
g_queryInfo.reset_query_cache = false;
}
tools_cJSON *respBuffer =
tools_cJSON_GetObjectItem(json, "response_buffer");
if (tools_cJSON_IsNumber(respBuffer)) {
g_queryInfo.response_buffer = respBuffer->valueint;
} else {
g_queryInfo.response_buffer = RESP_BUF_LEN;
}
tools_cJSON *dbs = tools_cJSON_GetObjectItem(json, "databases");
if (tools_cJSON_IsString(dbs)) {
g_queryInfo.dbName = dbs->valuestring;
}
tools_cJSON *queryMode = tools_cJSON_GetObjectItem(json, "query_mode");
if (tools_cJSON_IsString(queryMode)) {
if (0 == strcasecmp(queryMode->valuestring, "rest")) {
g_queryInfo.iface = REST_IFACE;
} else if (0 == strcasecmp(queryMode->valuestring, "taosc")) {
g_queryInfo.iface = TAOSC_IFACE;
} else {
errorPrint("Invalid query_mode value: %s\n",
queryMode->valuestring);
goto PARSE_OVER;
}
}
// init sqls
g_queryInfo.specifiedQueryInfo.sqls = benchArrayInit(1, sizeof(SSQL));
// specified_table_query
tools_cJSON *specifiedQuery =
tools_cJSON_GetObjectItem(json, "specified_table_query");
// Spec Query
int32_t readSpecQueryJson(tools_cJSON * specifiedQuery) {
g_queryInfo.specifiedQueryInfo.concurrent = 1;
if (tools_cJSON_IsObject(specifiedQuery)) {
tools_cJSON *queryInterval =
@ -1858,12 +1802,14 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) {
if (tools_cJSON_IsString(mixedQueryObj)) {
if (0 == strcasecmp(mixedQueryObj->valuestring, "yes")) {
g_queryInfo.specifiedQueryInfo.mixed_query = true;
infoPrint("%s\n","mixed_query is True");
} else if (0 == strcasecmp(mixedQueryObj->valuestring, "no")) {
g_queryInfo.specifiedQueryInfo.mixed_query = false;
infoPrint("%s\n","mixed_query is False");
} else {
errorPrint("Invalid mixed_query value: %s\n",
mixedQueryObj->valuestring);
goto PARSE_OVER;
return -1;
}
}
@ -1945,7 +1891,7 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) {
if (fp == NULL) {
errorPrint("failed to open file: %s\n",
sqlFileObj->valuestring);
goto PARSE_OVER;
return -1;
}
char *buf = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
while (fgets(buf, TSDB_MAX_ALLOWED_SQL_LEN, fp)) {
@ -2030,16 +1976,19 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) {
}
} else {
errorPrint("%s", "Invalid sql in json\n");
goto PARSE_OVER;
return -1;
}
}
}
}
}
// super_table_query
tools_cJSON *superQuery =
tools_cJSON_GetObjectItem(json, "super_table_query");
// succ
return 0;
}
// Super Query
int32_t readSuperQueryJson(tools_cJSON * superQuery) {
g_queryInfo.superQueryInfo.threadCnt = 1;
if (!superQuery || superQuery->type != tools_cJSON_Object) {
g_queryInfo.superQueryInfo.sqlCount = 0;
@ -2165,7 +2114,7 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) {
errorPrint(
"failed to read json, query sql size overflow, max is %d\n",
MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
return -1;
}
g_queryInfo.superQueryInfo.sqlCount = superSqlSize;
@ -2192,11 +2141,118 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) {
}
}
code = 0;
// succ
return 0;
}
PARSE_OVER:
// read query json
static int getMetaFromQueryJsonFile(tools_cJSON *json) {
int32_t code = -1;
// read common
tools_cJSON *telnet_tcp_port =
tools_cJSON_GetObjectItem(json, "telnet_tcp_port");
if (tools_cJSON_IsNumber(telnet_tcp_port)) {
g_arguments->telnet_tcp_port = (uint16_t)telnet_tcp_port->valueint;
}
tools_cJSON *gQueryTimes = tools_cJSON_GetObjectItem(json, "query_times");
if (tools_cJSON_IsNumber(gQueryTimes)) {
g_queryInfo.query_times = gQueryTimes->valueint;
} else {
g_queryInfo.query_times = 1;
}
tools_cJSON *gKillSlowQueryThreshold =
tools_cJSON_GetObjectItem(json, "kill_slow_query_threshold");
if (tools_cJSON_IsNumber(gKillSlowQueryThreshold)) {
g_queryInfo.killQueryThreshold = gKillSlowQueryThreshold->valueint;
} else {
g_queryInfo.killQueryThreshold = 0;
}
tools_cJSON *gKillSlowQueryInterval =
tools_cJSON_GetObjectItem(json, "kill_slow_query_interval");
if (tools_cJSON_IsNumber(gKillSlowQueryInterval)) {
g_queryInfo.killQueryInterval = gKillSlowQueryInterval->valueint;
} else {
g_queryInfo.killQueryInterval = 1; /* by default, interval 1s */
}
tools_cJSON *resetCache =
tools_cJSON_GetObjectItem(json, "reset_query_cache");
if (tools_cJSON_IsString(resetCache)) {
if (0 == strcasecmp(resetCache->valuestring, "yes")) {
g_queryInfo.reset_query_cache = true;
}
} else {
g_queryInfo.reset_query_cache = false;
}
tools_cJSON *respBuffer =
tools_cJSON_GetObjectItem(json, "response_buffer");
if (tools_cJSON_IsNumber(respBuffer)) {
g_queryInfo.response_buffer = respBuffer->valueint;
} else {
g_queryInfo.response_buffer = RESP_BUF_LEN;
}
tools_cJSON *dbs = tools_cJSON_GetObjectItem(json, "databases");
if (tools_cJSON_IsString(dbs)) {
g_queryInfo.dbName = dbs->valuestring;
}
tools_cJSON *queryMode = tools_cJSON_GetObjectItem(json, "query_mode");
if (tools_cJSON_IsString(queryMode)) {
if (0 == strcasecmp(queryMode->valuestring, "rest")) {
g_queryInfo.iface = REST_IFACE;
} else if (0 == strcasecmp(queryMode->valuestring, "taosc")) {
g_queryInfo.iface = TAOSC_IFACE;
} else {
errorPrint("Invalid query_mode value: %s\n",
queryMode->valuestring);
return -1;
}
}
// init sqls
g_queryInfo.specifiedQueryInfo.sqls = benchArrayInit(1, sizeof(SSQL));
// specified_table_query
tools_cJSON *specifiedQuery = tools_cJSON_GetObjectItem(json, "specified_table_query");
if (specifiedQuery) {
code = readSpecQueryJson(specifiedQuery);
if(code) {
errorPrint("failed to readSpecQueryJson code=%d \n", code);
return code;
}
}
// super_table_query
tools_cJSON *superQuery = tools_cJSON_GetObjectItem(json, "super_table_query");
if (superQuery) {
code = readSuperQueryJson(superQuery);
if(code) {
errorPrint("failed to readSuperQueryJson code=%d \n", code);
return code;
}
}
// only have one
const char* errType = "json config invalid:";
if (specifiedQuery && superQuery) {
errorPrint("%s only appear one for 'specified_table_query' and 'super_table_query' \n", errType);
return -1;
}
// must have one
if (specifiedQuery == NULL && superQuery == NULL ) {
errorPrint("%s must have one for 'specified_table_query' or 'super_table_query' \n", errType);
return -1;
}
// succ
return 0;
}
#ifdef TD_VER_COMPATIBLE_3_0_0_0
static int getMetaFromTmqJsonFile(tools_cJSON *json) {
@ -2368,8 +2424,7 @@ TMQ_PARSE_OVER:
}
#endif
int getInfoFromJsonFile() {
char * file = g_arguments->metaFile;
int readJsonConfig(char * file) {
int32_t code = -1;
FILE * fp = fopen(file, "r");
if (!fp) {
@ -2420,20 +2475,18 @@ int getInfoFromJsonFile() {
// read common item
code = getMetaFromCommonJsonFile(root);
if (INSERT_TEST == g_arguments->test_mode || CSVFILE_TEST == g_arguments->test_mode) {
// insert
code = getMetaFromInsertJsonFile(root);
#ifdef TD_VER_COMPATIBLE_3_0_0_0
} else if (QUERY_TEST == g_arguments->test_mode) {
#else
} else {
#endif
// query
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
code = getMetaFromQueryJsonFile(root);
#ifdef TD_VER_COMPATIBLE_3_0_0_0
} else if (SUBSCRIBE_TEST == g_arguments->test_mode) {
// subscribe
memset(&g_tmqInfo, 0, sizeof(STmqMetaInfo));
code = getMetaFromTmqJsonFile(root);
#endif
}
PARSE_OVER:
free(content);
fclose(fp);

View File

@ -25,7 +25,7 @@ tools_cJSON* root;
#define CLIENT_INFO_LEN 20
static char g_client_info[CLIENT_INFO_LEN] = {0};
int g_majorVersionOfClient = 0;
int32_t g_majorVersionOfClient = 0;
// set flag if command passed, see ARG_OPT_ ???
uint64_t g_argFlag = 0;
@ -43,11 +43,6 @@ void* benchCancelHandler(void* arg) {
g_arguments->terminate = true;
toolsMsleep(10);
if (g_arguments->in_prompt || INSERT_TEST != g_arguments->test_mode) {
toolsMsleep(100);
postFreeResource();
exit(EXIT_SUCCESS);
}
return NULL;
}
#endif
@ -128,7 +123,11 @@ int main(int argc, char* argv[]) {
#endif
if (g_arguments->metaFile) {
g_arguments->totalChildTables = 0;
if (getInfoFromJsonFile()) exit(EXIT_FAILURE);
if (readJsonConfig(g_arguments->metaFile)) {
errorPrint("failed to readJsonConfig %s\n", g_arguments->metaFile);
exitLog();
return -1;
}
} else {
modifyArgument();
}

File diff suppressed because it is too large Load Diff

View File

@ -23,7 +23,7 @@ static void stable_sub_callback(TAOS_SUB *tsub, TAOS_RES *res, void *param,
return;
}
if (param) fetchResult(res, (threadInfo *)param);
if (param) fetchResult(res, ((threadInfo *)param)->filePath);
// tao_unsubscribe() will free result.
}
@ -35,7 +35,7 @@ static void specified_sub_callback(TAOS_SUB *tsub, TAOS_RES *res, void *param,
return;
}
if (param) fetchResult(res, (threadInfo *)param);
if (param) fetchResult(res, ((threadInfo *)param)->filePath);
// tao_unsubscribe() will free result.
}
@ -127,7 +127,7 @@ static void *specifiedSubscribe(void *sarg) {
}
fetchResult(
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID],
pThreadInfo);
pThreadInfo->filePath);
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID]++;
if ((g_queryInfo.specifiedQueryInfo
@ -247,7 +247,7 @@ static void *superSubscribe(void *sarg) {
.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
fetchResult(res, pThreadInfo);
fetchResult(res, pThreadInfo->filePath);
consumed[tsubSeq]++;
if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1) &&

View File

@ -240,19 +240,24 @@ static void appendResultBufToFile(char *resultBuf, char * filePath) {
tmfclose(fp);
}
void replaceChildTblName(char *inSql, char *outSql, int tblIndex) {
char sourceString[32] = "xxxx";
char *pos = strstr(inSql, sourceString);
if (0 == pos) return;
int32_t replaceChildTblName(char *inSql, char *outSql, int tblIndex) {
// child table mark
char mark[32] = "xxxx";
char *pos = strstr(inSql, mark);
if (0 == pos) {
errorPrint("sql format error, sql not found mark string '%s'", mark);
return -1;
}
char subTblName[TSDB_TABLE_NAME_LEN];
snprintf(subTblName, TSDB_TABLE_NAME_LEN,
"%s.%s", g_queryInfo.dbName,
"`%s`.%s", g_queryInfo.dbName,
g_queryInfo.superQueryInfo.childTblName[tblIndex]);
tstrncpy(outSql, inSql, pos - inSql + 1);
snprintf(outSql + strlen(outSql), TSDB_MAX_ALLOWED_SQL_LEN -1,
"%s%s", subTblName, pos + strlen(sourceString));
snprintf(outSql + (pos - inSql), TSDB_MAX_ALLOWED_SQL_LEN - 1,
"%s%s", subTblName, pos + strlen(mark));
return 0;
}
int64_t toolsGetTimestamp(int32_t precision) {
@ -536,13 +541,20 @@ int postProceSqlImpl(char *sqlstr, char* dbName, int precision, int iface,
do {
bytes = recv(sockfd, responseBuf + received,
resp_len - received, 0);
if (bytes <= 0) {
errorPrint("%s", "reading no response from socket\n");
goto free_of_postImpl;
}
responseBuf[resp_len] = 0;
debugPrint("response buffer: %s\n", responseBuf);
debugPrint("response buffer: %s bytes=%d\n", responseBuf, bytes);
if (NULL != strstr(responseBuf, resEncodingChunk)) {
chunked = true;
}
int64_t index = strlen(responseBuf) - 1;
while (responseBuf[index] == '\n' || responseBuf[index] == '\r') {
if (index == 0) {
break;
}
index--;
}
debugPrint("index: %" PRId64 "\n", index);
@ -555,11 +567,6 @@ int postProceSqlImpl(char *sqlstr, char* dbName, int precision, int iface,
break;
}
if (bytes <= 0) {
errorPrint("%s", "reading no response from socket\n");
goto free_of_postImpl;
}
received += bytes;
if (g_arguments->test_mode == INSERT_TEST) {
@ -820,14 +827,14 @@ free_of_post:
}
// fetch result fo file or nothing
int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) {
int64_t fetchResult(TAOS_RES *res, char * filePath) {
TAOS_ROW row = NULL;
int num_fields = 0;
int64_t totalLen = 0;
TAOS_FIELD *fields = 0;
int64_t rows = 0;
char *databuf = NULL;
bool toFile = strlen(pThreadInfo->filePath) > 0;
bool toFile = strlen(filePath) > 0;
if(toFile) {
@ -841,7 +848,7 @@ int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) {
if (toFile) {
if (totalLen >= (FETCH_BUFFER_SIZE - HEAD_BUFF_LEN * 2)) {
// buff is full
appendResultBufToFile(databuf, pThreadInfo->filePath);
appendResultBufToFile(databuf, filePath);
totalLen = 0;
memset(databuf, 0, FETCH_BUFFER_SIZE);
}
@ -860,7 +867,7 @@ int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) {
// end
if (toFile) {
appendResultBufToFile(databuf, pThreadInfo->filePath);
appendResultBufToFile(databuf, filePath);
free(databuf);
}
return rows;
@ -1102,7 +1109,7 @@ static int32_t benchArrayEnsureCap(BArray* pArray, size_t newCap) {
}
void* benchArrayAddBatch(BArray* pArray, void* pData, int32_t elems, bool free) {
if (pData == NULL) {
if (pData == NULL || elems <=0) {
return NULL;
}
@ -1123,6 +1130,11 @@ FORCE_INLINE void* benchArrayPush(BArray* pArray, void* pData) {
return benchArrayAddBatch(pArray, pData, 1, true);
}
FORCE_INLINE void* benchArrayPushNoFree(BArray* pArray, void* pData) {
return benchArrayAddBatch(pArray, pData, 1, false);
}
void* benchArrayDestroy(BArray* pArray) {
if (pArray) {
tmfree(pArray->pData);
@ -1302,7 +1314,7 @@ void destroySockFd(int sockfd) {
FORCE_INLINE void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) {
char buff[512];
char *msg = cmd;
if (strlen(cmd) > sizeof(msg)) {
if (strlen(cmd) >= sizeof(buff)) {
memcpy(buff, cmd, 500);
buff[500] = 0;
strcat(buff, "...");
@ -1470,6 +1482,10 @@ void showBindV(TAOS_STMT2_BINDV *bindv, BArray *tags, BArray *cols) {
uint32_t MurmurHash3_32(const char *key, uint32_t len);
// get group index about dbname.tbname
int32_t calcGroupIndex(char* dbName, char* tbName, int32_t groupCnt) {
// check valid
if (dbName == NULL || tbName == NULL) {
return -1;
}
char key[1024];
snprintf(key, sizeof(key), "1.%s.%s", dbName, tbName);
uint32_t hash = MurmurHash3_32(key, strlen(key));
@ -1544,3 +1560,205 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len) {
return h1;
}
#endif
//
// ---------------- benchQuery util ----------------------
//
// init conn
int32_t initQueryConn(qThreadInfo * pThreadInfo, int iface) {
// create conn
if (iface == REST_IFACE) {
int sockfd = createSockFd();
if (sockfd < 0) {
return -1;
}
pThreadInfo->sockfd = sockfd;
} else {
pThreadInfo->conn = initBenchConn();
if (pThreadInfo->conn == NULL) {
return -1;
}
}
return 0;
}
// close conn
void closeQueryConn(qThreadInfo * pThreadInfo, int iface) {
if (iface == REST_IFACE) {
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
} else {
closeBenchConn(pThreadInfo->conn);
pThreadInfo->conn = NULL;
}
}
// free g_queryInfo.specailQueryInfo memory , can re-call
void freeSpecialQueryInfo() {
// can re-call
if (g_queryInfo.specifiedQueryInfo.sqls == NULL) {
return;
}
// loop free each item memory
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
tmfree(sql->command);
tmfree(sql->delay_list);
}
// free Array
benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);
g_queryInfo.specifiedQueryInfo.sqls = NULL;
}
#define KILLID_LEN 64
void *queryKiller(void *arg) {
char host[MAX_HOSTNAME_LEN] = {0};
tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN);
while (true) {
TAOS *taos = taos_connect(g_arguments->host, g_arguments->user,
g_arguments->password, NULL, g_arguments->port);
if (NULL == taos) {
errorPrint("Slow query killer thread "
"failed to connect to the server %s\n",
g_arguments->host);
return NULL;
}
char command[TSDB_MAX_ALLOWED_SQL_LEN] =
"SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries";
TAOS_RES *res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code) {
printErrCmdCodeStr(command, code, res);
}
TAOS_ROW row = NULL;
while ((row = taos_fetch_row(res)) != NULL) {
int32_t *lengths = taos_fetch_lengths(res);
if (lengths[0] <= 0) {
infoPrint("No valid query found by %s\n", command);
} else {
int64_t execUSec = *(int64_t*)row[1];
if (execUSec > g_queryInfo.killQueryThreshold * 1000000) {
char sql[SHORT_1K_SQL_BUFF_LEN] = {0};
tstrncpy(sql, (char*)row[2],
min(strlen((char*)row[2])+1,
SHORT_1K_SQL_BUFF_LEN));
char killId[KILLID_LEN] = {0};
tstrncpy(killId, (char*)row[0],
min(strlen((char*)row[0])+1, KILLID_LEN));
char killCommand[KILLID_LEN + 32] = {0};
snprintf(killCommand, sizeof(killCommand), "KILL QUERY '%s'", killId);
TAOS_RES *resKill = taos_query(taos, killCommand);
int32_t codeKill = taos_errno(resKill);
if (codeKill) {
printErrCmdCodeStr(killCommand, codeKill, resKill);
} else {
infoPrint("%s succeed, sql: %s killed!\n",
killCommand, sql);
taos_free_result(resKill);
}
}
}
}
taos_free_result(res);
taos_close(taos);
toolsMsleep(g_queryInfo.killQueryInterval*1000);
}
return NULL;
}
// kill show
int killSlowQuery() {
pthread_t pidKiller = {0};
int32_t ret = pthread_create(&pidKiller, NULL, queryKiller, NULL);
if (ret != 0) {
errorPrint("pthread_create failed create queryKiller thread. error code =%d \n", ret);
return -1;
}
pthread_join(pidKiller, NULL);
toolsMsleep(1000);
return 0;
}
// fetch super table child name from server
int fetchChildTableName(char *dbName, char *stbName) {
SBenchConn* conn = initBenchConn();
if (conn == NULL) {
return -1;
}
// get child count
char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
if (3 == g_majorVersionOfClient) {
snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
"SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM `%s`.`%s`)",
dbName, stbName);
} else {
snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
"SELECT COUNT(TBNAME) FROM `%s`.`%s`",
dbName, stbName);
}
TAOS_RES *res = taos_query(conn->taos, cmd);
int32_t code = taos_errno(res);
if (code) {
printErrCmdCodeStr(cmd, code, res);
closeBenchConn(conn);
return -1;
}
TAOS_ROW row = NULL;
int num_fields = taos_num_fields(res);
TAOS_FIELD *fields = taos_fetch_fields(res);
while ((row = taos_fetch_row(res)) != NULL) {
if (0 == strlen((char *)(row[0]))) {
errorPrint("stable %s have no child table\n", stbName);
taos_free_result(res);
closeBenchConn(conn);
return -1;
}
char temp[256] = {0};
taos_print_row(temp, row, fields, num_fields);
// set child table count
g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp);
}
infoPrint("%s's childTblCount: %" PRId64 "\n", stbName, g_queryInfo.superQueryInfo.childTblCount);
taos_free_result(res);
// malloc memory with child table count
g_queryInfo.superQueryInfo.childTblName =
benchCalloc(g_queryInfo.superQueryInfo.childTblCount,
sizeof(char *), false);
// fetch child table name
if (getAllChildNameOfSuperTable(
conn->taos, dbName, stbName,
g_queryInfo.superQueryInfo.childTblName,
g_queryInfo.superQueryInfo.childTblCount)) {
// faild
tmfree(g_queryInfo.superQueryInfo.childTblName);
closeBenchConn(conn);
return -1;
}
closeBenchConn(conn);
// succ
return 0;
}