824 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
			
		
		
	
	
			824 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
| #!/usr/bin/python3
 | |
| #  * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 | |
| #  *
 | |
| #  * This program is free software: you can use, redistribute, and/or modify
 | |
| #  * it under the terms of the GNU Affero General Public License, version 3
 | |
| #  * or later ("AGPL"), as published by the Free Software Foundation.
 | |
| #  *
 | |
| #  * This program is distributed in the hope that it will be useful, but WITHOUT
 | |
| #  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 | |
| #  * FITNESS FOR A PARTICULAR PURPOSE.
 | |
| #  *
 | |
| #  * You should have received a copy of the GNU Affero General Public License
 | |
| #  * along with this program. If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| import sys
 | |
| import getopt
 | |
| import requests
 | |
| import json
 | |
| import random
 | |
| import time
 | |
| import datetime
 | |
| from multiprocessing import Manager, Pool
 | |
| from multipledispatch import dispatch
 | |
| from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
 | |
| 
 | |
| 
 | |
| @dispatch(str, str)
 | |
| def v_print(msg: str, arg: str):
 | |
|     if verbose:
 | |
|         print(msg % arg)
 | |
| 
 | |
| 
 | |
| @dispatch(str, str, str)
 | |
| def v_print(msg: str, arg1: str, arg2: str):
 | |
|     if verbose:
 | |
|         print(msg % (arg1, arg2))
 | |
| 
 | |
| 
 | |
| @dispatch(str, str, str, str)
 | |
| def v_print(msg: str, arg1: str, arg2: str, arg3: str):
 | |
|     if verbose:
 | |
|         print(msg % (arg1, arg2, arg3))
 | |
| 
 | |
| 
 | |
| @dispatch(str, str, str, str, str)
 | |
| def v_print(msg: str, arg1: str, arg2: str, arg3: str, arg4: str):
 | |
|     if verbose:
 | |
|         print(msg % (arg1, arg2, arg3, arg4))
 | |
| 
 | |
| 
 | |
| @dispatch(str, int)
 | |
| def v_print(msg: str, arg: int):
 | |
|     if verbose:
 | |
|         print(msg % int(arg))
 | |
| 
 | |
| 
 | |
| @dispatch(str, int, str)
 | |
| def v_print(msg: str, arg1: int, arg2: str):
 | |
|     if verbose:
 | |
|         print(msg % (int(arg1), str(arg2)))
 | |
| 
 | |
| 
 | |
| @dispatch(str, str, int)
 | |
| def v_print(msg: str, arg1: str, arg2: int):
 | |
|     if verbose:
 | |
|         print(msg % (arg1, int(arg2)))
 | |
| 
 | |
| 
 | |
| @dispatch(str, int, int)
 | |
| def v_print(msg: str, arg1: int, arg2: int):
 | |
|     if verbose:
 | |
|         print(msg % (int(arg1), int(arg2)))
 | |
| 
 | |
| 
 | |
| @dispatch(str, int, int, str)
 | |
| def v_print(msg: str, arg1: int, arg2: int, arg3: str):
 | |
|     if verbose:
 | |
|         print(msg % (int(arg1), int(arg2), str(arg3)))
 | |
| 
 | |
| 
 | |
| @dispatch(str, int, int, int)
 | |
| def v_print(msg: str, arg1: int, arg2: int, arg3: int):
 | |
|     if verbose:
 | |
|         print(msg % (int(arg1), int(arg2), int(arg3)))
 | |
| 
 | |
| 
 | |
| @dispatch(str, int, int, int, int)
 | |
| def v_print(msg: str, arg1: int, arg2: int, arg3: int, arg4: int):
 | |
|     if verbose:
 | |
|         print(msg % (int(arg1), int(arg2), int(arg3), int(arg4)))
 | |
| 
 | |
| 
 | |
| def restful_execute(host: str, port: int, user: str, password: str, cmd: str):
 | |
|     url = "http://%s:%d/rest/sql" % (host, restPort)
 | |
| 
 | |
