226 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			226 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
# install dependencies:
 | 
						|
# recommend python >= 3.8
 | 
						|
#
 | 
						|
 | 
						|
import logging
 | 
						|
import math
 | 
						|
import multiprocessing
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import os
 | 
						|
from multiprocessing import Process, Queue
 | 
						|
from mockdatasource import MockDataSource
 | 
						|
from queue import Empty
 | 
						|
from typing import List
 | 
						|
 | 
						|
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
 | 
						|
 | 
						|
READ_TASK_COUNT = 1
 | 
						|
WRITE_TASK_COUNT = 1
 | 
						|
TABLE_COUNT = 1000
 | 
						|
QUEUE_SIZE = 1000000
 | 
						|
MAX_BATCH_SIZE = 3000
 | 
						|
 | 
						|
_DONE_MESSAGE = '__DONE__'
 | 
						|
 | 
						|
 | 
						|
def get_connection():
 | 
						|
    """
 | 
						|
    If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used.
 | 
						|
    You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD
 | 
						|
    """
 | 
						|
    import taos
 | 
						|
    firstEP = os.environ.get("TDENGINE_FIRST_EP")
 | 
						|
    if firstEP:
 | 
						|
        host, port = firstEP.split(":")
 | 
						|
    else:
 | 
						|
        host, port = None, 0
 | 
						|
    user = os.environ.get("TDENGINE_USER", "root")
 | 
						|
    password = os.environ.get("TDENGINE_PASSWORD", "taosdata")
 | 
						|
    return taos.connect(host=host, port=int(port), user=user, password=password)
 | 
						|
 | 
						|
 | 
						|
# ANCHOR: read
 | 
						|
 | 
						|
def run_read_task(task_id: int, task_queues: List[Queue], infinity):
 | 
						|
    table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
 | 
						|
    data_source = MockDataSource(f"tb{task_id}", table_count_per_task, infinity)
 | 
						|
    try:
 | 
						|
        for batch in data_source:
 | 
						|
            if isinstance(batch, tuple):
 | 
						|
                batch = [batch]
 | 
						|
            for table_id, rows in batch:
 | 
						|
                # hash data to different queue
 | 
						|
                i = table_id % len(task_queues)
 | 
						|
                # block putting forever when the queue is full
 | 
						|
                for row in rows:
 | 
						|
                    task_queues[i].put(row)
 | 
						|
        if not infinity:
 | 
						|
            for queue in task_queues:
 | 
						|
                queue.put(_DONE_MESSAGE)
 | 
						|
    except KeyboardInterrupt:
 | 
						|
        pass
 | 
						|
    finally:
 | 
						|
        logging.info('read task over')
 | 
						|
 | 
						|
 | 
						|
# ANCHOR_END: read
 | 
						|
 | 
						|
 | 
						|
# ANCHOR: write
 | 
						|
def run_write_task(task_id: int, queue: Queue, done_queue: Queue):
 | 
						|
    from sql_writer import SQLWriter
 | 
						|
    log = logging.getLogger(f"WriteTask-{task_id}")
 | 
						|
    writer = SQLWriter(get_connection)
 | 
						|
    lines = None
 | 
						|
    try:
 | 
						|
        while True:
 | 
						|
            over = False
 | 
						|
            lines = []
 | 
						|
            for _ in range(MAX_BATCH_SIZE):
 | 
						|
                try:
 | 
						|
                    line = queue.get_nowait()
 | 
						|
                    if line == _DONE_MESSAGE:
 | 
						|
                        over = True
 | 
						|
                        break
 | 
						|
                    if line:
 | 
						|
                        lines.append(line)
 | 
						|
                except Empty:
 | 
						|
                    time.sleep(0.1)
 | 
						|
            if len(lines) > 0:
 | 
						|
                writer.process_lines(lines)
 | 
						|
            if over:
 | 
						|
                done_queue.put(_DONE_MESSAGE)
 | 
						|
                break
 | 
						|
    except KeyboardInterrupt:
 | 
						|
        pass
 | 
						|
    except BaseException as e:
 | 
						|
        log.debug(f"lines={lines}")
 | 
						|
        raise e
 | 
						|
    finally:
 | 
						|
        writer.close()
 | 
						|
        log.debug('write task over')
 | 
						|
 | 
						|
 | 
						|
# ANCHOR_END: write
 | 
						|
 | 
						|
def set_global_config():
 | 
						|
    argc = len(sys.argv)
 | 
						|
    if argc > 1:
 | 
						|
        global READ_TASK_COUNT
 | 
						|
        READ_TASK_COUNT = int(sys.argv[1])
 | 
						|
    if argc > 2:
 | 
						|
        global WRITE_TASK_COUNT
 | 
						|
        WRITE_TASK_COUNT = int(sys.argv[2])
 | 
						|
    if argc > 3:
 | 
						|
        global TABLE_COUNT
 | 
						|
        TABLE_COUNT = int(sys.argv[3])
 | 
						|
    if argc > 4:
 | 
						|
        global QUEUE_SIZE
 | 
						|
        QUEUE_SIZE = int(sys.argv[4])
 | 
						|
    if argc > 5:
 | 
						|
        global MAX_BATCH_SIZE
 | 
						|
        MAX_BATCH_SIZE = int(sys.argv[5])
 | 
						|
 | 
						|
 | 
						|
