From 907f8bbaca0b813efd92091b92eca9d686234a76 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 1 Feb 2021 18:31:15 +0800 Subject: [PATCH 01/10] [TD-2771] : python version taosdemo. --- tests/examples/python/taosdemo/README.md | 32 + .../examples/python/taosdemo/requirements.txt | 28 + tests/examples/python/taosdemo/taosdemo.py | 545 ++++++++++++++++++ 3 files changed, 605 insertions(+) create mode 100644 tests/examples/python/taosdemo/README.md create mode 100644 tests/examples/python/taosdemo/requirements.txt create mode 100755 tests/examples/python/taosdemo/taosdemo.py diff --git a/tests/examples/python/taosdemo/README.md b/tests/examples/python/taosdemo/README.md new file mode 100644 index 0000000000..8714bfab2e --- /dev/null +++ b/tests/examples/python/taosdemo/README.md @@ -0,0 +1,32 @@ +install build environment +=== +/usr/bin/python3 -m pip install -r requirements.txt + +run python version taosdemo +=== +Usage: ./taosdemo.py [OPTION...] + + --help Show usage. + + -h host, The host to connect to TDengine. Default is localhost. + -p port, The TCP/IP port number to use for the connection. Default is 0. + -u user, The user name to use when connecting to the server. Default is 'root'. + -P password, The password to use when connecting to the server. Default is 'taosdata'. + -d database, Destination database. Default is 'test'. + -a replica, Set the replica parameters of the database, Default 1, min: 1, max: 5. + -m table_prefix, Table prefix name. Default is 't'. + -M stable, Use super table. + -s stable_prefix, STable prefix name. Default is 'st' + -Q query, Execute query command. set 'DEFAULT' means select * from each table + -T num_of_threads, The number of threads. Default is 10. + -r num_of_records_per_req, The number of records per request. Default is 1000. + -t num_of_tables, The number of tables. Default is 1. + -n num_of_records_per_table, The number of records per table. Default is 1. + -c config_directory, Configuration directory. Default is '/etc/taos/'. + -x flag, Insert only flag. + -O order, Insert mode--0: In order, 1: Out of order. Default is in order. + -R rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50. + -D Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database. + -v Print verbose output + -g Print debug output + -y Skip read key for continous test, default is not skip diff --git a/tests/examples/python/taosdemo/requirements.txt b/tests/examples/python/taosdemo/requirements.txt new file mode 100644 index 0000000000..977e8e3726 --- /dev/null +++ b/tests/examples/python/taosdemo/requirements.txt @@ -0,0 +1,28 @@ +## +######## example-requirements.txt ####### +## +####### Requirements without Version Specifiers ###### +requests +multipledispatch +#beautifulsoup4 +## +####### Requirements with Version Specifiers ###### +## See https://www.python.org/dev/peps/pep-0440/#version-specifiers +#docopt == 0.6.1 # Version Matching. Must be version 0.6.1 +#keyring >= 4.1.1 # Minimum version 4.1.1 +#coverage != 3.5 # Version Exclusion. Anything except version 3.5 +#Mopidy-Dirble ~= 1.1 # Compatible release. Same as >= 1.1, == 1.* +## +####### Refer to other requirements files ###### +#-r other-requirements.txt +## +## +####### A particular file ###### +#./downloads/numpy-1.9.2-cp34-none-win32.whl +#http://wxpython.org/Phoenix/snapshot-builds/wxPython_Phoenix-3.0.3.dev1820+49a8884-cp34-none-win_amd64.whl +## +####### Additional Requirements without Version Specifiers ###### +## Same as 1st section, just here to show that you can put things in any order. +#rejected +#green +## diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py new file mode 100755 index 0000000000..3a4254d0ae --- /dev/null +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -0,0 +1,545 @@ +#!/usr/bin/python3 +# * Copyright (c) 2019 TAOS Data, Inc. +# * +# * This program is free software: you can use, redistribute, and/or modify +# * it under the terms of the GNU Affero General Public License, version 3 +# * or later ("AGPL"), as published by the Free Software Foundation. +# * +# * This program is distributed in the hope that it will be useful, but WITHOUT +# * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# * FITNESS FOR A PARTICULAR PURPOSE. +# * +# * You should have received a copy of the GNU Affero General Public License +# * along with this program. If not, see . + +# -*- coding: utf-8 -*- + +import sys +import getopt +import requests +import json +import random +import time +import datetime +from multiprocessing import Process, Pool +from multipledispatch import dispatch +from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED + + +@dispatch(str, str) +def v_print(msg: str, arg: str): + if verbose: + print(msg % arg) + + +@dispatch(str, int) +def v_print(msg: str, arg: int): + if verbose: + print(msg % int(arg)) + + +@dispatch(str, int, str) +def v_print(msg: str, arg1: int, arg2: str): + if verbose: + print(msg % (int(arg1), str(arg2))) + + +@dispatch(str, str, int) +def v_print(msg: str, arg1: str, arg2: int): + if verbose: + print(msg % (arg1, int(arg2))) + + +@dispatch(str, int, int) +def v_print(msg: str, arg1: int, arg2: int): + if verbose: + print(msg % (int(arg1), int(arg2))) + + +@dispatch(str, int, int, str) +def v_print(msg: str, arg1: int, arg2: int, arg3: str): + if verbose: + print(msg % (int(arg1), int(arg2), str(arg3))) + + +@dispatch(str, int, int, int) +def v_print(msg: str, arg1: int, arg2: int, arg3: int): + if verbose: + print(msg % (int(arg1), int(arg2), int(arg3))) + + +@dispatch(str, int, int, int, int) +def v_print(msg: str, arg1: int, arg2: int, arg3: int, arg4: int): + if verbose: + print(msg % (int(arg1), int(arg2), int(arg3), int(arg4))) + + +def restful_execute(host: str, port: int, user: str, password: str, cmd: str): + url = "http://%s:%d/rest/sql" % (host, port) + + if verbose: + v_print("cmd: %s", cmd) + + resp = requests.post(url, cmd, auth=(user, password)) + + v_print("resp status: %d", resp.status_code) + + if verbose: + v_print( + "resp text: %s", + json.dumps( + resp.json(), + sort_keys=True, + indent=2)) + else: + print("resp: %s" % json.dumps(resp.json())) + + +def query_func(process: int, thread: int, cmd: str): + v_print("%d process %d thread cmd: %s", process, thread, cmd) + if oneMoreHost != "NotSupported" and random.randint( + 0, 1) == 1: + v_print("%s", "Send to second host") + restful_execute( + oneMoreHost, port, user, password, cmd) + else: + v_print("%s", "Send to first host") + restful_execute( + host, port, user, password, cmd) + + +def query_data_process(i: int, cmd: str): + v_print("Process:%d threads: %d cmd: %s", i, threads, cmd) + + with ThreadPoolExecutor(max_workers=threads) as executor: + workers = [ + executor.submit( + query_func, + i, + j, + cmd) for j in range( + 0, + threads)] + + wait(workers, return_when=ALL_COMPLETED) + + return i + + +def query_data(cmd: str): + v_print("query_data processes: %d, cmd: %s", processes, cmd) + pool = Pool(processes) + for i in range(processes): + pool.apply_async(query_data_process, args=(i, cmd)) + time.sleep(1) + pool.close() + pool.join() + + +def insert_data(processes: int): + pool = Pool(processes) + + begin = 0 + end = 0 + + quotient = numOfTb // processes + if quotient < 1: + processes = numOfTb + quotient = 1 + + remainder = numOfTb % processes + v_print( + "num of tables: %d, quotient: %d, remainder: %d", + numOfTb, + quotient, + remainder) + + for i in range(processes): + begin = end + + if i < remainder: + end = begin + quotient + 1 + else: + end = begin + quotient + + v_print("Process %d from %d to %d", i, begin, end) + pool.apply_async(insert_data_process, args=(i, begin, end)) + + pool.close() + pool.join() + + +def create_stb(): + for i in range(0, numOfStb): + restful_execute( + host, + port, + user, + password, + "CREATE TABLE IF NOT EXISTS st%d (ts timestamp, value float) TAGS (uuid binary(50))" % + i) + + +def create_databases(): + for i in range(0, numOfDb): + v_print("will create database db%d", int(i)) + restful_execute( + host, + port, + user, + password, + "CREATE DATABASE IF NOT EXISTS db%d" % + i) + + +def drop_databases(): + v_print("drop databases total %d", numOfDb) + + # drop exist databases first + for i in range(0, numOfDb): + v_print("will drop database db%d", int(i)) + restful_execute( + host, + port, + user, + password, + "DROP DATABASE IF EXISTS db%d" % + i) + + +def insert_func(process: int, thread: int): + v_print("%d process %d thread, insert_func ", process, thread) + + # generate uuid + uuid_int = random.randint(0, numOfTb + 1) + uuid = "%s" % uuid_int + v_print("uuid is: %s", uuid) + + v_print("numOfRec %d:", numOfRec) + if numOfRec > 0: + row = 0 + while row < numOfRec: + v_print("row: %d", row) + sqlCmd = ['INSERT INTO '] + try: + sqlCmd.append( + "%s.tb%s " % (current_db, thread)) + + if (numOfStb > 0 and autosubtable): + sqlCmd.append("USING %s.st%d TAGS('%s') " % + (current_db, numOfStb - 1, uuid)) + + start_time = datetime.datetime( + 2020, 9, 25) + datetime.timedelta(seconds=row) + + sqlCmd.append("VALUES ") + for batchIter in range(0, batch): + sqlCmd.append("('%s', %f) " % + (start_time + + datetime.timedelta( + milliseconds=batchIter), + random.random())) + row = row + 1 + if row >= numOfRec: + v_print("BREAK, row: %d numOfRec:%d", row, numOfRec) + break + + except Exception as e: + print("Error: %s" % e.args[0]) + + cmd = ' '.join(sqlCmd) + + if measure: + exec_start_time = datetime.datetime.now() + + if oneMoreHost != "NotSupported" and random.randint( + 0, 1) == 1: + v_print("%s", "Send to second host") + restful_execute( + oneMoreHost, port, user, password, cmd) + else: + v_print("%s", "Send to first host") + restful_execute( + host, port, user, password, cmd) + + if measure: + exec_end_time = datetime.datetime.now() + exec_delta = exec_end_time - exec_start_time + print( + "%s, %d" % + (time.strftime('%X'), + exec_delta.microseconds)) + + v_print("cmd: %s, length:%d", cmd, len(cmd)) + + +def create_tb_using_stb(): + # TODO: + pass + + +def create_tb(): + v_print("create_tb() numOfTb: %d", numOfTb) + for i in range(0, numOfDb): + restful_execute(host, port, user, password, "USE db%d" % i) + for j in range(0, numOfTb): + restful_execute( + host, + port, + user, + password, + "CREATE TABLE tb%d (ts timestamp, value float)" % + j) + + +def insert_data_process(i: int, begin: int, end: int): + tasks = end - begin + v_print("Process:%d table from %d to %d, tasks %d", i, begin, end, tasks) + + if (threads < (end - begin)): + for j in range(begin, end, threads): + with ThreadPoolExecutor(max_workers=threads) as executor: + k = end if ((j + threads) > end) else (j + threads) + workers = [ + executor.submit( + insert_func, + i, + n) for n in range( + j, + k)] + wait(workers, return_when=ALL_COMPLETED) + else: + with ThreadPoolExecutor(max_workers=threads) as executor: + workers = [ + executor.submit( + insert_func, + i, + j) for j in range( + begin, + end)] + wait(workers, return_when=ALL_COMPLETED) + + +if __name__ == "__main__": + + verbose = False + measure = False + dropDbOnly = False + numOfDb = 1 + batch = 1 + numOfTb = 1 + numOfStb = 0 + numOfRec = 10 + ieration = 1 + host = "127.0.0.1" + oneMoreHost = "NotSupported" + port = 6041 + user = "root" + defaultPass = "taosdata" + processes = 1 + threads = 1 + insertonly = False + autosubtable = False + queryCmd = "" + + try: + opts, args = getopt.gnu_getopt(sys.argv[1:], + 'Nh:p:u:P:d:a:m:Ms:Q:T:P:r:t:n:c:xOR:D:vgyH', + [ + 'native', 'host', 'port', 'user', 'password', 'dbname', 'replica', 'tbname', + 'supertable', 'stbname', 'query', 'numOfThreads', 'numOfProcesses', + 'numOfRecPerReq', 'numbOfTb', 'numOfRec', 'config', + 'insertOnly', 'outOfOrder', 'rateOOOO','deleteMethod', + 'verbose', 'debug', 'skipprompt', '--help' + ]) + except getopt.GetoptError as err: + print('ERROR:', err) + print('Try `taosdemo.py --help` for more options.') + sys.exit(1) + + if bool(opts) is False: + print('Try `taosdemo.py --help` for more options.') + sys.exit(1) + + for key, value in opts: + if key in ['-H', '--help']: + print('') + print( + 'taosdemo.py for TDengine') + print('') + print('Author: Shuduo Sang ') + print('') + + print('\t-H, --help Show usage.') + + print('\t-N, --native flag, Use native interface if set. Default is using RESTful interface.') + print('\t-h, --host host, The host to connect to TDengine. Default is localhost.') + print('\t-p, --port port, The TCP/IP port number to use for the connection. Default is 0.') + print('\t-u, --user user, The user name to use when connecting to the server. Default is \'root\'.') + print('\t-P, --password password, The password to use when connecting to the server. Default is \'taosdata\'.') + print('\t-d, --dbname database, Destination database. Default is \'test\'.') + print('\t-a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.') + print('\t-m, --tbname
table_prefix, Table prefix name. Default is \'t\'.') + print('\t-M, --supertable flag, Use super table. Default is no') + print('\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') + print('\t-Q, --query query, Execute query command. set \'DEFAULT\' means select * from each table') + print('\t-T, --numOfThreads num_of_threads, The number of threads. Default is 1.') + print('\t-P, --numOfProcesses num_of_processes, The number of threads. Default is 1.') + print('\t-r, --numOfRecPerReq num_of_records_per_req, The number of records per request. Default is 1000.') + print('\t-t, --numOfTb num_of_tables, The number of tables. Default is 1.') + print('\t-n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1.') + print('\t-c, --config config_directory, Configuration directory. Default is \'/etc/taos/\'.') + print('\t-x, --inserOnly flag, Insert only flag.') + print('\t-O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order.') + print('\t-R, --rateOOOO rate, Out of order data\'s rate--if order=1 Default 10, min: 0, max: 50.') + print('\t-D, --deleteMethod Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.') + print('\t-v, --verbose Print verbose output') + print('\t-g, --debug Print debug output') + print('\t-y, --skipprompt Skip read key for continous test, default is not skip') + print('') + sys.exit(0) + + if key in ['-s', '--hoSt']: + host = value + + if key in ['-m', '--one-More-host']: + oneMoreHost = value + + if key in ['-o', '--pOrt']: + port = int(value) + + if key in ['-u', '--User']: + user = value + + if key in ['-w', '--passWord']: + password = value + else: + password = defaultPass + + if key in ['-v', '--Verbose']: + verbose = True + + if key in ['-A', '--Autosubtable']: + autosubtable = True + + if key in ['-M', '--Measure']: + measure = True + + if key in ['-P', '--Processes']: + processes = int(value) + if processes < 1: + print("FATAL: number of processes must be larger than 0") + sys.exit(1) + + if key in ['-T', '--Threads']: + threads = int(value) + if threads < 1: + print("FATAL: number of threads must be larger than 0") + sys.exit(1) + + if key in ['-q', '--Query']: + queryCmd = str(value) + + if key in ['-p', '--droPdbonly']: + dropDbOnly = True + + if key in ['-d', '--numofDb']: + numOfDb = int(value) + v_print("numOfDb is %d", numOfDb) + if (numOfdb <= 0): + print("ERROR: wrong number of database given!") + sys.exit(1) + + if key in ['-c', '--batCh']: + batch = int(value) + + if key in ['-t', '--numofTb']: + numOfTb = int(value) + v_print("numOfTb is %d", numOfTb) + + if key in ['-b', '--numofstB']: + numOfStb = int(value) + v_print("numOfStb is %d", numOfStb) + + if key in ['-r', '--numofRec']: + numOfRec = int(value) + v_print("numOfRec is %d", numOfRec) + + if key in ['-f', '--File']: + fileOut = value + v_print("file is %s", fileOut) + + if key in ['-x', '--insertonLy']: + insertonly = True + v_print("insert only: %d", insertonly) + +# if verbose: +# restful_execute( +# host, +# port, +# user, +# password, +# "SHOW DATABASES") + + if dropDbOnly: + drop_databases() + print("Drop Database done.") + sys.exit(0) + + if queryCmd != "": + print("queryCmd: %s" % queryCmd) + query_data(queryCmd) + sys.exit(0) + + # create databases + if (insertonly == False): + drop_databases() + create_databases() + + if measure: + start_time = time.time() + + # use last database + current_db = "db%d" % (numOfDb - 1) + restful_execute(host, port, user, password, "USE %s" % current_db) + + if numOfStb > 0: + create_stb() + if (autosubtable == False): + create_tb_using_stb() + + insert_data(processes) + + if verbose: + for i in range(0, numOfDb): + for j in range(0, numOfStb): + restful_execute(host, port, user, password, + "SELECT COUNT(*) FROM db%d.st%d" % (i, j,)) + + print("done") + + if measure: + end_time = time.time() + print( + "Total time consumed {} seconds.".format( + (end_time - start_time))) + + sys.exit(0) + + if numOfTb > 0: + create_tb() + insert_data(processes) + + if verbose: + for i in range(0, numOfDb): + restful_execute(host, port, user, password, "USE db%d" % i) + for j in range(0, numOfTb): + restful_execute(host, port, user, password, + "SELECT COUNT(*) FROM tb%d" % (j,)) + + print("done") + if measure: + end_time = time.time() + print( + "Total time consumed {} seconds.".format( + (end_time - start_time))) From 26f7e973a9a2d1829ad8b1118a9801b421d08865 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 1 Feb 2021 18:36:16 +0800 Subject: [PATCH 02/10] [TD-2771] : python version taosdemo. fix --help. --- tests/examples/python/taosdemo/README.md | 51 ++++++++++++---------- tests/examples/python/taosdemo/taosdemo.py | 3 +- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/tests/examples/python/taosdemo/README.md b/tests/examples/python/taosdemo/README.md index 8714bfab2e..f70b4d4618 100644 --- a/tests/examples/python/taosdemo/README.md +++ b/tests/examples/python/taosdemo/README.md @@ -6,27 +6,32 @@ run python version taosdemo === Usage: ./taosdemo.py [OPTION...] - --help Show usage. +Author: Shuduo Sang + + -H, --help Show usage. + + -N, --native flag, Use native interface if set. Default is using RESTful interface. + -h, --host host, The host to connect to TDengine. Default is localhost. + -p, --port port, The TCP/IP port number to use for the connection. Default is 0. + -u, --user user, The user name to use when connecting to the server. Default is 'root'. + -P, --password password, The password to use when connecting to the server. Default is 'taosdata'. + -d, --dbname database, Destination database. Default is 'test'. + -a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5. + -m, --tbname
table_prefix, Table prefix name. Default is 't'. + -M, --supertable flag, Use super table. Default is no + -s, --stbname stable_prefix, STable prefix name. Default is 'st' + -Q, --query query, Execute query command. set 'DEFAULT' means select * from each table + -T, --numOfThreads num_of_threads, The number of threads. Default is 1. + -P, --numOfProcesses num_of_processes, The number of threads. Default is 1. + -r, --numOfRecPerReq num_of_records_per_req, The number of records per request. Default is 1000. + -t, --numOfTb num_of_tables, The number of tables. Default is 1. + -n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1. + -c, --config config_directory, Configuration directory. Default is '/etc/taos/'. + -x, --inserOnly flag, Insert only flag. + -O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order. + -R, --rateOOOO rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50. + -D, --deleteMethod Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database. + -v, --verbose Print verbose output + -g, --debug Print debug output + -y, --skipprompt Skip read key for continous test, default is not skip - -h host, The host to connect to TDengine. Default is localhost. - -p port, The TCP/IP port number to use for the connection. Default is 0. - -u user, The user name to use when connecting to the server. Default is 'root'. - -P password, The password to use when connecting to the server. Default is 'taosdata'. - -d database, Destination database. Default is 'test'. - -a replica, Set the replica parameters of the database, Default 1, min: 1, max: 5. - -m
table_prefix, Table prefix name. Default is 't'. - -M stable, Use super table. - -s stable_prefix, STable prefix name. Default is 'st' - -Q query, Execute query command. set 'DEFAULT' means select * from each table - -T num_of_threads, The number of threads. Default is 10. - -r num_of_records_per_req, The number of records per request. Default is 1000. - -t num_of_tables, The number of tables. Default is 1. - -n num_of_records_per_table, The number of records per table. Default is 1. - -c config_directory, Configuration directory. Default is '/etc/taos/'. - -x flag, Insert only flag. - -O order, Insert mode--0: In order, 1: Out of order. Default is in order. - -R rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50. - -D Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database. - -v Print verbose output - -g Print debug output - -y Skip read key for continous test, default is not skip diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index 3a4254d0ae..7ba1157759 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -350,7 +350,7 @@ if __name__ == "__main__": 'supertable', 'stbname', 'query', 'numOfThreads', 'numOfProcesses', 'numOfRecPerReq', 'numbOfTb', 'numOfRec', 'config', 'insertOnly', 'outOfOrder', 'rateOOOO','deleteMethod', - 'verbose', 'debug', 'skipprompt', '--help' + 'verbose', 'debug', 'skipprompt', 'help' ]) except getopt.GetoptError as err: print('ERROR:', err) @@ -371,6 +371,7 @@ if __name__ == "__main__": print('') print('\t-H, --help Show usage.') + print('') print('\t-N, --native flag, Use native interface if set. Default is using RESTful interface.') print('\t-h, --host host, The host to connect to TDengine. Default is localhost.') From b2b995e255e39277826b8b3f18fcf8cd42befa16 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 1 Feb 2021 20:14:37 +0800 Subject: [PATCH 03/10] [TD-2771] : python version taosdemo. RESTful works. --- tests/examples/python/taosdemo/README.md | 7 +- tests/examples/python/taosdemo/taosdemo.py | 215 +++++++++++++-------- 2 files changed, 141 insertions(+), 81 deletions(-) diff --git a/tests/examples/python/taosdemo/README.md b/tests/examples/python/taosdemo/README.md index f70b4d4618..5ac0e18aff 100644 --- a/tests/examples/python/taosdemo/README.md +++ b/tests/examples/python/taosdemo/README.md @@ -15,15 +15,16 @@ Author: Shuduo Sang -p, --port port, The TCP/IP port number to use for the connection. Default is 0. -u, --user user, The user name to use when connecting to the server. Default is 'root'. -P, --password password, The password to use when connecting to the server. Default is 'taosdata'. + -l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3. -d, --dbname database, Destination database. Default is 'test'. -a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5. -m, --tbname
table_prefix, Table prefix name. Default is 't'. - -M, --supertable flag, Use super table. Default is no + -M, --stable flag, Use super table. Default is no -s, --stbname stable_prefix, STable prefix name. Default is 'st' -Q, --query query, Execute query command. set 'DEFAULT' means select * from each table -T, --numOfThreads num_of_threads, The number of threads. Default is 1. -P, --numOfProcesses num_of_processes, The number of threads. Default is 1. - -r, --numOfRecPerReq num_of_records_per_req, The number of records per request. Default is 1000. + -r, --batch num_of_records_per_req, The number of records per request. Default is 1000. -t, --numOfTb num_of_tables, The number of tables. Default is 1. -n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1. -c, --config config_directory, Configuration directory. Default is '/etc/taos/'. @@ -33,5 +34,5 @@ Author: Shuduo Sang -D, --deleteMethod Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database. -v, --verbose Print verbose output -g, --debug Print debug output - -y, --skipprompt Skip read key for continous test, default is not skip + -y, --skipPrompt Skip read key for continous test, default is not skip diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index 7ba1157759..c54076cb15 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -75,7 +75,7 @@ def v_print(msg: str, arg1: int, arg2: int, arg3: int, arg4: int): def restful_execute(host: str, port: int, user: str, password: str, cmd: str): - url = "http://%s:%d/rest/sql" % (host, port) + url = "http://%s:%d/rest/sql" % (host, restPort) if verbose: v_print("cmd: %s", cmd) @@ -252,15 +252,8 @@ def insert_func(process: int, thread: int): if measure: exec_start_time = datetime.datetime.now() - if oneMoreHost != "NotSupported" and random.randint( - 0, 1) == 1: - v_print("%s", "Send to second host") - restful_execute( - oneMoreHost, port, user, password, cmd) - else: - v_print("%s", "Send to first host") - restful_execute( - host, port, user, password, cmd) + restful_execute( + host, port, user, password, cmd) if measure: exec_end_time = datetime.datetime.now() @@ -319,38 +312,84 @@ def insert_data_process(i: int, begin: int, end: int): end)] wait(workers, return_when=ALL_COMPLETED) +def printConfig(): + + print("###################################################################"); + print("# Use native interface: %s" % native); + print("# Server IP: %s" % host); + if native: + print("# Server port: %s" % port); + else: + print("# Server port: %s" % restPort); + + print("# User: %s" % user); + print("# Password: %s" % password); + print("# Number of Columns per record: %s" % colsPerRecord); + print("# Number of Threads: %s" % threads); + print("# Number of Processes: %s" % processes); + print("# Number of Tables: %s" % numOfTb); + print("# Number of records per Table: %s" % numOfRec); + print("# Records/Request: %s" % batch); + print("# Database name: %s" % dbName); + print("# Replica: %s" % replica); + print("# Use STable: %s" % useStable); + print("# Table prefix: %s" % tbNamePrefix); + if useStable: + print("# STable prefix: %s" % stbNamePrefix); + print("# Data order: %s" % outOfOrder); + print("# Data out of order rate: %s" % rateOOOO); + print("# Delete method: %s" % deleteMethod); + print("# Query command: %s" % queryCmd); + print("# Insert Only: %s" % insertOnly); + print("# Verbose output %s" % verbose); + print("# Test time: %s" % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")) + print("###################################################################"); + if __name__ == "__main__": + native = False verbose = False - measure = False + measure = True dropDbOnly = False + colsPerRecord = 3 numOfDb = 1 + dbName = "test" + replica = 1 batch = 1 numOfTb = 1 + tbNamePrefix = "tb" + useStable = False numOfStb = 0 + stbNamePrefix = "stb" numOfRec = 10 ieration = 1 host = "127.0.0.1" + configDir = "/etc/taos" oneMoreHost = "NotSupported" - port = 6041 + port = 6030 + restPort = 6041 user = "root" defaultPass = "taosdata" processes = 1 threads = 1 - insertonly = False + insertOnly = False autosubtable = False - queryCmd = "" + queryCmd = "select * from " + outOfOrder = 0 + rateOOOO = 0 + deleteMethod = 0 + skipPrompt = False try: opts, args = getopt.gnu_getopt(sys.argv[1:], - 'Nh:p:u:P:d:a:m:Ms:Q:T:P:r:t:n:c:xOR:D:vgyH', + 'Nh:p:u:P:d:a:m:Ms:Q:T:P:r:l:t:n:c:xOR:D:vgyH', [ 'native', 'host', 'port', 'user', 'password', 'dbname', 'replica', 'tbname', - 'supertable', 'stbname', 'query', 'numOfThreads', 'numOfProcesses', - 'numOfRecPerReq', 'numbOfTb', 'numOfRec', 'config', + 'stable', 'stbname', 'query', 'numOfThreads', 'numOfProcesses', + 'recPerReq', 'colsPerRecord', 'numOfTb', 'numOfRec', 'config', 'insertOnly', 'outOfOrder', 'rateOOOO','deleteMethod', - 'verbose', 'debug', 'skipprompt', 'help' + 'verbose', 'debug', 'skipPrompt', 'help' ]) except getopt.GetoptError as err: print('ERROR:', err) @@ -378,15 +417,16 @@ if __name__ == "__main__": print('\t-p, --port port, The TCP/IP port number to use for the connection. Default is 0.') print('\t-u, --user user, The user name to use when connecting to the server. Default is \'root\'.') print('\t-P, --password password, The password to use when connecting to the server. Default is \'taosdata\'.') + print('\t-l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3.') print('\t-d, --dbname database, Destination database. Default is \'test\'.') print('\t-a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.') print('\t-m, --tbname
table_prefix, Table prefix name. Default is \'t\'.') - print('\t-M, --supertable flag, Use super table. Default is no') + print('\t-M, --stable flag, Use super table. Default is no') print('\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') print('\t-Q, --query query, Execute query command. set \'DEFAULT\' means select * from each table') print('\t-T, --numOfThreads num_of_threads, The number of threads. Default is 1.') print('\t-P, --numOfProcesses num_of_processes, The number of threads. Default is 1.') - print('\t-r, --numOfRecPerReq num_of_records_per_req, The number of records per request. Default is 1000.') + print('\t-r, --batch num_of_records_per_req, The number of records per request. Default is 1000.') print('\t-t, --numOfTb num_of_tables, The number of tables. Default is 1.') print('\t-n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1.') print('\t-c, --config config_directory, Configuration directory. Default is \'/etc/taos/\'.') @@ -396,104 +436,118 @@ if __name__ == "__main__": print('\t-D, --deleteMethod Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.') print('\t-v, --verbose Print verbose output') print('\t-g, --debug Print debug output') - print('\t-y, --skipprompt Skip read key for continous test, default is not skip') + print('\t-y, --skipPrompt Skip read key for continous test, default is not skip') print('') sys.exit(0) - if key in ['-s', '--hoSt']: + if key in ['-N', '--native']: + try: + import taos + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + native = True + + if key in ['-h', '--host']: host = value - if key in ['-m', '--one-More-host']: - oneMoreHost = value - - if key in ['-o', '--pOrt']: + if key in ['-p', '--port']: port = int(value) - if key in ['-u', '--User']: + if key in ['-u', '--user']: user = value - if key in ['-w', '--passWord']: + if key in ['-P', '--password']: password = value else: password = defaultPass - if key in ['-v', '--Verbose']: - verbose = True + if key in ['-d', '--dbname']: + dbName = value - if key in ['-A', '--Autosubtable']: - autosubtable = True - - if key in ['-M', '--Measure']: - measure = True - - if key in ['-P', '--Processes']: - processes = int(value) - if processes < 1: - print("FATAL: number of processes must be larger than 0") + if key in ['-a', '--replica']: + replica = int(value) + if replica < 1: + print("FATAL: number of replica need > 0") sys.exit(1) - if key in ['-T', '--Threads']: + if key in ['-m', '--tbname']: + tbNamePrefix = value + + if key in ['-M', '--stable']: + useStable = True + numOfStb = 1 + + if key in ['-s', '--stbname']: + stbNamePrefix = value + + if key in ['-Q', '--query']: + queryCmd = str(value) + + if key in ['-T', '--numOfThreads']: threads = int(value) if threads < 1: print("FATAL: number of threads must be larger than 0") sys.exit(1) - if key in ['-q', '--Query']: - queryCmd = str(value) - - if key in ['-p', '--droPdbonly']: - dropDbOnly = True - - if key in ['-d', '--numofDb']: - numOfDb = int(value) - v_print("numOfDb is %d", numOfDb) - if (numOfdb <= 0): - print("ERROR: wrong number of database given!") + if key in ['-P', '--numOfProcesses']: + processes = int(value) + if processes < 1: + print("FATAL: number of processes must be larger than 0") sys.exit(1) - if key in ['-c', '--batCh']: + if key in ['-r', '--batch']: batch = int(value) - if key in ['-t', '--numofTb']: + if key in ['-l', '--colsPerRec']: + colsPerRec = int(value) + + if key in ['-t', '--numOfTb']: numOfTb = int(value) v_print("numOfTb is %d", numOfTb) - if key in ['-b', '--numofstB']: - numOfStb = int(value) - v_print("numOfStb is %d", numOfStb) - - if key in ['-r', '--numofRec']: + if key in ['-n', '--numOfRec']: numOfRec = int(value) v_print("numOfRec is %d", numOfRec) - if key in ['-f', '--File']: - fileOut = value - v_print("file is %s", fileOut) - if key in ['-x', '--insertonLy']: - insertonly = True - v_print("insert only: %d", insertonly) + insertOnly = True + v_print("insert only: %d", insertOnly) -# if verbose: -# restful_execute( -# host, -# port, -# user, -# password, -# "SHOW DATABASES") + if key in ['-O', '--outOfOrder']: + outOfOrder = int(value) + v_print("out of order is %d", outOfOrder) + + if key in ['-R', '--rateOOOO']: + rateOOOO = int(value) + v_print("the rate of out of order is %d", rateOOOO) + + if key in ['-D', '--deleteMethod']: + deleteMethod = int(value) + v_print("the delete method is %d", deleteMethod) + + if key in ['-v', '--verbose']: + verbose = True + + if key in ['-g', '--debug']: + debug = True + + if key in ['-y', '--skipPrompt']: + skipPrompt = True + + if verbose: + printConfig() + + if skipPrompt == False: + input("Press any key to continue..") if dropDbOnly: drop_databases() print("Drop Database done.") sys.exit(0) - if queryCmd != "": - print("queryCmd: %s" % queryCmd) - query_data(queryCmd) - sys.exit(0) - # create databases - if (insertonly == False): + if (insertOnly == False): drop_databases() create_databases() @@ -538,6 +592,11 @@ if __name__ == "__main__": restful_execute(host, port, user, password, "SELECT COUNT(*) FROM tb%d" % (j,)) + if queryCmd != "": + print("queryCmd: %s" % queryCmd) + query_data(queryCmd) + sys.exit(0) + print("done") if measure: end_time = time.time() From e66f2dbde5653f6e2cf6a497996ff0e33d3e66c7 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 2 Feb 2021 19:23:43 +0800 Subject: [PATCH 04/10] [TD-2771] : python version taosdemo. add native interface --- tests/examples/python/taosdemo/taosdemo.py | 296 +++++++++++++++------ 1 file changed, 213 insertions(+), 83 deletions(-) diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index c54076cb15..aac19d3f1d 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -32,6 +32,12 @@ def v_print(msg: str, arg: str): print(msg % arg) +@dispatch(str, str, str, str, str) +def v_print(msg: str, arg1: str, arg2: str, arg3: str, arg4: str): + if verbose: + print(msg % (arg1, arg2, arg3, arg4)) + + @dispatch(str, int) def v_print(msg: str, arg: int): if verbose: @@ -100,12 +106,18 @@ def query_func(process: int, thread: int, cmd: str): if oneMoreHost != "NotSupported" and random.randint( 0, 1) == 1: v_print("%s", "Send to second host") - restful_execute( - oneMoreHost, port, user, password, cmd) + if native: + cursor2.execute(cmd) + else: + restful_execute( + oneMoreHost, port, user, password, cmd) else: v_print("%s", "Send to first host") - restful_execute( - host, port, user, password, cmd) + if native: + cursor.execute(cmd) + else: + restful_execute( + host, port, user, password, cmd) def query_data_process(i: int, cmd: str): @@ -171,25 +183,56 @@ def insert_data(processes: int): def create_stb(): for i in range(0, numOfStb): - restful_execute( - host, - port, - user, - password, - "CREATE TABLE IF NOT EXISTS st%d (ts timestamp, value float) TAGS (uuid binary(50))" % - i) + if native: + cursor.execute( + "CREATE TABLE IF NOT EXISTS %s%d (ts timestamp, value float) TAGS (uuid binary(50))" % + (stbName, i)) + else: + restful_execute( + host, + port, + user, + password, + "CREATE TABLE IF NOT EXISTS %s%d (ts timestamp, value float) TAGS (uuid binary(50))" % + (stbName, i) + ) + + +def use_database(): + current_db = "%s%d" % (dbName, (numOfDb - 1)) + + if native: + cursor.execute("USE %s" % current_db) + else: + restful_execute(host, port, user, password, "USE %s" % current_db) def create_databases(): for i in range(0, numOfDb): v_print("will create database db%d", int(i)) - restful_execute( - host, - port, - user, - password, - "CREATE DATABASE IF NOT EXISTS db%d" % - i) + + if native: + cursor.execute( + "CREATE DATABASE IF NOT EXISTS %s%d" % (dbName, i)) + else: + restful_execute( + host, + port, + user, + password, + "CREATE DATABASE IF NOT EXISTS %s%d" % (dbName, i)) + + +def drop_tables(): + # TODO + v_print("TODO: drop tables total %d", numOfTb) + pass + + +def drop_stable(): + # TODO + v_print("TODO: drop stables total %d", numOfStb) + pass def drop_databases(): @@ -198,13 +241,19 @@ def drop_databases(): # drop exist databases first for i in range(0, numOfDb): v_print("will drop database db%d", int(i)) - restful_execute( - host, - port, - user, - password, - "DROP DATABASE IF EXISTS db%d" % - i) + + if native: + cursor.execute( + "DROP DATABASE IF EXISTS %s%d" % + (dbName, i)) + else: + restful_execute( + host, + port, + user, + password, + "DROP DATABASE IF EXISTS %s%d" % + (dbName, i)) def insert_func(process: int, thread: int): @@ -252,8 +301,11 @@ def insert_func(process: int, thread: int): if measure: exec_start_time = datetime.datetime.now() - restful_execute( - host, port, user, password, cmd) + if native: + cursor.execute(cmd) + else: + restful_execute( + host, port, user, password, cmd) if measure: exec_end_time = datetime.datetime.now() @@ -274,15 +326,26 @@ def create_tb_using_stb(): def create_tb(): v_print("create_tb() numOfTb: %d", numOfTb) for i in range(0, numOfDb): - restful_execute(host, port, user, password, "USE db%d" % i) - for j in range(0, numOfTb): + if native: + cursor.execute("USE %s%d" % (dbName, i)) + else: restful_execute( - host, - port, - user, - password, - "CREATE TABLE tb%d (ts timestamp, value float)" % - j) + host, port, user, password, "USE %s%d" % + (dbName, i)) + + for j in range(0, numOfTb): + if native: + cursor.execute( + "CREATE TABLE %s%d (ts timestamp, value float)" % + (tbName, j)) + else: + restful_execute( + host, + port, + user, + password, + "CREATE TABLE %s%d (ts timestamp, value float)" % + (tbName, j)) def insert_data_process(i: int, begin: int, end: int): @@ -312,38 +375,42 @@ def insert_data_process(i: int, begin: int, end: int): end)] wait(workers, return_when=ALL_COMPLETED) + def printConfig(): - print("###################################################################"); - print("# Use native interface: %s" % native); - print("# Server IP: %s" % host); + print("###################################################################") + print("# Use native interface: %s" % native) + print("# Server IP: %s" % host) if native: - print("# Server port: %s" % port); + print("# Server port: %s" % port) else: - print("# Server port: %s" % restPort); + print("# Server port: %s" % restPort) - print("# User: %s" % user); - print("# Password: %s" % password); - print("# Number of Columns per record: %s" % colsPerRecord); - print("# Number of Threads: %s" % threads); - print("# Number of Processes: %s" % processes); - print("# Number of Tables: %s" % numOfTb); - print("# Number of records per Table: %s" % numOfRec); - print("# Records/Request: %s" % batch); - print("# Database name: %s" % dbName); - print("# Replica: %s" % replica); - print("# Use STable: %s" % useStable); - print("# Table prefix: %s" % tbNamePrefix); + print("# Configuration Dir: %s" % configDir) + print("# User: %s" % user) + print("# Password: %s" % password) + print("# Number of Columns per record: %s" % colsPerRecord) + print("# Number of Threads: %s" % threads) + print("# Number of Processes: %s" % processes) + print("# Number of Tables: %s" % numOfTb) + print("# Number of records per Table: %s" % numOfRec) + print("# Records/Request: %s" % batch) + print("# Database name: %s" % dbName) + print("# Replica: %s" % replica) + print("# Use STable: %s" % useStable) + print("# Table prefix: %s" % tbNamePrefix) if useStable: - print("# STable prefix: %s" % stbNamePrefix); - print("# Data order: %s" % outOfOrder); - print("# Data out of order rate: %s" % rateOOOO); - print("# Delete method: %s" % deleteMethod); - print("# Query command: %s" % queryCmd); - print("# Insert Only: %s" % insertOnly); - print("# Verbose output %s" % verbose); - print("# Test time: %s" % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")) - print("###################################################################"); + print("# STable prefix: %s" % stbNamePrefix) + + print("# Data order: %s" % outOfOrder) + print("# Data out of order rate: %s" % rateOOOO) + print("# Delete method: %s" % deleteMethod) + print("# Query command: %s" % queryCmd) + print("# Insert Only: %s" % insertOnly) + print("# Verbose output %s" % verbose) + print("# Test time: %s" % + datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")) + print("###################################################################") if __name__ == "__main__": @@ -388,7 +455,7 @@ if __name__ == "__main__": 'native', 'host', 'port', 'user', 'password', 'dbname', 'replica', 'tbname', 'stable', 'stbname', 'query', 'numOfThreads', 'numOfProcesses', 'recPerReq', 'colsPerRecord', 'numOfTb', 'numOfRec', 'config', - 'insertOnly', 'outOfOrder', 'rateOOOO','deleteMethod', + 'insertOnly', 'outOfOrder', 'rateOOOO', 'deleteMethod', 'verbose', 'debug', 'skipPrompt', 'help' ]) except getopt.GetoptError as err: @@ -418,16 +485,23 @@ if __name__ == "__main__": print('\t-u, --user user, The user name to use when connecting to the server. Default is \'root\'.') print('\t-P, --password password, The password to use when connecting to the server. Default is \'taosdata\'.') print('\t-l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3.') - print('\t-d, --dbname database, Destination database. Default is \'test\'.') + print( + '\t-d, --dbname database, Destination database. Default is \'test\'.') print('\t-a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.') - print('\t-m, --tbname
table_prefix, Table prefix name. Default is \'t\'.') - print('\t-M, --stable flag, Use super table. Default is no') - print('\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') + print( + '\t-m, --tbname
table_prefix, Table prefix name. Default is \'t\'.') + print( + '\t-M, --stable flag, Use super table. Default is no') + print( + '\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') print('\t-Q, --query query, Execute query command. set \'DEFAULT\' means select * from each table') - print('\t-T, --numOfThreads num_of_threads, The number of threads. Default is 1.') - print('\t-P, --numOfProcesses num_of_processes, The number of threads. Default is 1.') + print( + '\t-T, --numOfThreads num_of_threads, The number of threads. Default is 1.') + print( + '\t-P, --numOfProcesses num_of_processes, The number of threads. Default is 1.') print('\t-r, --batch num_of_records_per_req, The number of records per request. Default is 1000.') - print('\t-t, --numOfTb num_of_tables, The number of tables. Default is 1.') + print( + '\t-t, --numOfTb num_of_tables, The number of tables. Default is 1.') print('\t-n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1.') print('\t-c, --config config_directory, Configuration directory. Default is \'/etc/taos/\'.') print('\t-x, --inserOnly flag, Insert only flag.') @@ -436,7 +510,8 @@ if __name__ == "__main__": print('\t-D, --deleteMethod Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.') print('\t-v, --verbose Print verbose output') print('\t-g, --debug Print debug output') - print('\t-y, --skipPrompt Skip read key for continous test, default is not skip') + print( + '\t-y, --skipPrompt Skip read key for continous test, default is not skip') print('') sys.exit(0) @@ -510,7 +585,11 @@ if __name__ == "__main__": numOfRec = int(value) v_print("numOfRec is %d", numOfRec) - if key in ['-x', '--insertonLy']: + if key in ['-c', '--config']: + configDir = value + v_print("config dir: %s", configDir) + + if key in ['-x', '--insertOnly']: insertOnly = True v_print("insert only: %d", insertOnly) @@ -523,7 +602,12 @@ if __name__ == "__main__": v_print("the rate of out of order is %d", rateOOOO) if key in ['-D', '--deleteMethod']: - deleteMethod = int(value) + deleteMethod = int(value) + if (deleteMethod < 0) or (deleteMethod > 3): + print( + "inputed delete method is %d, valid value is 0~3, set to default 0" % + deleteMethod) + deleteMethod = 0 v_print("the delete method is %d", deleteMethod) if key in ['-v', '--verbose']: @@ -538,25 +622,54 @@ if __name__ == "__main__": if verbose: printConfig() - if skipPrompt == False: + if not skipPrompt: input("Press any key to continue..") - if dropDbOnly: - drop_databases() - print("Drop Database done.") + if native: + v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) + try: + conn = taos.connect( + host=host, + user=user, + password=password, + config=configDir) + print("conn: %p" % conn) + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + + if native: + try: + cursor = conn.cursor() + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + + + + if deleteMethod > 0: + if deleteMethod == 1: + drop_tables() + print("Drop tables done.") + elif deleteMethod == 2: + drop_stables() + print("Drop super tables done.") + elif deleteMethod == 3: + drop_databases() + print("Drop Database done.") sys.exit(0) # create databases if (insertOnly == False): drop_databases() + create_databases() if measure: start_time = time.time() # use last database - current_db = "db%d" % (numOfDb - 1) - restful_execute(host, port, user, password, "USE %s" % current_db) + use_database() if numOfStb > 0: create_stb() @@ -568,8 +681,14 @@ if __name__ == "__main__": if verbose: for i in range(0, numOfDb): for j in range(0, numOfStb): - restful_execute(host, port, user, password, - "SELECT COUNT(*) FROM db%d.st%d" % (i, j,)) + if native: + cursor.execute( + "SELECT COUNT(*) FROM %s%d.%s%d" % + (dbName, i, stbName, j,)) + else: + restful_execute( + host, port, user, password, "SELECT COUNT(*) FROM %s%d.%s%d" % + (dbName, i, stbName, j,)) print("done") @@ -587,10 +706,21 @@ if __name__ == "__main__": if verbose: for i in range(0, numOfDb): - restful_execute(host, port, user, password, "USE db%d" % i) + if native: + cursor.execute("USE %s%d" % (dbName, i)) + else: + restful_execute( + host, port, user, password, "USE %s%d" % + (dbName, i)) + for j in range(0, numOfTb): - restful_execute(host, port, user, password, - "SELECT COUNT(*) FROM tb%d" % (j,)) + if native: + cursor.execute( + "SELECT COUNT(*) FROM %s%d" % (tbName, j)) + else: + restful_execute( + host, port, user, password, "SELECT COUNT(*) FROM %s%d" % + (tbName, j)) if queryCmd != "": print("queryCmd: %s" % queryCmd) From cbb534c24b5f665348fb4f3fa21c36e0c74fbb62 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 2 Feb 2021 12:25:32 +0000 Subject: [PATCH 05/10] [TD-2771] : python taosdemo, natvie interface. --- tests/examples/python/taosdemo/taosdemo.py | 47 +++++++++++++++------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index aac19d3f1d..fc2195e7ab 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -32,6 +32,18 @@ def v_print(msg: str, arg: str): print(msg % arg) +@dispatch(str, str, str) +def v_print(msg: str, arg1: str, arg2: str): + if verbose: + print(msg % (arg1, arg2)) + + +@dispatch(str, str, str, str) +def v_print(msg: str, arg1: str, arg2: str, arg3: str): + if verbose: + print(msg % (arg1, arg2, arg3)) + + @dispatch(str, str, str, str, str) def v_print(msg: str, arg1: str, arg2: str, arg3: str, arg4: str): if verbose: @@ -83,8 +95,7 @@ def v_print(msg: str, arg1: int, arg2: int, arg3: int, arg4: int): def restful_execute(host: str, port: int, user: str, password: str, cmd: str): url = "http://%s:%d/rest/sql" % (host, restPort) - if verbose: - v_print("cmd: %s", cmd) + v_print("restful_execute - cmd: %s", cmd) resp = requests.post(url, cmd, auth=(user, password)) @@ -103,6 +114,7 @@ def restful_execute(host: str, port: int, user: str, password: str, cmd: str): def query_func(process: int, thread: int, cmd: str): v_print("%d process %d thread cmd: %s", process, thread, cmd) + if oneMoreHost != "NotSupported" and random.randint( 0, 1) == 1: v_print("%s", "Send to second host") @@ -140,6 +152,7 @@ def query_data_process(i: int, cmd: str): def query_data(cmd: str): v_print("query_data processes: %d, cmd: %s", processes, cmd) + pool = Pool(processes) for i in range(processes): pool.apply_async(query_data_process, args=(i, cmd)) @@ -199,7 +212,6 @@ def create_stb(): def use_database(): - current_db = "%s%d" % (dbName, (numOfDb - 1)) if native: cursor.execute("USE %s" % current_db) @@ -272,14 +284,14 @@ def insert_func(process: int, thread: int): sqlCmd = ['INSERT INTO '] try: sqlCmd.append( - "%s.tb%s " % (current_db, thread)) + "%s.%s%d " % (current_db, tbName, thread)) if (numOfStb > 0 and autosubtable): - sqlCmd.append("USING %s.st%d TAGS('%s') " % - (current_db, numOfStb - 1, uuid)) + sqlCmd.append("USING %s.%s%d TAGS('%s') " % + (current_db, stbName, numOfStb - 1, uuid)) start_time = datetime.datetime( - 2020, 9, 25) + datetime.timedelta(seconds=row) + 2021, 1, 25) + datetime.timedelta(seconds=row) sqlCmd.append("VALUES ") for batchIter in range(0, batch): @@ -302,7 +314,10 @@ def insert_func(process: int, thread: int): exec_start_time = datetime.datetime.now() if native: - cursor.execute(cmd) + v_print("insert_func - cursor:%x cmd:%s", hex(id(cursor)), cmd) + cursor.execute("SHOW DATABASES" ) +# cursor.execute("%s" % cmd) + v_print("insert_func - cursor:%x cmd:%s done", hex(id(cursor)), cmd) else: restful_execute( host, port, user, password, cmd) @@ -398,9 +413,9 @@ def printConfig(): print("# Database name: %s" % dbName) print("# Replica: %s" % replica) print("# Use STable: %s" % useStable) - print("# Table prefix: %s" % tbNamePrefix) + print("# Table prefix: %s" % tbName) if useStable: - print("# STable prefix: %s" % stbNamePrefix) + print("# STable prefix: %s" % stbName) print("# Data order: %s" % outOfOrder) print("# Data out of order rate: %s" % rateOOOO) @@ -425,10 +440,10 @@ if __name__ == "__main__": replica = 1 batch = 1 numOfTb = 1 - tbNamePrefix = "tb" + tbName = "tb" useStable = False numOfStb = 0 - stbNamePrefix = "stb" + stbName = "stb" numOfRec = 10 ieration = 1 host = "127.0.0.1" @@ -547,14 +562,14 @@ if __name__ == "__main__": sys.exit(1) if key in ['-m', '--tbname']: - tbNamePrefix = value + tbName = value if key in ['-M', '--stable']: useStable = True numOfStb = 1 if key in ['-s', '--stbname']: - stbNamePrefix = value + stbName = value if key in ['-Q', '--query']: queryCmd = str(value) @@ -633,7 +648,7 @@ if __name__ == "__main__": user=user, password=password, config=configDir) - print("conn: %p" % conn) + print("conn: %s" % str(conn.__class__)) except Exception as e: print("Error: %s" % e.args[0]) sys.exit(1) @@ -641,6 +656,7 @@ if __name__ == "__main__": if native: try: cursor = conn.cursor() + print("cursor:%d %s" % (id(cursor), str(cursor.__class__))) except Exception as e: print("Error: %s" % e.args[0]) sys.exit(1) @@ -669,6 +685,7 @@ if __name__ == "__main__": start_time = time.time() # use last database + current_db = "%s%d" % (dbName, (numOfDb - 1)) use_database() if numOfStb > 0: From 13a6421698f28d747b6451b4610d3669d33d0006 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 5 Feb 2021 10:48:25 +0000 Subject: [PATCH 06/10] [TD-2771] feature: python taosdemo. add lock. --- tests/examples/python/taosdemo/README.md | 52 ++++---- tests/examples/python/taosdemo/taosdemo.py | 137 +++++++++++---------- 2 files changed, 101 insertions(+), 88 deletions(-) diff --git a/tests/examples/python/taosdemo/README.md b/tests/examples/python/taosdemo/README.md index 5ac0e18aff..a7920b8541 100644 --- a/tests/examples/python/taosdemo/README.md +++ b/tests/examples/python/taosdemo/README.md @@ -8,31 +8,31 @@ Usage: ./taosdemo.py [OPTION...] Author: Shuduo Sang - -H, --help Show usage. + -H, --help Show usage. - -N, --native flag, Use native interface if set. Default is using RESTful interface. - -h, --host host, The host to connect to TDengine. Default is localhost. - -p, --port port, The TCP/IP port number to use for the connection. Default is 0. - -u, --user user, The user name to use when connecting to the server. Default is 'root'. - -P, --password password, The password to use when connecting to the server. Default is 'taosdata'. - -l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3. - -d, --dbname database, Destination database. Default is 'test'. - -a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5. - -m, --tbname
table_prefix, Table prefix name. Default is 't'. - -M, --stable flag, Use super table. Default is no - -s, --stbname stable_prefix, STable prefix name. Default is 'st' - -Q, --query query, Execute query command. set 'DEFAULT' means select * from each table - -T, --numOfThreads num_of_threads, The number of threads. Default is 1. - -P, --numOfProcesses num_of_processes, The number of threads. Default is 1. - -r, --batch num_of_records_per_req, The number of records per request. Default is 1000. - -t, --numOfTb num_of_tables, The number of tables. Default is 1. - -n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1. - -c, --config config_directory, Configuration directory. Default is '/etc/taos/'. - -x, --inserOnly flag, Insert only flag. - -O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order. - -R, --rateOOOO rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50. - -D, --deleteMethod Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database. - -v, --verbose Print verbose output - -g, --debug Print debug output - -y, --skipPrompt Skip read key for continous test, default is not skip + -N, --native flag, Use native interface if set. Default is using RESTful interface. + -h, --host host, The host to connect to TDengine. Default is localhost. + -p, --port port, The TCP/IP port number to use for the connection. Default is 0. + -u, --user user, The user name to use when connecting to the server. Default is 'root'. + -P, --password password, The password to use when connecting to the server. Default is 'taosdata'. + -l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3. + -d, --dbname database, Destination database. Default is 'test'. + -a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5. + -m, --tbname
table_prefix, Table prefix name. Default is 't'. + -M, --stable flag, Use super table. Default is no + -s, --stbname stable_prefix, STable prefix name. Default is 'st' + -Q, --query query, Execute query command. set 'DEFAULT' means select * from each table + -T, --threads num_of_threads, The number of threads. Default is 1. + -C, --processes num_of_processes, The number of threads. Default is 1. + -r, --batch num_of_records_per_req, The number of records per request. Default is 1000. + -t, --numOfTb num_of_tables, The number of tables. Default is 1. + -n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1. + -c, --config config_directory, Configuration directory. Default is '/etc/taos/'. + -x, --inserOnly flag, Insert only flag. + -O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order. + -R, --rateOOOO rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50. + -D, --deleteMethod Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database. + -v, --verbose Print verbose output + -g, --debug Print debug output + -y, --skipPrompt Skip read key for continous test, default is not skip diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index fc2195e7ab..320273ba98 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -21,7 +21,7 @@ import json import random import time import datetime -from multiprocessing import Process, Pool +from multiprocessing import Process, Pool, Lock from multipledispatch import dispatch from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED @@ -124,28 +124,22 @@ def query_func(process: int, thread: int, cmd: str): restful_execute( oneMoreHost, port, user, password, cmd) else: - v_print("%s", "Send to first host") + v_print("%s%s%s", "Send ", cmd, " to the host") if native: - cursor.execute(cmd) + pass +# cursor.execute(cmd) else: restful_execute( host, port, user, password, cmd) -def query_data_process(i: int, cmd: str): +def query_data_process(q_lock, i: int, cmd: str): + time.sleep(0.01) v_print("Process:%d threads: %d cmd: %s", i, threads, cmd) - with ThreadPoolExecutor(max_workers=threads) as executor: - workers = [ - executor.submit( - query_func, - i, - j, - cmd) for j in range( - 0, - threads)] - - wait(workers, return_when=ALL_COMPLETED) + q_lock.aquire() + cursor_p.execute(cmd) + q_lock.release() return i @@ -153,15 +147,18 @@ def query_data_process(i: int, cmd: str): def query_data(cmd: str): v_print("query_data processes: %d, cmd: %s", processes, cmd) + q_lock = Lock() + pool = Pool(processes) for i in range(processes): - pool.apply_async(query_data_process, args=(i, cmd)) - time.sleep(1) + pool.apply_async(query_data_process, args=(q_lock, i, cmd)) +# time.sleep(1) pool.close() pool.join() def insert_data(processes: int): + i_lock = Lock() pool = Pool(processes) begin = 0 @@ -179,6 +176,7 @@ def insert_data(processes: int): quotient, remainder) + print("CBD LN210 processes:%d" % processes) for i in range(processes): begin = end @@ -187,8 +185,8 @@ def insert_data(processes: int): else: end = begin + quotient - v_print("Process %d from %d to %d", i, begin, end) - pool.apply_async(insert_data_process, args=(i, begin, end)) + v_print("insert_data Process %d from %d to %d", i, begin, end) + pool.apply_async(insert_data_process, args=(i_lock, i, begin, end)) pool.close() pool.join() @@ -295,10 +293,11 @@ def insert_func(process: int, thread: int): sqlCmd.append("VALUES ") for batchIter in range(0, batch): - sqlCmd.append("('%s', %f) " % - (start_time + - datetime.timedelta( - milliseconds=batchIter), + sqlCmd.append("(now, %f) " % + ( +# start_time + +# datetime.timedelta( +# milliseconds=batchIter), random.random())) row = row + 1 if row >= numOfRec: @@ -310,18 +309,23 @@ def insert_func(process: int, thread: int): cmd = ' '.join(sqlCmd) + print("CBD: LN313") if measure: exec_start_time = datetime.datetime.now() + print("CBD: LN316 native: %d" % native) if native: - v_print("insert_func - cursor:%x cmd:%s", hex(id(cursor)), cmd) - cursor.execute("SHOW DATABASES" ) -# cursor.execute("%s" % cmd) - v_print("insert_func - cursor:%x cmd:%s done", hex(id(cursor)), cmd) + print("CBD: LN319: %s" % cmd) + print("conn: %s" % str(conn.__class__)) + print("CBD: LN320 cursor:%d %s" % (id(cursor), str(cursor.__class__))) +# cursor.execute("SHOW DATABASES" ) + affectedRows = cursor.execute(cmd) + print("CBD: LN323 affectedRows:%d" % affectedRows) else: restful_execute( host, port, user, password, cmd) + print("CBD: LN327") if measure: exec_end_time = datetime.datetime.now() exec_delta = exec_end_time - exec_start_time @@ -363,9 +367,11 @@ def create_tb(): (tbName, j)) -def insert_data_process(i: int, begin: int, end: int): +def insert_data_process(lock, i: int, begin: int, end: int): + print("CBD insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) + time.sleep(1) tasks = end - begin - v_print("Process:%d table from %d to %d, tasks %d", i, begin, end, tasks) + i_lock.aquire() if (threads < (end - begin)): for j in range(begin, end, threads): @@ -389,6 +395,7 @@ def insert_data_process(i: int, begin: int, end: int): begin, end)] wait(workers, return_when=ALL_COMPLETED) + i_lock.release() def printConfig(): @@ -457,7 +464,7 @@ if __name__ == "__main__": threads = 1 insertOnly = False autosubtable = False - queryCmd = "select * from " + queryCmd = "DEFAULT" outOfOrder = 0 rateOOOO = 0 deleteMethod = 0 @@ -465,10 +472,10 @@ if __name__ == "__main__": try: opts, args = getopt.gnu_getopt(sys.argv[1:], - 'Nh:p:u:P:d:a:m:Ms:Q:T:P:r:l:t:n:c:xOR:D:vgyH', + 'Nh:p:u:P:d:a:m:Ms:Q:T:C:r:l:t:n:c:xOR:D:vgyH', [ 'native', 'host', 'port', 'user', 'password', 'dbname', 'replica', 'tbname', - 'stable', 'stbname', 'query', 'numOfThreads', 'numOfProcesses', + 'stable', 'stbname', 'query', 'threads', 'processes', 'recPerReq', 'colsPerRecord', 'numOfTb', 'numOfRec', 'config', 'insertOnly', 'outOfOrder', 'rateOOOO', 'deleteMethod', 'verbose', 'debug', 'skipPrompt', 'help' @@ -491,42 +498,42 @@ if __name__ == "__main__": print('Author: Shuduo Sang ') print('') - print('\t-H, --help Show usage.') + print('\t-H, --help Show usage.') print('') - print('\t-N, --native flag, Use native interface if set. Default is using RESTful interface.') - print('\t-h, --host host, The host to connect to TDengine. Default is localhost.') - print('\t-p, --port port, The TCP/IP port number to use for the connection. Default is 0.') - print('\t-u, --user user, The user name to use when connecting to the server. Default is \'root\'.') - print('\t-P, --password password, The password to use when connecting to the server. Default is \'taosdata\'.') - print('\t-l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3.') + print('\t-N, --native flag, Use native interface if set. Default is using RESTful interface.') + print('\t-h, --host host, The host to connect to TDengine. Default is localhost.') + print('\t-p, --port port, The TCP/IP port number to use for the connection. Default is 0.') + print('\t-u, --user user, The user name to use when connecting to the server. Default is \'root\'.') + print('\t-P, --password password, The password to use when connecting to the server. Default is \'taosdata\'.') + print('\t-l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3.') print( - '\t-d, --dbname database, Destination database. Default is \'test\'.') - print('\t-a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.') + '\t-d, --dbname database, Destination database. Default is \'test\'.') + print('\t-a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.') print( - '\t-m, --tbname
table_prefix, Table prefix name. Default is \'t\'.') + '\t-m, --tbname
table_prefix, Table prefix name. Default is \'t\'.') print( - '\t-M, --stable flag, Use super table. Default is no') + '\t-M, --stable flag, Use super table. Default is no') print( - '\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') - print('\t-Q, --query query, Execute query command. set \'DEFAULT\' means select * from each table') + '\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') + print('\t-Q, --query query, Execute query command. set \'DEFAULT\' means select * from each table') print( - '\t-T, --numOfThreads num_of_threads, The number of threads. Default is 1.') + '\t-T, --threads num_of_threads, The number of threads. Default is 1.') print( - '\t-P, --numOfProcesses num_of_processes, The number of threads. Default is 1.') - print('\t-r, --batch num_of_records_per_req, The number of records per request. Default is 1000.') + '\t-C, --processes num_of_processes, The number of threads. Default is 1.') + print('\t-r, --batch num_of_records_per_req, The number of records per request. Default is 1000.') print( - '\t-t, --numOfTb num_of_tables, The number of tables. Default is 1.') - print('\t-n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1.') - print('\t-c, --config config_directory, Configuration directory. Default is \'/etc/taos/\'.') - print('\t-x, --inserOnly flag, Insert only flag.') - print('\t-O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order.') - print('\t-R, --rateOOOO rate, Out of order data\'s rate--if order=1 Default 10, min: 0, max: 50.') - print('\t-D, --deleteMethod Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.') - print('\t-v, --verbose Print verbose output') - print('\t-g, --debug Print debug output') + '\t-t, --numOfTb num_of_tables, The number of tables. Default is 1.') + print('\t-n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1.') + print('\t-c, --config config_directory, Configuration directory. Default is \'/etc/taos/\'.') + print('\t-x, --inserOnly flag, Insert only flag.') + print('\t-O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order.') + print('\t-R, --rateOOOO rate, Out of order data\'s rate--if order=1 Default 10, min: 0, max: 50.') + print('\t-D, --deleteMethod Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.') + print('\t-v, --verbose Print verbose output') + print('\t-g, --debug Print debug output') print( - '\t-y, --skipPrompt Skip read key for continous test, default is not skip') + '\t-y, --skipPrompt Skip read key for continous test, default is not skip') print('') sys.exit(0) @@ -574,13 +581,13 @@ if __name__ == "__main__": if key in ['-Q', '--query']: queryCmd = str(value) - if key in ['-T', '--numOfThreads']: + if key in ['-T', '--threads']: threads = int(value) if threads < 1: print("FATAL: number of threads must be larger than 0") sys.exit(1) - if key in ['-P', '--numOfProcesses']: + if key in ['-C', '--processes']: processes = int(value) if processes < 1: print("FATAL: number of processes must be larger than 0") @@ -653,7 +660,6 @@ if __name__ == "__main__": print("Error: %s" % e.args[0]) sys.exit(1) - if native: try: cursor = conn.cursor() print("cursor:%d %s" % (id(cursor), str(cursor.__class__))) @@ -717,6 +723,7 @@ if __name__ == "__main__": sys.exit(0) + print("CBD LN755 %d" % numOfTb) if numOfTb > 0: create_tb() insert_data(processes) @@ -739,11 +746,17 @@ if __name__ == "__main__": host, port, user, password, "SELECT COUNT(*) FROM %s%d" % (tbName, j)) - if queryCmd != "": + if queryCmd != "DEFAULT": print("queryCmd: %s" % queryCmd) +# cursor.close() ## +# conn.close() ## CBD query_data(queryCmd) sys.exit(0) + if native: + cursor.close() + conn.close() + print("done") if measure: end_time = time.time() From 11b1467d5cd181997174b8d9169027846aa90d7e Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 5 Feb 2021 11:08:31 +0000 Subject: [PATCH 07/10] [TD-2771] feature: python taosdemo. adjust debug output. --- tests/examples/python/taosdemo/README.md | 2 +- tests/examples/python/taosdemo/taosdemo.py | 57 ++++++++++++---------- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/tests/examples/python/taosdemo/README.md b/tests/examples/python/taosdemo/README.md index a7920b8541..d48fffe8ff 100644 --- a/tests/examples/python/taosdemo/README.md +++ b/tests/examples/python/taosdemo/README.md @@ -21,7 +21,7 @@ Author: Shuduo Sang -m, --tbname
table_prefix, Table prefix name. Default is 't'. -M, --stable flag, Use super table. Default is no -s, --stbname stable_prefix, STable prefix name. Default is 'st' - -Q, --query query, Execute query command. set 'DEFAULT' means select * from each table + -Q, --query query, Execute query command. set 'DEFAULT' means select * from each table -T, --threads num_of_threads, The number of threads. Default is 1. -C, --processes num_of_processes, The number of threads. Default is 1. -r, --batch num_of_records_per_req, The number of records per request. Default is 1000. diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index 320273ba98..9a6613e685 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -101,7 +101,7 @@ def restful_execute(host: str, port: int, user: str, password: str, cmd: str): v_print("resp status: %d", resp.status_code) - if verbose: + if debug: v_print( "resp text: %s", json.dumps( @@ -138,7 +138,7 @@ def query_data_process(q_lock, i: int, cmd: str): v_print("Process:%d threads: %d cmd: %s", i, threads, cmd) q_lock.aquire() - cursor_p.execute(cmd) + cursor.execute(cmd) q_lock.release() return i @@ -171,7 +171,7 @@ def insert_data(processes: int): remainder = numOfTb % processes v_print( - "num of tables: %d, quotient: %d, remainder: %d", + "insert_data num of tables: %d, quotient: %d, remainder: %d", numOfTb, quotient, remainder) @@ -367,9 +367,9 @@ def create_tb(): (tbName, j)) -def insert_data_process(lock, i: int, begin: int, end: int): - print("CBD insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) - time.sleep(1) +def insert_data_process(i_lock, i: int, begin: int, end: int): + print("CBD LN371 insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) + time.sleep(0.01) tasks = end - begin i_lock.aquire() @@ -397,6 +397,23 @@ def insert_data_process(lock, i: int, begin: int, end: int): wait(workers, return_when=ALL_COMPLETED) i_lock.release() +def query_db(i): + if native: + cursor.execute("USE %s%d" % (dbName, i)) + else: + restful_execute( + host, port, user, password, "USE %s%d" % + (dbName, i)) + + for j in range(0, numOfTb): + if native: + cursor.execute( + "SELECT COUNT(*) FROM %s%d" % (tbName, j)) + else: + restful_execute( + host, port, user, password, "SELECT COUNT(*) FROM %s%d" % + (tbName, j)) + def printConfig(): @@ -439,6 +456,7 @@ if __name__ == "__main__": native = False verbose = False + debug = False measure = True dropDbOnly = False colsPerRecord = 3 @@ -516,7 +534,7 @@ if __name__ == "__main__": '\t-M, --stable flag, Use super table. Default is no') print( '\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') - print('\t-Q, --query query, Execute query command. set \'DEFAULT\' means select * from each table') + print('\t-Q, --query query, Execute query command. set \'DEFAULT\' means select * from each table') print( '\t-T, --threads num_of_threads, The number of threads. Default is 1.') print( @@ -728,28 +746,12 @@ if __name__ == "__main__": create_tb() insert_data(processes) - if verbose: + if debug: for i in range(0, numOfDb): - if native: - cursor.execute("USE %s%d" % (dbName, i)) - else: - restful_execute( - host, port, user, password, "USE %s%d" % - (dbName, i)) + query_db(i) - for j in range(0, numOfTb): - if native: - cursor.execute( - "SELECT COUNT(*) FROM %s%d" % (tbName, j)) - else: - restful_execute( - host, port, user, password, "SELECT COUNT(*) FROM %s%d" % - (tbName, j)) - - if queryCmd != "DEFAULT": + if queryCmd != "NO": print("queryCmd: %s" % queryCmd) -# cursor.close() ## -# conn.close() ## CBD query_data(queryCmd) sys.exit(0) @@ -757,9 +759,10 @@ if __name__ == "__main__": cursor.close() conn.close() - print("done") if measure: end_time = time.time() print( "Total time consumed {} seconds.".format( (end_time - start_time))) + + print("done") From e5908c5f02f31033b815bca6e4d0ec4457247a7a Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 6 Feb 2021 08:40:38 +0000 Subject: [PATCH 08/10] [TD-2771] feature: python taosdemo. multi process/thread working. --- tests/examples/python/taosdemo/taosdemo.py | 244 ++++++++++++--------- 1 file changed, 141 insertions(+), 103 deletions(-) diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index 9a6613e685..049ea05558 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -21,7 +21,7 @@ import json import random import time import datetime -from multiprocessing import Process, Pool, Lock +from multiprocessing import Manager, Pool, Lock from multipledispatch import dispatch from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED @@ -176,7 +176,6 @@ def insert_data(processes: int): quotient, remainder) - print("CBD LN210 processes:%d" % processes) for i in range(processes): begin = end @@ -274,67 +273,84 @@ def insert_func(process: int, thread: int): uuid = "%s" % uuid_int v_print("uuid is: %s", uuid) + # establish connection if native + if native: + v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) + try: + conn = taos.connect( + host=host, + user=user, + password=password, + config=configDir) + print("conn: %s" % str(conn.__class__)) + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + + try: + cursor = conn.cursor() + print("cursor:%d %s" % (id(cursor), str(cursor.__class__))) + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + v_print("numOfRec %d:", numOfRec) - if numOfRec > 0: - row = 0 - while row < numOfRec: - v_print("row: %d", row) - sqlCmd = ['INSERT INTO '] - try: - sqlCmd.append( - "%s.%s%d " % (current_db, tbName, thread)) - if (numOfStb > 0 and autosubtable): - sqlCmd.append("USING %s.%s%d TAGS('%s') " % - (current_db, stbName, numOfStb - 1, uuid)) + row = 0 + while row < numOfRec: + v_print("row: %d", row) + sqlCmd = ['INSERT INTO '] + try: + sqlCmd.append( + "%s.%s%d " % (current_db, tbName, thread)) - start_time = datetime.datetime( - 2021, 1, 25) + datetime.timedelta(seconds=row) + if (numOfStb > 0 and autosubtable): + sqlCmd.append("USING %s.%s%d TAGS('%s') " % + (current_db, stbName, numOfStb - 1, uuid)) - sqlCmd.append("VALUES ") - for batchIter in range(0, batch): - sqlCmd.append("(now, %f) " % - ( -# start_time + -# datetime.timedelta( -# milliseconds=batchIter), - random.random())) - row = row + 1 - if row >= numOfRec: - v_print("BREAK, row: %d numOfRec:%d", row, numOfRec) - break + start_time = datetime.datetime( + 2021, 1, 25) + datetime.timedelta(seconds=row) - except Exception as e: - print("Error: %s" % e.args[0]) + sqlCmd.append("VALUES ") + for batchIter in range(0, batch): + sqlCmd.append("('%s', %f) " % + ( + start_time + + datetime.timedelta( + milliseconds=batchIter), + random.random())) + row = row + 1 + if row >= numOfRec: + v_print("BREAK, row: %d numOfRec:%d", row, numOfRec) + break - cmd = ' '.join(sqlCmd) + except Exception as e: + print("Error: %s" % e.args[0]) - print("CBD: LN313") - if measure: - exec_start_time = datetime.datetime.now() + cmd = ' '.join(sqlCmd) - print("CBD: LN316 native: %d" % native) - if native: - print("CBD: LN319: %s" % cmd) - print("conn: %s" % str(conn.__class__)) - print("CBD: LN320 cursor:%d %s" % (id(cursor), str(cursor.__class__))) -# cursor.execute("SHOW DATABASES" ) - affectedRows = cursor.execute(cmd) - print("CBD: LN323 affectedRows:%d" % affectedRows) - else: - restful_execute( - host, port, user, password, cmd) + if measure: + exec_start_time = datetime.datetime.now() - print("CBD: LN327") - if measure: - exec_end_time = datetime.datetime.now() - exec_delta = exec_end_time - exec_start_time - print( - "%s, %d" % - (time.strftime('%X'), - exec_delta.microseconds)) + if native: + affectedRows = cursor.execute(cmd) + else: + restful_execute( + host, port, user, password, cmd) - v_print("cmd: %s, length:%d", cmd, len(cmd)) + if measure: + exec_end_time = datetime.datetime.now() + exec_delta = exec_end_time - exec_start_time + print( + "%s, %d" % + (time.strftime('%X'), + exec_delta.microseconds)) + + v_print("cmd: %s, length:%d", cmd, len(cmd)) + + if native: + cursor.close() + conn.close() def create_tb_using_stb(): @@ -367,11 +383,10 @@ def create_tb(): (tbName, j)) -def insert_data_process(i_lock, i: int, begin: int, end: int): - print("CBD LN371 insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) - time.sleep(0.01) +def insert_data_process(lock, i: int, begin: int, end: int): + lock.acquire() tasks = end - begin - i_lock.aquire() + v_print("insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) if (threads < (end - begin)): for j in range(begin, end, threads): @@ -395,7 +410,9 @@ def insert_data_process(i_lock, i: int, begin: int, end: int): begin, end)] wait(workers, return_when=ALL_COMPLETED) - i_lock.release() + + lock.release() + def query_db(i): if native: @@ -624,6 +641,10 @@ if __name__ == "__main__": if key in ['-n', '--numOfRec']: numOfRec = int(value) v_print("numOfRec is %d", numOfRec) + if numOfRec < 1: + print("FATAL: number of records must be larger than 0") + sys.exit(1) + if key in ['-c', '--config']: configDir = value @@ -665,6 +686,7 @@ if __name__ == "__main__": if not skipPrompt: input("Press any key to continue..") + # establish connection first if native if native: v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) try: @@ -685,8 +707,7 @@ if __name__ == "__main__": print("Error: %s" % e.args[0]) sys.exit(1) - - + # drop data only if delete method be set if deleteMethod > 0: if deleteMethod == 1: drop_tables() @@ -700,69 +721,86 @@ if __name__ == "__main__": sys.exit(0) # create databases - if (insertOnly == False): - drop_databases() - + drop_databases() create_databases() - if measure: - start_time = time.time() - # use last database current_db = "%s%d" % (dbName, (numOfDb - 1)) use_database() + if measure: + start_time_begin = time.time() + if numOfStb > 0: create_stb() if (autosubtable == False): create_tb_using_stb() - - insert_data(processes) - - if verbose: - for i in range(0, numOfDb): - for j in range(0, numOfStb): - if native: - cursor.execute( - "SELECT COUNT(*) FROM %s%d.%s%d" % - (dbName, i, stbName, j,)) - else: - restful_execute( - host, port, user, password, "SELECT COUNT(*) FROM %s%d.%s%d" % - (dbName, i, stbName, j,)) - - print("done") - - if measure: - end_time = time.time() - print( - "Total time consumed {} seconds.".format( - (end_time - start_time))) - - sys.exit(0) - - print("CBD LN755 %d" % numOfTb) - if numOfTb > 0: + else: create_tb() - insert_data(processes) - if debug: - for i in range(0, numOfDb): - query_db(i) - - if queryCmd != "NO": - print("queryCmd: %s" % queryCmd) - query_data(queryCmd) - sys.exit(0) + if measure: + end_time = time.time() + print( + "Total time consumed {} seconds for create table.".format( + (end_time - start_time_begin))) if native: cursor.close() conn.close() + # start insert data + if measure: + start_time = time.time() + + manager = Manager() + lock = manager.Lock() + pool = Pool(processes) + + begin = 0 + end = 0 + + quotient = numOfTb // processes + if quotient < 1: + processes = numOfTb + quotient = 1 + + remainder = numOfTb % processes + v_print( + "num of tables: %d, quotient: %d, remainder: %d", + numOfTb, + quotient, + remainder) + + for i in range(processes): + begin = end + + if i < remainder: + end = begin + quotient + 1 + else: + end = begin + quotient + pool.apply_async(insert_data_process, args=(lock, i, begin, end,)) +# pool.apply_async(text, args=(lock, i, begin, end,)) + + pool.close() + pool.join() + time.sleep(1) + + if measure: + end_time = time.time() + print( + "Total time consumed {} seconds for insert data.".format( + (end_time - start_time))) + + + # query data + if queryCmd != "NO": + print("queryCmd: %s" % queryCmd) + query_data(queryCmd) + if measure: end_time = time.time() print( "Total time consumed {} seconds.".format( - (end_time - start_time))) + (end_time - start_time_begin))) print("done") From b1c6a7d49f5d27037fdee1533d652c13ceb31ff8 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 6 Feb 2021 09:19:30 +0000 Subject: [PATCH 09/10] [TD-2771] : python taosdemo. add query func. --- tests/examples/python/taosdemo/taosdemo.py | 94 ++++++++++------------ 1 file changed, 42 insertions(+), 52 deletions(-) diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index 049ea05558..b15fc19a38 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -133,62 +133,52 @@ def query_func(process: int, thread: int, cmd: str): host, port, user, password, cmd) -def query_data_process(q_lock, i: int, cmd: str): - time.sleep(0.01) - v_print("Process:%d threads: %d cmd: %s", i, threads, cmd) +def query_data_process(cmd: str): + # establish connection if native + if native: + v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) + try: + conn = taos.connect( + host=host, + user=user, + password=password, + config=configDir) + print("conn: %s" % str(conn.__class__)) + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) - q_lock.aquire() - cursor.execute(cmd) - q_lock.release() + try: + cursor = conn.cursor() + print("cursor:%d %s" % (id(cursor), str(cursor.__class__))) + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) - return i + if native: + try: + cursor.execute(cmd) + cols = cursor.description + data = cursor.fetchall() + for col in data: + print(col) + except Exception as e: + conn.close() + print("Error: %s" % e.args[0]) + sys.exit(1) -def query_data(cmd: str): - v_print("query_data processes: %d, cmd: %s", processes, cmd) + else: + restful_execute( + host, + port, + user, + password, + cmd) - q_lock = Lock() - - pool = Pool(processes) - for i in range(processes): - pool.apply_async(query_data_process, args=(q_lock, i, cmd)) -# time.sleep(1) - pool.close() - pool.join() - - -def insert_data(processes: int): - i_lock = Lock() - pool = Pool(processes) - - begin = 0 - end = 0 - - quotient = numOfTb // processes - if quotient < 1: - processes = numOfTb - quotient = 1 - - remainder = numOfTb % processes - v_print( - "insert_data num of tables: %d, quotient: %d, remainder: %d", - numOfTb, - quotient, - remainder) - - for i in range(processes): - begin = end - - if i < remainder: - end = begin + quotient + 1 - else: - end = begin + quotient - - v_print("insert_data Process %d from %d to %d", i, begin, end) - pool.apply_async(insert_data_process, args=(i_lock, i, begin, end)) - - pool.close() - pool.join() + if native: + cursor.close() + conn.close() def create_stb(): @@ -795,7 +785,7 @@ if __name__ == "__main__": # query data if queryCmd != "NO": print("queryCmd: %s" % queryCmd) - query_data(queryCmd) + query_data_process(queryCmd) if measure: end_time = time.time() From 30ea2efd028fad059ddc035780e43f13eab9e668 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 6 Feb 2021 09:39:46 +0000 Subject: [PATCH 10/10] [TD-2771] : python taosdemo. refine query. --- tests/examples/python/taosdemo/taosdemo.py | 27 +++++++++++----------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index b15fc19a38..d55023bdbf 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -143,16 +143,17 @@ def query_data_process(cmd: str): user=user, password=password, config=configDir) - print("conn: %s" % str(conn.__class__)) + v_print("conn: %s", str(conn.__class__)) except Exception as e: print("Error: %s" % e.args[0]) sys.exit(1) try: cursor = conn.cursor() - print("cursor:%d %s" % (id(cursor), str(cursor.__class__))) + v_print("cursor:%d %s", id(cursor), str(cursor.__class__)) except Exception as e: print("Error: %s" % e.args[0]) + conn.close() sys.exit(1) if native: @@ -272,16 +273,17 @@ def insert_func(process: int, thread: int): user=user, password=password, config=configDir) - print("conn: %s" % str(conn.__class__)) + v_print("conn: %s", str(conn.__class__)) except Exception as e: print("Error: %s" % e.args[0]) sys.exit(1) try: cursor = conn.cursor() - print("cursor:%d %s" % (id(cursor), str(cursor.__class__))) + v_print("cursor:%d %s", id(cursor), str(cursor.__class__)) except Exception as e: print("Error: %s" % e.args[0]) + conn.close() sys.exit(1) v_print("numOfRec %d:", numOfRec) @@ -331,10 +333,9 @@ def insert_func(process: int, thread: int): if measure: exec_end_time = datetime.datetime.now() exec_delta = exec_end_time - exec_start_time - print( - "%s, %d" % - (time.strftime('%X'), - exec_delta.microseconds)) + v_print( + "consume %d microseconds", + exec_delta.microseconds) v_print("cmd: %s, length:%d", cmd, len(cmd)) @@ -489,7 +490,7 @@ if __name__ == "__main__": threads = 1 insertOnly = False autosubtable = False - queryCmd = "DEFAULT" + queryCmd = "NO" outOfOrder = 0 rateOOOO = 0 deleteMethod = 0 @@ -541,7 +542,7 @@ if __name__ == "__main__": '\t-M, --stable flag, Use super table. Default is no') print( '\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') - print('\t-Q, --query query, Execute query command. set \'DEFAULT\' means select * from each table') + print('\t-Q, --query [NO|EACHTB|command] query, Execute query command. set \'EACHTB\' means select * from each table') print( '\t-T, --threads num_of_threads, The number of threads. Default is 1.') print( @@ -685,16 +686,17 @@ if __name__ == "__main__": user=user, password=password, config=configDir) - print("conn: %s" % str(conn.__class__)) + v_print("conn: %s", str(conn.__class__)) except Exception as e: print("Error: %s" % e.args[0]) sys.exit(1) try: cursor = conn.cursor() - print("cursor:%d %s" % (id(cursor), str(cursor.__class__))) + v_print("cursor:%d %s", id(cursor), str(cursor.__class__)) except Exception as e: print("Error: %s" % e.args[0]) + conn.close() sys.exit(1) # drop data only if delete method be set @@ -769,7 +771,6 @@ if __name__ == "__main__": else: end = begin + quotient pool.apply_async(insert_data_process, args=(lock, i, begin, end,)) -# pool.apply_async(text, args=(lock, i, begin, end,)) pool.close() pool.join()