|     v_print("restful_execute - cmd: %s", cmd)
 | |
| 
 | |
|     resp = requests.post(url, cmd, auth=(user, password))
 | |
| 
 | |
|     v_print("resp status: %d", resp.status_code)
 | |
| 
 | |
|     if debug:
 | |
|         v_print("resp text: %s", json.dumps(resp.json(), sort_keys=True, indent=2))
 | |
|     else:
 | |
|         print("resp: %s" % json.dumps(resp.json()))
 | |
| 
 | |
| 
 | |
| def query_func(process: int, thread: int, cmd: str):
 | |
|     v_print("%d process %d thread cmd: %s", process, thread, cmd)
 | |
| 
 | |
|     if oneMoreHost != "NotSupported" and random.randint(0, 1) == 1:
 | |
|         v_print("%s", "Send to second host")
 | |
|         if native:
 | |
|             cursor.execute(cmd)
 | |
|         else:
 | |
|             restful_execute(oneMoreHost, port, user, password, cmd)
 | |
|     else:
 | |
|         v_print("%s%s%s", "Send ", cmd, " to the host")
 | |
|         if native:
 | |
|             pass
 | |
|         #            cursor.execute(cmd)
 | |
|         else:
 | |
|             restful_execute(host, port, user, password, cmd)
 | |
| 
 | |
| 
 | |
| def query_data_process(cmd: str):
 | |
|     # establish connection if native
 | |
|     if native:
 | |
|         v_print("host:%s, user:%s passwd:xxxxxx configDir:%s ", host, user, configDir)
 | |
|         try:
 | |
|             conn = taos.connect(
 | |
|                 host=host, user=user, password=password, config=configDir
 | |
|             )
 | |
|             v_print("conn: %s", str(conn.__class__))
 | |
|         except Exception as e:
 | |
|             print("Error: %s" % e.args[0])
 | |
|             sys.exit(1)
 | |
| 
 | |
|         try:
 | |
|             cursor = conn.cursor()
 | |
|             v_print("cursor:%d %s", id(cursor), str(cursor.__class__))
 | |
|         except Exception as e:
 | |
|             print("Error: %s" % e.args[0])
 | |
|             conn.close()
 | |
|             sys.exit(1)
 | |
| 
 | |
|     if native:
 | |
|         try:
 | |
|             cursor.execute(cmd)
 | |
|             cols = cursor.description
 | |
|             print(cols)
 | |
|             data = cursor.fetchall()
 | |
| 
 | |
|             for col in data:
 | |
|                 print(col)
 | |
|         except Exception as e:
 | |
|             conn.close()
 | |
|             print("Error: %s" % e.args[0])
 | |
|             sys.exit(1)
 | |
| 
 | |
|     else:
 | |
|         restful_execute(host, port, user, password, cmd)
 | |
| 
 | |
|     if native:
 | |
|         cursor.close()
 | |
|         conn.close()
 | |
| 
 | |
| 
 | |
| def create_stb():
 | |
|     for i in range(0, numOfStb):
 | |
|         if native:
 | |
|             cursor.execute(
 | |
|                 "CREATE TABLE IF NOT EXISTS %s%d (ts timestamp, value float) TAGS (uuid binary(50))"
 | |
|                 % (stbName, i)
 | |
|             )
 | |
|         else:
 | |
|             restful_execute(
 | |
|                 host,
 | |
|                 port,
 | |
|                 user,
 | |
|                 password,
 | |
|                 "CREATE TABLE IF NOT EXISTS %s%d (ts timestamp, value float) TAGS (uuid binary(50))"
 | |
|                 % (stbName, i),
 | |
|             )
 | |
| 
 | |
| 
 | |
| def use_database():
 | |
|     if native:
 | |
|         cursor.execute("USE %s" % current_db)
 | |
|     else:
 | |
|         restful_execute(host, port, user, password, "USE %s" % current_db)
 | |
| 
 | |
| 
 | |
| def create_databases():
 | |
|     for i in range(0, numOfDb):
 | |
|         v_print("will create database db%d", int(i))
 | |
