test(stream): update test cases.
This commit is contained in:
parent
74aead7be3
commit
760dd48d7d
|
@ -1,6 +1,6 @@
|
|||
import json
|
||||
import subprocess
|
||||
|
||||
import threading
|
||||
import psutil
|
||||
import time
|
||||
import taos
|
||||
|
@ -31,13 +31,14 @@ class MonitorSystemLoad:
|
|||
io_counters = process.io_counters()
|
||||
sys_load = psutil.getloadavg()
|
||||
|
||||
s = "load: %.2f, CPU:%s, Mem:%.2f MiB(%.2f%%), Read: %.2fMiB(%d), Write: %.2fMib (%d)" % (
|
||||
s = "load: %.2f, CPU:%s, Mem:%.2fMiB, %.2f%%, Read: %.2fMiB, %d, Write: %.2fMib, %d" % (
|
||||
sys_load[0], cpu_percent, memory_info.rss / 1048576.0,
|
||||
memory_percent, io_counters.read_bytes / 1048576.0, io_counters.read_count,
|
||||
io_counters.write_bytes / 1048576.0, io_counters.write_count)
|
||||
|
||||
print(s)
|
||||
f.write(s + '\n')
|
||||
f.flush()
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
|
@ -46,10 +47,60 @@ class MonitorSystemLoad:
|
|||
break
|
||||
|
||||
|
||||
def do_monitor():
|
||||
print("start monitor threads")
|
||||
loader = MonitorSystemLoad('taosd', 80000)
|
||||
loader.get_proc_status()
|
||||
|
||||
def get_table_list(cursor):
|
||||
cursor.execute('use stream_test')
|
||||
|
||||
sql = "select table_name from information_schema.ins_tables where db_name = 'stream_test' and stable_name='stb' order by table_name"
|
||||
cursor.execute(sql)
|
||||
|
||||
res = cursor.fetchall()
|
||||
return res
|
||||
|
||||
def do_multi_insert(index, total, host, user, passwd, conf, tz):
|
||||
conn = taos.connect(
|
||||
host=host, user=user, password=passwd, config=conf, timezone=tz
|
||||
)
|
||||
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('use stream_test')
|
||||
|
||||
start_ts = 1609430400000
|
||||
step = 5
|
||||
|
||||
cursor.execute("create stable if not exists stb_result(wstart timestamp, minx float, maxx float, countx bigint) tags(gid bigint unsigned)")
|
||||
|
||||
list = get_table_list(cursor)
|
||||
|
||||
list = list[index*total: (index+1)*total]
|
||||
|
||||
print("there are %d tables" % len(list))
|
||||
|
||||
for index, n in enumerate(list):
|
||||
cursor.execute(f"create table if not exists {n[0]}_1 using stb_result tags(1)")
|
||||
count = 1
|
||||
while True:
|
||||
sql = (f"select cast({start_ts + step * 1000 * (count - 1)} as timestamp), min(c1), max(c2), count(c3) from stream_test.{n[0]} "
|
||||
f"where ts >= {start_ts + step * 1000 * (count - 1)} and ts < {start_ts + step * 1000 * count}")
|
||||
cursor.execute(sql)
|
||||
|
||||
res = cursor.fetchall()
|
||||
if res[0][3] == 0:
|
||||
break
|
||||
|
||||
insert = f"insert into {n[0]}_1 values ({start_ts + step * 1000 * (count - 1)}, {res[0][1]}, {res[0][2]}, {res[0][3]})"
|
||||
cursor.execute(insert)
|
||||
count += 1
|
||||
conn.close()
|
||||
|
||||
class StreamStarter:
|
||||
def __init__(self) -> None:
|
||||
self.sql = None
|
||||
self.host='127.0.0.1'
|
||||
self.host='ubuntu'
|
||||
self.user = 'root'
|
||||
self.passwd = 'taosdata'
|
||||
self.conf = '/etc/taos/taos.cfg'
|
||||
|
@ -59,18 +110,18 @@ class StreamStarter:
|
|||
json_data = {
|
||||
"filetype": "insert",
|
||||
"cfgdir": "/etc/taos/cfg",
|
||||
"host": "127.0.0.1",
|
||||
"host": "ubuntu",
|
||||
"port": 6030,
|
||||
"rest_port": 6041,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"thread_count": 5,
|
||||
"create_table_thread_count": 10,
|
||||
"create_table_thread_count": 5,
|
||||
"result_file": "/tmp/taosBenchmark_result.log",
|
||||
"confirm_parameter_prompt": "no",
|
||||
"insert_interval": 1000,
|
||||
"num_of_records_per_req": 10000,
|
||||
"max_sql_len": 1024000,
|
||||
"num_of_records_per_req": 1000,
|
||||
"max_sql_len": 102400,
|
||||
"databases": [
|
||||
{
|
||||
"dbinfo": {
|
||||
|
@ -91,7 +142,7 @@ class StreamStarter:
|
|||
{
|
||||
"name": "stb",
|
||||
"child_table_exists": "yes",
|
||||
"childtable_count": 50000,
|
||||
"childtable_count": 500,
|
||||
"childtable_prefix": "ctb0_",
|
||||
"escape_character": "no",
|
||||
"auto_create_table": "yes",
|
||||
|
@ -100,7 +151,7 @@ class StreamStarter:
|
|||
"insert_mode": "taosc",
|
||||
"interlace_rows": 400,
|
||||
"tcp_transfer": "no",
|
||||
"insert_rows": 10000,
|
||||
"insert_rows": 1000,
|
||||
"partial_col_num": 0,
|
||||
"childtable_limit": 0,
|
||||
"childtable_offset": 0,
|
||||
|
@ -285,6 +336,78 @@ class StreamStarter:
|
|||
loader = MonitorSystemLoad('taosd', 80)
|
||||
loader.get_proc_status()
|
||||
|
||||
def do_query_then_insert(self):
|
||||
self.prepare_data()
|
||||
|
||||
try:
|
||||
subprocess.Popen('taosBenchmark --f /tmp/stream.json', stdout=subprocess.PIPE, shell=True, text=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error running Bash command: {e}")
|
||||
|
||||
time.sleep(50)
|
||||
|
||||
conn = taos.connect(
|
||||
host=self.host, user=self.user, password=self.passwd, config=self.conf, timezone=self.tz
|
||||
)
|
||||
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('use stream_test')
|
||||
|
||||
start_ts = 1609430400000
|
||||
step = 5
|
||||
|
||||
cursor.execute("create stable if not exists stb_result(wstart timestamp, minx float, maxx float, countx bigint) tags(gid bigint unsigned)")
|
||||
|
||||
try:
|
||||
t = threading.Thread(target=do_monitor)
|
||||
t.start()
|
||||
except Exception as e:
|
||||
print("Error: unable to start thread, %s" % e)
|
||||
|
||||
print("start to query")
|
||||
|
||||
list = get_table_list(cursor)
|
||||
print("there are %d tables" % len(list))
|
||||
|
||||
for index, n in enumerate(list):
|
||||
cursor.execute(f"create table if not exists {n[0]}_1 using stb_result tags(1)")
|
||||
count = 1
|
||||
while True:
|
||||
sql = (f"select cast({start_ts + step * 1000 * (count - 1)} as timestamp), min(c1), max(c2), count(c3) from stream_test.{n[0]} "
|
||||
f"where ts >= {start_ts + step * 1000 * (count - 1)} and ts < {start_ts + step * 1000 * count}")
|
||||
cursor.execute(sql)
|
||||
|
||||
res = cursor.fetchall()
|
||||
if res[0][3] == 0:
|
||||
break
|
||||
|
||||
insert = f"insert into {n[0]}_1 values ({start_ts + step * 1000 * (count - 1)}, {res[0][1]}, {res[0][2]}, {res[0][3]})"
|
||||
cursor.execute(insert)
|
||||
count += 1
|
||||
conn.close()
|
||||
|
||||
def multi_insert(self):
|
||||
self.prepare_data()
|
||||
|
||||
try:
|
||||
subprocess.Popen('taosBenchmark --f /tmp/stream.json', stdout=subprocess.PIPE, shell=True, text=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error running Bash command: {e}")
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
for n in range(5):
|
||||
try:
|
||||
print(f"start query_insert thread {n}")
|
||||
t = threading.Thread(target=do_multi_insert, args=(n, 100, self.host, self.user, self.passwd, self.conf, self.tz))
|
||||
t.start()
|
||||
except Exception as e:
|
||||
print("Error: unable to start thread, %s" % e)
|
||||
|
||||
loader = MonitorSystemLoad('taosd', 80)
|
||||
loader.get_proc_status()
|
||||
|
||||
if __name__ == "__main__":
|
||||
StreamStarter().do_start()
|
||||
# StreamStarter().do_start()
|
||||
# StreamStarter().do_query_then_insert()
|
||||
StreamStarter().multi_insert()
|
Loading…
Reference in New Issue