From 760dd48d7d4c1e50fe1f75b803371c5f964b05f6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Jan 2025 16:36:41 +0800 Subject: [PATCH] test(stream): update test cases. --- tests/perf-test/stream.py | 143 +++++++++++++++++++++++++++++++++++--- 1 file changed, 133 insertions(+), 10 deletions(-) diff --git a/tests/perf-test/stream.py b/tests/perf-test/stream.py index f596c86532..90ca773184 100644 --- a/tests/perf-test/stream.py +++ b/tests/perf-test/stream.py @@ -1,6 +1,6 @@ import json import subprocess - +import threading import psutil import time import taos @@ -31,13 +31,14 @@ class MonitorSystemLoad: io_counters = process.io_counters() sys_load = psutil.getloadavg() - s = "load: %.2f, CPU:%s, Mem:%.2f MiB(%.2f%%), Read: %.2fMiB(%d), Write: %.2fMib (%d)" % ( + s = "load: %.2f, CPU:%s, Mem:%.2fMiB, %.2f%%, Read: %.2fMiB, %d, Write: %.2fMib, %d" % ( sys_load[0], cpu_percent, memory_info.rss / 1048576.0, memory_percent, io_counters.read_bytes / 1048576.0, io_counters.read_count, io_counters.write_bytes / 1048576.0, io_counters.write_count) print(s) f.write(s + '\n') + f.flush() time.sleep(1) @@ -46,10 +47,60 @@ class MonitorSystemLoad: break +def do_monitor(): + print("start monitor threads") + loader = MonitorSystemLoad('taosd', 80000) + loader.get_proc_status() + +def get_table_list(cursor): + cursor.execute('use stream_test') + + sql = "select table_name from information_schema.ins_tables where db_name = 'stream_test' and stable_name='stb' order by table_name" + cursor.execute(sql) + + res = cursor.fetchall() + return res + +def do_multi_insert(index, total, host, user, passwd, conf, tz): + conn = taos.connect( + host=host, user=user, password=passwd, config=conf, timezone=tz + ) + + cursor = conn.cursor() + cursor.execute('use stream_test') + + start_ts = 1609430400000 + step = 5 + + cursor.execute("create stable if not exists stb_result(wstart timestamp, minx float, maxx float, countx bigint) tags(gid bigint unsigned)") + + list = get_table_list(cursor) + + list = list[index*total: (index+1)*total] + + print("there are %d tables" % len(list)) + + for index, n in enumerate(list): + cursor.execute(f"create table if not exists {n[0]}_1 using stb_result tags(1)") + count = 1 + while True: + sql = (f"select cast({start_ts + step * 1000 * (count - 1)} as timestamp), min(c1), max(c2), count(c3) from stream_test.{n[0]} " + f"where ts >= {start_ts + step * 1000 * (count - 1)} and ts < {start_ts + step * 1000 * count}") + cursor.execute(sql) + + res = cursor.fetchall() + if res[0][3] == 0: + break + + insert = f"insert into {n[0]}_1 values ({start_ts + step * 1000 * (count - 1)}, {res[0][1]}, {res[0][2]}, {res[0][3]})" + cursor.execute(insert) + count += 1 + conn.close() + class StreamStarter: def __init__(self) -> None: self.sql = None - self.host='127.0.0.1' + self.host='ubuntu' self.user = 'root' self.passwd = 'taosdata' self.conf = '/etc/taos/taos.cfg' @@ -59,18 +110,18 @@ class StreamStarter: json_data = { "filetype": "insert", "cfgdir": "/etc/taos/cfg", - "host": "127.0.0.1", + "host": "ubuntu", "port": 6030, "rest_port": 6041, "user": "root", "password": "taosdata", "thread_count": 5, - "create_table_thread_count": 10, + "create_table_thread_count": 5, "result_file": "/tmp/taosBenchmark_result.log", "confirm_parameter_prompt": "no", "insert_interval": 1000, - "num_of_records_per_req": 10000, - "max_sql_len": 1024000, + "num_of_records_per_req": 1000, + "max_sql_len": 102400, "databases": [ { "dbinfo": { @@ -91,7 +142,7 @@ class StreamStarter: { "name": "stb", "child_table_exists": "yes", - "childtable_count": 50000, + "childtable_count": 500, "childtable_prefix": "ctb0_", "escape_character": "no", "auto_create_table": "yes", @@ -100,7 +151,7 @@ class StreamStarter: "insert_mode": "taosc", "interlace_rows": 400, "tcp_transfer": "no", - "insert_rows": 10000, + "insert_rows": 1000, "partial_col_num": 0, "childtable_limit": 0, "childtable_offset": 0, @@ -285,6 +336,78 @@ class StreamStarter: loader = MonitorSystemLoad('taosd', 80) loader.get_proc_status() + def do_query_then_insert(self): + self.prepare_data() + + try: + subprocess.Popen('taosBenchmark --f /tmp/stream.json', stdout=subprocess.PIPE, shell=True, text=True) + except subprocess.CalledProcessError as e: + print(f"Error running Bash command: {e}") + + time.sleep(50) + + conn = taos.connect( + host=self.host, user=self.user, password=self.passwd, config=self.conf, timezone=self.tz + ) + + cursor = conn.cursor() + cursor.execute('use stream_test') + + start_ts = 1609430400000 + step = 5 + + cursor.execute("create stable if not exists stb_result(wstart timestamp, minx float, maxx float, countx bigint) tags(gid bigint unsigned)") + + try: + t = threading.Thread(target=do_monitor) + t.start() + except Exception as e: + print("Error: unable to start thread, %s" % e) + + print("start to query") + + list = get_table_list(cursor) + print("there are %d tables" % len(list)) + + for index, n in enumerate(list): + cursor.execute(f"create table if not exists {n[0]}_1 using stb_result tags(1)") + count = 1 + while True: + sql = (f"select cast({start_ts + step * 1000 * (count - 1)} as timestamp), min(c1), max(c2), count(c3) from stream_test.{n[0]} " + f"where ts >= {start_ts + step * 1000 * (count - 1)} and ts < {start_ts + step * 1000 * count}") + cursor.execute(sql) + + res = cursor.fetchall() + if res[0][3] == 0: + break + + insert = f"insert into {n[0]}_1 values ({start_ts + step * 1000 * (count - 1)}, {res[0][1]}, {res[0][2]}, {res[0][3]})" + cursor.execute(insert) + count += 1 + conn.close() + + def multi_insert(self): + self.prepare_data() + + try: + subprocess.Popen('taosBenchmark --f /tmp/stream.json', stdout=subprocess.PIPE, shell=True, text=True) + except subprocess.CalledProcessError as e: + print(f"Error running Bash command: {e}") + + time.sleep(10) + + for n in range(5): + try: + print(f"start query_insert thread {n}") + t = threading.Thread(target=do_multi_insert, args=(n, 100, self.host, self.user, self.passwd, self.conf, self.tz)) + t.start() + except Exception as e: + print("Error: unable to start thread, %s" % e) + + loader = MonitorSystemLoad('taosd', 80) + loader.get_proc_status() if __name__ == "__main__": - StreamStarter().do_start() + # StreamStarter().do_start() + # StreamStarter().do_query_then_insert() + StreamStarter().multi_insert() \ No newline at end of file