| 
 | |
|         if native:
 | |
|             cursor.execute("CREATE DATABASE IF NOT EXISTS %s%d" % (dbName, i))
 | |
|         else:
 | |
|             restful_execute(
 | |
|                 host,
 | |
|                 port,
 | |
|                 user,
 | |
|                 password,
 | |
|                 "CREATE DATABASE IF NOT EXISTS %s%d" % (dbName, i),
 | |
|             )
 | |
| 
 | |
| 
 | |
| def drop_tables():
 | |
|     # TODO
 | |
|     v_print("TODO: drop tables total %d", numOfTb)
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def drop_stable():
 | |
|     # TODO
 | |
|     v_print("TODO: drop stables total %d", numOfStb)
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def drop_databases():
 | |
|     v_print("drop databases total %d", numOfDb)
 | |
| 
 | |
|     # drop exist databases first
 | |
|     for i in range(0, numOfDb):
 | |
|         v_print("will drop database db%d", int(i))
 | |
| 
 | |
|         if native:
 | |
|             cursor.execute("DROP DATABASE IF EXISTS %s%d" % (dbName, i))
 | |
|         else:
 | |
|             restful_execute(
 | |
|                 host, port, user, password, "DROP DATABASE IF EXISTS %s%d" % (dbName, i)
 | |
|             )
 | |
| 
 | |
| 
 | |
| def insert_func(process: int, thread: int):
 | |
|     v_print("%d process %d thread, insert_func ", process, thread)
 | |
| 
 | |
|     # generate uuid
 | |
|     uuid_int = random.randint(0, numOfTb + 1)
 | |
|     uuid = "%s" % uuid_int
 | |
|     v_print("uuid is: %s", uuid)
 | |
| 
 | |
|     # establish connection if native
 | |
|     if native:
 | |
|         v_print("host:%s, user:%s passwd:xxxxxx configDir:%s ", host, user, configDir)
 | |
|         try:
 | |
|             conn = taos.connect(
 | |
|                 host=host, user=user, password=password, config=configDir
 | |
|             )
 | |
|             v_print("conn: %s", str(conn.__class__))
 | |
|         except Exception as e:
 | |
|             print("Error: %s" % e.args[0])
 | |
|             sys.exit(1)
 | |
| 
 | |
|         try:
 | |
|             cursor = conn.cursor()
 | |
|             v_print("cursor:%d %s", id(cursor), str(cursor.__class__))
 | |
|         except Exception as e:
 | |
|             print("Error: %s" % e.args[0])
 | |
|             conn.close()
 | |
|             sys.exit(1)
 | |
| 
 | |
|     v_print("numOfRec %d:", numOfRec)
 | |
| 
 | |
|     row = 0
 | |
|     while row < numOfRec:
 | |
|         v_print("row: %d", row)
 | |
|         sqlCmd = ["INSERT INTO "]
 | |
|         try:
 | |
|             sqlCmd.append("%s.%s%d " % (current_db, tbName, thread))
 | |
| 
 | |
|             if numOfStb > 0 and autosubtable:
 | |
|                 sqlCmd.append(
 | |
|                     "USING %s.%s%d TAGS('%s') "
 | |
|                     % (current_db, stbName, numOfStb - 1, uuid)
 | |
|                 )
 | |
| 
 | |
|             start_time = datetime.datetime(2021, 1, 25) + datetime.timedelta(
 | |
|                 seconds=row
 | |
|             )
 | |
| 
 | |
|             sqlCmd.append("VALUES ")
 | |
|             for batchIter in range(0, batch):
 | |
|                 sqlCmd.append(
 | |
|                     "('%s', %f) "
 | |
|                     % (
 | |
|                         start_time + datetime.timedelta(milliseconds=batchIter),
 | |
|                         random.random(),
 | |
|                     )
 | |
|                 )
 | |
|                 row = row + 1
 | |
|                 if row >= numOfRec:
 | |
|                     v_print("BREAK, row: %d numOfRec:%d", row, numOfRec)
 | |
