From 772202b84e79b191f9d87d6d0ad50d507aee64e0 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sun, 2 Feb 2025 17:57:38 +0800 Subject: [PATCH] feat: merge taos-tools 3.0 latest code --- tests/army/tools/benchmark/basic/bugs.py | 25 +- .../tools/benchmark/basic/from-to-continue.py | 4 - tests/army/tools/benchmark/basic/insertMix.py | 44 +- .../tools/benchmark/basic/json/TD-32846.json | 4 +- .../basic/json/queryErrorBothSpecSuper.json | 35 + .../basic/json/queryErrorFormat.json | 27 + .../basic/json/queryErrorNoSpecSuper.json | 12 + .../benchmark/basic/json/queryModeSpec.json | 24 + .../basic/json/queryModeSpecMix.json | 25 + .../basic/json/queryModeSpecMixRest.json | 28 + .../basic/json/queryModeSpecRest.json | 27 + .../benchmark/basic/json/queryModeSuper.json | 27 + .../basic/json/queryModeSuperRest.json | 24 + .../tools/benchmark/basic/json/queryQps.json | 13 +- .../tools/benchmark/basic/json/queryQps1.json | 22 + .../benchmark/basic/json/queryRestful.json | 11 - .../benchmark/basic/json/queryRestful1.json | 24 + .../basic/json/querySpeciMutisql100.json | 13 +- .../basic/json/queryTaosc-mixed-query.json | 11 - .../basic/json/queryTaosc-mixed-query1.json | 23 + .../benchmark/basic/json/queryTaosc.json | 11 - .../benchmark/basic/json/queryTaosc1.json | 23 + .../benchmark/basic/json/rest_query.json | 9 - .../benchmark/basic/json/rest_query1.json | 18 + .../basic/json/taosc_query-error-sqlfile.json | 2 +- .../json/taosc_query-kill-slow-query.json | 2 +- .../basic/json/taosc_query-sqlfile.json | 2 +- .../benchmark/basic/json/taosc_query.json | 11 - .../benchmark/basic/json/taosc_query1.json | 24 + tests/army/tools/benchmark/basic/queryMain.py | 260 ++++ .../basic/query_json-with-sqlfile.py | 2 - .../army/tools/benchmark/basic/query_json.py | 12 +- tests/army/tools/benchmark/basic/stt.py | 44 +- .../taosdemoTestQueryWithJson-mixed-query.py | 39 +- .../basic/taosdemoTestQueryWithJson.py | 78 +- .../army/tools/benchmark/basic/telnet_tcp.py | 3 - tests/army/tools/gencase.sh | 33 + tools/taos-tools/inc/bench.h | 52 +- tools/taos-tools/src/benchInsert.c | 347 +++-- tools/taos-tools/src/benchJsonOpt.c | 309 ++-- tools/taos-tools/src/benchMain.c | 13 +- tools/taos-tools/src/benchQuery.c | 1357 +++++++++-------- tools/taos-tools/src/benchSubscribe.c | 8 +- tools/taos-tools/src/benchUtil.c | 258 +++- 44 files changed, 2241 insertions(+), 1099 deletions(-) create mode 100644 tests/army/tools/benchmark/basic/json/queryErrorBothSpecSuper.json create mode 100644 tests/army/tools/benchmark/basic/json/queryErrorFormat.json create mode 100644 tests/army/tools/benchmark/basic/json/queryErrorNoSpecSuper.json create mode 100644 tests/army/tools/benchmark/basic/json/queryModeSpec.json create mode 100644 tests/army/tools/benchmark/basic/json/queryModeSpecMix.json create mode 100644 tests/army/tools/benchmark/basic/json/queryModeSpecMixRest.json create mode 100644 tests/army/tools/benchmark/basic/json/queryModeSpecRest.json create mode 100644 tests/army/tools/benchmark/basic/json/queryModeSuper.json create mode 100644 tests/army/tools/benchmark/basic/json/queryModeSuperRest.json create mode 100644 tests/army/tools/benchmark/basic/json/queryQps1.json create mode 100644 tests/army/tools/benchmark/basic/json/queryRestful1.json create mode 100644 tests/army/tools/benchmark/basic/json/queryTaosc-mixed-query1.json create mode 100644 tests/army/tools/benchmark/basic/json/queryTaosc1.json create mode 100644 tests/army/tools/benchmark/basic/json/rest_query1.json create mode 100644 tests/army/tools/benchmark/basic/json/taosc_query1.json create mode 100644 tests/army/tools/benchmark/basic/queryMain.py create mode 100644 tests/army/tools/gencase.sh diff --git a/tests/army/tools/benchmark/basic/bugs.py b/tests/army/tools/benchmark/basic/bugs.py index 2d5b0c46fc..aaf3312715 100644 --- a/tests/army/tools/benchmark/basic/bugs.py +++ b/tests/army/tools/benchmark/basic/bugs.py @@ -12,7 +12,6 @@ # -*- coding: utf-8 -*- import os import json - import frame import frame.etool from frame.log import * @@ -31,12 +30,28 @@ def removeQuotation(origin): return value -class TDTestCase(TBase): +class TDTestCase: def caseDescription(self): """ [TD-11510] taosBenchmark test cases """ + def benchmarkQuery(self, benchmark, jsonFile, keys, options=""): + # exe insert + result = "query.log" + os.system(f"rm -f {result}") + cmd = f"{benchmark} {options} -f {jsonFile} >> {result}" + os.system(cmd) + tdLog.info(cmd) + with open(result) as file: + content = file.read() + for key in keys: + if content.find(key) == -1: + tdLog.exit(f"not found key: {key} in content={content}") + else: + tdLog.info(f"found key:{key} successful.") + + def testBenchmarkJson(self, benchmark, jsonFile, options="", checkStep=False): # exe insert cmd = f"{benchmark} {options} -f {jsonFile}" @@ -111,6 +126,11 @@ class TDTestCase(TBase): self.testBenchmarkJson(benchmark, "./tools/benchmark/basic/json/TS-5234-1.json") self.testBenchmarkJson(benchmark, "./tools/benchmark/basic/json/TS-5234-2.json") self.testBenchmarkJson(benchmark, "./tools/benchmark/basic/json/TS-5234-3.json") + # TS-5846 + keys = ["completed total queries: 40"] + self.benchmarkQuery(benchmark, "./tools/benchmark/basic/json/TS-5846-Query.json", keys) + keys = ["completed total queries: 20"] + self.benchmarkQuery(benchmark, "./tools/benchmark/basic/json/TS-5846-Mixed-Query.json", keys) # bugs td def bugsTD(self, benchmark): @@ -133,7 +153,6 @@ class TDTestCase(TBase): def run(self): benchmark = etool.benchMarkFile() - # ts self.bugsTS(benchmark) diff --git a/tests/army/tools/benchmark/basic/from-to-continue.py b/tests/army/tools/benchmark/basic/from-to-continue.py index 6402525215..281fb511e3 100644 --- a/tests/army/tools/benchmark/basic/from-to-continue.py +++ b/tests/army/tools/benchmark/basic/from-to-continue.py @@ -27,8 +27,6 @@ class TDTestCase(TBase): [TD-20424] taosBenchmark insert child table from and to test cases """ - - def run(self): binPath = etool.benchMarkFile() cmd = "%s -t 6 -n 1 -y" % binPath @@ -41,7 +39,6 @@ class TDTestCase(TBase): tdSql.query("select count(*) from test.meters") tdSql.checkData(0, 0, 6 + 3) - binPath = etool.benchMarkFile() cmd = "%s -t 5 -n 1 -y" % binPath tdLog.info("%s" % cmd) os.system("%s" % cmd) @@ -54,7 +51,6 @@ class TDTestCase(TBase): tdSql.query("select count(*) from test.meters") tdSql.checkData(0, 0, 5 + 3) - binPath = etool.benchMarkFile() cmd = "%s -t 4 -n 1 -y" % binPath tdLog.info("%s" % cmd) os.system("%s" % cmd) diff --git a/tests/army/tools/benchmark/basic/insertMix.py b/tests/army/tools/benchmark/basic/insertMix.py index b4046b8c98..f9c08edf06 100644 --- a/tests/army/tools/benchmark/basic/insertMix.py +++ b/tests/army/tools/benchmark/basic/insertMix.py @@ -14,10 +14,13 @@ import os import subprocess import time -from util.log import * -from util.cases import * -from util.sql import * -from util.dnodes import * +import frame +import frame.etool +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * class TDTestCase: @@ -26,35 +29,6 @@ class TDTestCase: [TD-13823] taosBenchmark test cases """ return - - def init(self, conn, logSql, replicaVar=1): - # comment off by Shuduo for CI self.replicaVar = int(replicaVar) - tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor(), logSql) - - def getPath(self, tool="taosBenchmark"): - if (platform.system().lower() == 'windows'): - tool = tool + ".exe" - selfPath = os.path.dirname(os.path.realpath(__file__)) - - if "community" in selfPath: - projPath = selfPath[: selfPath.find("community")] - else: - projPath = selfPath[: selfPath.find("tests")] - - paths = [] - for root, dirs, files in os.walk(projPath): - if (tool) in files: - rootRealPath = os.path.dirname(os.path.realpath(root)) - if "packaging" not in rootRealPath: - paths.append(os.path.join(root, tool)) - break - if len(paths) == 0: - tdLog.exit("taosBenchmark not found!") - return - else: - tdLog.info("taosBenchmark found in %s" % paths[0]) - return paths[0] def checkDataCorrect(self): sql = "select count(*) from meters" @@ -86,8 +60,8 @@ class TDTestCase: def run(self): - binPath = self.getPath() - cmd = "%s -f ./5-taos-tools/taosbenchmark/json/insertMix.json" % binPath + binPath = etool.benchMarkFile() + cmd = "%s -f ./tools/benchmark/basic//json/insertMix.json" % binPath tdLog.info("%s" % cmd) errcode = os.system("%s" % cmd) if errcode != 0: diff --git a/tests/army/tools/benchmark/basic/json/TD-32846.json b/tests/army/tools/benchmark/basic/json/TD-32846.json index cd8b87e480..309ba40fa0 100644 --- a/tests/army/tools/benchmark/basic/json/TD-32846.json +++ b/tests/army/tools/benchmark/basic/json/TD-32846.json @@ -29,7 +29,7 @@ "super_tables": [{ "name": "product", "child_table_exists": "no", - "childtable_count": 9, + "childtable_count": 5, "childtable_prefix": "d", "auto_create_table": "yes", "batch_create_tbl_num": 10, @@ -37,7 +37,7 @@ "insert_mode": "stmt2", "non_stop_mode": "no", "line_protocol": "line", - "insert_rows": 10340, + "insert_rows": 1034, "childtable_limit": 0, "childtable_offset": 0, "interlace_rows": 0, diff --git a/tests/army/tools/benchmark/basic/json/queryErrorBothSpecSuper.json b/tests/army/tools/benchmark/basic/json/queryErrorBothSpecSuper.json new file mode 100644 index 0000000000..48b9de4911 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryErrorBothSpecSuper.json @@ -0,0 +1,35 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "taosc", + "specified_table_query": { + "concurrent": 3, + "sqls": [ + { + "sql": "select last_row(*) from meters" + }, + { + "sql": "select count(*) from d0", + "result": "./query_res1.txt" + } + ] + }, + "super_table_query": { + "stblname": "meters", + "concurrent": 3, + "query_interval": 1, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } +} diff --git a/tests/army/tools/benchmark/basic/json/queryErrorFormat.json b/tests/army/tools/benchmark/basic/json/queryErrorFormat.json new file mode 100644 index 0000000000..07bfd6abf9 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryErrorFormat.json @@ -0,0 +1,27 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "continue_if_fail": "yes", + "databases": "test", + "query_times": 100, + "query_mode": "taosc", + "super_table_query": + "stblname": "meters", + "concurrent": 3, + "query_interval": 1, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + }, + { + "sql": "select count(ts) from xxxx" + } + ] + } +} diff --git a/tests/army/tools/benchmark/basic/json/queryErrorNoSpecSuper.json b/tests/army/tools/benchmark/basic/json/queryErrorNoSpecSuper.json new file mode 100644 index 0000000000..71dc16d5d7 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryErrorNoSpecSuper.json @@ -0,0 +1,12 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "taosc" +} diff --git a/tests/army/tools/benchmark/basic/json/queryModeSpec.json b/tests/army/tools/benchmark/basic/json/queryModeSpec.json new file mode 100644 index 0000000000..533cb05c2b --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryModeSpec.json @@ -0,0 +1,24 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "taosc", + "specified_table_query": { + "concurrent": 3, + "sqls": [ + { + "sql": "select last_row(*) from meters" + }, + { + "sql": "select count(*) from d0", + "result": "./query_res1.txt" + } + ] + } +} diff --git a/tests/army/tools/benchmark/basic/json/queryModeSpecMix.json b/tests/army/tools/benchmark/basic/json/queryModeSpecMix.json new file mode 100644 index 0000000000..39a9593d2e --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryModeSpecMix.json @@ -0,0 +1,25 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "taosc", + "specified_table_query": { + "concurrent": 4, + "mixed_query": "yes", + "sqls": [ + { + "sql": "select last_row(*) from meters" + }, + { + "sql": "select count(*) from d0", + "result": "./query_res1.txt" + } + ] + } +} diff --git a/tests/army/tools/benchmark/basic/json/queryModeSpecMixRest.json b/tests/army/tools/benchmark/basic/json/queryModeSpecMixRest.json new file mode 100644 index 0000000000..5d962f1354 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryModeSpecMixRest.json @@ -0,0 +1,28 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "rest", + "specified_table_query": { + "concurrent": 3, + "mixed_query": "yes", + "sqls": [ + { + "sql": "select last_row(*) from meters" + }, + { + "sql": "select count(*) from d0", + "result": "./query_res1.txt" + }, + { + "sql": "select count(*) from meters" + } + ] + } +} diff --git a/tests/army/tools/benchmark/basic/json/queryModeSpecRest.json b/tests/army/tools/benchmark/basic/json/queryModeSpecRest.json new file mode 100644 index 0000000000..5b00ee6479 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryModeSpecRest.json @@ -0,0 +1,27 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "test", + "query_times": 100, + "query_mode": "rest", + "specified_table_query": { + "concurrent": 3, + "sqls": [ + { + "sql": "select last_row(*) from meters" + }, + { + "sql": "select count(*) from d0", + "result": "./query_res1.txt" + }, + { + "sql": "select count(*) from meters" + } + ] + } +} diff --git a/tests/army/tools/benchmark/basic/json/queryModeSuper.json b/tests/army/tools/benchmark/basic/json/queryModeSuper.json new file mode 100644 index 0000000000..9d20154d49 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryModeSuper.json @@ -0,0 +1,27 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "continue_if_fail": "yes", + "databases": "test", + "query_times": 100, + "query_mode": "taosc", + "super_table_query": { + "stblname": "meters", + "concurrent": 3, + "query_interval": 0, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + }, + { + "sql": "select count(ts) from xxxx" + } + ] + } +} diff --git a/tests/army/tools/benchmark/basic/json/queryModeSuperRest.json b/tests/army/tools/benchmark/basic/json/queryModeSuperRest.json new file mode 100644 index 0000000000..4ae43062a6 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryModeSuperRest.json @@ -0,0 +1,24 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "continue_if_fail": "yes", + "databases": "test", + "query_times": 100, + "query_mode": "rest", + "super_table_query": { + "stblname": "meters", + "concurrent": 3, + "query_interval": 0, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } +} diff --git a/tests/army/tools/benchmark/basic/json/queryQps.json b/tests/army/tools/benchmark/basic/json/queryQps.json index 02661fe9d2..21b4cf6f90 100644 --- a/tests/army/tools/benchmark/basic/json/queryQps.json +++ b/tests/army/tools/benchmark/basic/json/queryQps.json @@ -20,16 +20,5 @@ "sql": "select last_row(*) from db.stb00_9 ", "result": "./query_res1.txt" }] - }, - "super_table_query": { - "stblname": "stb1", - "query_interval":20, - "threads": 4, - "sqls": [ - { - "sql": "select last_row(ts) from xxxx", - "result": "./query_res2.txt" - } - ] - } + } } diff --git a/tests/army/tools/benchmark/basic/json/queryQps1.json b/tests/army/tools/benchmark/basic/json/queryQps1.json new file mode 100644 index 0000000000..c2c2381777 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryQps1.json @@ -0,0 +1,22 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_times": 1, + "super_table_query": { + "stblname": "stb1", + "query_interval":20, + "threads": 4, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } + } diff --git a/tests/army/tools/benchmark/basic/json/queryRestful.json b/tests/army/tools/benchmark/basic/json/queryRestful.json index 5de560fd21..6cb83bc2e1 100644 --- a/tests/army/tools/benchmark/basic/json/queryRestful.json +++ b/tests/army/tools/benchmark/basic/json/queryRestful.json @@ -22,17 +22,6 @@ "result": "./query_res1.txt" } ] - }, - "super_table_query": { - "stblname": "stb1", - "query_interval": 1, - "threads": 3, - "sqls": [ - { - "sql": "select last_row(ts) from xxxx", - "result": "./query_res2.txt" - } - ] } } diff --git a/tests/army/tools/benchmark/basic/json/queryRestful1.json b/tests/army/tools/benchmark/basic/json/queryRestful1.json new file mode 100644 index 0000000000..54d2589ce9 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryRestful1.json @@ -0,0 +1,24 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_times": 2, + "query_mode": "rest", + "super_table_query": { + "stblname": "stb1", + "query_interval": 1, + "threads": 3, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } + } + diff --git a/tests/army/tools/benchmark/basic/json/querySpeciMutisql100.json b/tests/army/tools/benchmark/basic/json/querySpeciMutisql100.json index 5b523ad3dd..13010efdba 100644 --- a/tests/army/tools/benchmark/basic/json/querySpeciMutisql100.json +++ b/tests/army/tools/benchmark/basic/json/querySpeciMutisql100.json @@ -413,17 +413,6 @@ "result": "./query_res0.txt" }] - }, - "super_table_query": { - "stblname": "stb1", - "query_interval": 1, - "threads": 3, - "sqls": [ - { - "sql": "select last_row(ts) from xxxx", - "result": "./query_res2.txt" - } - ] - } + } } \ No newline at end of file diff --git a/tests/army/tools/benchmark/basic/json/queryTaosc-mixed-query.json b/tests/army/tools/benchmark/basic/json/queryTaosc-mixed-query.json index 9b2f1b4675..9cdc191d66 100644 --- a/tests/army/tools/benchmark/basic/json/queryTaosc-mixed-query.json +++ b/tests/army/tools/benchmark/basic/json/queryTaosc-mixed-query.json @@ -23,16 +23,5 @@ "result": "./query_res1.txt" } ] - }, - "super_table_query": { - "stblname": "stb1", - "query_interval": 1, - "threads": 3, - "sqls": [ - { - "sql": "select last_row(ts) from xxxx", - "result": "./query_res2.txt" - } - ] } } diff --git a/tests/army/tools/benchmark/basic/json/queryTaosc-mixed-query1.json b/tests/army/tools/benchmark/basic/json/queryTaosc-mixed-query1.json new file mode 100644 index 0000000000..a3caa1c5e8 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryTaosc-mixed-query1.json @@ -0,0 +1,23 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_times": 2, + "query_mode": "taosc", + "super_table_query": { + "stblname": "stb1", + "query_interval": 1, + "threads": 3, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } +} diff --git a/tests/army/tools/benchmark/basic/json/queryTaosc.json b/tests/army/tools/benchmark/basic/json/queryTaosc.json index d1e64e645f..bcc2852b31 100644 --- a/tests/army/tools/benchmark/basic/json/queryTaosc.json +++ b/tests/army/tools/benchmark/basic/json/queryTaosc.json @@ -22,16 +22,5 @@ "result": "./query_res1.txt" } ] - }, - "super_table_query": { - "stblname": "stb1", - "query_interval": 1, - "threads": 3, - "sqls": [ - { - "sql": "select last_row(ts) from xxxx", - "result": "./query_res2.txt" - } - ] } } diff --git a/tests/army/tools/benchmark/basic/json/queryTaosc1.json b/tests/army/tools/benchmark/basic/json/queryTaosc1.json new file mode 100644 index 0000000000..a3caa1c5e8 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/queryTaosc1.json @@ -0,0 +1,23 @@ +{ + "filetype": "query", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_times": 2, + "query_mode": "taosc", + "super_table_query": { + "stblname": "stb1", + "query_interval": 1, + "threads": 3, + "sqls": [ + { + "sql": "select last_row(ts) from xxxx", + "result": "./query_res2.txt" + } + ] + } +} diff --git a/tests/army/tools/benchmark/basic/json/rest_query.json b/tests/army/tools/benchmark/basic/json/rest_query.json index 459e496f0b..817d733202 100644 --- a/tests/army/tools/benchmark/basic/json/rest_query.json +++ b/tests/army/tools/benchmark/basic/json/rest_query.json @@ -14,14 +14,5 @@ "sql": "select count(*) from db.stb", "result": "rest_query_specified" }] - }, - "super_table_query": { - "stblname": "stb", - "sqls": [ - { - "sql": "select count(*) from xxxx", - "result": "rest_query_super" - } - ] } } \ No newline at end of file diff --git a/tests/army/tools/benchmark/basic/json/rest_query1.json b/tests/army/tools/benchmark/basic/json/rest_query1.json new file mode 100644 index 0000000000..e09112737e --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/rest_query1.json @@ -0,0 +1,18 @@ +{ + "filetype":"query", + "cfgdir": "/etc/taos", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_mode": "rest", + "connection_pool_size": 10, + "response_buffer": 10000, + "super_table_query": { + "stblname": "stb", + "sqls": [ + { + "sql": "select count(*) from xxxx", + "result": "rest_query_super" + } + ] + } +} \ No newline at end of file diff --git a/tests/army/tools/benchmark/basic/json/taosc_query-error-sqlfile.json b/tests/army/tools/benchmark/basic/json/taosc_query-error-sqlfile.json index 814ae3fc46..fe87790d18 100644 --- a/tests/army/tools/benchmark/basic/json/taosc_query-error-sqlfile.json +++ b/tests/army/tools/benchmark/basic/json/taosc_query-error-sqlfile.json @@ -14,7 +14,7 @@ { "query_interval": 1, "concurrent":1, - "sql_file": "./tools/benchmark/basic/json/query-error-sqls.txt", + "sql_file": "./taosbenchmark/json/query-error-sqls.txt", "result": "taosc_query_specified-sqlfile" } } diff --git a/tests/army/tools/benchmark/basic/json/taosc_query-kill-slow-query.json b/tests/army/tools/benchmark/basic/json/taosc_query-kill-slow-query.json index 0fd9b15277..6fc8e01605 100644 --- a/tests/army/tools/benchmark/basic/json/taosc_query-kill-slow-query.json +++ b/tests/army/tools/benchmark/basic/json/taosc_query-kill-slow-query.json @@ -14,6 +14,6 @@ "specified_table_query": { "query_interval": 3, "concurrent": 3, - "sql_file": "./tools/benchmark/basic/json/query-sqls-slow-query.txt" + "sql_file": "./taosbenchmark/json/query-sqls-slow-query.txt" } } diff --git a/tests/army/tools/benchmark/basic/json/taosc_query-sqlfile.json b/tests/army/tools/benchmark/basic/json/taosc_query-sqlfile.json index 1dc94af6eb..236e87a36d 100644 --- a/tests/army/tools/benchmark/basic/json/taosc_query-sqlfile.json +++ b/tests/army/tools/benchmark/basic/json/taosc_query-sqlfile.json @@ -13,7 +13,7 @@ { "query_interval": 1, "concurrent":1, - "sql_file": "./tools/benchmark/basic/json/query-sqls.txt", + "sql_file": "./taosbenchmark/json/query-sqls.txt", "result": "taosc_query_specified-sqlfile" } } diff --git a/tests/army/tools/benchmark/basic/json/taosc_query.json b/tests/army/tools/benchmark/basic/json/taosc_query.json index c8ff2e9275..2cf8f648a6 100644 --- a/tests/army/tools/benchmark/basic/json/taosc_query.json +++ b/tests/army/tools/benchmark/basic/json/taosc_query.json @@ -18,16 +18,5 @@ "sql": "select count(*) from db.stb", "result": "taosc_query_specified" }] - }, - "super_table_query": { - "stblname": "stb", - "query_interval": 1, - "concurrent": 1, - "sqls": [ - { - "sql": "select count(*) from xxxx", - "result": "taosc_query_super" - } - ] } } \ No newline at end of file diff --git a/tests/army/tools/benchmark/basic/json/taosc_query1.json b/tests/army/tools/benchmark/basic/json/taosc_query1.json new file mode 100644 index 0000000000..fd650f90e5 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/taosc_query1.json @@ -0,0 +1,24 @@ +{ + "filetype":"query", + "cfgdir": "/etc/taos", + "host": "localhost", + "port": 6030, + "user": "root", + "password": "taosdata", + "confirm_parameter_prompt": "no", + "databases": "db", + "query_times": 1, + "reset_query_cache": "yes", + "super_table_query": + { + "stblname": "stb", + "query_interval": 1, + "concurrent": 1, + "sqls": [ + { + "sql": "select count(*) from xxxx", + "result": "taosc_query_super" + } + ] + } +} \ No newline at end of file diff --git a/tests/army/tools/benchmark/basic/queryMain.py b/tests/army/tools/benchmark/basic/queryMain.py new file mode 100644 index 0000000000..87c70c8df2 --- /dev/null +++ b/tests/army/tools/benchmark/basic/queryMain.py @@ -0,0 +1,260 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- +import os +import json +import sys +import os +import time +import datetime +import platform +import subprocess + +import frame +import frame.etool +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * + + +# reomve single and double quotation +def removeQuotation(origin): + value = "" + for c in origin: + if c != '\'' and c != '"': + value += c + + return value + +class TDTestCase: + def caseDescription(self): + """ + [TD-11510] taosBenchmark test cases + """ + + def runSeconds(self, command, timeout = 180): + tdLog.info(f"runSeconds {command} ...") + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process.wait(timeout) + + # get output + output = process.stdout.read().decode(encoding="gbk") + error = process.stderr.read().decode(encoding="gbk") + return output, error + + def getKeyValue(self, content, key, end): + # find key + s = content.find(key) + if s == -1: + return False,"" + + # skip self + s += len(key) + # skip blank + while s < len(content): + if content[s] != " ": + break + s += 1 + + # end check + if s + 1 == len(content): + return False, "" + + # find end + if len(end) == 0: + e = -1 + else: + e = content.find(end, s) + + # get value + if e == -1: + value = content[s : ] + else: + value = content[s : e] + + return True, value + + def getDbRows(self, times): + sql = f"select count(*) from test.meters" + tdSql.waitedQuery(sql, 1, times) + dbRows = tdSql.getData(0, 0) + return dbRows + + def checkItem(self, output, key, end, expect, equal): + ret, value = self.getKeyValue(output, key, end) + if ret == False: + tdLog.exit(f"not found key:{key}. end:{end} output:\n{output}") + + fval = float(value) + # compare + if equal and fval != expect: + tdLog.exit(f"check not expect. expect:{expect} real:{fval}, key:{key} end:{end} output:\n{output}") + elif equal == False and fval <= expect: + tdLog.exit(f"failed because {fval} <= {expect}, key:{key} end:{end} output:\n{output}") + else: + # succ + if equal: + tdLog.info(f"check successfully. key:{key} expect:{expect} real:{fval}") + else: + tdLog.info(f"check successfully. key:{key} {fval} > {expect}") + + + def checkAfterRun(self, benchmark, jsonFile, specMode, tbCnt): + # run + cmd = f"{benchmark} -f {jsonFile}" + output, error = self.runSeconds(cmd) + + if specMode : + label = "specified_table_query" + else: + label = "super_table_query" + + # + # check insert result + # + with open(jsonFile, "r") as file: + data = json.load(file) + + queryTimes = data["query_times"] + # contineIfFail + try: + continueIfFail = data["continue_if_fail"] + except: + continueIfFail = "no" + + concurrent = data[label]["concurrent"] + sqls = data[label]["sqls"] + + + # mix + try: + mixedQuery = data[label]["mixed_query"] + except: + mixedQuery = "no" + + tdLog.info(f"queryTimes={queryTimes} concurrent={concurrent} mixedQuery={mixedQuery} len(sqls)={len(sqls)} label={label}\n") + + totalQueries = 0 + threadQueries = 0 + + if continueIfFail.lower() == "yes": + allEnd = " " + else: + allEnd = "\n" + + if specMode and mixedQuery.lower() != "yes": + # spec + threadQueries = queryTimes * concurrent + totalQueries = queryTimes * concurrent * len(sqls) + threadKey = f"complete query with {concurrent} threads and " + qpsKey = "QPS: " + avgKey = "query delay avg: " + minKey = "min:" + else: + # spec mixed or super + if specMode: + # spec + totalQueries = queryTimes * len(sqls) + else: + # super + totalQueries = queryTimes * len(sqls) * tbCnt + threadQueries = totalQueries + + nSql = len(sqls) + if specMode and nSql < concurrent : + tdLog.info(f"set concurrent = {nSql} because len(sqls) < concurrent") + concurrent = nSql + threadKey = f"using {concurrent} threads complete query " + qpsKey = "" + avgKey = "avg delay:" + minKey = "min delay:" + + items = [ + [threadKey, " ", threadQueries, True], + [qpsKey, " ", 5, False], # qps need > 1 + [avgKey, "s", 0, False], + [minKey, "s", 0, False], + ["max: ", "s", 0, False], + ["p90: ", "s", 0, False], + ["p95: ", "s", 0, False], + ["p99: ", "s", 0, False], + ["INFO: Spend ", " ", 0, False], + ["completed total queries: ", ",", totalQueries, True], + ["the QPS of all threads:", allEnd, 10 , False] # all qps need > 5 + ] + + # check + for item in items: + if len(item[0]) > 0: + self.checkItem(output, item[0], item[1], item[2], item[3]) + + # native + def threeQueryMode(self, benchmark, tbCnt, tbRow): + # json + args = [ + ["./tools/benchmark/basic/json/queryModeSpec", True], + ["./tools/benchmark/basic/json/queryModeSpecMix", True], + ["./tools/benchmark/basic/json/queryModeSuper", False] + ] + + # native + for arg in args: + self.checkAfterRun(benchmark, arg[0] + ".json", arg[1], tbCnt) + + # rest + for arg in args: + self.checkAfterRun(benchmark, arg[0] + "Rest.json", arg[1], tbCnt) + + def expectFailed(self, command): + ret = os.system(command) + if ret == 0: + tdLog.exit(f" expect failed but success. command={command}") + else: + tdLog.info(f" expect failed is ok. command={command}") + + + # check excption + def exceptTest(self, benchmark, tbCnt, tbRow): + # 'specified_table_query' and 'super_table_query' error + self.expectFailed(f"{benchmark} -f ./tools/benchmark/basic/json/queryErrorNoSpecSuper.json") + self.expectFailed(f"{benchmark} -f ./tools/benchmark/basic/json/queryErrorBothSpecSuper.json") + # json format error + self.expectFailed(f"{benchmark} -f ./tools/benchmark/basic/json/queryErrorFormat.json") + + + + def run(self): + tbCnt = 10 + tbRow = 1000 + benchmark = etool.benchMarkFile() + + # insert + command = f"{benchmark} -d test -t {tbCnt} -n {tbRow} -I stmt2 -r 100 -y" + ret = os.system(command) + if ret !=0 : + tdLog.exit(f"exec failed. command={command}") + + # query mode test + self.threeQueryMode(benchmark, tbCnt, tbRow) + + # exception test + self.exceptTest(benchmark, tbCnt, tbRow); + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/army/tools/benchmark/basic/query_json-with-sqlfile.py b/tests/army/tools/benchmark/basic/query_json-with-sqlfile.py index e5c7445d80..110651a52d 100644 --- a/tests/army/tools/benchmark/basic/query_json-with-sqlfile.py +++ b/tests/army/tools/benchmark/basic/query_json-with-sqlfile.py @@ -27,8 +27,6 @@ class TDTestCase(TBase): [TD-11510] taosBenchmark test cases """ - - def run(self): binPath = etool.benchMarkFile() os.system( diff --git a/tests/army/tools/benchmark/basic/query_json.py b/tests/army/tools/benchmark/basic/query_json.py index 8a06fa45bb..62741a3401 100644 --- a/tests/army/tools/benchmark/basic/query_json.py +++ b/tests/army/tools/benchmark/basic/query_json.py @@ -23,14 +23,12 @@ from frame.caseBase import * from frame import * -class TDTestCase(TBase): +class TDTestCase: def caseDescription(self): """ [TD-11510] taosBenchmark test cases """ - - def run(self): binPath = etool.benchMarkFile() os.system( @@ -46,6 +44,10 @@ class TDTestCase(TBase): cmd = "%s -f ./tools/benchmark/basic/json/taosc_query.json" % binPath tdLog.info("%s" % cmd) os.system("%s" % cmd) + cmd = "%s -f ./tools/benchmark/basic/json/taosc_query1.json" % binPath + tdLog.info("%s" % cmd) + os.system("%s" % cmd) + with open("%s" % "taosc_query_specified-0", "r+") as f1: for line in f1.readlines(): queryTaosc = line.strip().split()[0] @@ -56,9 +58,13 @@ class TDTestCase(TBase): queryTaosc = line.strip().split()[0] assert queryTaosc == "1", "result is %s != expect: 1" % queryTaosc + # split two cmd = "%s -f ./tools/benchmark/basic/json/rest_query.json" % binPath tdLog.info("%s" % cmd) os.system("%s" % cmd) + cmd = "%s -f ./tools/benchmark/basic/json/rest_query1.json" % binPath + tdLog.info("%s" % cmd) + os.system("%s" % cmd) times = 0 with open("rest_query_super-0", "r+") as f1: diff --git a/tests/army/tools/benchmark/basic/stt.py b/tests/army/tools/benchmark/basic/stt.py index 9b86bd8e40..b9288fcfa6 100644 --- a/tests/army/tools/benchmark/basic/stt.py +++ b/tests/army/tools/benchmark/basic/stt.py @@ -14,10 +14,13 @@ import os import subprocess import time -from util.log import * -from util.cases import * -from util.sql import * -from util.dnodes import * +import frame +import frame.etool +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * class TDTestCase: @@ -26,35 +29,6 @@ class TDTestCase: [TD-13823] taosBenchmark test cases """ return - - def init(self, conn, logSql, replicaVar=1): - # comment off by Shuduo for CI self.replicaVar = int(replicaVar) - tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor(), logSql) - - def getPath(self, tool="taosBenchmark"): - if (platform.system().lower() == 'windows'): - tool = tool + ".exe" - selfPath = os.path.dirname(os.path.realpath(__file__)) - - if "community" in selfPath: - projPath = selfPath[: selfPath.find("community")] - else: - projPath = selfPath[: selfPath.find("tests")] - - paths = [] - for root, dirs, files in os.walk(projPath): - if (tool) in files: - rootRealPath = os.path.dirname(os.path.realpath(root)) - if "packaging" not in rootRealPath: - paths.append(os.path.join(root, tool)) - break - if len(paths) == 0: - tdLog.exit("taosBenchmark not found!") - return - else: - tdLog.info("taosBenchmark found in %s" % paths[0]) - return paths[0] def checkDataCorrect(self): sql = "select count(*) from meters" @@ -81,8 +55,8 @@ class TDTestCase: def run(self): - binPath = self.getPath() - cmd = "%s -f ./5-taos-tools/taosbenchmark/json/stt.json" % binPath + binPath = etool.benchMarkFile() + cmd = "%s -f ./tools/benchmark/basic/json/stt.json" % binPath tdLog.info("%s" % cmd) errcode = os.system("%s" % cmd) if errcode != 0: diff --git a/tests/army/tools/benchmark/basic/taosdemoTestQueryWithJson-mixed-query.py b/tests/army/tools/benchmark/basic/taosdemoTestQueryWithJson-mixed-query.py index c716c119a6..f10f43c7fc 100644 --- a/tests/army/tools/benchmark/basic/taosdemoTestQueryWithJson-mixed-query.py +++ b/tests/army/tools/benchmark/basic/taosdemoTestQueryWithJson-mixed-query.py @@ -12,6 +12,9 @@ # -*- coding: utf-8 -*- import os +import ast +import re + import frame import frame.etool from frame.log import * @@ -19,17 +22,43 @@ from frame.cases import * from frame.sql import * from frame.caseBase import * from frame import * -import ast -import re # from assertpy import assert_that import subprocess -class TDTestCase(TBase): +class TDTestCase: # pylint: disable=R0201 + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) # pylint: disable=R0201 + def getPath(self, tool="taosBenchmark"): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if "community" in selfPath: + projPath = selfPath[: selfPath.find("community")] + elif "src" in selfPath: + projPath = selfPath[: selfPath.find("src")] + elif "/tools/" in selfPath: + projPath = selfPath[: selfPath.find("/tools/")] + elif "/tests/" in selfPath: + projPath = selfPath[: selfPath.find("/tests/")] + else: + tdLog.info("cannot found %s in path: %s, use system's" % (tool, selfPath)) + projPath = "/usr/local/taos/bin/" + + paths = [] + for root, dummy, files in os.walk(projPath): + if (tool) in files: + rootRealPath = os.path.dirname(os.path.realpath(root)) + if "packaging" not in rootRealPath: + paths.append(os.path.join(root, tool)) + break + if len(paths) == 0: + return "" + return paths[0] # 获取taosc接口查询的结果文件中的内容,返回每行数据,并断言数据的第一列内容。 def assertfileDataTaosc(self, filename, expectResult): @@ -81,7 +110,7 @@ class TDTestCase(TBase): ) def run(self): - binPath = etool.benchMarkFile() + binPath = self.getPath() if binPath == "": tdLog.exit("taosBenchmark not found!") else: @@ -94,6 +123,7 @@ class TDTestCase(TBase): # taosc query: query specified table and query super table os.system("%s -f ./tools/benchmark/basic/json/queryInsertdata.json" % binPath) os.system("%s -f ./tools/benchmark/basic/json/queryTaosc-mixed-query.json" % binPath) + os.system("%s -f ./tools/benchmark/basic/json/queryTaosc-mixed-query1.json" % binPath) os.system("cat query_res2.txt* > all_query_res2_taosc.txt") # correct Times testcases @@ -111,6 +141,7 @@ class TDTestCase(TBase): # use restful api to query os.system("%s -f ./tools/benchmark/basic/json/queryInsertrestdata.json" % binPath) os.system("%s -f ./tools/benchmark/basic/json/queryRestful.json" % binPath) + os.system("%s -f ./tools/benchmark/basic/json/queryRestful1.json" % binPath) os.system("cat query_res2.txt* > all_query_res2_rest.txt") # correct Times testcases diff --git a/tests/army/tools/benchmark/basic/taosdemoTestQueryWithJson.py b/tests/army/tools/benchmark/basic/taosdemoTestQueryWithJson.py index ac572718e3..aec43073f3 100644 --- a/tests/army/tools/benchmark/basic/taosdemoTestQueryWithJson.py +++ b/tests/army/tools/benchmark/basic/taosdemoTestQueryWithJson.py @@ -12,6 +12,11 @@ # -*- coding: utf-8 -*- import os +import ast +import re + +# from assertpy import assert_that +import subprocess import frame import frame.etool from frame.log import * @@ -19,15 +24,38 @@ from frame.cases import * from frame.sql import * from frame.caseBase import * from frame import * -import ast -import re - -# from assertpy import assert_that -import subprocess -class TDTestCase(TBase): +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + def getPath(self, tool="taosBenchmark"): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if "community" in selfPath: + projPath = selfPath[: selfPath.find("community")] + elif "src" in selfPath: + projPath = selfPath[: selfPath.find("src")] + elif "/tools/" in selfPath: + projPath = selfPath[: selfPath.find("/tools/")] + elif "/tests/" in selfPath: + projPath = selfPath[: selfPath.find("/tests/")] + else: + tdLog.info("cannot found %s in path: %s, use system's" % (tool, selfPath)) + projPath = "/usr/local/taos/bin/" + + paths = [] + for root, dummy, files in os.walk(projPath): + if (tool) in files: + rootRealPath = os.path.dirname(os.path.realpath(root)) + if "packaging" not in rootRealPath: + paths.append(os.path.join(root, tool)) + break + if len(paths) == 0: + return "" + return paths[0] # 获取taosc接口查询的结果文件中的内容,返回每行数据,并断言数据的第一列内容。 def assertfileDataTaosc(self, filename, expectResult): @@ -79,7 +107,7 @@ class TDTestCase(TBase): ) def run(self): - binPath = etool.benchMarkFile() + binPath = self.getPath() if binPath == "": tdLog.exit("taosBenchmark not found!") else: @@ -90,8 +118,11 @@ class TDTestCase(TBase): os.system("rm -rf ./all_query*") # taosc query: query specified table and query super table - os.system("%s -f ./tools/benchmark/basic/json/queryInsertdata.json" % binPath) - os.system("%s -f ./tools/benchmark/basic/json/queryTaosc.json" % binPath) + os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath) + os.system("%s -f ./taosbenchmark/json/queryTaosc.json" % binPath) + # forbid parallel spec query with super query + os.system("%s -f ./taosbenchmark/json/queryTaosc1.json" % binPath) + os.system("cat query_res0.txt* > all_query_res0_taosc.txt") os.system("cat query_res1.txt* > all_query_res1_taosc.txt") os.system("cat query_res2.txt* > all_query_res2_taosc.txt") @@ -116,8 +147,9 @@ class TDTestCase(TBase): os.system("rm -rf ./all_query*") # use restful api to query - os.system("%s -f ./tools/benchmark/basic/json/queryInsertrestdata.json" % binPath) - os.system("%s -f ./tools/benchmark/basic/json/queryRestful.json" % binPath) + os.system("%s -f ./taosbenchmark/json/queryInsertrestdata.json" % binPath) + os.system("%s -f ./taosbenchmark/json/queryRestful.json" % binPath) + os.system("%s -f ./taosbenchmark/json/queryRestful1.json" % binPath) os.system("cat query_res0.txt* > all_query_res0_rest.txt") os.system("cat query_res1.txt* > all_query_res1_rest.txt") os.system("cat query_res2.txt* > all_query_res2_rest.txt") @@ -151,49 +183,51 @@ class TDTestCase(TBase): # query times less than or equal to 100 assert ( - os.system("%s -f ./tools/benchmark/basic/json/queryInsertdata.json" % binPath) == 0 + os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath) == 0 ) assert ( - os.system("%s -f ./tools/benchmark/basic/json/querySpeciMutisql100.json" % binPath) + os.system("%s -f ./taosbenchmark/json/querySpeciMutisql100.json" % binPath) != 0 ) assert ( - os.system("%s -f ./tools/benchmark/basic/json/querySuperMutisql100.json" % binPath) + os.system("%s -f ./taosbenchmark/json/querySuperMutisql100.json" % binPath) == 0 ) # query result print QPS - os.system("%s -f ./tools/benchmark/basic/json/queryInsertdata.json" % binPath) - exceptcode = os.system("%s -f ./tools/benchmark/basic/json/queryQps.json" % binPath) + os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath) + exceptcode = os.system("%s -f ./taosbenchmark/json/queryQps.json" % binPath) + assert exceptcode == 0 + exceptcode = os.system("%s -f ./taosbenchmark/json/queryQps1.json" % binPath) assert exceptcode == 0 # 2021.02.09 need modify taosBenchmakr code # use illegal or out of range parameters query json file - os.system("%s -f ./tools/benchmark/basic/json/queryInsertdata.json" % binPath) + os.system("%s -f ./taosbenchmark/json/queryInsertdata.json" % binPath) # 2021.02.09 need modify taosBenchmakr code # exceptcode = os.system( - # "%s -f ./tools/benchmark/basic/json/queryTimes0.json" % + # "%s -f ./taosbenchmark/json/queryTimes0.json" % # binPath) # assert exceptcode != 0 # 2021.02.09 need modify taosBenchmakr code # exceptcode0 = os.system( - # "%s -f ./tools/benchmark/basic/json/queryTimesless0.json" % + # "%s -f ./taosbenchmark/json/queryTimesless0.json" % # binPath) # assert exceptcode0 != 0 # exceptcode1 = os.system( - # "%s -f ./tools/benchmark/basic/json/queryConcurrent0.json" % + # "%s -f ./taosbenchmark/json/queryConcurrent0.json" % # binPath) # assert exceptcode2 != 0 # exceptcode3 = os.system( - # "%s -f ./tools/benchmark/basic/json/querrThreadsless0.json" % + # "%s -f ./taosbenchmark/json/querrThreadsless0.json" % # binPath) # assert exceptcode3 != 0 # exceptcode4 = os.system( - # "%s -f ./tools/benchmark/basic/json/querrThreads0.json" % + # "%s -f ./taosbenchmark/json/querrThreads0.json" % # binPath) # assert exceptcode4 != 0 diff --git a/tests/army/tools/benchmark/basic/telnet_tcp.py b/tests/army/tools/benchmark/basic/telnet_tcp.py index bd471f8469..b69d9d7469 100644 --- a/tests/army/tools/benchmark/basic/telnet_tcp.py +++ b/tests/army/tools/benchmark/basic/telnet_tcp.py @@ -27,9 +27,6 @@ class TDTestCase(TBase): [TD-11510] taosBenchmark test cases """ - - - def run(self): tdSql.query("select client_version()") client_ver = "".join(tdSql.res[0]) diff --git a/tests/army/tools/gencase.sh b/tests/army/tools/gencase.sh new file mode 100644 index 0000000000..d0ef74c387 --- /dev/null +++ b/tests/army/tools/gencase.sh @@ -0,0 +1,33 @@ +set -e +count=0 +for i in `find benchmark/basic/ -name "*.py"` + do printf ",,y,army,./pytest.sh python3 ./test.py -f tools/$i\n" + ((count=count+1)) +done + +for i in `find benchmark/cloud/ -name "*.py"` + do printf ",,y,army,./pytest.sh python3 ./test.py -f tools/$i\n" + ((count=count+1)) +done + +for i in `find benchmark/ws/ -name "*.py"` + do printf ",,y,army,./pytest.sh python3 ./test.py -f tools/$i\n" + ((count=count+1)) +done + + +printf "\nbenchmark count=$count \n" + + +for i in `find taosdump/native/ -name "*.py"` + do printf ",,y,army,./pytest.sh python3 ./test.py -f tools/$i\n" + ((count=count+1)) +done + +for i in `find taosdump/ws/ -name "*.py"` + do printf ",,y,army,./pytest.sh python3 ./test.py -f tools/$i\n" + ((count=count+1)) +done + + +printf "\nall count=$count \n" \ No newline at end of file diff --git a/tools/taos-tools/inc/bench.h b/tools/taos-tools/inc/bench.h index 2fb0885935..d47bafbaf0 100644 --- a/tools/taos-tools/inc/bench.h +++ b/tools/taos-tools/inc/bench.h @@ -648,6 +648,8 @@ typedef struct SpecifiedQueryInfo_S { TAOS_RES *res[MAX_QUERY_SQL_COUNT]; uint64_t totalQueried; bool mixed_query; + // error rate + uint64_t totalFail; } SpecifiedQueryInfo; typedef struct SuperQueryInfo_S { @@ -669,6 +671,8 @@ typedef struct SuperQueryInfo_S { TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT]; char ** childTblName; uint64_t totalQueried; + // error rate + uint64_t totalFail; } SuperQueryInfo; typedef struct SQueryMetaInfo_S { @@ -856,14 +860,24 @@ typedef struct SThreadInfo_S { } threadInfo; typedef struct SQueryThreadInfo_S { - int start_sql; - int end_sql; - int threadId; - BArray* query_delay_list; - int sockfd; SBenchConn* conn; - int64_t total_delay; -} queryThreadInfo; + int32_t start_sql; + int32_t end_sql; + int32_t threadID; + BArray* query_delay_list; + int32_t sockfd; + double total_delay; + + char filePath[MAX_PATH_LEN]; + uint64_t start_table_from; + uint64_t end_table_to; + uint64_t ntables; + uint64_t querySeq; + + // error rate + uint64_t nSucc; + uint64_t nFail; +} qThreadInfo; typedef struct STSmaThreadInfo_S { char* dbName; @@ -883,6 +897,7 @@ extern bool g_fail; extern char configDir[]; extern tools_cJSON * root; extern uint64_t g_memoryUsage; +extern int32_t g_majorVersionOfClient; #define min(a, b) (((a) < (b)) ? (a) : (b)) #define BARRAY_GET_ELEM(array, index) \ @@ -895,11 +910,11 @@ void initArgument(); void queryAggrFunc(); void parseFieldDatatype(char *dataType, BArray *fields, bool isTag); /* demoJsonOpt.c */ -int getInfoFromJsonFile(); +int readJsonConfig(char * file); /* demoUtil.c */ int compare(const void *a, const void *b); void encodeAuthBase64(); -void replaceChildTblName(char *inSql, char *outSql, int tblIndex); +int32_t replaceChildTblName(char *inSql, char *outSql, int tblIndex); void setupForAnsiEscape(void); void resetAfterAnsiEscape(void); char * convertDatatypeToString(int type); @@ -907,7 +922,7 @@ int convertStringToDatatype(char *type, int length); unsigned int taosRandom(); void tmfree(void *buf); void tmfclose(FILE *fp); -int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo); +int64_t fetchResult(TAOS_RES *res, char *filePath); void prompt(bool NonStopMode); void ERROR_EXIT(const char *msg); int getServerVersionRest(int16_t rest_port); @@ -927,7 +942,8 @@ int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName, int64_t childTblCountOfSuperTbl); void* benchCalloc(size_t nmemb, size_t size, bool record); BArray* benchArrayInit(size_t size, size_t elemSize); -void* benchArrayPush(BArray* pArray, void* pData); +void* benchArrayPush(BArray* pArray, void* pData); // free pData for auto +void* benchArrayPushNoFree(BArray* pArray, void* pData); // not free pData void* benchArrayDestroy(BArray* pArray); void benchArrayClear(BArray* pArray); void* benchArrayGet(const BArray* pArray, size_t index); @@ -1015,4 +1031,18 @@ bool isRest(int32_t iface); // get group index about dbname.tbname int32_t calcGroupIndex(char* dbName, char* tbName, int32_t groupCnt); + +// ------------ benchQuery util ------------- +void freeSpecialQueryInfo(); +// init conn +int32_t initQueryConn(qThreadInfo * pThreadInfo, int iface); +// close conn +void closeQueryConn(qThreadInfo * pThreadInfo, int iface); + +void *queryKiller(void *arg); +// kill show +int killSlowQuery(); +// fetch super table child name from server +int fetchChildTableName(char *dbName, char *stbName); + #endif // INC_BENCH_H_ diff --git a/tools/taos-tools/src/benchInsert.c b/tools/taos-tools/src/benchInsert.c index a55197f418..4c86d8a24d 100644 --- a/tools/taos-tools/src/benchInsert.c +++ b/tools/taos-tools/src/benchInsert.c @@ -16,11 +16,12 @@ #include #include -static int32_t stmt2BindVProgressive( +static int32_t stmt2BindAndSubmit( threadInfo *pThreadInfo, SChildTable *childTbl, - int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1); - + int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1, + int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w); +TAOS_STMT2* initStmt2(TAOS* taos, bool single); #define FREE_PIDS_INFOS_RETURN_MINUS_1() \ do { \ @@ -1710,6 +1711,159 @@ void loadChildTableInfo(threadInfo* pThreadInfo) { tmfree(buf); } +// create conn again +int32_t reCreateConn(threadInfo * pThreadInfo) { + // single + bool single = true; + if (pThreadInfo->dbInfo->superTbls->size > 1) { + single = false; + } + + // + // retry stmt2 init + // + + // stmt2 close + if (pThreadInfo->conn->stmt2) { + taos_stmt2_close(pThreadInfo->conn->stmt2); + pThreadInfo->conn->stmt2 = NULL; + } + + // retry stmt2 init , maybe success + pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single); + if (pThreadInfo->conn->stmt2) { + succPrint("%s", "reCreateConn first taos_stmt2_init() success and return.\n"); + return 0; + } + + // + // close old + // + closeBenchConn(pThreadInfo->conn); + pThreadInfo->conn = NULL; + + // + // create new + // + + // conn + pThreadInfo->conn = initBenchConn(); + if (pThreadInfo->conn == NULL) { + errorPrint("%s", "reCreateConn initBenchConn failed."); + return -1; + } + // stmt2 + pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single); + if (NULL == pThreadInfo->conn->stmt2) { + errorPrint("reCreateConn taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL)); + return -1; + } + + succPrint("%s", "reCreateConn second taos_stmt2_init() success.\n"); + // select db + if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) { + errorPrint("second taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName); + return -1; + } + + return 0; +} + +// reinit +int32_t reConnectStmt2(threadInfo * pThreadInfo, int32_t w) { + // re-create connection + int32_t code = reCreateConn(pThreadInfo); + if (code != 0) { + return code; + } + + // prepare + code = prepareStmt2(pThreadInfo->conn->stmt2, pThreadInfo->stbInfo, NULL, w); + if (code != 0) { + return code; + } + + return code; +} + +int32_t submitStmt2Impl(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3, + int64_t* startTs, int64_t* endTs, uint32_t* generated) { + // call bind + int64_t start = toolsGetTimestampUs(); + int32_t code = taos_stmt2_bind_param(pThreadInfo->conn->stmt2, bindv, -1); + if (code != 0) { + errorPrint("taos_stmt2_bind_param failed, reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2)); + return code; + } + debugPrint("interlace taos_stmt2_bind_param() ok. bindv->count=%d \n", bindv->count); + *delay1 += toolsGetTimestampUs() - start; + + // execute + *startTs = toolsGetTimestampUs(); + code = execInsert(pThreadInfo, *generated, delay3); + *endTs = toolsGetTimestampUs(); + return code; +} + +int32_t submitStmt2(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3, + int64_t* startTs, int64_t* endTs, uint32_t* generated, int32_t w) { + // calc loop + int32_t loop = 1; + SSuperTable* stbInfo = pThreadInfo->stbInfo; + if(stbInfo->continueIfFail == YES_IF_FAILED) { + if(stbInfo->keep_trying > 1) { + loop = stbInfo->keep_trying; + } else { + loop = 3; // default + } + } + + // submit stmt2 + int32_t i = 0; + bool connected = true; + while (1) { + int32_t code = -1; + if(connected) { + // reinit success to do submit + code = submitStmt2Impl(pThreadInfo, bindv, delay1, delay3, startTs, endTs, generated); + } + + // check code + if ( code == 0) { + // success + break; + } else { + // failed to try + if (--loop == 0) { + // failed finally + char tip[64] = ""; + if (i > 0) { + snprintf(tip, sizeof(tip), " after retry %d", i); + } + errorPrint("finally faild execute submitStmt2()%s\n", tip); + return -1; + } + + // wait a memont for trying + toolsMsleep(stbInfo->trying_interval); + // reinit + infoPrint("stmt2 start retry submit i=%d after sleep %d ms...\n", i++, stbInfo->trying_interval); + code = reConnectStmt2(pThreadInfo, w); + if (code != 0) { + // faild and try again + errorPrint("faild reConnectStmt2 and retry again for next i=%d \n", i); + connected = false; + } else { + // succ + connected = true; + } + } + } + + // success + return 0; +} + static void *syncWriteInterlace(void *sarg) { threadInfo * pThreadInfo = (threadInfo *)sarg; SDataBase * database = pThreadInfo->dbInfo; @@ -2138,28 +2292,26 @@ static void *syncWriteInterlace(void *sarg) { } } - // stmt2 bind param + // exec if(stbInfo->iface == STMT2_IFACE) { + // exec stmt2 if(g_arguments->debug_print) showBindV(bindv, stbInfo->tags, stbInfo->cols); - // call bind - int64_t start = toolsGetTimestampUs(); - if (taos_stmt2_bind_param(pThreadInfo->conn->stmt2, bindv, -1)) { - errorPrint("taos_stmt2_bind_param failed, reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2)); + // bind & exec stmt2 + if (submitStmt2(pThreadInfo, bindv, &delay1, &delay3, &startTs, &endTs, &generated, w) != 0) { g_fail = true; goto free_of_interlace; } - debugPrint("succ to call taos_stmt2_bind_param() with interlace mode. interlaceRows=%d bindv->count=%d \n", interlaceRows, bindv->count); - delay1 += toolsGetTimestampUs() - start; + } else { + // exec other + startTs = toolsGetTimestampUs(); + if (execInsert(pThreadInfo, generated, &delay3)) { + g_fail = true; + goto free_of_interlace; + } + endTs = toolsGetTimestampUs(); } - // execute - startTs = toolsGetTimestampUs(); - if (execInsert(pThreadInfo, generated, &delay3)) { - g_fail = true; - goto free_of_interlace; - } - endTs = toolsGetTimestampUs(); debugPrint("execInsert tableIndex=%d left insert rows=%"PRId64" generated=%d\n", i, insertRows, generated); // reset count @@ -2839,9 +2991,10 @@ void *syncWriteProgressive(void *sarg) { break; } case STMT2_IFACE: { - generated = stmt2BindVProgressive( + generated = stmt2BindAndSubmit( pThreadInfo, - childTbl, ×tamp, i, ttl, &pkCur, &pkCnt, &delay1); + childTbl, ×tamp, i, ttl, &pkCur, &pkCnt, &delay1, + &delay3, &startTs, &endTs, w); break; } case SML_REST_IFACE: @@ -2861,63 +3014,67 @@ void *syncWriteProgressive(void *sarg) { if (!stbInfo->non_stop) { i += generated; } - // only measure insert - startTs = toolsGetTimestampUs(); - int code = execInsert(pThreadInfo, generated, &delay3); - if (code) { - if (NO_IF_FAILED == stbInfo->continueIfFail) { - warnPrint("The super table parameter " - "continueIfFail: %d, STOP insertion!\n", - stbInfo->continueIfFail); - g_fail = true; - goto free_of_progressive; - } else if (YES_IF_FAILED == stbInfo->continueIfFail) { - infoPrint("The super table parameter " - "continueIfFail: %d, " - "will continue to insert ..\n", - stbInfo->continueIfFail); - } else if (smart) { - warnPrint("The super table parameter " - "continueIfFail: %d, will create table " - "then insert ..\n", - stbInfo->continueIfFail); - // generator - if (w == 0) { - if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL)) { + // stmt2 execInsert already execute on stmt2BindAndSubmit + if (stbInfo->iface != STMT2_IFACE) { + // no stmt2 exec + startTs = toolsGetTimestampUs(); + int code = execInsert(pThreadInfo, generated, &delay3); + if (code) { + if (NO_IF_FAILED == stbInfo->continueIfFail) { + warnPrint("The super table parameter " + "continueIfFail: %d, STOP insertion!\n", + stbInfo->continueIfFail); + g_fail = true; + goto free_of_progressive; + } else if (YES_IF_FAILED == stbInfo->continueIfFail) { + infoPrint("The super table parameter " + "continueIfFail: %d, " + "will continue to insert ..\n", + stbInfo->continueIfFail); + } else if (smart) { + warnPrint("The super table parameter " + "continueIfFail: %d, will create table " + "then insert ..\n", + stbInfo->continueIfFail); + + // generator + if (w == 0) { + if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL)) { + g_fail = true; + goto free_of_progressive; + } + } + + code = smartContinueIfFail( + pThreadInfo, + childTbl, tagData, w, ttl); + if (0 != code) { g_fail = true; goto free_of_progressive; } - } - int ret = smartContinueIfFail( - pThreadInfo, - childTbl, tagData, w, ttl); - if (0 != ret) { + // move next + if (++w >= TAG_BATCH_COUNT) { + // reset for gen again + w = 0; + } + + code = execInsert(pThreadInfo, generated, &delay3); + if (code) { + g_fail = true; + goto free_of_progressive; + } + } else { + warnPrint("Unknown super table parameter " + "continueIfFail: %d\n", + stbInfo->continueIfFail); g_fail = true; goto free_of_progressive; } - - // move next - if (++w >= TAG_BATCH_COUNT) { - // reset for gen again - w = 0; - } - - code = execInsert(pThreadInfo, generated, &delay3); - if (code) { - g_fail = true; - goto free_of_progressive; - } - } else { - warnPrint("Unknown super table parameter " - "continueIfFail: %d\n", - stbInfo->continueIfFail); - g_fail = true; - goto free_of_progressive; } + endTs = toolsGetTimestampUs() + 1; } - endTs = toolsGetTimestampUs()+1; if (stbInfo->insert_interval > 0) { debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n", @@ -2931,7 +3088,7 @@ void *syncWriteProgressive(void *sarg) { if (database->flush) { char sql[260] = ""; sprintf(sql, "flush database %s", database->dbName); - code = executeSql(pThreadInfo->conn->taos,sql); + int32_t code = executeSql(pThreadInfo->conn->taos,sql); if (code != 0) { perfPrint(" %s failed. error code = 0x%x\n", sql, code); } else { @@ -3602,6 +3759,9 @@ int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) { // calc table count per vgroup for (int64_t i = 0; i < stbInfo->childTblCount; i++) { int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups); + if (vgIdx == -1) { + continue; + } SVGroup *vg = benchArrayGet(database->vgArray, vgIdx); vg->tbCountPerVgId ++; } @@ -3609,7 +3769,7 @@ int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) { // malloc vg->childTblArray memory with table count for (int v = 0; v < database->vgroups; v++) { SVGroup *vg = benchArrayGet(database->vgArray, v); - infoPrint("Total %"PRId64" tables on %s's vgroup %d (id: %d)\n", + infoPrint("Local hash calc %"PRId64" tables on %s's vgroup %d (id: %d)\n", vg->tbCountPerVgId, database->dbName, v, vg->vgId); if (vg->tbCountPerVgId) { threads++; @@ -3623,6 +3783,9 @@ int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) { // set vg->childTblArray data for (int64_t i = 0; i < stbInfo->childTblCount; i++) { int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups); + if (vgIdx == -1) { + continue; + } SVGroup *vg = benchArrayGet(database->vgArray, vgIdx); debugPrint("calc table hash to vgroup %s.%s vgIdx=%d\n", database->dbName, @@ -3655,8 +3818,13 @@ TAOS_STMT2* initStmt2(TAOS* taos, bool single) { memset(&op2, 0, sizeof(op2)); op2.singleStbInsert = single; op2.singleTableBindOnce = single; - infoPrint("initStmt2 call taos_stmt2_init single=%d\n", single); - return taos_stmt2_init(taos, &op2); + + TAOS_STMT2* stmt2 = taos_stmt2_init(taos, &op2); + if (stmt2) + succPrint("succ taos_stmt2_init single=%d\n", single); + else + errorPrint("failed taos_stmt2_init single=%d\n", single); + return stmt2; } // init insert thread @@ -4613,10 +4781,11 @@ int insertTestProcess() { // ------- STMT 2 ----------- // -static int32_t stmt2BindVProgressive( +static int32_t stmt2BindAndSubmit( threadInfo *pThreadInfo, SChildTable *childTbl, - int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1) { + int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1, + int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w) { // create bindV int32_t count = 1; @@ -4647,8 +4816,13 @@ static int32_t stmt2BindVProgressive( batch = g_arguments->prepared_rand - pos; } - int32_t generated = bindVColsProgressive(bindv, 0, pThreadInfo, batch, *timestamp, pos, childTbl, pkCur, pkCnt, &n); - if(generated <= 0) { + if (batch == 0) { + infoPrint("batch size is zero. pos = %"PRId64"\n", pos); + return 0; + } + + uint32_t generated = bindVColsProgressive(bindv, 0, pThreadInfo, batch, *timestamp, pos, childTbl, pkCur, pkCnt, &n); + if(generated == 0) { errorPrint( "get cols data bind information failed. table: %s\n", childTbl->name); freeBindV(bindv); return -1; @@ -4659,18 +4833,17 @@ static int32_t stmt2BindVProgressive( showBindV(bindv, stbInfo->tags, stbInfo->cols); } - // do bindv - int64_t start = toolsGetTimestampUs(); - int32_t ret = taos_stmt2_bind_param(stmt2, bindv, -1); - if(ret != 0) { - errorPrint( "taos_stmt2_bind_param failed, table: %s . engine error: %s\n", childTbl->name, taos_stmt2_error(stmt2)); - freeBindV(bindv); - return -1; - } - debugPrint("succ to call taos_stmt2_bind_param() progressive mode. table=%s batch=%d pos=%" PRId64 " ts=%" PRId64 " generated=%d\n", - childTbl->name, batch, pos, *timestamp, generated); - *delay1 = toolsGetTimestampUs() - start; + // bind and submit + int32_t code = submitStmt2(pThreadInfo, bindv, delay1, delay3, startTs, endTs, &generated, w); // free - freeBindV(bindv); - return generated; + freeBindV(bindv); + + if(code != 0) { + errorPrint( "failed submitStmt2() progressive mode, table: %s . engine error: %s\n", childTbl->name, taos_stmt2_error(stmt2)); + return code; + } else { + debugPrint("succ submitStmt2 progressive mode. table=%s batch=%d pos=%" PRId64 " ts=%" PRId64 " generated=%d\n", + childTbl->name, batch, pos, *timestamp, generated); + return generated; + } } diff --git a/tools/taos-tools/src/benchJsonOpt.c b/tools/taos-tools/src/benchJsonOpt.c index bcac6ff0dd..3e41908668 100644 --- a/tools/taos-tools/src/benchJsonOpt.c +++ b/tools/taos-tools/src/benchJsonOpt.c @@ -260,23 +260,25 @@ static int getColumnAndTagTypeFromInsertJsonFile( minInDbl = min; } - double valueRange = maxInDbl - minInDbl; - tools_cJSON *dataScalingFactor = tools_cJSON_GetObjectItem(column, "scalingFactor"); - if (tools_cJSON_IsNumber(dataScalingFactor)) { - scalingFactor = dataScalingFactor->valueint; - if (scalingFactor > 1) { - max = maxInDbl * scalingFactor; - min = minInDbl * scalingFactor; + if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { + double valueRange = maxInDbl - minInDbl; + tools_cJSON *dataScalingFactor = tools_cJSON_GetObjectItem(column, "scalingFactor"); + if (tools_cJSON_IsNumber(dataScalingFactor)) { + scalingFactor = dataScalingFactor->valueint; + if (1< scalingFactor && scalingFactor <= 1000000) { + max = maxInDbl * scalingFactor; + min = minInDbl * scalingFactor; + } else { + scalingFactor = 1; + } } else { - scalingFactor = 1; - } - } else { - if (0 < valueRange && valueRange <= 1) { - scalingFactor = 1000; - max = maxInDbl * scalingFactor; - min = minInDbl * scalingFactor; - } else { - scalingFactor = 1; + if (0 < valueRange && valueRange <= 1) { + scalingFactor = 1000; + max = maxInDbl * scalingFactor; + min = minInDbl * scalingFactor; + } else { + scalingFactor = 1; + } } } @@ -495,23 +497,26 @@ static int getColumnAndTagTypeFromInsertJsonFile( minInDbl = min; } - double valueRange = maxInDbl - minInDbl; - tools_cJSON *dataScalingFactor = tools_cJSON_GetObjectItem(tagObj, "scalingFactor"); - if (tools_cJSON_IsNumber(dataScalingFactor)) { - scalingFactor = dataScalingFactor->valueint; - if (scalingFactor > 1) { - max = maxInDbl * scalingFactor; - min = minInDbl * scalingFactor; + + if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { + double valueRange = maxInDbl - minInDbl; + tools_cJSON *dataScalingFactor = tools_cJSON_GetObjectItem(tagObj, "scalingFactor"); + if (tools_cJSON_IsNumber(dataScalingFactor)) { + scalingFactor = dataScalingFactor->valueint; + if (1< scalingFactor && scalingFactor <= 1000000) { + max = maxInDbl * scalingFactor; + min = minInDbl * scalingFactor; + } else { + scalingFactor = 1; + } } else { - scalingFactor = 1; - } - } else { - if (0 < valueRange && valueRange <= 1) { - scalingFactor = 1000; - max = maxInDbl * scalingFactor; - min = minInDbl * scalingFactor; - } else { - scalingFactor = 1; + if (0 < valueRange && valueRange <= 1) { + scalingFactor = 1000; + max = maxInDbl * scalingFactor; + min = minInDbl * scalingFactor; + } else { + scalingFactor = 1; + } } } @@ -1026,11 +1031,8 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) { superTable->childTblLimit = superTable->childTblCount; } } else { - warnPrint("child table limit %"PRId64" is invalid, " - "set to %"PRId64"\n", - childTbl_limit->valueint, - superTable->childTblCount); - superTable->childTblLimit = superTable->childTblCount; + warnPrint("child table limit %"PRId64" is invalid, set to zero. \n",childTbl_limit->valueint); + superTable->childTblLimit = 0; } } tools_cJSON *childTbl_offset = @@ -1038,6 +1040,14 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) { if (tools_cJSON_IsNumber(childTbl_offset)) { superTable->childTblOffset = childTbl_offset->valueint; } + + // check limit offset + if( superTable->childTblOffset + superTable->childTblLimit > superTable->childTblCount ) { + errorPrint("json config invalid. childtable_offset(%"PRId64") + childtable_limit(%"PRId64") > childtable_count(%"PRId64")", + superTable->childTblOffset, superTable->childTblLimit, superTable->childTblCount); + return -1; + } + tools_cJSON *childTbl_from = tools_cJSON_GetObjectItem(stbInfo, "childtable_from"); if (tools_cJSON_IsNumber(childTbl_from)) { @@ -1071,6 +1081,7 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) { return -1; } + // read from super table tools_cJSON *continueIfFail = tools_cJSON_GetObjectItem(stbInfo, "continue_if_fail"); // yes, no, if (tools_cJSON_IsString(continueIfFail)) { @@ -1085,6 +1096,9 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) { continueIfFail->valuestring); return -1; } + } else { + // default value is common specialed + superTable->continueIfFail = g_arguments->continueIfFail; } // start_fillback_time @@ -1555,6 +1569,7 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) { } } + // read from common tools_cJSON *continueIfFail = tools_cJSON_GetObjectItem(json, "continue_if_fail"); // yes, no, if (tools_cJSON_IsString(continueIfFail)) { @@ -1760,79 +1775,8 @@ PARSE_OVER: return code; } -static int getMetaFromQueryJsonFile(tools_cJSON *json) { - int32_t code = -1; - - tools_cJSON *telnet_tcp_port = - tools_cJSON_GetObjectItem(json, "telnet_tcp_port"); - if (tools_cJSON_IsNumber(telnet_tcp_port)) { - g_arguments->telnet_tcp_port = (uint16_t)telnet_tcp_port->valueint; - } - - tools_cJSON *gQueryTimes = tools_cJSON_GetObjectItem(json, "query_times"); - if (tools_cJSON_IsNumber(gQueryTimes)) { - g_queryInfo.query_times = gQueryTimes->valueint; - } else { - g_queryInfo.query_times = 1; - } - - tools_cJSON *gKillSlowQueryThreshold = - tools_cJSON_GetObjectItem(json, "kill_slow_query_threshold"); - if (tools_cJSON_IsNumber(gKillSlowQueryThreshold)) { - g_queryInfo.killQueryThreshold = gKillSlowQueryThreshold->valueint; - } else { - g_queryInfo.killQueryThreshold = 0; - } - - tools_cJSON *gKillSlowQueryInterval = - tools_cJSON_GetObjectItem(json, "kill_slow_query_interval"); - if (tools_cJSON_IsNumber(gKillSlowQueryInterval)) { - g_queryInfo.killQueryInterval = gKillSlowQueryInterval ->valueint; - } else { - g_queryInfo.killQueryInterval = 1; /* by default, interval 1s */ - } - - tools_cJSON *resetCache = - tools_cJSON_GetObjectItem(json, "reset_query_cache"); - if (tools_cJSON_IsString(resetCache)) { - if (0 == strcasecmp(resetCache->valuestring, "yes")) { - g_queryInfo.reset_query_cache = true; - } - } else { - g_queryInfo.reset_query_cache = false; - } - - tools_cJSON *respBuffer = - tools_cJSON_GetObjectItem(json, "response_buffer"); - if (tools_cJSON_IsNumber(respBuffer)) { - g_queryInfo.response_buffer = respBuffer->valueint; - } else { - g_queryInfo.response_buffer = RESP_BUF_LEN; - } - - tools_cJSON *dbs = tools_cJSON_GetObjectItem(json, "databases"); - if (tools_cJSON_IsString(dbs)) { - g_queryInfo.dbName = dbs->valuestring; - } - - tools_cJSON *queryMode = tools_cJSON_GetObjectItem(json, "query_mode"); - if (tools_cJSON_IsString(queryMode)) { - if (0 == strcasecmp(queryMode->valuestring, "rest")) { - g_queryInfo.iface = REST_IFACE; - } else if (0 == strcasecmp(queryMode->valuestring, "taosc")) { - g_queryInfo.iface = TAOSC_IFACE; - } else { - errorPrint("Invalid query_mode value: %s\n", - queryMode->valuestring); - goto PARSE_OVER; - } - } - // init sqls - g_queryInfo.specifiedQueryInfo.sqls = benchArrayInit(1, sizeof(SSQL)); - - // specified_table_query - tools_cJSON *specifiedQuery = - tools_cJSON_GetObjectItem(json, "specified_table_query"); +// Spec Query +int32_t readSpecQueryJson(tools_cJSON * specifiedQuery) { g_queryInfo.specifiedQueryInfo.concurrent = 1; if (tools_cJSON_IsObject(specifiedQuery)) { tools_cJSON *queryInterval = @@ -1858,12 +1802,14 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) { if (tools_cJSON_IsString(mixedQueryObj)) { if (0 == strcasecmp(mixedQueryObj->valuestring, "yes")) { g_queryInfo.specifiedQueryInfo.mixed_query = true; + infoPrint("%s\n","mixed_query is True"); } else if (0 == strcasecmp(mixedQueryObj->valuestring, "no")) { g_queryInfo.specifiedQueryInfo.mixed_query = false; + infoPrint("%s\n","mixed_query is False"); } else { errorPrint("Invalid mixed_query value: %s\n", mixedQueryObj->valuestring); - goto PARSE_OVER; + return -1; } } @@ -1945,7 +1891,7 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) { if (fp == NULL) { errorPrint("failed to open file: %s\n", sqlFileObj->valuestring); - goto PARSE_OVER; + return -1; } char *buf = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true); while (fgets(buf, TSDB_MAX_ALLOWED_SQL_LEN, fp)) { @@ -2030,16 +1976,19 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) { } } else { errorPrint("%s", "Invalid sql in json\n"); - goto PARSE_OVER; + return -1; } } } } } - // super_table_query - tools_cJSON *superQuery = - tools_cJSON_GetObjectItem(json, "super_table_query"); + // succ + return 0; +} + +// Super Query +int32_t readSuperQueryJson(tools_cJSON * superQuery) { g_queryInfo.superQueryInfo.threadCnt = 1; if (!superQuery || superQuery->type != tools_cJSON_Object) { g_queryInfo.superQueryInfo.sqlCount = 0; @@ -2165,7 +2114,7 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) { errorPrint( "failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); - goto PARSE_OVER; + return -1; } g_queryInfo.superQueryInfo.sqlCount = superSqlSize; @@ -2191,11 +2140,118 @@ static int getMetaFromQueryJsonFile(tools_cJSON *json) { } } } + + // succ + return 0; +} - code = 0; +// read query json +static int getMetaFromQueryJsonFile(tools_cJSON *json) { + int32_t code = -1; -PARSE_OVER: - return code; + // read common + tools_cJSON *telnet_tcp_port = + tools_cJSON_GetObjectItem(json, "telnet_tcp_port"); + if (tools_cJSON_IsNumber(telnet_tcp_port)) { + g_arguments->telnet_tcp_port = (uint16_t)telnet_tcp_port->valueint; + } + + tools_cJSON *gQueryTimes = tools_cJSON_GetObjectItem(json, "query_times"); + if (tools_cJSON_IsNumber(gQueryTimes)) { + g_queryInfo.query_times = gQueryTimes->valueint; + } else { + g_queryInfo.query_times = 1; + } + + tools_cJSON *gKillSlowQueryThreshold = + tools_cJSON_GetObjectItem(json, "kill_slow_query_threshold"); + if (tools_cJSON_IsNumber(gKillSlowQueryThreshold)) { + g_queryInfo.killQueryThreshold = gKillSlowQueryThreshold->valueint; + } else { + g_queryInfo.killQueryThreshold = 0; + } + + tools_cJSON *gKillSlowQueryInterval = + tools_cJSON_GetObjectItem(json, "kill_slow_query_interval"); + if (tools_cJSON_IsNumber(gKillSlowQueryInterval)) { + g_queryInfo.killQueryInterval = gKillSlowQueryInterval->valueint; + } else { + g_queryInfo.killQueryInterval = 1; /* by default, interval 1s */ + } + + tools_cJSON *resetCache = + tools_cJSON_GetObjectItem(json, "reset_query_cache"); + if (tools_cJSON_IsString(resetCache)) { + if (0 == strcasecmp(resetCache->valuestring, "yes")) { + g_queryInfo.reset_query_cache = true; + } + } else { + g_queryInfo.reset_query_cache = false; + } + + tools_cJSON *respBuffer = + tools_cJSON_GetObjectItem(json, "response_buffer"); + if (tools_cJSON_IsNumber(respBuffer)) { + g_queryInfo.response_buffer = respBuffer->valueint; + } else { + g_queryInfo.response_buffer = RESP_BUF_LEN; + } + + tools_cJSON *dbs = tools_cJSON_GetObjectItem(json, "databases"); + if (tools_cJSON_IsString(dbs)) { + g_queryInfo.dbName = dbs->valuestring; + } + + tools_cJSON *queryMode = tools_cJSON_GetObjectItem(json, "query_mode"); + if (tools_cJSON_IsString(queryMode)) { + if (0 == strcasecmp(queryMode->valuestring, "rest")) { + g_queryInfo.iface = REST_IFACE; + } else if (0 == strcasecmp(queryMode->valuestring, "taosc")) { + g_queryInfo.iface = TAOSC_IFACE; + } else { + errorPrint("Invalid query_mode value: %s\n", + queryMode->valuestring); + return -1; + } + } + // init sqls + g_queryInfo.specifiedQueryInfo.sqls = benchArrayInit(1, sizeof(SSQL)); + + // specified_table_query + tools_cJSON *specifiedQuery = tools_cJSON_GetObjectItem(json, "specified_table_query"); + if (specifiedQuery) { + code = readSpecQueryJson(specifiedQuery); + if(code) { + errorPrint("failed to readSpecQueryJson code=%d \n", code); + return code; + } + } + + // super_table_query + tools_cJSON *superQuery = tools_cJSON_GetObjectItem(json, "super_table_query"); + if (superQuery) { + code = readSuperQueryJson(superQuery); + if(code) { + errorPrint("failed to readSuperQueryJson code=%d \n", code); + return code; + } + } + + // only have one + const char* errType = "json config invalid:"; + if (specifiedQuery && superQuery) { + errorPrint("%s only appear one for 'specified_table_query' and 'super_table_query' \n", errType); + return -1; + } + + // must have one + if (specifiedQuery == NULL && superQuery == NULL ) { + errorPrint("%s must have one for 'specified_table_query' or 'super_table_query' \n", errType); + return -1; + } + + // succ + return 0; } #ifdef TD_VER_COMPATIBLE_3_0_0_0 @@ -2368,8 +2424,7 @@ TMQ_PARSE_OVER: } #endif -int getInfoFromJsonFile() { - char * file = g_arguments->metaFile; +int readJsonConfig(char * file) { int32_t code = -1; FILE * fp = fopen(file, "r"); if (!fp) { @@ -2420,20 +2475,18 @@ int getInfoFromJsonFile() { // read common item code = getMetaFromCommonJsonFile(root); if (INSERT_TEST == g_arguments->test_mode || CSVFILE_TEST == g_arguments->test_mode) { + // insert code = getMetaFromInsertJsonFile(root); -#ifdef TD_VER_COMPATIBLE_3_0_0_0 } else if (QUERY_TEST == g_arguments->test_mode) { -#else - } else { -#endif + // query memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo)); code = getMetaFromQueryJsonFile(root); -#ifdef TD_VER_COMPATIBLE_3_0_0_0 } else if (SUBSCRIBE_TEST == g_arguments->test_mode) { + // subscribe memset(&g_tmqInfo, 0, sizeof(STmqMetaInfo)); code = getMetaFromTmqJsonFile(root); -#endif } + PARSE_OVER: free(content); fclose(fp); diff --git a/tools/taos-tools/src/benchMain.c b/tools/taos-tools/src/benchMain.c index 7f590df9e7..86ad795d05 100644 --- a/tools/taos-tools/src/benchMain.c +++ b/tools/taos-tools/src/benchMain.c @@ -25,7 +25,7 @@ tools_cJSON* root; #define CLIENT_INFO_LEN 20 static char g_client_info[CLIENT_INFO_LEN] = {0}; -int g_majorVersionOfClient = 0; +int32_t g_majorVersionOfClient = 0; // set flag if command passed, see ARG_OPT_ ??? uint64_t g_argFlag = 0; @@ -43,11 +43,6 @@ void* benchCancelHandler(void* arg) { g_arguments->terminate = true; toolsMsleep(10); - if (g_arguments->in_prompt || INSERT_TEST != g_arguments->test_mode) { - toolsMsleep(100); - postFreeResource(); - exit(EXIT_SUCCESS); - } return NULL; } #endif @@ -128,7 +123,11 @@ int main(int argc, char* argv[]) { #endif if (g_arguments->metaFile) { g_arguments->totalChildTables = 0; - if (getInfoFromJsonFile()) exit(EXIT_FAILURE); + if (readJsonConfig(g_arguments->metaFile)) { + errorPrint("failed to readJsonConfig %s\n", g_arguments->metaFile); + exitLog(); + return -1; + } } else { modifyArgument(); } diff --git a/tools/taos-tools/src/benchQuery.c b/tools/taos-tools/src/benchQuery.c index 2074539f32..0de71d8b18 100644 --- a/tools/taos-tools/src/benchQuery.c +++ b/tools/taos-tools/src/benchQuery.c @@ -13,14 +13,16 @@ #include #include "benchLog.h" -extern int g_majorVersionOfClient; - -int selectAndGetResult(threadInfo *pThreadInfo, char *command) { +// query and get result record is true to total request +int selectAndGetResult(qThreadInfo *pThreadInfo, char *command, bool record) { int ret = 0; + // user cancel if (g_arguments->terminate) { return -1; } + + // execute sql uint32_t threadID = pThreadInfo->threadID; char dbName[TSDB_DB_NAME_LEN] = {0}; tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN); @@ -30,142 +32,179 @@ int selectAndGetResult(threadInfo *pThreadInfo, char *command) { 0, g_arguments->port, false, pThreadInfo->sockfd, pThreadInfo->filePath); if (0 != retCode) { - errorPrint("====restful return fail, threadID[%u]\n", - threadID); + errorPrint("====restful return fail, threadID[%u]\n", threadID); ret = -1; } } else { + // query TAOS *taos = pThreadInfo->conn->taos; int64_t rows = 0; TAOS_RES *res = taos_query(taos, command); int code = taos_errno(res); if (res == NULL || code) { - if (YES_IF_FAILED == g_arguments->continueIfFail) { - warnPrint("failed to execute sql:%s, " - "code: 0x%08x, reason:%s\n", - command, code, taos_errstr(res)); - } else { - errorPrint("failed to execute sql:%s, " - "code: 0x%08x, reason:%s\n", - command, code, taos_errstr(res)); - ret = -1; - } + // failed query + errorPrint("failed to execute sql:%s, " + "code: 0x%08x, reason:%s\n", + command, code, taos_errstr(res)); + ret = -1; } else { - //if (strlen(pThreadInfo->filePath) > 0) { - rows = fetchResult(res, pThreadInfo); - //} + // succ query + if (record) + rows = fetchResult(res, pThreadInfo->filePath); + } + + // free result + if (res) { + taos_free_result(res); } - taos_free_result(res); debugPrint("query sql:%s rows:%"PRId64"\n", command, rows); } + + // record count + if (ret ==0) { + // succ + if (record) + pThreadInfo->nSucc ++; + } else { + // fail + if (record) + pThreadInfo->nFail ++; + + // continue option + if (YES_IF_FAILED == g_arguments->continueIfFail) { + ret = 0; // force continue + } + } + return ret; } -static void *mixedQuery(void *sarg) { - queryThreadInfo *pThreadInfo = (queryThreadInfo*)sarg; +// interlligent sleep +void autoSleep(uint64_t interval, uint64_t delay ) { + if (delay < interval * 1000) { + toolsMsleep((int32_t)(interval * 1000 - delay)); // ms + } +} + +// reset +int32_t resetQueryCache(qThreadInfo* pThreadInfo) { + // execute sql + if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE", false)) { + errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__); + return -1; + } + // succ + return 0; +} + + + +// +// --------------------------------- second levle funtion for Thread ----------------------------------- +// + +// show rela qps +int64_t showRealQPS(qThreadInfo* pThreadInfo, int64_t lastPrintTime, int64_t startTs) { + int64_t now = toolsGetTimestampMs(); + if (now - lastPrintTime > 10 * 1000) { + // real total + uint64_t totalQueried = pThreadInfo->nSucc; + if(g_arguments->continueIfFail == YES_IF_FAILED) { + totalQueried += pThreadInfo->nFail; + } + infoPrint( + "thread[%d] has currently completed queries: %" PRIu64 ", QPS: %10.3f\n", + pThreadInfo->threadID, totalQueried, + (double)(totalQueried / ((now - startTs) / 1000.0))); + return now; + } else { + return lastPrintTime; + } +} + +// spec query mixed thread +static void *specQueryMixThread(void *sarg) { + qThreadInfo *pThreadInfo = (qThreadInfo*)sarg; #ifdef LINUX - prctl(PR_SET_NAME, "mixedQuery"); + prctl(PR_SET_NAME, "specQueryMixThread"); #endif // use db if (g_queryInfo.dbName) { if (pThreadInfo->conn && pThreadInfo->conn->taos && taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) { - errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadId, g_queryInfo.dbName); + errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName); return NULL; } } - int64_t lastPrintTs = toolsGetTimestampMs(); - int64_t st; - int64_t et; + int64_t st = 0; + int64_t et = 0; + int64_t startTs = toolsGetTimestampMs(); + int64_t lastPrintTime = startTs; uint64_t queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes; + uint64_t interval = g_queryInfo.specifiedQueryInfo.queryInterval; + pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t)); for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) { SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); for (int j = 0; j < queryTimes; ++j) { - if (g_arguments->terminate) { - return NULL; + // use cancel + if(g_arguments->terminate) { + infoPrint("%s\n", "user cancel , so exit testing."); + break; } + + // reset cache if (g_queryInfo.reset_query_cache) { - if (queryDbExecCall(pThreadInfo->conn, - "RESET QUERY CACHE")) { - errorPrint("%s() LN%d, reset query cache failed\n", - __func__, __LINE__); + if (resetQueryCache(pThreadInfo)) { + errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__); return NULL; } } + + // execute sql st = toolsGetTimestampUs(); - if (g_queryInfo.iface == REST_IFACE) { - int retCode = postProceSql(sql->command, g_queryInfo.dbName, - 0, g_queryInfo.iface, 0, - g_arguments->port, - false, pThreadInfo->sockfd, ""); - if (retCode) { - errorPrint("thread[%d]: restful query <%s> failed\n", - pThreadInfo->threadId, sql->command); - continue; - } - } else { - TAOS_RES *res = taos_query(pThreadInfo->conn->taos, - sql->command); - if (res == NULL || taos_errno(res) != 0) { - if (YES_IF_FAILED == g_arguments->continueIfFail) { - warnPrint( - "thread[%d]: failed to execute sql :%s, " - "code: 0x%x, reason: %s\n", - pThreadInfo->threadId, - sql->command, - taos_errno(res), taos_errstr(res)); - } else { - errorPrint( - "thread[%d]: failed to execute sql :%s, " - "code: 0x%x, reason: %s\n", - pThreadInfo->threadId, - sql->command, - taos_errno(res), taos_errstr(res)); - if (TSDB_CODE_RPC_NETWORK_UNAVAIL == - taos_errno(res)) { - taos_free_result(res); - return NULL; - } - } - taos_free_result(res); - continue; - } - taos_free_result(res); + int ret = selectAndGetResult(pThreadInfo, sql->command, true); + if (ret) { + g_fail = true; + errorPrint("failed call mix selectAndGetResult, i=%d j=%d", i, j); + return NULL; } et = toolsGetTimestampUs(); - int64_t* delay = benchCalloc(1, sizeof(int64_t), false); - *delay = et - st; - debugPrint("%s() LN%d, delay: %"PRId64"\n", - __func__, __LINE__, *delay); - pThreadInfo->total_delay += (et - st); - if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){ - tmfree(delay); + // sleep + if (interval > 0) { + autoSleep(interval, et - st); } - int64_t currentPrintTs = toolsGetTimestampMs(); - if (currentPrintTs - lastPrintTs > 10 * 1000) { - infoPrint("thread[%d] has currently complete query %d times\n", - pThreadInfo->threadId, - (int)pThreadInfo->query_delay_list->size); - lastPrintTs = currentPrintTs; + + // delay + if (ret == 0) { + int64_t* delay = benchCalloc(1, sizeof(int64_t), false); + *delay = et - st; + debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, *delay); + + pThreadInfo->total_delay += *delay; + if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){ + tmfree(delay); + } } + + // real show + lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs); } } + return NULL; } -static void *specifiedTableQuery(void *sarg) { - threadInfo *pThreadInfo = (threadInfo *)sarg; +// spec query thread +static void *specQueryThread(void *sarg) { + qThreadInfo *pThreadInfo = (qThreadInfo *)sarg; #ifdef LINUX - prctl(PR_SET_NAME, "specTableQuery"); + prctl(PR_SET_NAME, "specQueryThread"); #endif uint64_t st = 0; uint64_t et = 0; - uint64_t minDelay = UINT64_MAX; - uint64_t maxDelay = 0; - uint64_t totalDelay = 0; int32_t index = 0; // use db @@ -179,13 +218,13 @@ static void *specifiedTableQuery(void *sarg) { } uint64_t queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes; - pThreadInfo->query_delay_list = benchCalloc(queryTimes, - sizeof(uint64_t), false); - uint64_t lastPrintTime = toolsGetTimestampMs(); - uint64_t startTs = toolsGetTimestampMs(); + uint64_t interval = g_queryInfo.specifiedQueryInfo.queryInterval; + pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t)); - SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, - pThreadInfo->querySeq); + uint64_t startTs = toolsGetTimestampMs(); + uint64_t lastPrintTime = startTs; + + SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, pThreadInfo->querySeq); if (sql->result[0] != '\0') { snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d", @@ -193,485 +232,210 @@ static void *specifiedTableQuery(void *sarg) { } while (index < queryTimes) { - // check cancel - if (g_arguments->terminate) { - return NULL; + // use cancel + if(g_arguments->terminate) { + infoPrint("thread[%d] user cancel , so exit testing.\n", pThreadInfo->threadID); + break; } - if (g_queryInfo.specifiedQueryInfo.queryInterval && - (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) { - toolsMsleep((int32_t)( - g_queryInfo.specifiedQueryInfo.queryInterval*1000 - - (et - st))); // ms - } + // reset cache if (g_queryInfo.reset_query_cache) { - // execute sql - if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE")) { - errorPrint("%s() LN%d, reset query cache failed\n", - __func__, __LINE__); + if (resetQueryCache(pThreadInfo)) { + errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__); return NULL; } } + // execute sql st = toolsGetTimestampUs(); - int ret = selectAndGetResult(pThreadInfo, sql->command); + int ret = selectAndGetResult(pThreadInfo, sql->command, true); if (ret) { g_fail = true; + errorPrint("failed call spec selectAndGetResult, index=%d\n", index); + break; + } + et = toolsGetTimestampUs(); + + // sleep + if (interval > 0) { + autoSleep(interval, et - st); } - et = toolsGetTimestampUs(); - int64_t delay = et - st; - debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, delay); + + uint64_t delay = et - st; + debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay); if (ret == 0) { - pThreadInfo->query_delay_list[index] = delay; - pThreadInfo->totalQueried++; + // only succ add delay list + benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay); + pThreadInfo->total_delay += delay; } index++; - totalDelay += delay; - if (delay > maxDelay) { - maxDelay = delay; - } - if (delay < minDelay) { - minDelay = delay; - } - uint64_t currentPrintTime = toolsGetTimestampMs(); - uint64_t endTs = toolsGetTimestampMs(); - - if ((ret == 0) && (currentPrintTime - lastPrintTime > 30 * 1000)) { - infoPrint( - "thread[%d] has currently completed queries: %" PRIu64 - ", QPS: %10.6f\n", - pThreadInfo->threadID, pThreadInfo->totalQueried, - (double)(pThreadInfo->totalQueried / - ((endTs - startTs) / 1000.0))); - lastPrintTime = currentPrintTime; - } - - if (-2 == ret) { - toolsMsleep(1000); - return NULL; - } + // real show + lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs); } - qsort(pThreadInfo->query_delay_list, queryTimes, - sizeof(uint64_t), compare); - pThreadInfo->avg_delay = (double)totalDelay / queryTimes; + return NULL; } -static void *superTableQuery(void *sarg) { +// super table query thread +static void *stbQueryThread(void *sarg) { char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false); - threadInfo *pThreadInfo = (threadInfo *)sarg; + qThreadInfo *pThreadInfo = (qThreadInfo *)sarg; #ifdef LINUX - prctl(PR_SET_NAME, "superTableQuery"); + prctl(PR_SET_NAME, "stbQueryThread"); #endif uint64_t st = 0; - uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000; + uint64_t et = 0; uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes; + uint64_t interval = g_queryInfo.superQueryInfo.queryInterval; + pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(uint64_t)); + uint64_t startTs = toolsGetTimestampMs(); - - uint64_t lastPrintTime = toolsGetTimestampMs(); + uint64_t lastPrintTime = startTs; while (queryTimes--) { - if (g_queryInfo.superQueryInfo.queryInterval - && ((et - st) < - (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000)) { - toolsMsleep((int32_t) - (g_queryInfo.superQueryInfo.queryInterval*1000 - - (et - st))); + // use cancel + if(g_arguments->terminate) { + infoPrint("%s\n", "user cancel , so exit testing."); + break; } + // reset cache + if (g_queryInfo.reset_query_cache) { + if (resetQueryCache(pThreadInfo)) { + errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__); + return NULL; + } + } + + // execute st = toolsGetTimestampMs(); - for (int i = (int)pThreadInfo->start_table_from; - i <= pThreadInfo->end_table_to; i++) { + // for each table + for (int i = (int)pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { + // use cancel + if(g_arguments->terminate) { + infoPrint("%s\n", "user cancel , so exit testing."); + break; + } + + // for each sql for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN); - replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, - i); + // use cancel + if(g_arguments->terminate) { + infoPrint("%s\n", "user cancel , so exit testing."); + break; + } + + // get real child name sql + if (replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i)) { + // fault + tmfree(sqlstr); + return NULL; + } + if (g_queryInfo.superQueryInfo.result[j][0] != '\0') { snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d", g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); } - if (selectAndGetResult(pThreadInfo, sqlstr)) { + + // execute sql + uint64_t s = toolsGetTimestampUs(); + int ret = selectAndGetResult(pThreadInfo, sqlstr, true); + if (ret) { + // found error + errorPrint("failed call stb selectAndGetResult, i=%d j=%d\n", i, j); g_fail = true; + tmfree(sqlstr); + return NULL; + } + uint64_t delay = toolsGetTimestampUs() - s; + debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay); + if (ret == 0) { + // only succ add delay list + benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay); + pThreadInfo->total_delay += delay; } - pThreadInfo->totalQueried++; - - int64_t currentPrintTime = toolsGetTimestampMs(); - int64_t endTs = toolsGetTimestampMs(); - if (currentPrintTime - lastPrintTime > 30 * 1000) { - infoPrint( - "thread[%d] has currently completed queries: %" PRIu64 - ", QPS: %10.3f\n", - pThreadInfo->threadID, pThreadInfo->totalQueried, - (double)(pThreadInfo->totalQueried / - ((endTs - startTs) / 1000.0))); - lastPrintTime = currentPrintTime; - } + // show real QPS + lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs); } } et = toolsGetTimestampMs(); + + // sleep + if (interval > 0) { + autoSleep(interval, et - st); + } + } tmfree(sqlstr); + return NULL; } -static int multi_thread_super_table_query(uint16_t iface, char* dbName) { - int ret = -1; - pthread_t * pidsOfSub = NULL; - threadInfo *infosOfSub = NULL; - //==== create sub threads for query from all sub table of the super table - if ((g_queryInfo.superQueryInfo.sqlCount > 0) - && (g_queryInfo.superQueryInfo.threadCnt > 0)) { - pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt - *sizeof(pthread_t), - false); - infosOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt - *sizeof(threadInfo), false); +// +// --------------------------------- firse level function ------------------------------ +// - int64_t ntables = g_queryInfo.superQueryInfo.childTblCount; - int threads = g_queryInfo.superQueryInfo.threadCnt; - - int64_t a = ntables / threads; - if (a < 1) { - threads = (int)ntables; - a = 1; - } - - int64_t b = 0; - if (threads != 0) { - b = ntables % threads; - } - - uint64_t tableFrom = 0; - for (int i = 0; i < threads; i++) { - threadInfo *pThreadInfo = infosOfSub + i; - pThreadInfo->threadID = i; - pThreadInfo->start_table_from = tableFrom; - pThreadInfo->ntables = i < b ? a + 1 : a; - pThreadInfo->end_table_to = - i < b ? tableFrom + a : tableFrom + a - 1; - tableFrom = pThreadInfo->end_table_to + 1; - if (iface == REST_IFACE) { - int sockfd = createSockFd(); - if (sockfd < 0) { - goto OVER; - } - pThreadInfo->sockfd = sockfd; - } else { - pThreadInfo->conn = initBenchConn(); - if (pThreadInfo->conn == NULL) { - goto OVER; - } - } - pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo); - } - g_queryInfo.superQueryInfo.threadCnt = threads; - - for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { - if (!g_arguments->terminate) - pthread_join(pidsOfSub[i], NULL); - threadInfo *pThreadInfo = infosOfSub + i; - if (iface == REST_IFACE) { - destroySockFd(pThreadInfo->sockfd); - } else { - closeBenchConn(pThreadInfo->conn); - } - if (g_fail) { - goto OVER; - } - } - for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; ++i) { - g_queryInfo.superQueryInfo.totalQueried - += infosOfSub[i].totalQueried; - } - } else { - return 0; +void totalChildQuery(qThreadInfo* infos, int threadCnt, int64_t spend) { + // valid check + if (infos == NULL || threadCnt == 0) { + return ; } - - ret = 0; -OVER: - tmfree((char *)pidsOfSub); - tmfree((char *)infosOfSub); - - for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) { - tmfree(g_queryInfo.superQueryInfo.childTblName[i]); - } - tmfree(g_queryInfo.superQueryInfo.childTblName); - return ret; -} - -// free g_queryInfo.specailQueryInfo memory , can re-call -void freeSpecialQueryInfo() { - // can re-call - if (g_queryInfo.specifiedQueryInfo.sqls == NULL) { - return; - } - - // loop free each item memory - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) { - SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); - tmfree(sql->command); - tmfree(sql->delay_list); - } - - // free Array - benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls); - g_queryInfo.specifiedQueryInfo.sqls = NULL; -} - - -static int multi_thread_specified_table_query(uint16_t iface, char* dbName) { - pthread_t * pids = NULL; - threadInfo *infos = NULL; - //==== create sub threads for query from specify table - int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; - uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size; - - // check invaid - if(nSqlCount == 0 || nConcurrent == 0 ) { - if(nSqlCount == 0) - warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount); - if(nConcurrent == 0) - warnPrint("concurrent is %d , specified_table_query->concurrent is zero. \n", nConcurrent); - return 0; - } - - // malloc funciton global memory - pids = benchCalloc(1, nConcurrent * sizeof(pthread_t), false); - infos = benchCalloc(1, nConcurrent * sizeof(threadInfo), false); - - bool exeError = false; - for (uint64_t i = 0; i < nSqlCount; i++) { - // reset - memset(pids, 0, nConcurrent * sizeof(pthread_t)); - memset(infos, 0, nConcurrent * sizeof(threadInfo)); - - // get execute sql - SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); - - // create threads - int threadCnt = 0; - for (int j = 0; j < nConcurrent; j++) { - threadInfo *pThreadInfo = infos + j; - pThreadInfo->threadID = i * nConcurrent + j; - pThreadInfo->querySeq = i; - if (iface == REST_IFACE) { - int sockfd = createSockFd(); - // int iMode = 1; - // ioctl(sockfd, FIONBIO, &iMode); - if (sockfd < 0) { - exeError = true; - - break; - } - pThreadInfo->sockfd = sockfd; - } else { - pThreadInfo->conn = initBenchConn(); - if (pThreadInfo->conn == NULL) { - destroySockFd(pThreadInfo->sockfd); - exeError = true; - break; - } - } - - pthread_create(pids + j, NULL, specifiedTableQuery, pThreadInfo); - threadCnt++; - } - - // if failed, set termainte flag true like ctrl+c exit - if (exeError) { - errorPrint(" i=%" PRIu64 " create thread occur error, so wait exit ...\n", i); - g_arguments->terminate = true; - } - - // wait threads execute finished one by one - for (int j = 0; j < threadCnt ; j++) { - pthread_join(pids[j], NULL); - threadInfo *pThreadInfo = infos + j; - if (iface == REST_IFACE) { -#ifdef WINDOWS - closesocket(pThreadInfo->sockfd); - WSACleanup(); -#else - close(pThreadInfo->sockfd); -#endif - } else { - closeBenchConn(pThreadInfo->conn); - pThreadInfo->conn = NULL; - } - - // need exit in loop - if (g_fail || g_arguments->terminate) { - // free BArray - tmfree(pThreadInfo->query_delay_list); - pThreadInfo->query_delay_list = NULL; - } - } - - // cancel or need exit check - if (g_fail || g_arguments->terminate) { - // free current funciton malloc memory - tmfree((char *)pids); - tmfree((char *)infos); - // free global - freeSpecialQueryInfo(); - return -1; - } - - // execute successfully - uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes; - uint64_t totalQueryTimes = query_times * nConcurrent; - double avg_delay = 0.0; - for (int j = 0; j < nConcurrent; j++) { - threadInfo *pThreadInfo = infos + j; - avg_delay += pThreadInfo->avg_delay; - for (uint64_t k = 0; k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) { - sql->delay_list[j * query_times + k] = pThreadInfo->query_delay_list[k]; - } - - // free BArray - tmfree(pThreadInfo->query_delay_list); - pThreadInfo->query_delay_list = NULL; - } - avg_delay /= nConcurrent; - qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes, sizeof(uint64_t), compare); - infoPrintNoTimestamp("complete query with %d threads and %" PRIu64 - " query delay " - "avg: \t%.6fs " - "min: \t%.6fs " - "max: \t%.6fs " - "p90: \t%.6fs " - "p95: \t%.6fs " - "p99: \t%.6fs " - "SQL command: %s" - "\n", - nConcurrent, query_times, avg_delay / 1E6, /* avg */ - sql->delay_list[0] / 1E6, /* min */ - sql->delay_list[totalQueryTimes - 1] / 1E6, /* max */ - /* p90 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6, - /* p95 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6, - /* p99 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command); - infoPrintNoTimestampToFile("complete query with %d threads and %" PRIu64 - " query delay " - "avg: \t%.6fs " - "min: \t%.6fs " - "max: \t%.6fs " - "p90: \t%.6fs " - "p95: \t%.6fs " - "p99: \t%.6fs " - "SQL command: %s" - "\n", - nConcurrent, query_times, avg_delay / 1E6, /* avg */ - sql->delay_list[0] / 1E6, /* min */ - sql->delay_list[totalQueryTimes - 1] / 1E6, /* max */ - /* p90 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6, - /* p95 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6, - /* p99 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command); - } - - g_queryInfo.specifiedQueryInfo.totalQueried = - g_queryInfo.specifiedQueryInfo.queryTimes * nConcurrent; - tmfree((char *)pids); - tmfree((char *)infos); - - // free specialQueryInfo - freeSpecialQueryInfo(); - return 0; -} - -static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) { - int code = -1; - int thread = g_queryInfo.specifiedQueryInfo.concurrent; - pthread_t * pids = benchCalloc(thread, sizeof(pthread_t), true); - queryThreadInfo *infos = benchCalloc(thread, sizeof(queryThreadInfo), true); - int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size; - int start_sql = 0; - int a = total_sql_num / thread; - if (a < 1) { - thread = total_sql_num; - a = 1; - } - int b = 0; - if (thread != 0) { - b = total_sql_num % thread; - } - for (int i = 0; i < thread; ++i) { - queryThreadInfo *pQueryThreadInfo = infos + i; - pQueryThreadInfo->threadId = i; - pQueryThreadInfo->start_sql = start_sql; - pQueryThreadInfo->end_sql = i < b ? start_sql + a : start_sql + a - 1; - start_sql = pQueryThreadInfo->end_sql + 1; - pQueryThreadInfo->total_delay = 0; - pQueryThreadInfo->query_delay_list = benchArrayInit(1, sizeof(int64_t)); - if (iface == REST_IFACE) { - int sockfd = createSockFd(); - if (sockfd < 0) { - goto OVER; - } - pQueryThreadInfo->sockfd = sockfd; - } else { - pQueryThreadInfo->conn = initBenchConn(); - if (pQueryThreadInfo->conn == NULL) { - goto OVER; - } - } - pthread_create(pids + i, NULL, mixedQuery, pQueryThreadInfo); - } - - int64_t start = toolsGetTimestampUs(); - for (int i = 0; i < thread; ++i) { - pthread_join(pids[i], NULL); - } - int64_t end = toolsGetTimestampUs(); - + // statistic BArray * delay_list = benchArrayInit(1, sizeof(int64_t)); - int64_t total_delay = 0; - for (int i = 0; i < thread; ++i) { - queryThreadInfo * pThreadInfo = infos + i; + double total_delays = 0; + + // clear + for (int i = 0; i < threadCnt; ++i) { + qThreadInfo * pThreadInfo = infos + i; + if(pThreadInfo->query_delay_list == NULL) { + continue;; + } + + // append delay benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData, - pThreadInfo->query_delay_list->size, true); - total_delay += pThreadInfo->total_delay; - tmfree(pThreadInfo->query_delay_list); + pThreadInfo->query_delay_list->size, false); + total_delays += pThreadInfo->total_delay; + + // free delay + benchArrayDestroy(pThreadInfo->query_delay_list); pThreadInfo->query_delay_list = NULL; - if (iface == REST_IFACE) { -#ifdef WINDOWS - closesocket(pThreadInfo->sockfd); - WSACleanup(); -#else - close(pThreadInfo->sockfd); -#endif - } else { - closeBenchConn(pThreadInfo->conn); - } } + + // succ is zero + if (delay_list->size == 0) { + errorPrint("%s", "succ queries count is zero.\n"); + benchArrayDestroy(delay_list); + return ; + } + + + // sort qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare); + + // show delay min max if (delay_list->size) { infoPrint( "spend %.6fs using " - "%d threads complete query %d times,cd " + "%d threads complete query %d times, " "min delay: %.6fs, " "avg delay: %.6fs, " "p90: %.6fs, " "p95: %.6fs, " "p99: %.6fs, " "max: %.6fs\n", - (end - start)/1E6, - thread, (int)delay_list->size, + spend/1E6, + threadCnt, (int)delay_list->size, *(int64_t *)(benchArrayGet(delay_list, 0))/1E6, - (double)total_delay/delay_list->size/1E6, + (double)total_delays/delay_list->size/1E6, *(int64_t *)(benchArrayGet(delay_list, (int32_t)(delay_list->size * 0.9)))/1E6, *(int64_t *)(benchArrayGet(delay_list, @@ -685,76 +449,419 @@ static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) { __func__, __LINE__, (int64_t)delay_list->size); } benchArrayDestroy(delay_list); - code = 0; +} + +// +// super table query +// +static int stbQuery(uint16_t iface, char* dbName) { + int ret = -1; + pthread_t * pidsOfSub = NULL; + qThreadInfo *threadInfos = NULL; + g_queryInfo.superQueryInfo.totalQueried = 0; + g_queryInfo.superQueryInfo.totalFail = 0; + + // check + if ((g_queryInfo.superQueryInfo.sqlCount == 0) + || (g_queryInfo.superQueryInfo.threadCnt == 0)) { + return 0; + } + + // malloc + pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt + *sizeof(pthread_t), + false); + threadInfos = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt + *sizeof(qThreadInfo), false); + + int64_t ntables = g_queryInfo.superQueryInfo.childTblCount; + int nConcurrent = g_queryInfo.superQueryInfo.threadCnt; + + int64_t a = ntables / nConcurrent; + if (a < 1) { + nConcurrent = (int)ntables; + a = 1; + } + + int64_t b = 0; + if (nConcurrent != 0) { + b = ntables % nConcurrent; + } + + uint64_t tableFrom = 0; + int threadCnt = 0; + for (int i = 0; i < nConcurrent; i++) { + qThreadInfo *pThreadInfo = threadInfos + i; + pThreadInfo->threadID = i; + pThreadInfo->start_table_from = tableFrom; + pThreadInfo->ntables = i < b ? a + 1 : a; + pThreadInfo->end_table_to = + i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; + // create conn + if (initQueryConn(pThreadInfo, iface)){ + break; + } + int code = pthread_create(pidsOfSub + i, NULL, stbQueryThread, pThreadInfo); + if (code != 0) { + errorPrint("failed stbQueryThread create. error code =%d \n", code); + break; + } + threadCnt ++; + } + + bool needExit = false; + // if failed, set termainte flag true like ctrl+c exit + if (threadCnt != nConcurrent ) { + needExit = true; + g_arguments->terminate = true; + goto OVER; + } + + // reset total + g_queryInfo.superQueryInfo.totalQueried = 0; + g_queryInfo.superQueryInfo.totalFail = 0; + + // real thread count + g_queryInfo.superQueryInfo.threadCnt = threadCnt; + int64_t start = toolsGetTimestampUs(); + + for (int i = 0; i < threadCnt; i++) { + pthread_join(pidsOfSub[i], NULL); + qThreadInfo *pThreadInfo = threadInfos + i; + // add succ + g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nSucc; + if (g_arguments->continueIfFail == YES_IF_FAILED) { + // "yes" need add fail cnt + g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nFail; + g_queryInfo.superQueryInfo.totalFail += pThreadInfo->nFail; + } + + // close conn + closeQueryConn(pThreadInfo, iface); + } + int64_t end = toolsGetTimestampUs(); + + if (needExit) { + goto OVER; + } + + // total show + totalChildQuery(threadInfos, threadCnt, end - start); + + ret = 0; + +OVER: + tmfree((char *)pidsOfSub); + tmfree((char *)threadInfos); + + for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) { + tmfree(g_queryInfo.superQueryInfo.childTblName[i]); + } + tmfree(g_queryInfo.superQueryInfo.childTblName); + return ret; +} + +// +// specQuery +// +static int specQuery(uint16_t iface, char* dbName) { + int ret = -1; + pthread_t *pids = NULL; + qThreadInfo *infos = NULL; + int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; + uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size; + g_queryInfo.specifiedQueryInfo.totalQueried = 0; + g_queryInfo.specifiedQueryInfo.totalFail = 0; + + // check invaid + if(nSqlCount == 0 || nConcurrent == 0 ) { + if(nSqlCount == 0) + warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount); + if(nConcurrent == 0) + warnPrint("nConcurrent is %d , specified_table_query->nConcurrent is zero. \n", nConcurrent); + return 0; + } + + // malloc threads memory + pids = benchCalloc(1, nConcurrent * sizeof(pthread_t), false); + infos = benchCalloc(1, nConcurrent * sizeof(qThreadInfo), false); + + for (uint64_t i = 0; i < nSqlCount; i++) { + if( g_arguments->terminate ) { + break; + } + + // reset + memset(pids, 0, nConcurrent * sizeof(pthread_t)); + memset(infos, 0, nConcurrent * sizeof(qThreadInfo)); + + // get execute sql + SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); + + // create threads + int threadCnt = 0; + for (int j = 0; j < nConcurrent; j++) { + qThreadInfo *pThreadInfo = infos + j; + pThreadInfo->threadID = i * nConcurrent + j; + pThreadInfo->querySeq = i; + + // create conn + if (initQueryConn(pThreadInfo, iface)) { + break; + } + + int code = pthread_create(pids + j, NULL, specQueryThread, pThreadInfo); + if (code != 0) { + errorPrint("failed specQueryThread create. error code =%d \n", code); + break; + } + threadCnt++; + } + + bool needExit = false; + // if failed, set termainte flag true like ctrl+c exit + if (threadCnt != nConcurrent ) { + needExit = true; + g_arguments->terminate = true; + } + + int64_t start = toolsGetTimestampUs(); + // wait threads execute finished one by one + for (int j = 0; j < threadCnt ; j++) { + pthread_join(pids[j], NULL); + qThreadInfo *pThreadInfo = infos + j; + closeQueryConn(pThreadInfo, iface); + + // need exit in loop + if (needExit) { + // free BArray + benchArrayDestroy(pThreadInfo->query_delay_list); + pThreadInfo->query_delay_list = NULL; + } + } + int64_t spend = toolsGetTimestampUs() - start; + if(spend == 0) { + // avoid xx/spend expr throw error + spend = 1; + } + + // create + if (needExit) { + errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d, exit testing.\n", nConcurrent, threadCnt); + goto OVER; + } + + // + // show QPS and P90 ... + // + uint64_t n = 0; + double total_delays = 0.0; + uint64_t totalQueried = 0; + uint64_t totalFail = 0; + for (int j = 0; j < threadCnt; j++) { + qThreadInfo *pThreadInfo = infos + j; + if(pThreadInfo->query_delay_list == NULL) { + continue;; + } + + // total one sql + for (uint64_t k = 0; k < pThreadInfo->query_delay_list->size; k++) { + int64_t * delay = benchArrayGet(pThreadInfo->query_delay_list, k); + sql->delay_list[n++] = *delay; + total_delays += *delay; + } + + // total queries + totalQueried += pThreadInfo->nSucc; + if (g_arguments->continueIfFail == YES_IF_FAILED) { + totalQueried += pThreadInfo->nFail; + totalFail += pThreadInfo->nFail; + } + + // free BArray query_delay_list + benchArrayDestroy(pThreadInfo->query_delay_list); + pThreadInfo->query_delay_list = NULL; + } + + // appand current sql + g_queryInfo.specifiedQueryInfo.totalQueried += totalQueried; + g_queryInfo.specifiedQueryInfo.totalFail += totalFail; + + // succ is zero + if(totalQueried == 0 || n == 0) { + errorPrint("%s", "succ queries count is zero.\n"); + goto OVER; + } + + qsort(sql->delay_list, n, sizeof(uint64_t), compare); + int32_t bufLen = strlen(sql->command) + 512; + char * buf = benchCalloc(bufLen, sizeof(char), false); + snprintf(buf , bufLen, "complete query with %d threads and %" PRIu64 " " + "sql %"PRIu64" spend %.6fs QPS: %.3f " + "query delay " + "avg: %.6fs " + "min: %.6fs " + "max: %.6fs " + "p90: %.6fs " + "p95: %.6fs " + "p99: %.6fs " + "SQL command: %s \n", + threadCnt, totalQueried, + i + 1, spend/1E6, totalQueried / (spend/1E6), + total_delays/n/1E6, /* avg */ + sql->delay_list[0] / 1E6, /* min */ + sql->delay_list[n - 1] / 1E6, /* max */ + /* p90 */ + sql->delay_list[(uint64_t)(n * 0.90)] / 1E6, + /* p95 */ + sql->delay_list[(uint64_t)(n * 0.95)] / 1E6, + /* p99 */ + sql->delay_list[(uint64_t)(n * 0.99)] / 1E6, + sql->command); + + infoPrintNoTimestamp("%s", buf); + infoPrintNoTimestampToFile("%s", buf); + tmfree(buf); + } + ret = 0; + +OVER: + tmfree((char *)pids); + tmfree((char *)infos); + + // free specialQueryInfo + freeSpecialQueryInfo(); + return ret; +} + +// +// specQueryMix +// +static int specQueryMix(uint16_t iface, char* dbName) { + // init + int ret = -1; + int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; + pthread_t * pids = benchCalloc(nConcurrent, sizeof(pthread_t), true); + qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true); + + // concurent calc + int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size; + int start_sql = 0; + int a = total_sql_num / nConcurrent; + if (a < 1) { + warnPrint("sqls num:%d < concurent:%d, so set concurrent to %d\n", total_sql_num, nConcurrent, nConcurrent); + nConcurrent = total_sql_num; + a = 1; + } + int b = 0; + if (nConcurrent != 0) { + b = total_sql_num % nConcurrent; + } + + // + // running + // + int threadCnt = 0; + for (int i = 0; i < nConcurrent; ++i) { + qThreadInfo *pThreadInfo = infos + i; + pThreadInfo->threadID = i; + pThreadInfo->start_sql = start_sql; + pThreadInfo->end_sql = i < b ? start_sql + a : start_sql + a - 1; + start_sql = pThreadInfo->end_sql + 1; + pThreadInfo->total_delay = 0; + + // create conn + if (initQueryConn(pThreadInfo, iface)){ + break; + } + // main run + int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo); + if (code != 0) { + errorPrint("failed specQueryMixThread create. error code =%d \n", code); + break; + } + + threadCnt ++; + } + + bool needExit = false; + // if failed, set termainte flag true like ctrl+c exit + if (threadCnt != nConcurrent) { + needExit = true; + g_arguments->terminate = true; + } + + // reset total + g_queryInfo.specifiedQueryInfo.totalQueried = 0; + g_queryInfo.specifiedQueryInfo.totalFail = 0; + + int64_t start = toolsGetTimestampUs(); + for (int i = 0; i < threadCnt; ++i) { + pthread_join(pids[i], NULL); + qThreadInfo *pThreadInfo = infos + i; + closeQueryConn(pThreadInfo, iface); + + // total queries + g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc; + if (g_arguments->continueIfFail == YES_IF_FAILED) { + // yes need add failed count + g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail; + g_queryInfo.specifiedQueryInfo.totalFail += pThreadInfo->nFail; + } + + // destory + if (needExit) { + benchArrayDestroy(pThreadInfo->query_delay_list); + pThreadInfo->query_delay_list = NULL; + } + } + int64_t end = toolsGetTimestampUs(); + + // create + if (needExit) { + errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d, exit testing.\n", nConcurrent, threadCnt); + goto OVER; + } + + // statistic + totalChildQuery(infos, threadCnt, end - start); + ret = 0; + OVER: tmfree(pids); tmfree(infos); - return code; + + // free sqls + freeSpecialQueryInfo(); + + return ret; } -#define KILLID_LEN 20 +// total query for end +void totalQuery(int64_t spends) { + // total QPS + uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried + + g_queryInfo.superQueryInfo.totalQueried; -void *queryKiller(void *arg) { - char host[MAX_HOSTNAME_LEN] = {0}; - tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN); - - while (true) { - TAOS *taos = taos_connect(g_arguments->host, g_arguments->user, - g_arguments->password, NULL, g_arguments->port); - if (NULL == taos) { - errorPrint("Slow query killer thread " - "failed to connect to the server %s\n", - g_arguments->host); - return NULL; + // error rate + char errRate[128] = ""; + if(g_arguments->continueIfFail == YES_IF_FAILED) { + uint64_t totalFail = g_queryInfo.specifiedQueryInfo.totalFail + g_queryInfo.superQueryInfo.totalFail; + if (totalQueried > 0) { + snprintf(errRate, sizeof(errRate), " ,error %" PRIu64 " (rate:%.3f%%)", totalFail, ((float)totalFail * 100)/totalQueried); } - - char command[TSDB_MAX_ALLOWED_SQL_LEN] = - "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries"; - TAOS_RES *res = taos_query(taos, command); - int32_t code = taos_errno(res); - if (code) { - printErrCmdCodeStr(command, code, res); - } - - TAOS_ROW row = NULL; - while ((row = taos_fetch_row(res)) != NULL) { - int32_t *lengths = taos_fetch_lengths(res); - if (lengths[0] <= 0) { - infoPrint("No valid query found by %s\n", command); - } else { - int64_t execUSec = *(int64_t*)row[1]; - - if (execUSec > g_queryInfo.killQueryThreshold * 1000000) { - char sql[SHORT_1K_SQL_BUFF_LEN] = {0}; - tstrncpy(sql, (char*)row[2], - min(strlen((char*)row[2])+1, - SHORT_1K_SQL_BUFF_LEN)); - - char killId[KILLID_LEN] = {0}; - tstrncpy(killId, (char*)row[0], - min(strlen((char*)row[0])+1, KILLID_LEN)); - char killCommand[KILLID_LEN + 15] = {0}; - snprintf(killCommand, KILLID_LEN + 15, - "KILL QUERY '%s'", killId); - TAOS_RES *resKill = taos_query(taos, killCommand); - int32_t codeKill = taos_errno(resKill); - if (codeKill) { - printErrCmdCodeStr(killCommand, codeKill, resKill); - } else { - infoPrint("%s succeed, sql: %s killed!\n", - killCommand, sql); - taos_free_result(resKill); - } - } - } - } - - taos_free_result(res); - taos_close(taos); - toolsMsleep(g_queryInfo.killQueryInterval*1000); } - return NULL; + // show + double tInS = (double)spends / 1000; + char buf[512] = ""; + snprintf(buf, sizeof(buf), + "Spend %.4f second completed total queries: %" PRIu64 + ", the QPS of all threads: %10.3f%s\n\n", + tInS, totalQueried, (double)totalQueried / tInS, errRate); + infoPrint("%s", buf); + infoPrintToFile("%s", buf); } int queryTestProcess() { @@ -764,13 +871,15 @@ int queryTestProcess() { encodeAuthBase64(); } - pthread_t pidKiller = {0}; + // kill sql for executing seconds over "kill_slow_query_threshold" if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) { - pthread_create(&pidKiller, NULL, queryKiller, NULL); - pthread_join(pidKiller, NULL); - toolsMsleep(1000); + int32_t ret = killSlowQuery(); + if (ret != 0) { + return ret; + } } + // covert addr if (g_queryInfo.iface == REST_IFACE) { if (convertHostToServAddr(g_arguments->host, g_arguments->port + TSDB_PORT_HTTP, @@ -780,97 +889,47 @@ int queryTestProcess() { } } + // fetch child name if super table if ((g_queryInfo.superQueryInfo.sqlCount > 0) && (g_queryInfo.superQueryInfo.threadCnt > 0)) { - SBenchConn* conn = initBenchConn(); - if (conn == NULL) { + int32_t ret = fetchChildTableName(g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName); + if (ret != 0) { + errorPrint("fetchChildTableName dbName=%s stb=%s failed.", g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName); return -1; } - char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0"; - if (3 == g_majorVersionOfClient) { - snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, - "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM %s.%s)", - g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName); - } else { - snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, - "SELECT COUNT(TBNAME) FROM %s.%s", - g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName); - } - TAOS_RES *res = taos_query(conn->taos, cmd); - int32_t code = taos_errno(res); - if (code) { - printErrCmdCodeStr(cmd, code, res); - closeBenchConn(conn); - return -1; - } - TAOS_ROW row = NULL; - int num_fields = taos_num_fields(res); - TAOS_FIELD *fields = taos_fetch_fields(res); - while ((row = taos_fetch_row(res)) != NULL) { - if (0 == strlen((char *)(row[0]))) { - errorPrint("stable %s have no child table\n", - g_queryInfo.superQueryInfo.stbName); - taos_free_result(res); - closeBenchConn(conn); + } + + // + // start running + // + + + uint64_t startTs = toolsGetTimestampMs(); + if(g_queryInfo.specifiedQueryInfo.sqls && g_queryInfo.specifiedQueryInfo.sqls->size > 0) { + // specified table + if (g_queryInfo.specifiedQueryInfo.mixed_query) { + // mixed + if (specQueryMix(g_queryInfo.iface, g_queryInfo.dbName)) { + return -1; + } + } else { + // no mixied + if (specQuery(g_queryInfo.iface, g_queryInfo.dbName)) { return -1; } - char temp[256] = {0}; - taos_print_row(temp, row, fields, num_fields); - g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp); } - infoPrint("%s's childTblCount: %" PRId64 "\n", - g_queryInfo.superQueryInfo.stbName, - g_queryInfo.superQueryInfo.childTblCount); - taos_free_result(res); - g_queryInfo.superQueryInfo.childTblName = - benchCalloc(g_queryInfo.superQueryInfo.childTblCount, - sizeof(char *), false); - if (getAllChildNameOfSuperTable( - conn->taos, g_queryInfo.dbName, - g_queryInfo.superQueryInfo.stbName, - g_queryInfo.superQueryInfo.childTblName, - g_queryInfo.superQueryInfo.childTblCount)) { - tmfree(g_queryInfo.superQueryInfo.childTblName); - closeBenchConn(conn); - return -1; - } - closeBenchConn(conn); - } - uint64_t startTs = toolsGetTimestampMs(); - if (g_queryInfo.specifiedQueryInfo.mixed_query) { - if (multi_thread_specified_mixed_query(g_queryInfo.iface, - g_queryInfo.dbName)) { + } else if(g_queryInfo.superQueryInfo.sqlCount > 0) { + // super table + if (stbQuery(g_queryInfo.iface, g_queryInfo.dbName)) { return -1; } } else { - if (multi_thread_specified_table_query(g_queryInfo.iface, - g_queryInfo.dbName)) { - return -1; - } - } - if (multi_thread_super_table_query(g_queryInfo.iface, - g_queryInfo.dbName)) { + // nothing + errorPrint("%s\n", "Both 'specified_table_query' and 'super_table_query' sqls is empty."); return -1; } - // workaround to use separate taos connection; - uint64_t endTs = toolsGetTimestampMs(); - int64_t t = endTs - startTs; - double tInS = (double)t / 1000.0; - if (g_queryInfo.specifiedQueryInfo.totalQueried) - infoPrint("Total specified queries: %" PRIu64 "\n", - g_queryInfo.specifiedQueryInfo.totalQueried); - if (g_queryInfo.superQueryInfo.totalQueried) - infoPrint("Total super queries: %" PRIu64 "\n", - g_queryInfo.superQueryInfo.totalQueried); - uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried - + g_queryInfo.superQueryInfo.totalQueried; - infoPrint( - "Spend %.4f second completed total queries: %" PRIu64 - ", the QPS of all threads: %10.3f\n\n", - tInS, totalQueried, (double)totalQueried / tInS); - infoPrintToFile( - "Spend %.4f second completed total queries: %" PRIu64 - ", the QPS of all threads: %10.3f\n\n", - tInS, totalQueried, (double)totalQueried / tInS); + + // total + totalQuery(toolsGetTimestampMs() - startTs); return 0; } diff --git a/tools/taos-tools/src/benchSubscribe.c b/tools/taos-tools/src/benchSubscribe.c index 011d545841..7f0d7e1579 100644 --- a/tools/taos-tools/src/benchSubscribe.c +++ b/tools/taos-tools/src/benchSubscribe.c @@ -23,7 +23,7 @@ static void stable_sub_callback(TAOS_SUB *tsub, TAOS_RES *res, void *param, return; } - if (param) fetchResult(res, (threadInfo *)param); + if (param) fetchResult(res, ((threadInfo *)param)->filePath); // tao_unsubscribe() will free result. } @@ -35,7 +35,7 @@ static void specified_sub_callback(TAOS_SUB *tsub, TAOS_RES *res, void *param, return; } - if (param) fetchResult(res, (threadInfo *)param); + if (param) fetchResult(res, ((threadInfo *)param)->filePath); // tao_unsubscribe() will free result. } @@ -127,7 +127,7 @@ static void *specifiedSubscribe(void *sarg) { } fetchResult( g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], - pThreadInfo); + pThreadInfo->filePath); g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID]++; if ((g_queryInfo.specifiedQueryInfo @@ -247,7 +247,7 @@ static void *superSubscribe(void *sarg) { .result[pThreadInfo->querySeq], pThreadInfo->threadID); } - fetchResult(res, pThreadInfo); + fetchResult(res, pThreadInfo->filePath); consumed[tsubSeq]++; if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1) && diff --git a/tools/taos-tools/src/benchUtil.c b/tools/taos-tools/src/benchUtil.c index 7efcc51c16..900f0ded7a 100644 --- a/tools/taos-tools/src/benchUtil.c +++ b/tools/taos-tools/src/benchUtil.c @@ -240,19 +240,24 @@ static void appendResultBufToFile(char *resultBuf, char * filePath) { tmfclose(fp); } -void replaceChildTblName(char *inSql, char *outSql, int tblIndex) { - char sourceString[32] = "xxxx"; - char *pos = strstr(inSql, sourceString); - if (0 == pos) return; +int32_t replaceChildTblName(char *inSql, char *outSql, int tblIndex) { + // child table mark + char mark[32] = "xxxx"; + char *pos = strstr(inSql, mark); + if (0 == pos) { + errorPrint("sql format error, sql not found mark string '%s'", mark); + return -1; + } char subTblName[TSDB_TABLE_NAME_LEN]; snprintf(subTblName, TSDB_TABLE_NAME_LEN, - "%s.%s", g_queryInfo.dbName, + "`%s`.%s", g_queryInfo.dbName, g_queryInfo.superQueryInfo.childTblName[tblIndex]); tstrncpy(outSql, inSql, pos - inSql + 1); - snprintf(outSql + strlen(outSql), TSDB_MAX_ALLOWED_SQL_LEN -1, - "%s%s", subTblName, pos + strlen(sourceString)); + snprintf(outSql + (pos - inSql), TSDB_MAX_ALLOWED_SQL_LEN - 1, + "%s%s", subTblName, pos + strlen(mark)); + return 0; } int64_t toolsGetTimestamp(int32_t precision) { @@ -536,13 +541,20 @@ int postProceSqlImpl(char *sqlstr, char* dbName, int precision, int iface, do { bytes = recv(sockfd, responseBuf + received, resp_len - received, 0); + if (bytes <= 0) { + errorPrint("%s", "reading no response from socket\n"); + goto free_of_postImpl; + } responseBuf[resp_len] = 0; - debugPrint("response buffer: %s\n", responseBuf); + debugPrint("response buffer: %s bytes=%d\n", responseBuf, bytes); if (NULL != strstr(responseBuf, resEncodingChunk)) { chunked = true; } int64_t index = strlen(responseBuf) - 1; while (responseBuf[index] == '\n' || responseBuf[index] == '\r') { + if (index == 0) { + break; + } index--; } debugPrint("index: %" PRId64 "\n", index); @@ -555,11 +567,6 @@ int postProceSqlImpl(char *sqlstr, char* dbName, int precision, int iface, break; } - if (bytes <= 0) { - errorPrint("%s", "reading no response from socket\n"); - goto free_of_postImpl; - } - received += bytes; if (g_arguments->test_mode == INSERT_TEST) { @@ -820,14 +827,14 @@ free_of_post: } // fetch result fo file or nothing -int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) { +int64_t fetchResult(TAOS_RES *res, char * filePath) { TAOS_ROW row = NULL; int num_fields = 0; int64_t totalLen = 0; TAOS_FIELD *fields = 0; int64_t rows = 0; char *databuf = NULL; - bool toFile = strlen(pThreadInfo->filePath) > 0; + bool toFile = strlen(filePath) > 0; if(toFile) { @@ -841,7 +848,7 @@ int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) { if (toFile) { if (totalLen >= (FETCH_BUFFER_SIZE - HEAD_BUFF_LEN * 2)) { // buff is full - appendResultBufToFile(databuf, pThreadInfo->filePath); + appendResultBufToFile(databuf, filePath); totalLen = 0; memset(databuf, 0, FETCH_BUFFER_SIZE); } @@ -860,7 +867,7 @@ int64_t fetchResult(TAOS_RES *res, threadInfo *pThreadInfo) { // end if (toFile) { - appendResultBufToFile(databuf, pThreadInfo->filePath); + appendResultBufToFile(databuf, filePath); free(databuf); } return rows; @@ -1102,7 +1109,7 @@ static int32_t benchArrayEnsureCap(BArray* pArray, size_t newCap) { } void* benchArrayAddBatch(BArray* pArray, void* pData, int32_t elems, bool free) { - if (pData == NULL) { + if (pData == NULL || elems <=0) { return NULL; } @@ -1123,6 +1130,11 @@ FORCE_INLINE void* benchArrayPush(BArray* pArray, void* pData) { return benchArrayAddBatch(pArray, pData, 1, true); } +FORCE_INLINE void* benchArrayPushNoFree(BArray* pArray, void* pData) { + return benchArrayAddBatch(pArray, pData, 1, false); +} + + void* benchArrayDestroy(BArray* pArray) { if (pArray) { tmfree(pArray->pData); @@ -1302,7 +1314,7 @@ void destroySockFd(int sockfd) { FORCE_INLINE void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) { char buff[512]; char *msg = cmd; - if (strlen(cmd) > sizeof(msg)) { + if (strlen(cmd) >= sizeof(buff)) { memcpy(buff, cmd, 500); buff[500] = 0; strcat(buff, "..."); @@ -1470,6 +1482,10 @@ void showBindV(TAOS_STMT2_BINDV *bindv, BArray *tags, BArray *cols) { uint32_t MurmurHash3_32(const char *key, uint32_t len); // get group index about dbname.tbname int32_t calcGroupIndex(char* dbName, char* tbName, int32_t groupCnt) { + // check valid + if (dbName == NULL || tbName == NULL) { + return -1; + } char key[1024]; snprintf(key, sizeof(key), "1.%s.%s", dbName, tbName); uint32_t hash = MurmurHash3_32(key, strlen(key)); @@ -1543,4 +1559,206 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len) { return h1; } -#endif \ No newline at end of file +#endif + + +// +// ---------------- benchQuery util ---------------------- +// + +// init conn +int32_t initQueryConn(qThreadInfo * pThreadInfo, int iface) { + // create conn + if (iface == REST_IFACE) { + int sockfd = createSockFd(); + if (sockfd < 0) { + return -1; + } + pThreadInfo->sockfd = sockfd; + } else { + pThreadInfo->conn = initBenchConn(); + if (pThreadInfo->conn == NULL) { + return -1; + } + } + + return 0; +} + +// close conn +void closeQueryConn(qThreadInfo * pThreadInfo, int iface) { + if (iface == REST_IFACE) { +#ifdef WINDOWS + closesocket(pThreadInfo->sockfd); + WSACleanup(); +#else + close(pThreadInfo->sockfd); +#endif + } else { + closeBenchConn(pThreadInfo->conn); + pThreadInfo->conn = NULL; + } +} + + +// free g_queryInfo.specailQueryInfo memory , can re-call +void freeSpecialQueryInfo() { + // can re-call + if (g_queryInfo.specifiedQueryInfo.sqls == NULL) { + return; + } + + // loop free each item memory + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) { + SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); + tmfree(sql->command); + tmfree(sql->delay_list); + } + + // free Array + benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls); + g_queryInfo.specifiedQueryInfo.sqls = NULL; +} + + +#define KILLID_LEN 64 + +void *queryKiller(void *arg) { + char host[MAX_HOSTNAME_LEN] = {0}; + tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN); + + while (true) { + TAOS *taos = taos_connect(g_arguments->host, g_arguments->user, + g_arguments->password, NULL, g_arguments->port); + if (NULL == taos) { + errorPrint("Slow query killer thread " + "failed to connect to the server %s\n", + g_arguments->host); + return NULL; + } + + char command[TSDB_MAX_ALLOWED_SQL_LEN] = + "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries"; + TAOS_RES *res = taos_query(taos, command); + int32_t code = taos_errno(res); + if (code) { + printErrCmdCodeStr(command, code, res); + } + + TAOS_ROW row = NULL; + while ((row = taos_fetch_row(res)) != NULL) { + int32_t *lengths = taos_fetch_lengths(res); + if (lengths[0] <= 0) { + infoPrint("No valid query found by %s\n", command); + } else { + int64_t execUSec = *(int64_t*)row[1]; + + if (execUSec > g_queryInfo.killQueryThreshold * 1000000) { + char sql[SHORT_1K_SQL_BUFF_LEN] = {0}; + tstrncpy(sql, (char*)row[2], + min(strlen((char*)row[2])+1, + SHORT_1K_SQL_BUFF_LEN)); + + char killId[KILLID_LEN] = {0}; + tstrncpy(killId, (char*)row[0], + min(strlen((char*)row[0])+1, KILLID_LEN)); + char killCommand[KILLID_LEN + 32] = {0}; + snprintf(killCommand, sizeof(killCommand), "KILL QUERY '%s'", killId); + TAOS_RES *resKill = taos_query(taos, killCommand); + int32_t codeKill = taos_errno(resKill); + if (codeKill) { + printErrCmdCodeStr(killCommand, codeKill, resKill); + } else { + infoPrint("%s succeed, sql: %s killed!\n", + killCommand, sql); + taos_free_result(resKill); + } + } + } + } + + taos_free_result(res); + taos_close(taos); + toolsMsleep(g_queryInfo.killQueryInterval*1000); + } + + return NULL; +} + +// kill show +int killSlowQuery() { + pthread_t pidKiller = {0}; + int32_t ret = pthread_create(&pidKiller, NULL, queryKiller, NULL); + if (ret != 0) { + errorPrint("pthread_create failed create queryKiller thread. error code =%d \n", ret); + return -1; + } + pthread_join(pidKiller, NULL); + toolsMsleep(1000); + return 0; +} + +// fetch super table child name from server +int fetchChildTableName(char *dbName, char *stbName) { + SBenchConn* conn = initBenchConn(); + if (conn == NULL) { + return -1; + } + + // get child count + char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0"; + if (3 == g_majorVersionOfClient) { + snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, + "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM `%s`.`%s`)", + dbName, stbName); + } else { + snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, + "SELECT COUNT(TBNAME) FROM `%s`.`%s`", + dbName, stbName); + } + TAOS_RES *res = taos_query(conn->taos, cmd); + int32_t code = taos_errno(res); + if (code) { + printErrCmdCodeStr(cmd, code, res); + closeBenchConn(conn); + return -1; + } + + TAOS_ROW row = NULL; + int num_fields = taos_num_fields(res); + TAOS_FIELD *fields = taos_fetch_fields(res); + while ((row = taos_fetch_row(res)) != NULL) { + if (0 == strlen((char *)(row[0]))) { + errorPrint("stable %s have no child table\n", stbName); + taos_free_result(res); + closeBenchConn(conn); + return -1; + } + char temp[256] = {0}; + taos_print_row(temp, row, fields, num_fields); + + // set child table count + g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp); + } + infoPrint("%s's childTblCount: %" PRId64 "\n", stbName, g_queryInfo.superQueryInfo.childTblCount); + taos_free_result(res); + + // malloc memory with child table count + g_queryInfo.superQueryInfo.childTblName = + benchCalloc(g_queryInfo.superQueryInfo.childTblCount, + sizeof(char *), false); + // fetch child table name + if (getAllChildNameOfSuperTable( + conn->taos, dbName, stbName, + g_queryInfo.superQueryInfo.childTblName, + g_queryInfo.superQueryInfo.childTblCount)) { + // faild + tmfree(g_queryInfo.superQueryInfo.childTblName); + closeBenchConn(conn); + return -1; + } + closeBenchConn(conn); + + // succ + return 0; +}