test: add fast_write_example for 3.0
This commit is contained in:
parent
246ced231e
commit
54b079e006
|
@ -0,0 +1,180 @@
|
|||
# install dependencies:
|
||||
# recommend python >= 3.8
|
||||
# pip3 install faster-fifo
|
||||
#
|
||||
|
||||
import logging
|
||||
import math
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
from multiprocessing import Process
|
||||
from faster_fifo import Queue
|
||||
from mockdatasource import MockDataSource
|
||||
from queue import Empty
|
||||
from typing import List
|
||||
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
|
||||
|
||||
READ_TASK_COUNT = 1
|
||||
WRITE_TASK_COUNT = 1
|
||||
TABLE_COUNT = 1000
|
||||
QUEUE_SIZE = 1000000
|
||||
MAX_BATCH_SIZE = 3000
|
||||
|
||||
read_processes = []
|
||||
write_processes = []
|
||||
|
||||
|
||||
def get_connection():
|
||||
"""
|
||||
If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used.
|
||||
You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD
|
||||
"""
|
||||
import taos
|
||||
firstEP = os.environ.get("TDENGINE_FIRST_EP")
|
||||
if firstEP:
|
||||
host, port = firstEP.split(":")
|
||||
else:
|
||||
host, port = None, 0
|
||||
user = os.environ.get("TDENGINE_USER", "root")
|
||||
password = os.environ.get("TDENGINE_PASSWORD", "taosdata")
|
||||
return taos.connect(host=host, port=int(port), user=user, password=password)
|
||||
|
||||
|
||||
# ANCHOR: read
|
||||
|
||||
def run_read_task(task_id: int, task_queues: List[Queue]):
|
||||
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
|
||||
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
|
||||
try:
|
||||
for batch in data_source:
|
||||
for table_id, rows in batch:
|
||||
# hash data to different queue
|
||||
i = table_id % len(task_queues)
|
||||
# block putting forever when the queue is full
|
||||
task_queues[i].put_many(rows, block=True, timeout=-1)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
# ANCHOR_END: read
|
||||
|
||||
# ANCHOR: write
|
||||
def run_write_task(task_id: int, queue: Queue):
|
||||
from sql_writer import SQLWriter
|
||||
log = logging.getLogger(f"WriteTask-{task_id}")
|
||||
writer = SQLWriter(get_connection)
|
||||
lines = None
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
# get as many as possible
|
||||
lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
|
||||
writer.process_lines(lines)
|
||||
except Empty:
|
||||
time.sleep(0.01)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except BaseException as e:
|
||||
log.debug(f"lines={lines}")
|
||||
raise e
|
||||
|
||||
|
||||
# ANCHOR_END: write
|
||||
|
||||
def set_global_config():
|
||||
argc = len(sys.argv)
|
||||
if argc > 1:
|
||||
global READ_TASK_COUNT
|
||||
READ_TASK_COUNT = int(sys.argv[1])
|
||||
if argc > 2:
|
||||
global WRITE_TASK_COUNT
|
||||
WRITE_TASK_COUNT = int(sys.argv[2])
|
||||
if argc > 3:
|
||||
global TABLE_COUNT
|
||||
TABLE_COUNT = int(sys.argv[3])
|
||||
if argc > 4:
|
||||
global QUEUE_SIZE
|
||||
QUEUE_SIZE = int(sys.argv[4])
|
||||
if argc > 5:
|
||||
global MAX_BATCH_SIZE
|
||||
MAX_BATCH_SIZE = int(sys.argv[5])
|
||||
|
||||
|
||||
# ANCHOR: monitor
|
||||
def run_monitor_process():
|
||||
log = logging.getLogger("DataBaseMonitor")
|
||||
conn = get_connection()
|
||||
conn.execute("DROP DATABASE IF EXISTS test")
|
||||
conn.execute("CREATE DATABASE test")
|
||||
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
|
||||
"TAGS (location BINARY(64), groupId INT)")
|
||||
|
||||
def get_count():
|
||||
res = conn.query("SELECT count(*) FROM test.meters")
|
||||
rows = res.fetch_all()
|
||||
return rows[0][0] if rows else 0
|
||||
|
||||
last_count = 0
|
||||
while True:
|
||||
time.sleep(10)
|
||||
count = get_count()
|
||||
log.info(f"count={count} speed={(count - last_count) / 10}")
|
||||
last_count = count
|
||||
|
||||
|
||||
# ANCHOR_END: monitor
|
||||
# ANCHOR: main
|
||||
def main():
|
||||
set_global_config()
|
||||
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
|
||||
f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
|
||||
|
||||
monitor_process = Process(target=run_monitor_process)
|
||||
monitor_process.start()
|
||||
time.sleep(3) # waiting for database ready.
|
||||
|
||||
task_queues: List[Queue] = []
|
||||
# create task queues
|
||||
for i in range(WRITE_TASK_COUNT):
|
||||
queue = Queue(max_size_bytes=QUEUE_SIZE)
|
||||
task_queues.append(queue)
|
||||
|
||||
# create write processes
|
||||
for i in range(WRITE_TASK_COUNT):
|
||||
p = Process(target=run_write_task, args=(i, task_queues[i]))
|
||||
p.start()
|
||||
logging.debug(f"WriteTask-{i} started with pid {p.pid}")
|
||||
write_processes.append(p)
|
||||
|
||||
# create read processes
|
||||
for i in range(READ_TASK_COUNT):
|
||||
queues = assign_queues(i, task_queues)
|
||||
p = Process(target=run_read_task, args=(i, queues))
|
||||
p.start()
|
||||
logging.debug(f"ReadTask-{i} started with pid {p.pid}")
|
||||
read_processes.append(p)
|
||||
|
||||
try:
|
||||
monitor_process.join()
|
||||
except KeyboardInterrupt:
|
||||
monitor_process.terminate()
|
||||
[p.terminate() for p in read_processes]
|
||||
[p.terminate() for p in write_processes]
|
||||
[q.close() for q in task_queues]
|
||||
|
||||
|
||||
def assign_queues(read_task_id, task_queues):
|
||||
"""
|
||||
Compute target queues for a specific read task.
|
||||
"""
|
||||
ratio = WRITE_TASK_COUNT / READ_TASK_COUNT
|
||||
from_index = math.floor(read_task_id * ratio)
|
||||
end_index = math.ceil((read_task_id + 1) * ratio)
|
||||
return task_queues[from_index:end_index]
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
# ANCHOR_END: main
|
|
@ -0,0 +1,49 @@
|
|||
import time
|
||||
|
||||
|
||||
class MockDataSource:
|
||||
samples = [
|
||||
"8.8,119,0.32,LosAngeles,0",
|
||||
"10.7,116,0.34,SanDiego,1",
|
||||
"9.9,111,0.33,Hollywood,2",
|
||||
"8.9,113,0.329,Compton,3",
|
||||
"9.4,118,0.141,San Francisco,4"
|
||||
]
|
||||
|
||||
def __init__(self, tb_name_prefix, table_count):
|
||||
self.table_name_prefix = tb_name_prefix + "_"
|
||||
self.table_count = table_count
|
||||
self.max_rows = 10000000
|
||||
self.current_ts = round(time.time() * 1000) - self.max_rows * 100
|
||||
# [(tableId, tableName, values),]
|
||||
self.data = self._init_data()
|
||||
|
||||
def _init_data(self):
|
||||
lines = self.samples * (self.table_count // 5 + 1)
|
||||
data = []
|
||||
for i in range(self.table_count):
|
||||
table_name = self.table_name_prefix + str(i)
|
||||
data.append((i, table_name, lines[i])) # tableId, row
|
||||
return data
|
||||
|
||||
def __iter__(self):
|
||||
self.row = 0
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
"""
|
||||
next 1000 rows for each table.
|
||||
return: {tableId:[row,...]}
|
||||
"""
|
||||
# generate 1000 timestamps
|
||||
ts = []
|
||||
for _ in range(1000):
|
||||
self.current_ts += 100
|
||||
ts.append(str(self.current_ts))
|
||||
# add timestamp to each row
|
||||
# [(tableId, ["tableName,ts,current,voltage,phase,location,groupId"])]
|
||||
result = []
|
||||
for table_id, table_name, values in self.data:
|
||||
rows = [table_name + ',' + t + ',' + values for t in ts]
|
||||
result.append((table_id, rows))
|
||||
return result
|
|
@ -0,0 +1,90 @@
|
|||
import logging
|
||||
import taos
|
||||
|
||||
|
||||
class SQLWriter:
|
||||
log = logging.getLogger("SQLWriter")
|
||||
|
||||
def __init__(self, get_connection_func):
|
||||
self._tb_values = {}
|
||||
self._tb_tags = {}
|
||||
self._conn = get_connection_func()
|
||||
self._max_sql_length = self.get_max_sql_length()
|
||||
self._conn.execute("USE test")
|
||||
|
||||
def get_max_sql_length(self):
|
||||
rows = self._conn.query("SHOW variables").fetch_all()
|
||||
for r in rows:
|
||||
name = r[0]
|
||||
if name == "maxSQLLength":
|
||||
return int(r[1])
|
||||
return 1024 * 1024
|
||||
|
||||
def process_lines(self, lines: str):
|
||||
"""
|
||||
:param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
|
||||
"""
|
||||
for line in lines:
|
||||
ps = line.split(",")
|
||||
table_name = ps[0]
|
||||
value = '(' + ",".join(ps[1:-2]) + ') '
|
||||
if table_name in self._tb_values:
|
||||
self._tb_values[table_name] += value
|
||||
else:
|
||||
self._tb_values[table_name] = value
|
||||
|
||||
if table_name not in self._tb_tags:
|
||||
location = ps[-2]
|
||||
group_id = ps[-1]
|
||||
tag_value = f"('{location}',{group_id})"
|
||||
self._tb_tags[table_name] = tag_value
|
||||
self.flush()
|
||||
|
||||
def flush(self):
|
||||
"""
|
||||
Assemble INSERT statement and execute it.
|
||||
When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created.
|
||||
In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed.
|
||||
"""
|
||||
sql = "INSERT INTO "
|
||||
sql_len = len(sql)
|
||||
buf = []
|
||||
for tb_name, values in self._tb_values.items():
|
||||
q = tb_name + " VALUES " + values
|
||||
if sql_len + len(q) >= self._max_sql_length:
|
||||
sql += " ".join(buf)
|
||||
self.execute_sql(sql)
|
||||
sql = "INSERT INTO "
|
||||
sql_len = len(sql)
|
||||
buf = []
|
||||
buf.append(q)
|
||||
sql_len += len(q)
|
||||
sql += " ".join(buf)
|
||||
self.execute_sql(sql)
|
||||
self._tb_values.clear()
|
||||
|
||||
def execute_sql(self, sql):
|
||||
try:
|
||||
self._conn.execute(sql)
|
||||
except taos.Error as e:
|
||||
error_code = e.errno & 0xffff
|
||||
# Table does not exit
|
||||
if error_code == 9731:
|
||||
self.create_tables()
|
||||
else:
|
||||
self.log.error("Execute SQL: %s", sql)
|
||||
raise e
|
||||
except BaseException as baseException:
|
||||
self.log.error("Execute SQL: %s", sql)
|
||||
raise baseException
|
||||
|
||||
def create_tables(self):
|
||||
sql = "CREATE TABLE "
|
||||
for tb in self._tb_values.keys():
|
||||
tag_values = self._tb_tags[tb]
|
||||
sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " "
|
||||
try:
|
||||
self._conn.execute(sql)
|
||||
except BaseException as e:
|
||||
self.log.error("Execute SQL: %s", sql)
|
||||
raise e
|
Loading…
Reference in New Issue