|                     break
 | |
| 
 | |
|         except Exception as e:
 | |
|             print("Error: %s" % e.args[0])
 | |
| 
 | |
|         cmd = " ".join(sqlCmd)
 | |
| 
 | |
|         if measure:
 | |
|             exec_start_time = datetime.datetime.now()
 | |
| 
 | |
|         if native:
 | |
|             affectedRows = cursor.execute(cmd)
 | |
|             print("affectedRows: %d" % affectedRows)
 | |
|         else:
 | |
|             restful_execute(host, port, user, password, cmd)
 | |
| 
 | |
|         if measure:
 | |
|             exec_end_time = datetime.datetime.now()
 | |
|             exec_delta = exec_end_time - exec_start_time
 | |
|             v_print("consume %d microseconds", exec_delta.microseconds)
 | |
| 
 | |
|         v_print("cmd: %s, length:%d", cmd, len(cmd))
 | |
| 
 | |
|     if native:
 | |
|         cursor.close()
 | |
|         conn.close()
 | |
| 
 | |
| 
 | |
| def create_tb_using_stb():
 | |
|     # TODO:
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def create_tb():
 | |
|     v_print("create_tb() numOfTb: %d", numOfTb)
 | |
|     for i in range(0, numOfDb):
 | |
|         if native:
 | |
|             cursor.execute("USE %s%d" % (dbName, i))
 | |
|         else:
 | |
|             restful_execute(host, port, user, password, "USE %s%d" % (dbName, i))
 | |
| 
 | |
|         for j in range(0, numOfTb):
 | |
|             if native:
 | |
|                 cursor.execute(
 | |
|                     "CREATE TABLE %s%d (ts timestamp, value float)" % (tbName, j)
 | |
|                 )
 | |
|             else:
 | |
|                 restful_execute(
 | |
|                     host,
 | |
|                     port,
 | |
|                     user,
 | |
|                     password,
 | |
|                     "CREATE TABLE %s%d (ts timestamp, value float)" % (tbName, j),
 | |
|                 )
 | |
| 
 | |
| 
 | |
| def insert_data_process(lock, i: int, begin: int, end: int):
 | |
|     lock.acquire()
 | |
|     tasks = end - begin
 | |
|     v_print(
 | |
|         "insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks
 | |
|     )
 | |
| 
 | |
|     if threads < (end - begin):
 | |
|         for j in range(begin, end, threads):
 | |
|             with ThreadPoolExecutor(max_workers=threads) as executor:
 | |
|                 k = end if ((j + threads) > end) else (j + threads)
 | |
|                 workers = [executor.submit(insert_func, i, n) for n in range(j, k)]
 | |
|                 wait(workers, return_when=ALL_COMPLETED)
 | |
|     else:
 | |
|         with ThreadPoolExecutor(max_workers=threads) as executor:
 | |
|             workers = [executor.submit(insert_func, i, j) for j in range(begin, end)]
 | |
|             wait(workers, return_when=ALL_COMPLETED)
 | |
| 
 | |
|     lock.release()
 | |
| 
 | |
| 
 | |
| def query_db(i):
 | |
|     if native:
 | |
|         cursor.execute("USE %s%d" % (dbName, i))
 | |
|     else:
 | |
|         restful_execute(host, port, user, password, "USE %s%d" % (dbName, i))
 | |
| 
 | |
|     for j in range(0, numOfTb):
 | |
|         if native:
 | |
|             cursor.execute("SELECT COUNT(*) FROM %s%d" % (tbName, j))
 | |
|         else:
 | |
|             restful_execute(
 | |
|                 host, port, user, password, "SELECT COUNT(*) FROM %s%d" % (tbName, j)
 | |
|             )
 | |
| 
 | |
| 
 | |
| def printConfig():
 | |
|     print("###################################################################")
 | |
|     print("# Use native interface:              %s" % native)
 | |
|     print("# Server IP:                         %s" % host)
 | |
|     if native:
 | |
|         print("# Server port:                       %s" % port)
 | |
