From 13a6421698f28d747b6451b4610d3669d33d0006 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 5 Feb 2021 10:48:25 +0000 Subject: [PATCH] [TD-2771] feature: python taosdemo. add lock. --- tests/examples/python/taosdemo/README.md | 52 ++++---- tests/examples/python/taosdemo/taosdemo.py | 137 +++++++++++---------- 2 files changed, 101 insertions(+), 88 deletions(-) diff --git a/tests/examples/python/taosdemo/README.md b/tests/examples/python/taosdemo/README.md index 5ac0e18aff..a7920b8541 100644 --- a/tests/examples/python/taosdemo/README.md +++ b/tests/examples/python/taosdemo/README.md @@ -8,31 +8,31 @@ Usage: ./taosdemo.py [OPTION...] Author: Shuduo Sang - -H, --help Show usage. + -H, --help Show usage. - -N, --native flag, Use native interface if set. Default is using RESTful interface. - -h, --host host, The host to connect to TDengine. Default is localhost. - -p, --port port, The TCP/IP port number to use for the connection. Default is 0. - -u, --user user, The user name to use when connecting to the server. Default is 'root'. - -P, --password password, The password to use when connecting to the server. Default is 'taosdata'. - -l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3. - -d, --dbname database, Destination database. Default is 'test'. - -a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5. - -m, --tbname table_prefix, Table prefix name. Default is 't'. - -M, --stable flag, Use super table. Default is no - -s, --stbname stable_prefix, STable prefix name. Default is 'st' - -Q, --query query, Execute query command. set 'DEFAULT' means select * from each table - -T, --numOfThreads num_of_threads, The number of threads. Default is 1. - -P, --numOfProcesses num_of_processes, The number of threads. Default is 1. - -r, --batch num_of_records_per_req, The number of records per request. Default is 1000. - -t, --numOfTb num_of_tables, The number of tables. Default is 1. - -n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1. - -c, --config config_directory, Configuration directory. Default is '/etc/taos/'. - -x, --inserOnly flag, Insert only flag. - -O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order. - -R, --rateOOOO rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50. - -D, --deleteMethod Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database. - -v, --verbose Print verbose output - -g, --debug Print debug output - -y, --skipPrompt Skip read key for continous test, default is not skip + -N, --native flag, Use native interface if set. Default is using RESTful interface. + -h, --host host, The host to connect to TDengine. Default is localhost. + -p, --port port, The TCP/IP port number to use for the connection. Default is 0. + -u, --user user, The user name to use when connecting to the server. Default is 'root'. + -P, --password password, The password to use when connecting to the server. Default is 'taosdata'. + -l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3. + -d, --dbname database, Destination database. Default is 'test'. + -a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5. + -m, --tbname
table_prefix, Table prefix name. Default is 't'. + -M, --stable flag, Use super table. Default is no + -s, --stbname stable_prefix, STable prefix name. Default is 'st' + -Q, --query query, Execute query command. set 'DEFAULT' means select * from each table + -T, --threads num_of_threads, The number of threads. Default is 1. + -C, --processes num_of_processes, The number of threads. Default is 1. + -r, --batch num_of_records_per_req, The number of records per request. Default is 1000. + -t, --numOfTb num_of_tables, The number of tables. Default is 1. + -n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1. + -c, --config config_directory, Configuration directory. Default is '/etc/taos/'. + -x, --inserOnly flag, Insert only flag. + -O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order. + -R, --rateOOOO rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50. + -D, --deleteMethod Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database. + -v, --verbose Print verbose output + -g, --debug Print debug output + -y, --skipPrompt Skip read key for continous test, default is not skip diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index fc2195e7ab..320273ba98 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -21,7 +21,7 @@ import json import random import time import datetime -from multiprocessing import Process, Pool +from multiprocessing import Process, Pool, Lock from multipledispatch import dispatch from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED @@ -124,28 +124,22 @@ def query_func(process: int, thread: int, cmd: str): restful_execute( oneMoreHost, port, user, password, cmd) else: - v_print("%s", "Send to first host") + v_print("%s%s%s", "Send ", cmd, " to the host") if native: - cursor.execute(cmd) + pass +# cursor.execute(cmd) else: restful_execute( host, port, user, password, cmd) -def query_data_process(i: int, cmd: str): +def query_data_process(q_lock, i: int, cmd: str): + time.sleep(0.01) v_print("Process:%d threads: %d cmd: %s", i, threads, cmd) - with ThreadPoolExecutor(max_workers=threads) as executor: - workers = [ - executor.submit( - query_func, - i, - j, - cmd) for j in range( - 0, - threads)] - - wait(workers, return_when=ALL_COMPLETED) + q_lock.aquire() + cursor_p.execute(cmd) + q_lock.release() return i @@ -153,15 +147,18 @@ def query_data_process(i: int, cmd: str): def query_data(cmd: str): v_print("query_data processes: %d, cmd: %s", processes, cmd) + q_lock = Lock() + pool = Pool(processes) for i in range(processes): - pool.apply_async(query_data_process, args=(i, cmd)) - time.sleep(1) + pool.apply_async(query_data_process, args=(q_lock, i, cmd)) +# time.sleep(1) pool.close() pool.join() def insert_data(processes: int): + i_lock = Lock() pool = Pool(processes) begin = 0 @@ -179,6 +176,7 @@ def insert_data(processes: int): quotient, remainder) + print("CBD LN210 processes:%d" % processes) for i in range(processes): begin = end @@ -187,8 +185,8 @@ def insert_data(processes: int): else: end = begin + quotient - v_print("Process %d from %d to %d", i, begin, end) - pool.apply_async(insert_data_process, args=(i, begin, end)) + v_print("insert_data Process %d from %d to %d", i, begin, end) + pool.apply_async(insert_data_process, args=(i_lock, i, begin, end)) pool.close() pool.join() @@ -295,10 +293,11 @@ def insert_func(process: int, thread: int): sqlCmd.append("VALUES ") for batchIter in range(0, batch): - sqlCmd.append("('%s', %f) " % - (start_time + - datetime.timedelta( - milliseconds=batchIter), + sqlCmd.append("(now, %f) " % + ( +# start_time + +# datetime.timedelta( +# milliseconds=batchIter), random.random())) row = row + 1 if row >= numOfRec: @@ -310,18 +309,23 @@ def insert_func(process: int, thread: int): cmd = ' '.join(sqlCmd) + print("CBD: LN313") if measure: exec_start_time = datetime.datetime.now() + print("CBD: LN316 native: %d" % native) if native: - v_print("insert_func - cursor:%x cmd:%s", hex(id(cursor)), cmd) - cursor.execute("SHOW DATABASES" ) -# cursor.execute("%s" % cmd) - v_print("insert_func - cursor:%x cmd:%s done", hex(id(cursor)), cmd) + print("CBD: LN319: %s" % cmd) + print("conn: %s" % str(conn.__class__)) + print("CBD: LN320 cursor:%d %s" % (id(cursor), str(cursor.__class__))) +# cursor.execute("SHOW DATABASES" ) + affectedRows = cursor.execute(cmd) + print("CBD: LN323 affectedRows:%d" % affectedRows) else: restful_execute( host, port, user, password, cmd) + print("CBD: LN327") if measure: exec_end_time = datetime.datetime.now() exec_delta = exec_end_time - exec_start_time @@ -363,9 +367,11 @@ def create_tb(): (tbName, j)) -def insert_data_process(i: int, begin: int, end: int): +def insert_data_process(lock, i: int, begin: int, end: int): + print("CBD insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) + time.sleep(1) tasks = end - begin - v_print("Process:%d table from %d to %d, tasks %d", i, begin, end, tasks) + i_lock.aquire() if (threads < (end - begin)): for j in range(begin, end, threads): @@ -389,6 +395,7 @@ def insert_data_process(i: int, begin: int, end: int): begin, end)] wait(workers, return_when=ALL_COMPLETED) + i_lock.release() def printConfig(): @@ -457,7 +464,7 @@ if __name__ == "__main__": threads = 1 insertOnly = False autosubtable = False - queryCmd = "select * from " + queryCmd = "DEFAULT" outOfOrder = 0 rateOOOO = 0 deleteMethod = 0 @@ -465,10 +472,10 @@ if __name__ == "__main__": try: opts, args = getopt.gnu_getopt(sys.argv[1:], - 'Nh:p:u:P:d:a:m:Ms:Q:T:P:r:l:t:n:c:xOR:D:vgyH', + 'Nh:p:u:P:d:a:m:Ms:Q:T:C:r:l:t:n:c:xOR:D:vgyH', [ 'native', 'host', 'port', 'user', 'password', 'dbname', 'replica', 'tbname', - 'stable', 'stbname', 'query', 'numOfThreads', 'numOfProcesses', + 'stable', 'stbname', 'query', 'threads', 'processes', 'recPerReq', 'colsPerRecord', 'numOfTb', 'numOfRec', 'config', 'insertOnly', 'outOfOrder', 'rateOOOO', 'deleteMethod', 'verbose', 'debug', 'skipPrompt', 'help' @@ -491,42 +498,42 @@ if __name__ == "__main__": print('Author: Shuduo Sang ') print('') - print('\t-H, --help Show usage.') + print('\t-H, --help Show usage.') print('') - print('\t-N, --native flag, Use native interface if set. Default is using RESTful interface.') - print('\t-h, --host host, The host to connect to TDengine. Default is localhost.') - print('\t-p, --port port, The TCP/IP port number to use for the connection. Default is 0.') - print('\t-u, --user user, The user name to use when connecting to the server. Default is \'root\'.') - print('\t-P, --password password, The password to use when connecting to the server. Default is \'taosdata\'.') - print('\t-l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3.') + print('\t-N, --native flag, Use native interface if set. Default is using RESTful interface.') + print('\t-h, --host host, The host to connect to TDengine. Default is localhost.') + print('\t-p, --port port, The TCP/IP port number to use for the connection. Default is 0.') + print('\t-u, --user user, The user name to use when connecting to the server. Default is \'root\'.') + print('\t-P, --password password, The password to use when connecting to the server. Default is \'taosdata\'.') + print('\t-l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3.') print( - '\t-d, --dbname database, Destination database. Default is \'test\'.') - print('\t-a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.') + '\t-d, --dbname database, Destination database. Default is \'test\'.') + print('\t-a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.') print( - '\t-m, --tbname
table_prefix, Table prefix name. Default is \'t\'.') + '\t-m, --tbname
table_prefix, Table prefix name. Default is \'t\'.') print( - '\t-M, --stable flag, Use super table. Default is no') + '\t-M, --stable flag, Use super table. Default is no') print( - '\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') - print('\t-Q, --query query, Execute query command. set \'DEFAULT\' means select * from each table') + '\t-s, --stbname stable_prefix, STable prefix name. Default is \'st\'') + print('\t-Q, --query query, Execute query command. set \'DEFAULT\' means select * from each table') print( - '\t-T, --numOfThreads num_of_threads, The number of threads. Default is 1.') + '\t-T, --threads num_of_threads, The number of threads. Default is 1.') print( - '\t-P, --numOfProcesses num_of_processes, The number of threads. Default is 1.') - print('\t-r, --batch num_of_records_per_req, The number of records per request. Default is 1000.') + '\t-C, --processes num_of_processes, The number of threads. Default is 1.') + print('\t-r, --batch num_of_records_per_req, The number of records per request. Default is 1000.') print( - '\t-t, --numOfTb num_of_tables, The number of tables. Default is 1.') - print('\t-n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1.') - print('\t-c, --config config_directory, Configuration directory. Default is \'/etc/taos/\'.') - print('\t-x, --inserOnly flag, Insert only flag.') - print('\t-O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order.') - print('\t-R, --rateOOOO rate, Out of order data\'s rate--if order=1 Default 10, min: 0, max: 50.') - print('\t-D, --deleteMethod Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.') - print('\t-v, --verbose Print verbose output') - print('\t-g, --debug Print debug output') + '\t-t, --numOfTb num_of_tables, The number of tables. Default is 1.') + print('\t-n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1.') + print('\t-c, --config config_directory, Configuration directory. Default is \'/etc/taos/\'.') + print('\t-x, --inserOnly flag, Insert only flag.') + print('\t-O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order.') + print('\t-R, --rateOOOO rate, Out of order data\'s rate--if order=1 Default 10, min: 0, max: 50.') + print('\t-D, --deleteMethod Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.') + print('\t-v, --verbose Print verbose output') + print('\t-g, --debug Print debug output') print( - '\t-y, --skipPrompt Skip read key for continous test, default is not skip') + '\t-y, --skipPrompt Skip read key for continous test, default is not skip') print('') sys.exit(0) @@ -574,13 +581,13 @@ if __name__ == "__main__": if key in ['-Q', '--query']: queryCmd = str(value) - if key in ['-T', '--numOfThreads']: + if key in ['-T', '--threads']: threads = int(value) if threads < 1: print("FATAL: number of threads must be larger than 0") sys.exit(1) - if key in ['-P', '--numOfProcesses']: + if key in ['-C', '--processes']: processes = int(value) if processes < 1: print("FATAL: number of processes must be larger than 0") @@ -653,7 +660,6 @@ if __name__ == "__main__": print("Error: %s" % e.args[0]) sys.exit(1) - if native: try: cursor = conn.cursor() print("cursor:%d %s" % (id(cursor), str(cursor.__class__))) @@ -717,6 +723,7 @@ if __name__ == "__main__": sys.exit(0) + print("CBD LN755 %d" % numOfTb) if numOfTb > 0: create_tb() insert_data(processes) @@ -739,11 +746,17 @@ if __name__ == "__main__": host, port, user, password, "SELECT COUNT(*) FROM %s%d" % (tbName, j)) - if queryCmd != "": + if queryCmd != "DEFAULT": print("queryCmd: %s" % queryCmd) +# cursor.close() ## +# conn.close() ## CBD query_data(queryCmd) sys.exit(0) + if native: + cursor.close() + conn.close() + print("done") if measure: end_time = time.time()