# ANCHOR: monitor
 | 
						|
def run_monitor_process(done_queue: Queue):
 | 
						|
    log = logging.getLogger("DataBaseMonitor")
 | 
						|
    conn = None
 | 
						|
    try:
 | 
						|
        conn = get_connection()
 | 
						|
 | 
						|
        def get_count():
 | 
						|
            res = conn.query("SELECT count(*) FROM test.meters")
 | 
						|
            rows = res.fetch_all()
 | 
						|
            return rows[0][0] if rows else 0
 | 
						|
 | 
						|
        last_count = 0
 | 
						|
        while True:
 | 
						|
            try:
 | 
						|
                done = done_queue.get_nowait()
 | 
						|
                if done == _DONE_MESSAGE:
 | 
						|
                    break
 | 
						|
            except Empty:
 | 
						|
                pass
 | 
						|
            time.sleep(10)
 | 
						|
            count = get_count()
 | 
						|
            log.info(f"count={count} speed={(count - last_count) / 10}")
 | 
						|
            last_count = count
 | 
						|
    finally:
 | 
						|
        conn.close()
 | 
						|
 | 
						|
 | 
						|
# ANCHOR_END: monitor
 | 
						|
# ANCHOR: main
 | 
						|
def main(infinity):
 | 
						|
    set_global_config()
 | 
						|
    logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
 | 
						|
                 f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
 | 
						|
 | 
						|
    conn = get_connection()
 | 
						|
    conn.execute("DROP DATABASE IF EXISTS test")
 | 
						|
    conn.execute("CREATE DATABASE IF NOT EXISTS test")
 | 
						|
    conn.execute("CREATE STABLE IF NOT EXISTS test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
 | 
						|
                 "TAGS (location BINARY(64), groupId INT)")
 | 
						|
    conn.close()
 | 
						|
 | 
						|
    done_queue = Queue()
 | 
						|
    monitor_process = Process(target=run_monitor_process, args=(done_queue,))
 | 
						|
    monitor_process.start()
 | 
						|
    logging.debug(f"monitor task started with pid {monitor_process.pid}")
 | 
						|
 | 
						|
    task_queues: List[Queue] = []
 | 
						|
    write_processes = []
 | 
						|
    read_processes = []
 | 
						|
 | 
						|
    # create task queues
 | 
						|
    for i in range(WRITE_TASK_COUNT):
 | 
						|
        queue = Queue()
 | 
						|
        task_queues.append(queue)
 | 
						|
 | 
						|
    # create write processes
 | 
						|
    for i in range(WRITE_TASK_COUNT):
 | 
						|
        p = Process(target=run_write_task, args=(i, task_queues[i], done_queue))
 | 
						|
        p.start()
 | 
						|
        logging.debug(f"WriteTask-{i} started with pid {p.pid}")
 | 
						|
        write_processes.append(p)
 | 
						|
 | 
						|
    # create read processes
 | 
						|
    for i in range(READ_TASK_COUNT):
 | 
						|
        queues = assign_queues(i, task_queues)
 | 
						|
        p = Process(target=run_read_task, args=(i, queues, infinity))
 | 
						|
        p.start()
 | 
						|
        logging.debug(f"ReadTask-{i} started with pid {p.pid}")
 | 
						|
        read_processes.append(p)
 | 
						|
 | 
						|
    try:
 | 
						|
        monitor_process.join()
 | 
						|
        for p in read_processes:
 | 
						|
            p.join()
 | 
						|
        for p in write_processes:
 | 
						|
            p.join()
 | 
						|
        time.sleep(1)
 | 
						|
        return
 | 
						|
    except KeyboardInterrupt:
 | 
						|
        monitor_process.terminate()
 | 
						|
        [p.terminate() for p in read_processes]
 | 
						|
        [p.terminate() for p in write_processes]
 | 
						|
        [q.close() for q in task_queues]
 | 
						|
 | 
						|
 | 
						|
def assign_queues(read_task_id, task_queues):
 | 
						|
    """
 | 
						|
    Compute target queues for a specific read task.
 | 
						|
    """
 | 
						|
    ratio = WRITE_TASK_COUNT / READ_TASK_COUNT
 | 
						|
    from_index = math.floor(read_task_id * ratio)
 | 
						|
    end_index = math.ceil((read_task_id + 1) * ratio)
 | 
						|
    return task_queues[from_index:end_index]
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    multiprocessing.set_start_method('spawn')
 | 
						|
    main(False)
 | 
						|
# ANCHOR_END: main
 |