|     else:
 | |
|         print("# Server port:                       %s" % restPort)
 | |
| 
 | |
|     print("# Configuration Dir:                 %s" % configDir)
 | |
|     print("# User:                              %s" % user)
 | |
|     print("# Number of Columns per record:      %s" % colsPerRecord)
 | |
|     print("# Number of Threads:                 %s" % threads)
 | |
|     print("# Number of Processes:               %s" % processes)
 | |
|     print("# Number of Tables:                  %s" % numOfTb)
 | |
|     print("# Number of records per Table:       %s" % numOfRec)
 | |
|     print("# Records/Request:                   %s" % batch)
 | |
|     print("# Database name:                     %s" % dbName)
 | |
|     print("# Replica:                           %s" % replica)
 | |
|     print("# Use STable:                        %s" % useStable)
 | |
|     print("# Table prefix:                      %s" % tbName)
 | |
|     if useStable:
 | |
|         print("# STable prefix:                     %s" % stbName)
 | |
| 
 | |
|     print("# Data order:                        %s" % outOfOrder)
 | |
|     print("# Data out of order rate:            %s" % rateOOOO)
 | |
|     print("# Delete method:                     %s" % deleteMethod)
 | |
|     print("# Query command:                     %s" % queryCmd)
 | |
|     print("# Insert Only:                       %s" % insertOnly)
 | |
|     print("# Verbose output                     %s" % verbose)
 | |
|     print(
 | |
|         "# Test time:                         %s"
 | |
|         % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
 | |
|     )
 | |
|     print("###################################################################")
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     native = False
 | |
|     verbose = False
 | |
|     debug = False
 | |
|     measure = True
 | |
|     dropDbOnly = False
 | |
|     colsPerRecord = 3
 | |
|     numOfDb = 1
 | |
|     dbName = "test"
 | |
|     replica = 1
 | |
|     batch = 1
 | |
|     numOfTb = 1
 | |
|     tbName = "tb"
 | |
|     useStable = False
 | |
|     numOfStb = 0
 | |
|     stbName = "stb"
 | |
|     numOfRec = 10
 | |
|     ieration = 1
 | |
|     host = "127.0.0.1"
 | |
|     configDir = "/etc/taos"
 | |
|     oneMoreHost = "NotSupported"
 | |
|     port = 6030
 | |
|     restPort = 6041
 | |
|     user = "root"
 | |
|     defaultPass = "taosdata"
 | |
|     processes = 1
 | |
|     threads = 1
 | |
|     insertOnly = False
 | |
|     autosubtable = False
 | |
|     queryCmd = "NO"
 | |
|     outOfOrder = 0
 | |
|     rateOOOO = 0
 | |
|     deleteMethod = 0
 | |
|     skipPrompt = False
 | |
| 
 | |
|     try:
 | |
|         opts, args = getopt.gnu_getopt(
 | |
|             sys.argv[1:],
 | |
|             "Nh:p:u:P:d:a:m:Ms:Q:T:C:r:l:t:n:c:xOR:D:vgyH",
 | |
|             [
 | |
|                 "native",
 | |
|                 "host",
 | |
|                 "port",
 | |
|                 "user",
 | |
|                 "password",
 | |
|                 "dbname",
 | |
|                 "replica",
 | |
|                 "tbname",
 | |
|                 "stable",
 | |
|                 "stbname",
 | |
|                 "query",
 | |
|                 "threads",
 | |
|                 "processes",
 | |
|                 "recPerReq",
 | |
|                 "colsPerRecord",
 | |
|                 "numOfTb",
 | |
|                 "numOfRec",
 | |
|                 "config",
 | |
|                 "insertOnly",
 | |
|                 "outOfOrder",
 | |
|                 "rateOOOO",
 | |
|                 "deleteMethod",
 | |
|                 "verbose",
 | |
|                 "debug",
 | |
|                 "skipPrompt",
 | |
|                 "help",
 | |
|             ],
 | |
|         )
 | |
|     except getopt.GetoptError as err:
 | |
