diff --git a/tests/examples/python/taosdemo/README.md b/tests/examples/python/taosdemo/README.md new file mode 100644 index 0000000000..d48fffe8ff --- /dev/null +++ b/tests/examples/python/taosdemo/README.md @@ -0,0 +1,38 @@ +install build environment +=== +/usr/bin/python3 -m pip install -r requirements.txt + +run python version taosdemo +=== +Usage: ./taosdemo.py [OPTION...] + +Author: Shuduo Sang + + -H, --help Show usage. + + -N, --native flag, Use native interface if set. Default is using RESTful interface. + -h, --host host, The host to connect to TDengine. Default is localhost. + -p, --port port, The TCP/IP port number to use for the connection. Default is 0. + -u, --user user, The user name to use when connecting to the server. Default is 'root'. + -P, --password password, The password to use when connecting to the server. Default is 'taosdata'. + -l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3. + -d, --dbname database, Destination database. Default is 'test'. + -a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5. + -m, --tbname table_prefix, Table prefix name. Default is 't'. + -M, --stable flag, Use super table. Default is no + -s, --stbname stable_prefix, STable prefix name. Default is 'st' + -Q, --query query, Execute query command. set 'DEFAULT' means select * from each table + -T, --threads num_of_threads, The number of threads. Default is 1. + -C, --processes num_of_processes, The number of threads. Default is 1. + -r, --batch num_of_records_per_req, The number of records per request. Default is 1000. + -t, --numOfTb num_of_tables, The number of tables. Default is 1. + -n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1. + -c, --config config_directory, Configuration directory. Default is '/etc/taos/'. + -x, --inserOnly flag, Insert only flag. + -O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order. + -R, --rateOOOO rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50. + -D, --deleteMethod Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database. + -v, --verbose Print verbose output + -g, --debug Print debug output + -y, --skipPrompt Skip read key for continous test, default is not skip + diff --git a/tests/examples/python/taosdemo/requirements.txt b/tests/examples/python/taosdemo/requirements.txt new file mode 100644 index 0000000000..977e8e3726 --- /dev/null +++ b/tests/examples/python/taosdemo/requirements.txt @@ -0,0 +1,28 @@ +## +######## example-requirements.txt ####### +## +####### Requirements without Version Specifiers ###### +requests +multipledispatch +#beautifulsoup4 +## +####### Requirements with Version Specifiers ###### +## See https://www.python.org/dev/peps/pep-0440/#version-specifiers +#docopt == 0.6.1 # Version Matching. Must be version 0.6.1 +#keyring >= 4.1.1 # Minimum version 4.1.1 +#coverage != 3.5 # Version Exclusion. Anything except version 3.5 +#Mopidy-Dirble ~= 1.1 # Compatible release. Same as >= 1.1, == 1.* +## +####### Refer to other requirements files ###### +#-r other-requirements.txt +## +## +####### A particular file ###### +#./downloads/numpy-1.9.2-cp34-none-win32.whl +#http://wxpython.org/Phoenix/snapshot-builds/wxPython_Phoenix-3.0.3.dev1820+49a8884-cp34-none-win_amd64.whl +## +####### Additional Requirements without Version Specifiers ###### +## Same as 1st section, just here to show that you can put things in any order. +#rejected +#green +## diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py new file mode 100755 index 0000000000..d55023bdbf --- /dev/null +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -0,0 +1,797 @@ +#!/usr/bin/python3 +# * Copyright (c) 2019 TAOS Data, Inc. +# * +# * This program is free software: you can use, redistribute, and/or modify +# * it under the terms of the GNU Affero General Public License, version 3 +# * or later ("AGPL"), as published by the Free Software Foundation. +# * +# * This program is distributed in the hope that it will be useful, but WITHOUT +# * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# * FITNESS FOR A PARTICULAR PURPOSE. +# * +# * You should have received a copy of the GNU Affero General Public License +# * along with this program. If not, see . + +# -*- coding: utf-8 -*- + +import sys +import getopt +import requests +import json +import random +import time +import datetime +from multiprocessing import Manager, Pool, Lock +from multipledispatch import dispatch +from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED + + +@dispatch(str, str) +def v_print(msg: str, arg: str): + if verbose: + print(msg % arg) + + +@dispatch(str, str, str) +def v_print(msg: str, arg1: str, arg2: str): + if verbose: + print(msg % (arg1, arg2)) + + +@dispatch(str, str, str, str) +def v_print(msg: str, arg1: str, arg2: str, arg3: str): + if verbose: + print(msg % (arg1, arg2, arg3)) + + +@dispatch(str, str, str, str, str) +def v_print(msg: str, arg1: str, arg2: str, arg3: str, arg4: str): + if verbose: + print(msg % (arg1, arg2, arg3, arg4)) + + +@dispatch(str, int) +def v_print(msg: str, arg: int): + if verbose: + print(msg % int(arg)) + + +@dispatch(str, int, str) +def v_print(msg: str, arg1: int, arg2: str): + if verbose: + print(msg % (int(arg1), str(arg2))) + + +@dispatch(str, str, int) +def v_print(msg: str, arg1: str, arg2: int): + if verbose: + print(msg % (arg1, int(arg2))) + + +@dispatch(str, int, int) +def v_print(msg: str, arg1: int, arg2: int): + if verbose: + print(msg % (int(arg1), int(arg2))) + + +@dispatch(str, int, int, str) +def v_print(msg: str, arg1: int, arg2: int, arg3: str): + if verbose: + print(msg % (int(arg1), int(arg2), str(arg3))) + + +@dispatch(str, int, int, int) +def v_print(msg: str, arg1: int, arg2: int, arg3: int): + if verbose: + print(msg % (int(arg1), int(arg2), int(arg3))) + + +@dispatch(str, int, int, int, int) +def v_print(msg: str, arg1: int, arg2: int, arg3: int, arg4: int): + if verbose: + print(msg % (int(arg1), int(arg2), int(arg3), int(arg4))) + + +def restful_execute(host: str, port: int, user: str, password: str, cmd: str): + url = "http://%s:%d/rest/sql" % (host, restPort) + + v_print("restful_execute - cmd: %s", cmd) + + resp = requests.post(url, cmd, auth=(user, password)) + + v_print("resp status: %d", resp.status_code) + + if debug: + v_print( + "resp text: %s", + json.dumps( + resp.json(), + sort_keys=True, + indent=2)) + else: + print("resp: %s" % json.dumps(resp.json())) + + +def query_func(process: int, thread: int, cmd: str): + v_print("%d process %d thread cmd: %s", process, thread, cmd) + + if oneMoreHost != "NotSupported" and random.randint( + 0, 1) == 1: + v_print("%s", "Send to second host") + if native: + cursor2.execute(cmd) + else: + restful_execute( + oneMoreHost, port, user, password, cmd) + else: + v_print("%s%s%s", "Send ", cmd, " to the host") + if native: + pass +# cursor.execute(cmd) + else: + restful_execute( + host, port, user, password, cmd) + + +def query_data_process(cmd: str): + # establish connection if native + if native: + v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) + try: + conn = taos.connect( + host=host, + user=user, + password=password, + config=configDir) + v_print("conn: %s", str(conn.__class__)) + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + + try: + cursor = conn.cursor() + v_print("cursor:%d %s", id(cursor), str(cursor.__class__)) + except Exception as e: + print("Error: %s" % e.args[0]) + conn.close() + sys.exit(1) + + if native: + try: + cursor.execute(cmd) + cols = cursor.description + data = cursor.fetchall() + + for col in data: + print(col) + except Exception as e: + conn.close() + print("Error: %s" % e.args[0]) + sys.exit(1) + + else: + restful_execute( + host, + port, + user, + password, + cmd) + + if native: + cursor.close() + conn.close() + + +def create_stb(): + for i in range(0, numOfStb): + if native: + cursor.execute( + "CREATE TABLE IF NOT EXISTS %s%d (ts timestamp, value float) TAGS (uuid binary(50))" % + (stbName, i)) + else: + restful_execute( + host, + port, + user, + password, + "CREATE TABLE IF NOT EXISTS %s%d (ts timestamp, value float) TAGS (uuid binary(50))" % + (stbName, i) + ) + + +def use_database(): + + if native: + cursor.execute("USE %s" % current_db) + else: + restful_execute(host, port, user, password, "USE %s" % current_db) + + +def create_databases(): + for i in range(0, numOfDb): + v_print("will create database db%d", int(i)) + + if native: + cursor.execute( + "CREATE DATABASE IF NOT EXISTS %s%d" % (dbName, i)) + else: + restful_execute( + host, + port, + user, + password, + "CREATE DATABASE IF NOT EXISTS %s%d" % (dbName, i)) + + +def drop_tables(): + # TODO + v_print("TODO: drop tables total %d", numOfTb) + pass + + +def drop_stable(): + # TODO + v_print("TODO: drop stables total %d", numOfStb) + pass + + +def drop_databases(): + v_print("drop databases total %d", numOfDb) + + # drop exist databases first + for i in range(0, numOfDb): + v_print("will drop database db%d", int(i)) + + if native: + cursor.execute( + "DROP DATABASE IF EXISTS %s%d" % + (dbName, i)) + else: + restful_execute( + host, + port, + user, + password, + "DROP DATABASE IF EXISTS %s%d" % + (dbName, i)) + + +def insert_func(process: int, thread: int): + v_print("%d process %d thread, insert_func ", process, thread) + + # generate uuid + uuid_int = random.randint(0, numOfTb + 1) + uuid = "%s" % uuid_int + v_print("uuid is: %s", uuid) + + # establish connection if native + if native: + v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) + try: + conn = taos.connect( + host=host, + user=user, + password=password, + config=configDir) + v_print("conn: %s", str(conn.__class__)) + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + + try: + cursor = conn.cursor() + v_print("cursor:%d %s", id(cursor), str(cursor.__class__)) + except Exception as e: + print("Error: %s" % e.args[0]) + conn.close() + sys.exit(1) + + v_print("numOfRec %d:", numOfRec) + + row = 0 + while row < numOfRec: + v_print("row: %d", row) + sqlCmd = ['INSERT INTO '] + try: + sqlCmd.append( + "%s.%s%d " % (current_db, tbName, thread)) + + if (numOfStb > 0 and autosubtable): + sqlCmd.append("USING %s.%s%d TAGS('%s') " % + (current_db, stbName, numOfStb - 1, uuid)) + + start_time = datetime.datetime( + 2021, 1, 25) + datetime.timedelta(seconds=row) + + sqlCmd.append("VALUES ") + for batchIter in range(0, batch): + sqlCmd.append("('%s', %f) " % + ( + start_time + + datetime.timedelta( + milliseconds=batchIter), + random.random())) + row = row + 1 + if row >= numOfRec: + v_print("BREAK, row: %d numOfRec:%d", row, numOfRec) + break + + except Exception as e: + print("Error: %s" % e.args[0]) + + cmd = ' '.join(sqlCmd) + + if measure: + exec_start_time = datetime.datetime.now() + + if native: + affectedRows = cursor.execute(cmd) + else: + restful_execute( + host, port, user, password, cmd) + + if measure: + exec_end_time = datetime.datetime.now() + exec_delta = exec_end_time - exec_start_time + v_print( + "consume %d microseconds", + exec_delta.microseconds) + + v_print("cmd: %s, length:%d", cmd, len(cmd)) + + if native: + cursor.close() + conn.close() + + +def create_tb_using_stb(): + # TODO: + pass + + +def create_tb(): + v_print("create_tb() numOfTb: %d", numOfTb) + for i in range(0, numOfDb): + if native: + cursor.execute("USE %s%d" % (dbName, i)) + else: + restful_execute( + host, port, user, password, "USE %s%d" % + (dbName, i)) + + for j in range(0, numOfTb): + if native: + cursor.execute( + "CREATE TABLE %s%d (ts timestamp, value float)" % + (tbName, j)) + else: + restful_execute( + host, + port, + user, + password, + "CREATE TABLE %s%d (ts timestamp, value float)" % + (tbName, j)) + + +def insert_data_process(lock, i: int, begin: int, end: int): + lock.acquire() + tasks = end - begin + v_print("insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) + + if (threads < (end - begin)): + for j in range(begin, end, threads): + with ThreadPoolExecutor(max_workers=threads) as executor: + k = end if ((j + threads) > end) else (j + threads) + workers = [ + executor.submit( + insert_func, + i, + n) for n in range( + j, + k)] + wait(workers, return_when=ALL_COMPLETED) + else: + with ThreadPoolExecutor(max_workers=threads) as executor: + workers = [ + executor.submit( + insert_func, + i, + j) for j in range( + begin, + end)] + wait(workers, return_when=ALL_COMPLETED) + + lock.release() + + +def query_db(i): + if native: + cursor.execute("USE %s%d" % (dbName, i)) + else: + restful_execute( + host, port, user, password, "USE %s%d" % + (dbName, i)) + + for j in range(0, numOfTb): + if native: + cursor.execute( + "SELECT COUNT(*) FROM %s%d" % (tbName, j)) + else: + restful_execute( + host, port, user, password, "SELECT COUNT(*) FROM %s%d" % + (tbName, j)) + + +def printConfig(): + + print("###################################################################") + print("# Use native interface: %s" % native) + print("# Server IP: %s" % host) + if native: + print("# Server port: %s" % port) + else: + print("# Server port: %s" % restPort) + + print("# Configuration Dir: %s" % configDir) + print("# User: %s" % user) + print("# Password: %s" % password) + print("# Number of Columns per record: %s" % colsPerRecord) + print("# Number of Threads: %s" % threads) + print("# Number of Processes: %s" % processes) + print("# Number of Tables: %s" % numOfTb) + print("# Number of records per Table: %s" % numOfRec) + print("# Records/Request: %s" % batch) + print("# Database name: %s" % dbName) + print("# Replica: %s" % replica) + print("# Use STable: %s" % useStable) + print("# Table prefix: %s" % tbName) + if useStable: + print("# STable prefix: %s" % stbName) + + print("# Data order: %s" % outOfOrder) + print("# Data out of order rate: %s" % rateOOOO) + print("# Delete method: %s" % deleteMethod) + print("# Query command: %s" % queryCmd) + print("# Insert Only: %s" % insertOnly) + print("# Verbose output %s" % verbose) + print("# Test time: %s" % + datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")) + print("###################################################################") + + +if __name__ == "__main__": + + native = False + verbose = False + debug = False + measure = True + dropDbOnly = False + colsPerRecord = 3 + numOfDb = 1 + dbName = "test" + replica = 1 + batch = 1 + numOfTb = 1 + tbName = "tb" + useStable = False + numOfStb = 0 + stbName = "stb" + numOfRec = 10 + ieration = 1 + host = "127.0.0.1" + configDir = "/etc/taos" + oneMoreHost = "NotSupported" + port = 6030 + restPort = 6041 + user = "root" + defaultPass = "taosdata" + processes = 1 + threads = 1 + insertOnly = False + autosubtable = False + queryCmd = "NO" + outOfOrder = 0 + rateOOOO = 0 + deleteMethod = 0 + skipPrompt = False + + try: + opts, args = getopt.gnu_getopt(sys.argv[1:], + 'Nh:p:u:P:d:a:m:Ms:Q:T:C:r:l:t:n:c:xOR:D:vgyH', + [ + 'native', 'host', 'port', 'user', 'password', 'dbname', 'replica', 'tbname', + 'stable', 'stbname', 'query', 'threads', 'processes', + 'recPerReq', 'colsPerRecord', 'numOfTb', 'numOfRec', 'config', + 'insertOnly', 'outOfOrder', 'rateOOOO', 'deleteMethod', + 'verbose', 'debug', 'skipPrompt', 'help' + ]) + except getopt.GetoptError as err: + print('ERROR:', err) + print('Try `taosdemo.py --help` for more options.') + sys.exit(1) + + if bool(opts) is False: + print('Try `taosdemo.py --help` for more options.') + sys.exit(1) + + for key, value in opts: + if key in ['-H', '--help']: + print('') + print( + 'taosdemo.py for TDengine') + print('') + print('Author: Shuduo Sang ') + print('') + + print('\t-H, --help Show usage.') + print('') + + print('\t-N, --native flag, Use native interface if set. Default is using RESTful interface.') + print('\t-h, --host host, The host to connect to TDengine. Default is localhost.') + print('\t-p, --port port, The TCP/IP port number to use for the connection. Default is 0.') + print('\t-u, --user user, The user name to use when connecting to the server. Default is \'root\'.') + print('\t-P, --password password, The password to use when connecting to the server. Default is \'taosdata\'.') + print('\t-l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3.') + print( + '\t-d, --dbname database, Destination database. Default is \'test\'.') + print('\t-a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.') + print( + '\t-m, --tbname
table_prefix, Table prefix name. Default is \'t\'.') + print( + '\t-M, --stable flag, Use super table. Default is no') + print( + '\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') + print('\t-Q, --query [NO|EACHTB|command] query, Execute query command. set \'EACHTB\' means select * from each table') + print( + '\t-T, --threads num_of_threads, The number of threads. Default is 1.') + print( + '\t-C, --processes num_of_processes, The number of threads. Default is 1.') + print('\t-r, --batch num_of_records_per_req, The number of records per request. Default is 1000.') + print( + '\t-t, --numOfTb num_of_tables, The number of tables. Default is 1.') + print('\t-n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1.') + print('\t-c, --config config_directory, Configuration directory. Default is \'/etc/taos/\'.') + print('\t-x, --inserOnly flag, Insert only flag.') + print('\t-O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order.') + print('\t-R, --rateOOOO rate, Out of order data\'s rate--if order=1 Default 10, min: 0, max: 50.') + print('\t-D, --deleteMethod Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.') + print('\t-v, --verbose Print verbose output') + print('\t-g, --debug Print debug output') + print( + '\t-y, --skipPrompt Skip read key for continous test, default is not skip') + print('') + sys.exit(0) + + if key in ['-N', '--native']: + try: + import taos + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + native = True + + if key in ['-h', '--host']: + host = value + + if key in ['-p', '--port']: + port = int(value) + + if key in ['-u', '--user']: + user = value + + if key in ['-P', '--password']: + password = value + else: + password = defaultPass + + if key in ['-d', '--dbname']: + dbName = value + + if key in ['-a', '--replica']: + replica = int(value) + if replica < 1: + print("FATAL: number of replica need > 0") + sys.exit(1) + + if key in ['-m', '--tbname']: + tbName = value + + if key in ['-M', '--stable']: + useStable = True + numOfStb = 1 + + if key in ['-s', '--stbname']: + stbName = value + + if key in ['-Q', '--query']: + queryCmd = str(value) + + if key in ['-T', '--threads']: + threads = int(value) + if threads < 1: + print("FATAL: number of threads must be larger than 0") + sys.exit(1) + + if key in ['-C', '--processes']: + processes = int(value) + if processes < 1: + print("FATAL: number of processes must be larger than 0") + sys.exit(1) + + if key in ['-r', '--batch']: + batch = int(value) + + if key in ['-l', '--colsPerRec']: + colsPerRec = int(value) + + if key in ['-t', '--numOfTb']: + numOfTb = int(value) + v_print("numOfTb is %d", numOfTb) + + if key in ['-n', '--numOfRec']: + numOfRec = int(value) + v_print("numOfRec is %d", numOfRec) + if numOfRec < 1: + print("FATAL: number of records must be larger than 0") + sys.exit(1) + + + if key in ['-c', '--config']: + configDir = value + v_print("config dir: %s", configDir) + + if key in ['-x', '--insertOnly']: + insertOnly = True + v_print("insert only: %d", insertOnly) + + if key in ['-O', '--outOfOrder']: + outOfOrder = int(value) + v_print("out of order is %d", outOfOrder) + + if key in ['-R', '--rateOOOO']: + rateOOOO = int(value) + v_print("the rate of out of order is %d", rateOOOO) + + if key in ['-D', '--deleteMethod']: + deleteMethod = int(value) + if (deleteMethod < 0) or (deleteMethod > 3): + print( + "inputed delete method is %d, valid value is 0~3, set to default 0" % + deleteMethod) + deleteMethod = 0 + v_print("the delete method is %d", deleteMethod) + + if key in ['-v', '--verbose']: + verbose = True + + if key in ['-g', '--debug']: + debug = True + + if key in ['-y', '--skipPrompt']: + skipPrompt = True + + if verbose: + printConfig() + + if not skipPrompt: + input("Press any key to continue..") + + # establish connection first if native + if native: + v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) + try: + conn = taos.connect( + host=host, + user=user, + password=password, + config=configDir) + v_print("conn: %s", str(conn.__class__)) + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + + try: + cursor = conn.cursor() + v_print("cursor:%d %s", id(cursor), str(cursor.__class__)) + except Exception as e: + print("Error: %s" % e.args[0]) + conn.close() + sys.exit(1) + + # drop data only if delete method be set + if deleteMethod > 0: + if deleteMethod == 1: + drop_tables() + print("Drop tables done.") + elif deleteMethod == 2: + drop_stables() + print("Drop super tables done.") + elif deleteMethod == 3: + drop_databases() + print("Drop Database done.") + sys.exit(0) + + # create databases + drop_databases() + create_databases() + + # use last database + current_db = "%s%d" % (dbName, (numOfDb - 1)) + use_database() + + if measure: + start_time_begin = time.time() + + if numOfStb > 0: + create_stb() + if (autosubtable == False): + create_tb_using_stb() + else: + create_tb() + + if measure: + end_time = time.time() + print( + "Total time consumed {} seconds for create table.".format( + (end_time - start_time_begin))) + + if native: + cursor.close() + conn.close() + + # start insert data + if measure: + start_time = time.time() + + manager = Manager() + lock = manager.Lock() + pool = Pool(processes) + + begin = 0 + end = 0 + + quotient = numOfTb // processes + if quotient < 1: + processes = numOfTb + quotient = 1 + + remainder = numOfTb % processes + v_print( + "num of tables: %d, quotient: %d, remainder: %d", + numOfTb, + quotient, + remainder) + + for i in range(processes): + begin = end + + if i < remainder: + end = begin + quotient + 1 + else: + end = begin + quotient + pool.apply_async(insert_data_process, args=(lock, i, begin, end,)) + + pool.close() + pool.join() + time.sleep(1) + + if measure: + end_time = time.time() + print( + "Total time consumed {} seconds for insert data.".format( + (end_time - start_time))) + + + # query data + if queryCmd != "NO": + print("queryCmd: %s" % queryCmd) + query_data_process(queryCmd) + + if measure: + end_time = time.time() + print( + "Total time consumed {} seconds.".format( + (end_time - start_time_begin))) + + print("done")