226 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			226 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
| # install dependencies:
 | |
| # recommend python >= 3.8
 | |
| #
 | |
| 
 | |
| import logging
 | |
| import math
 | |
| import multiprocessing
 | |
| import sys
 | |
| import time
 | |
| import os
 | |
| from multiprocessing import Process, Queue
 | |
| from mockdatasource import MockDataSource
 | |
| from queue import Empty
 | |
| from typing import List
 | |
| 
 | |
| logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
 | |
| 
 | |
| READ_TASK_COUNT = 1
 | |
| WRITE_TASK_COUNT = 1
 | |
| TABLE_COUNT = 1000
 | |
| QUEUE_SIZE = 1000000
 | |
| MAX_BATCH_SIZE = 3000
 | |
| 
 | |
| _DONE_MESSAGE = '__DONE__'
 | |
| 
 | |
| 
 | |
| def get_connection():
 | |
|     """
 | |
|     If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used.
 | |
|     You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD
 | |
|     """
 | |
|     import taos
 | |
|     firstEP = os.environ.get("TDENGINE_FIRST_EP")
 | |
|     if firstEP:
 | |
|         host, port = firstEP.split(":")
 | |
|     else:
 | |
|         host, port = None, 0
 | |
|     user = os.environ.get("TDENGINE_USER", "root")
 | |
|     password = os.environ.get("TDENGINE_PASSWORD", "taosdata")
 | |
|     return taos.connect(host=host, port=int(port), user=user, password=password)
 | |
| 
 | |
| 
 | |
| # ANCHOR: read
 | |
| 
 | |
| def run_read_task(task_id: int, task_queues: List[Queue], infinity):
 | |
|     table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
 | |
|     data_source = MockDataSource(f"tb{task_id}", table_count_per_task, infinity)
 | |
|     try:
 | |
|         for batch in data_source:
 | |
|             if isinstance(batch, tuple):
 | |
|                 batch = [batch]
 | |
|             for table_id, rows in batch:
 | |
|                 # hash data to different queue
 | |
|                 i = table_id % len(task_queues)
 | |
|                 # block putting forever when the queue is full
 | |
|                 for row in rows:
 | |
|                     task_queues[i].put(row)
 | |
|         if not infinity:
 | |
|             for queue in task_queues:
 | |
|                 queue.put(_DONE_MESSAGE)
 | |
|     except KeyboardInterrupt:
 | |
|         pass
 | |
|     finally:
 | |
|         logging.info('read task over')
 | |
| 
 | |
| 
 | |
| # ANCHOR_END: read
 | |
| 
 | |
| 
 | |
| # ANCHOR: write
 | |
| def run_write_task(task_id: int, queue: Queue, done_queue: Queue):
 | |
|     from sql_writer import SQLWriter
 | |
|     log = logging.getLogger(f"WriteTask-{task_id}")
 | |
|     writer = SQLWriter(get_connection)
 | |
|     lines = None
 | |
|     try:
 | |
|         while True:
 | |
|             over = False
 | |
|             lines = []
 | |
|             for _ in range(MAX_BATCH_SIZE):
 | |
|                 try:
 | |
|                     line = queue.get_nowait()
 | |
|                     if line == _DONE_MESSAGE:
 | |
|                         over = True
 | |
|                         break
 | |
|                     if line:
 | |
|                         lines.append(line)
 | |
|                 except Empty:
 | |
|                     time.sleep(0.1)
 | |
|             if len(lines) > 0:
 | |
|                 writer.process_lines(lines)
 | |
|             if over:
 | |
|                 done_queue.put(_DONE_MESSAGE)
 | |
|                 break
 | |
|     except KeyboardInterrupt:
 | |
|         pass
 | |
|     except BaseException as e:
 | |
|         log.debug(f"lines={lines}")
 | |
|         raise e
 | |
|     finally:
 | |
|         writer.close()
 | |
|         log.debug('write task over')
 | |
| 
 | |
| 
 | |
| # ANCHOR_END: write
 | |
| 
 | |
| def set_global_config():
 | |
|     argc = len(sys.argv)
 | |
|     if argc > 1:
 | |
|         global READ_TASK_COUNT
 | |
|         READ_TASK_COUNT = int(sys.argv[1])
 | |
|     if argc > 2:
 | |
|         global WRITE_TASK_COUNT
 | |
|         WRITE_TASK_COUNT = int(sys.argv[2])
 | |
|     if argc > 3:
 | |
|         global TABLE_COUNT
 | |