|         print("ERROR:", err)
 | |
|         print("Try `taosdemo.py --help` for more options.")
 | |
|         sys.exit(1)
 | |
| 
 | |
|     if bool(opts) is False:
 | |
|         print("Try `taosdemo.py --help` for more options.")
 | |
|         sys.exit(1)
 | |
| 
 | |
|     for key, value in opts:
 | |
|         if key in ["-H", "--help"]:
 | |
|             print("")
 | |
|             print("taosdemo.py for TDengine")
 | |
|             print("")
 | |
|             print("Author: Shuduo Sang <sangshuduo@gmail.com>")
 | |
|             print("")
 | |
| 
 | |
|             print("\t-H, --help                        Show usage.")
 | |
|             print("")
 | |
| 
 | |
|             print(
 | |
|                 "\t-N, --native                      flag, Use native interface if set. Default is using RESTful interface."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-h, --host <hostname>             host, The host to connect to TDengine. Default is localhost."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-p, --port <port>                 port, The TCP/IP port number to use for the connection. Default is 0."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-u, --user <username>             user, The user name to use when connecting to the server. Default is 'root'."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-P, --password <password>         password, The password to use when connecting to the server. Default is 'taosdata'."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-l, --colsPerRec <number>         num_of_columns_per_record, The number of columns per record. Default is 3."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-d, --dbname <dbname>             database, Destination database. Default is 'test'."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-a, --replica <replications>      replica, Set the replica parameters of the database, Default 1, min: 1, max: 5."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-m, --tbname <table prefix>       table_prefix, Table prefix name. Default is 't'."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-M, --stable                      flag, Use super table. Default is no"
 | |
|             )
 | |
|             print(
 | |
|                 "\t-s, --stbname <stable prefix>     stable_prefix, STable prefix name. Default is 'st'"
 | |
|             )
 | |
|             print(
 | |
|                 "\t-Q, --query [NO|EACHTB|command]   query, Execute query command. set 'EACHTB' means select * from each table"
 | |
|             )
 | |
|             print(
 | |
|                 "\t-T, --threads <number>            num_of_threads, The number of threads. Default is 1."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-C, --processes <number>          num_of_processes, The number of threads. Default is 1."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-r, --batch <number>              num_of_records_per_req, The number of records per request. Default is 1000."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-t, --numOfTb <number>            num_of_tables, The number of tables. Default is 1."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-n, --numOfRec <number>           num_of_records_per_table, The number of records per table. Default is 1."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-c, --config <path>               config_directory, Configuration directory. Default is '/etc/taos/'."
 | |
|             )
 | |
|             print("\t-x, --inserOnly                   flag, Insert only flag.")
 | |
|             print(
 | |
|                 "\t-O, --outOfOrder                  out of order data insert, 0: In order, 1: Out of order. Default is in order."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-R, --rateOOOO <number>           rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50."
 | |
|             )
 | |
|             print(
 | |
|                 "\t-D, --deleteMethod <number>       Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database."
 | |
|             )
 | |
|             print("\t-v, --verbose                     Print verbose output")
 | |
|             print("\t-g, --debug                       Print debug output")
 | |
|             print(
 | |
|                 "\t-y, --skipPrompt                  Skip read key for continous test, default is not skip"
 | |
|             )
 | |
|             print("")
 | |
|             sys.exit(0)
 | |
| 
 | |
|         if key in ["-N", "--native"]:
 | |
|             try:
 | |
|                 import taos
 | |
|             except Exception as e:
 | |
|                 print("Error: %s" % e.args[0])
 | |
|                 sys.exit(1)
 | |
|             native = True
 | |
| 
 | |
|         if key in ["-h", "--host"]:
 | |
|             host = value
 | |
| 
 | |
|         if key in ["-p", "--port"]:
 | |
|             port = int(value)
 | |
| 
 | |
|         if key in ["-u", "--user"]:
 | |
|             user = value
 | |
| 
 | |
|         if key in ["-P", "--password"]:
 | |
|             password = value
 | |
|         else:
 | |
|             password = defaultPass
 | |
| 
 | |
|         if key in ["-d", "--dbname"]:
 | |
|             dbName = value
 | |
| 
 | |
|         if key in ["-a", "--replica"]:
 | |
|             replica = int(value)
 | |
|             if replica < 1:
 | |
|                 print("FATAL: number of replica need > 0")
 | |
|                 sys.exit(1)
 | |
| 
 | |
|         if key in ["-m", "--tbname"]:
 | |
|             tbName = value
 | |
| 
 | |
|         if key in ["-M", "--stable"]:
 | |
|             useStable = True
 | |
|             numOfStb = 1
 | |
| 
 | |
|         if key in ["-s", "--stbname"]:
 | |
|             stbName = value
 | |
| 
 | |
|         if key in ["-Q", "--query"]:
 | |
|             queryCmd = str(value)
 | |
| 
 | |
|         if key in ["-T", "--threads"]:
 | |
|             threads = int(value)
 | |
|             if threads < 1:
 | |
|                 print("FATAL: number of threads must be larger than 0")
 | |
|                 sys.exit(1)
 | |
| 
 | |
|         if key in ["-C", "--processes"]:
 | |
|             processes = int(value)
 | |
|             if processes < 1:
 | |
|                 print("FATAL: number of processes must be larger than 0")
 | |
|                 sys.exit(1)
 | |
| 
 | |
|         if key in ["-r", "--batch"]:
 | |
|             batch = int(value)
 | |
| 
 | |
|         if key in ["-l", "--colsPerRec"]:
 | |
|             colsPerRec = int(value)
 | |
| 
 | |
|         if key in ["-t", "--numOfTb"]:
 | |
|             numOfTb = int(value)
 | |
|             v_print("numOfTb is %d", numOfTb)
 | |
| 
 | |
|         if key in ["-n", "--numOfRec"]:
 | |
|             numOfRec = int(value)
 | |
|             v_print("numOfRec is %d", numOfRec)
 | |
|             if numOfRec < 1:
 | |
|                 print("FATAL: number of records must be larger than 0")
 | |
|                 sys.exit(1)
 | |
| 
 | |
|         if key in ["-c", "--config"]:
 | |
|             configDir = value
 | |
|             v_print("config dir: %s", configDir)
 | |
| 
 | |
|         if key in ["-x", "--insertOnly"]:
 | |
|             insertOnly = True
 | |
|             v_print("insert only: %d", insertOnly)
 | |
| 
 | |
|         if key in ["-O", "--outOfOrder"]:
 | |
|             outOfOrder = int(value)
 | |
|             v_print("out of order is %d", outOfOrder)
 | |
| 
 | |
|         if key in ["-R", "--rateOOOO"]:
 | |
|             rateOOOO = int(value)
 | |
|             v_print("the rate of out of order is %d", rateOOOO)
 | |
| 
 | |
|         if key in ["-D", "--deleteMethod"]:
 | |
|             deleteMethod = int(value)
 | |
|             if (deleteMethod < 0) or (deleteMethod > 3):
 | |
|                 print(
 | |
|                     "inputed delete method is %d, valid value is 0~3, set to default 0"
 | |
|                     % deleteMethod
 | |
|                 )
 | |
|                 deleteMethod = 0
 | |
|             v_print("the delete method is %d", deleteMethod)
 | |
| 
 | |
|         if key in ["-v", "--verbose"]:
 | |
|             verbose = True
 | |
| 
 | |
|         if key in ["-g", "--debug"]:
 | |
|             debug = True
 | |
| 
 | |
|         if key in ["-y", "--skipPrompt"]:
 | |
|             skipPrompt = True
 | |
| 
 | |
|     if verbose:
 | |
|         printConfig()
 | |
| 
 | |
|     if not skipPrompt:
 | |
|         input("Press any key to continue..")
 | |
| 
 | |
|     # establish connection first if native
 | |
|     if native:
 | |
|         v_print("host:%s, user:%s passwd:xxxxxx configDir:%s ", host, user, configDir)
 | |