|         TABLE_COUNT = int(sys.argv[3])
 | |
|     if argc > 4:
 | |
|         global QUEUE_SIZE
 | |
|         QUEUE_SIZE = int(sys.argv[4])
 | |
|     if argc > 5:
 | |
|         global MAX_BATCH_SIZE
 | |
|         MAX_BATCH_SIZE = int(sys.argv[5])
 | |
| 
 | |
| 
 | |
| # ANCHOR: monitor
 | |
| def run_monitor_process(done_queue: Queue):
 | |
|     log = logging.getLogger("DataBaseMonitor")
 | |
|     conn = None
 | |
|     try:
 | |
|         conn = get_connection()
 | |
| 
 | |
|         def get_count():
 | |
|             res = conn.query("SELECT count(*) FROM test.meters")
 | |
|             rows = res.fetch_all()
 | |
|             return rows[0][0] if rows else 0
 | |
| 
 | |
|         last_count = 0
 | |
|         while True:
 | |
|             try:
 | |
|                 done = done_queue.get_nowait()
 | |
|                 if done == _DONE_MESSAGE:
 | |
|                     break
 | |
|             except Empty:
 | |
|                 pass
 | |
|             time.sleep(10)
 | |
|             count = get_count()
 | |
|             log.info(f"count={count} speed={(count - last_count) / 10}")
 | |
|             last_count = count
 | |
|     finally:
 | |
|         conn.close()
 | |
| 
 | |
| 
 | |
| # ANCHOR_END: monitor
 | |
| # ANCHOR: main
 | |
| def main(infinity):
 | |
|     set_global_config()
 | |
|     logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
 | |
|                  f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
 | |
| 
 | |
|     conn = get_connection()
 | |
|     conn.execute("DROP DATABASE IF EXISTS test")
 | |
|     conn.execute("CREATE DATABASE IF NOT EXISTS test")
 | |
|     conn.execute("CREATE STABLE IF NOT EXISTS test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
 | |
|                  "TAGS (location BINARY(64), groupId INT)")
 | |
|     conn.close()
 | |
| 
 | |
|     done_queue = Queue()
 | |
|     monitor_process = Process(target=run_monitor_process, args=(done_queue,))
 | |
|     monitor_process.start()
 | |
|     logging.debug(f"monitor task started with pid {monitor_process.pid}")
 | |
| 
 | |
|     task_queues: List[Queue] = []
 | |
|     write_processes = []
 | |
|     read_processes = []
 | |
| 
 | |
|     # create task queues
 | |
|     for i in range(WRITE_TASK_COUNT):
 | |
|         queue = Queue()
 | |
|         task_queues.append(queue)
 | |
| 
 | |
|     # create write processes
 | |
|     for i in range(WRITE_TASK_COUNT):
 | |
|         p = Process(target=run_write_task, args=(i, task_queues[i], done_queue))
 | |
|         p.start()
 | |
|         logging.debug(f"WriteTask-{i} started with pid {p.pid}")
 | |
|         write_processes.append(p)
 | |
| 
 | |
|     # create read processes
 | |
|     for i in range(READ_TASK_COUNT):
 | |
|         queues = assign_queues(i, task_queues)
 | |
|         p = Process(target=run_read_task, args=(i, queues, infinity))
 | |
|         p.start()
 | |
|         logging.debug(f"ReadTask-{i} started with pid {p.pid}")
 | |
|         read_processes.append(p)
 | |
| 
 | |
|     try:
 | |
|         monitor_process.join()
 | |
|         for p in read_processes:
 | |
|             p.join()
 | |
|         for p in write_processes:
 | |
|             p.join()
 | |
|         time.sleep(1)
 | |
|         return
 | |
|     except KeyboardInterrupt:
 | |
|         monitor_process.terminate()
 | |
|         [p.terminate() for p in read_processes]
 | |
|         [p.terminate() for p in write_processes]
 | |
|         [q.close() for q in task_queues]
 | |
| 
 | |
| 
 | |
| def assign_queues(read_task_id, task_queues):
 | |
|     """
 | |
|     Compute target queues for a specific read task.
 | |
|     """
 | |
|     ratio = WRITE_TASK_COUNT / READ_TASK_COUNT
 | |
|     from_index = math.floor(read_task_id * ratio)
 | |
|     end_index = math.ceil((read_task_id + 1) * ratio)
 | |
|     return task_queues[from_index:end_index]
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     multiprocessing.set_start_method('spawn')
 | |
|     main(False)
 | |
| # ANCHOR_END: main
 |