|         try:
 | |
|             conn = taos.connect(
 | |
|                 host=host, user=user, password=password, config=configDir
 | |
|             )
 | |
|             v_print("conn: %s", str(conn.__class__))
 | |
|         except Exception as e:
 | |
|             print("Error: %s" % e.args[0])
 | |
|             sys.exit(1)
 | |
| 
 | |
|         try:
 | |
|             cursor = conn.cursor()
 | |
|             v_print("cursor:%d %s", id(cursor), str(cursor.__class__))
 | |
|         except Exception as e:
 | |
|             print("Error: %s" % e.args[0])
 | |
|             conn.close()
 | |
|             sys.exit(1)
 | |
| 
 | |
|     # drop data only if delete method be set
 | |
|     if deleteMethod > 0:
 | |
|         if deleteMethod == 1:
 | |
|             drop_tables()
 | |
|             print("Drop tables done.")
 | |
|         elif deleteMethod == 2:
 | |
|             drop_stable()
 | |
|             print("Drop super tables done.")
 | |
|         elif deleteMethod == 3:
 | |
|             drop_databases()
 | |
|             print("Drop Database done.")
 | |
|         sys.exit(0)
 | |
| 
 | |
|     # create databases
 | |
|     drop_databases()
 | |
|     create_databases()
 | |
| 
 | |
|     # use last database
 | |
|     current_db = "%s%d" % (dbName, (numOfDb - 1))
 | |
|     use_database()
 | |
| 
 | |
|     if measure:
 | |
|         start_time_begin = time.time()
 | |
| 
 | |
|     if numOfStb > 0:
 | |
|         create_stb()
 | |
|         if autosubtable is False:
 | |
|             create_tb_using_stb()
 | |
|     else:
 | |
|         create_tb()
 | |
| 
 | |
|     if measure:
 | |
|         end_time = time.time()
 | |
|         print(
 | |
|             "Total time consumed {} seconds for create table.".format(
 | |
|                 (end_time - start_time_begin)
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     if native:
 | |
|         cursor.close()
 | |
|         conn.close()
 | |
| 
 | |
|     # start insert data
 | |
|     if measure:
 | |
|         start_time = time.time()
 | |
| 
 | |
|     manager = Manager()
 | |
|     lock = manager.Lock()
 | |
|     pool = Pool(processes)
 | |
| 
 | |
|     begin = 0
 | |
|     end = 0
 | |
| 
 | |
|     quotient = numOfTb // processes
 | |
|     if quotient < 1:
 | |
|         processes = numOfTb
 | |
|         quotient = 1
 | |
| 
 | |
|     remainder = numOfTb % processes
 | |
|     v_print(
 | |
|         "num of tables: %d, quotient: %d, remainder: %d", numOfTb, quotient, remainder
 | |
|     )
 | |
| 
 | |
|     for i in range(processes):
 | |
|         begin = end
 | |
| 
 | |
|         if i < remainder:
 | |
|             end = begin + quotient + 1
 | |
|         else:
 | |
|             end = begin + quotient
 | |
|         pool.apply_async(
 | |
|             insert_data_process,
 | |
|             args=(
 | |
|                 lock,
 | |
|                 i,
 | |
|                 begin,
 | |
|                 end,
 | |
|             ),
 | |
|         )
 | |
| 
 | |
|     pool.close()
 | |
|     pool.join()
 | |
|     time.sleep(1)
 | |
| 
 | |
|     if measure:
 | |
|         end_time = time.time()
 | |
|         print(
 | |
|             "Total time consumed {} seconds for insert data.".format(
 | |
|                 (end_time - start_time)
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     # query data
 | |
|     if queryCmd != "NO":
 | |
|         print("queryCmd: %s" % queryCmd)
 | |
|         query_data_process(queryCmd)
 | |
| 
 | |
|     if measure:
 | |
|         end_time = time.time()
 | |
|         print("Total time consumed {} seconds.".format((end_time - start_time_begin)))
 | |
| 
 | |
|     print("done")
 |