[TD-2771] feature: python taosdemo. add lock.

This commit is contained in:
Shuduo Sang 2021-02-05 10:48:25 +00:00
parent cbb534c24b
commit 13a6421698
2 changed files with 101 additions and 88 deletions

View File

@ -8,31 +8,31 @@ Usage: ./taosdemo.py [OPTION...]
Author: Shuduo Sang <sangshuduo@gmail.com> Author: Shuduo Sang <sangshuduo@gmail.com>
-H, --help Show usage. -H, --help Show usage.
-N, --native flag, Use native interface if set. Default is using RESTful interface. -N, --native flag, Use native interface if set. Default is using RESTful interface.
-h, --host <hostname> host, The host to connect to TDengine. Default is localhost. -h, --host <hostname> host, The host to connect to TDengine. Default is localhost.
-p, --port <port> port, The TCP/IP port number to use for the connection. Default is 0. -p, --port <port> port, The TCP/IP port number to use for the connection. Default is 0.
-u, --user <username> user, The user name to use when connecting to the server. Default is 'root'. -u, --user <username> user, The user name to use when connecting to the server. Default is 'root'.
-P, --password <password> password, The password to use when connecting to the server. Default is 'taosdata'. -P, --password <password> password, The password to use when connecting to the server. Default is 'taosdata'.
-l, --colsPerRec <number> num_of_columns_per_record, The number of columns per record. Default is 3. -l, --colsPerRec <number> num_of_columns_per_record, The number of columns per record. Default is 3.
-d, --dbname <dbname> database, Destination database. Default is 'test'. -d, --dbname <dbname> database, Destination database. Default is 'test'.
-a, --replica <replications> replica, Set the replica parameters of the database, Default 1, min: 1, max: 5. -a, --replica <replications> replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.
-m, --tbname <table prefix> table_prefix, Table prefix name. Default is 't'. -m, --tbname <table prefix> table_prefix, Table prefix name. Default is 't'.
-M, --stable flag, Use super table. Default is no -M, --stable flag, Use super table. Default is no
-s, --stbname <stable prefix> stable_prefix, STable prefix name. Default is 'st' -s, --stbname <stable prefix> stable_prefix, STable prefix name. Default is 'st'
-Q, --query <DEFAULT | command> query, Execute query command. set 'DEFAULT' means select * from each table -Q, --query <DEFAULT | command> query, Execute query command. set 'DEFAULT' means select * from each table
-T, --numOfThreads <number> num_of_threads, The number of threads. Default is 1. -T, --threads <number> num_of_threads, The number of threads. Default is 1.
-P, --numOfProcesses <number> num_of_processes, The number of threads. Default is 1. -C, --processes <number> num_of_processes, The number of threads. Default is 1.
-r, --batch <number> num_of_records_per_req, The number of records per request. Default is 1000. -r, --batch <number> num_of_records_per_req, The number of records per request. Default is 1000.
-t, --numOfTb <number> num_of_tables, The number of tables. Default is 1. -t, --numOfTb <number> num_of_tables, The number of tables. Default is 1.
-n, --numOfRec <number> num_of_records_per_table, The number of records per table. Default is 1. -n, --numOfRec <number> num_of_records_per_table, The number of records per table. Default is 1.
-c, --config <path> config_directory, Configuration directory. Default is '/etc/taos/'. -c, --config <path> config_directory, Configuration directory. Default is '/etc/taos/'.
-x, --inserOnly flag, Insert only flag. -x, --inserOnly flag, Insert only flag.
-O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order. -O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order.
-R, --rateOOOO <number> rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50. -R, --rateOOOO <number> rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50.
-D, --deleteMethod <number> Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database. -D, --deleteMethod <number> Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database.
-v, --verbose Print verbose output -v, --verbose Print verbose output
-g, --debug Print debug output -g, --debug Print debug output
-y, --skipPrompt Skip read key for continous test, default is not skip -y, --skipPrompt Skip read key for continous test, default is not skip

View File

@ -21,7 +21,7 @@ import json
import random import random
import time import time
import datetime import datetime
from multiprocessing import Process, Pool from multiprocessing import Process, Pool, Lock
from multipledispatch import dispatch from multipledispatch import dispatch
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
@ -124,28 +124,22 @@ def query_func(process: int, thread: int, cmd: str):
restful_execute( restful_execute(
oneMoreHost, port, user, password, cmd) oneMoreHost, port, user, password, cmd)
else: else:
v_print("%s", "Send to first host") v_print("%s%s%s", "Send ", cmd, " to the host")
if native: if native:
cursor.execute(cmd) pass
# cursor.execute(cmd)
else: else:
restful_execute( restful_execute(
host, port, user, password, cmd) host, port, user, password, cmd)
def query_data_process(i: int, cmd: str): def query_data_process(q_lock, i: int, cmd: str):
time.sleep(0.01)
v_print("Process:%d threads: %d cmd: %s", i, threads, cmd) v_print("Process:%d threads: %d cmd: %s", i, threads, cmd)
with ThreadPoolExecutor(max_workers=threads) as executor: q_lock.aquire()
workers = [ cursor_p.execute(cmd)
executor.submit( q_lock.release()
query_func,
i,
j,
cmd) for j in range(
0,
threads)]
wait(workers, return_when=ALL_COMPLETED)
return i return i
@ -153,15 +147,18 @@ def query_data_process(i: int, cmd: str):
def query_data(cmd: str): def query_data(cmd: str):
v_print("query_data processes: %d, cmd: %s", processes, cmd) v_print("query_data processes: %d, cmd: %s", processes, cmd)
q_lock = Lock()
pool = Pool(processes) pool = Pool(processes)
for i in range(processes): for i in range(processes):
pool.apply_async(query_data_process, args=(i, cmd)) pool.apply_async(query_data_process, args=(q_lock, i, cmd))
time.sleep(1) # time.sleep(1)
pool.close() pool.close()
pool.join() pool.join()
def insert_data(processes: int): def insert_data(processes: int):
i_lock = Lock()
pool = Pool(processes) pool = Pool(processes)
begin = 0 begin = 0
@ -179,6 +176,7 @@ def insert_data(processes: int):
quotient, quotient,
remainder) remainder)
print("CBD LN210 processes:%d" % processes)
for i in range(processes): for i in range(processes):
begin = end begin = end
@ -187,8 +185,8 @@ def insert_data(processes: int):
else: else:
end = begin + quotient end = begin + quotient
v_print("Process %d from %d to %d", i, begin, end) v_print("insert_data Process %d from %d to %d", i, begin, end)
pool.apply_async(insert_data_process, args=(i, begin, end)) pool.apply_async(insert_data_process, args=(i_lock, i, begin, end))
pool.close() pool.close()
pool.join() pool.join()
@ -295,10 +293,11 @@ def insert_func(process: int, thread: int):
sqlCmd.append("VALUES ") sqlCmd.append("VALUES ")
for batchIter in range(0, batch): for batchIter in range(0, batch):
sqlCmd.append("('%s', %f) " % sqlCmd.append("(now, %f) " %
(start_time + (
datetime.timedelta( # start_time +
milliseconds=batchIter), # datetime.timedelta(
# milliseconds=batchIter),
random.random())) random.random()))
row = row + 1 row = row + 1
if row >= numOfRec: if row >= numOfRec:
@ -310,18 +309,23 @@ def insert_func(process: int, thread: int):
cmd = ' '.join(sqlCmd) cmd = ' '.join(sqlCmd)
print("CBD: LN313")
if measure: if measure:
exec_start_time = datetime.datetime.now() exec_start_time = datetime.datetime.now()
print("CBD: LN316 native: %d" % native)
if native: if native:
v_print("insert_func - cursor:%x cmd:%s", hex(id(cursor)), cmd) print("CBD: LN319: %s" % cmd)
cursor.execute("SHOW DATABASES" ) print("conn: %s" % str(conn.__class__))
# cursor.execute("%s" % cmd) print("CBD: LN320 cursor:%d %s" % (id(cursor), str(cursor.__class__)))
v_print("insert_func - cursor:%x cmd:%s done", hex(id(cursor)), cmd) # cursor.execute("SHOW DATABASES" )
affectedRows = cursor.execute(cmd)
print("CBD: LN323 affectedRows:%d" % affectedRows)
else: else:
restful_execute( restful_execute(
host, port, user, password, cmd) host, port, user, password, cmd)
print("CBD: LN327")
if measure: if measure:
exec_end_time = datetime.datetime.now() exec_end_time = datetime.datetime.now()
exec_delta = exec_end_time - exec_start_time exec_delta = exec_end_time - exec_start_time
@ -363,9 +367,11 @@ def create_tb():
(tbName, j)) (tbName, j))
def insert_data_process(i: int, begin: int, end: int): def insert_data_process(lock, i: int, begin: int, end: int):
print("CBD insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks)
time.sleep(1)
tasks = end - begin tasks = end - begin
v_print("Process:%d table from %d to %d, tasks %d", i, begin, end, tasks) i_lock.aquire()
if (threads < (end - begin)): if (threads < (end - begin)):
for j in range(begin, end, threads): for j in range(begin, end, threads):
@ -389,6 +395,7 @@ def insert_data_process(i: int, begin: int, end: int):
begin, begin,
end)] end)]
wait(workers, return_when=ALL_COMPLETED) wait(workers, return_when=ALL_COMPLETED)
i_lock.release()
def printConfig(): def printConfig():
@ -457,7 +464,7 @@ if __name__ == "__main__":
threads = 1 threads = 1
insertOnly = False insertOnly = False
autosubtable = False autosubtable = False
queryCmd = "select * from " queryCmd = "DEFAULT"
outOfOrder = 0 outOfOrder = 0
rateOOOO = 0 rateOOOO = 0
deleteMethod = 0 deleteMethod = 0
@ -465,10 +472,10 @@ if __name__ == "__main__":
try: try:
opts, args = getopt.gnu_getopt(sys.argv[1:], opts, args = getopt.gnu_getopt(sys.argv[1:],
'Nh:p:u:P:d:a:m:Ms:Q:T:P:r:l:t:n:c:xOR:D:vgyH', 'Nh:p:u:P:d:a:m:Ms:Q:T:C:r:l:t:n:c:xOR:D:vgyH',
[ [
'native', 'host', 'port', 'user', 'password', 'dbname', 'replica', 'tbname', 'native', 'host', 'port', 'user', 'password', 'dbname', 'replica', 'tbname',
'stable', 'stbname', 'query', 'numOfThreads', 'numOfProcesses', 'stable', 'stbname', 'query', 'threads', 'processes',
'recPerReq', 'colsPerRecord', 'numOfTb', 'numOfRec', 'config', 'recPerReq', 'colsPerRecord', 'numOfTb', 'numOfRec', 'config',
'insertOnly', 'outOfOrder', 'rateOOOO', 'deleteMethod', 'insertOnly', 'outOfOrder', 'rateOOOO', 'deleteMethod',
'verbose', 'debug', 'skipPrompt', 'help' 'verbose', 'debug', 'skipPrompt', 'help'
@ -491,42 +498,42 @@ if __name__ == "__main__":
print('Author: Shuduo Sang <sangshuduo@gmail.com>') print('Author: Shuduo Sang <sangshuduo@gmail.com>')
print('') print('')
print('\t-H, --help Show usage.') print('\t-H, --help Show usage.')
print('') print('')
print('\t-N, --native flag, Use native interface if set. Default is using RESTful interface.') print('\t-N, --native flag, Use native interface if set. Default is using RESTful interface.')
print('\t-h, --host <hostname> host, The host to connect to TDengine. Default is localhost.') print('\t-h, --host <hostname> host, The host to connect to TDengine. Default is localhost.')
print('\t-p, --port <port> port, The TCP/IP port number to use for the connection. Default is 0.') print('\t-p, --port <port> port, The TCP/IP port number to use for the connection. Default is 0.')
print('\t-u, --user <username> user, The user name to use when connecting to the server. Default is \'root\'.') print('\t-u, --user <username> user, The user name to use when connecting to the server. Default is \'root\'.')
print('\t-P, --password <password> password, The password to use when connecting to the server. Default is \'taosdata\'.') print('\t-P, --password <password> password, The password to use when connecting to the server. Default is \'taosdata\'.')
print('\t-l, --colsPerRec <number> num_of_columns_per_record, The number of columns per record. Default is 3.') print('\t-l, --colsPerRec <number> num_of_columns_per_record, The number of columns per record. Default is 3.')
print( print(
'\t-d, --dbname <dbname> database, Destination database. Default is \'test\'.') '\t-d, --dbname <dbname> database, Destination database. Default is \'test\'.')
print('\t-a, --replica <replications> replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.') print('\t-a, --replica <replications> replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.')
print( print(
'\t-m, --tbname <table prefix> table_prefix, Table prefix name. Default is \'t\'.') '\t-m, --tbname <table prefix> table_prefix, Table prefix name. Default is \'t\'.')
print( print(
'\t-M, --stable flag, Use super table. Default is no') '\t-M, --stable flag, Use super table. Default is no')
print( print(
'\t-s, --stbname <stable prefix> stable_prefix, STable prefix name. Default is \'st\'') '\t-s, --stbname <stable prefix> stable_prefix, STable prefix name. Default is \'st\'')
print('\t-Q, --query <DEFAULT | command> query, Execute query command. set \'DEFAULT\' means select * from each table') print('\t-Q, --query <DEFAULT | command> query, Execute query command. set \'DEFAULT\' means select * from each table')
print( print(
'\t-T, --numOfThreads <number> num_of_threads, The number of threads. Default is 1.') '\t-T, --threads <number> num_of_threads, The number of threads. Default is 1.')
print( print(
'\t-P, --numOfProcesses <number> num_of_processes, The number of threads. Default is 1.') '\t-C, --processes <number> num_of_processes, The number of threads. Default is 1.')
print('\t-r, --batch <number> num_of_records_per_req, The number of records per request. Default is 1000.') print('\t-r, --batch <number> num_of_records_per_req, The number of records per request. Default is 1000.')
print( print(
'\t-t, --numOfTb <number> num_of_tables, The number of tables. Default is 1.') '\t-t, --numOfTb <number> num_of_tables, The number of tables. Default is 1.')
print('\t-n, --numOfRec <number> num_of_records_per_table, The number of records per table. Default is 1.') print('\t-n, --numOfRec <number> num_of_records_per_table, The number of records per table. Default is 1.')
print('\t-c, --config <path> config_directory, Configuration directory. Default is \'/etc/taos/\'.') print('\t-c, --config <path> config_directory, Configuration directory. Default is \'/etc/taos/\'.')
print('\t-x, --inserOnly flag, Insert only flag.') print('\t-x, --inserOnly flag, Insert only flag.')
print('\t-O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order.') print('\t-O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order.')
print('\t-R, --rateOOOO <number> rate, Out of order data\'s rate--if order=1 Default 10, min: 0, max: 50.') print('\t-R, --rateOOOO <number> rate, Out of order data\'s rate--if order=1 Default 10, min: 0, max: 50.')
print('\t-D, --deleteMethod <number> Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.') print('\t-D, --deleteMethod <number> Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.')
print('\t-v, --verbose Print verbose output') print('\t-v, --verbose Print verbose output')
print('\t-g, --debug Print debug output') print('\t-g, --debug Print debug output')
print( print(
'\t-y, --skipPrompt Skip read key for continous test, default is not skip') '\t-y, --skipPrompt Skip read key for continous test, default is not skip')
print('') print('')
sys.exit(0) sys.exit(0)
@ -574,13 +581,13 @@ if __name__ == "__main__":
if key in ['-Q', '--query']: if key in ['-Q', '--query']:
queryCmd = str(value) queryCmd = str(value)
if key in ['-T', '--numOfThreads']: if key in ['-T', '--threads']:
threads = int(value) threads = int(value)
if threads < 1: if threads < 1:
print("FATAL: number of threads must be larger than 0") print("FATAL: number of threads must be larger than 0")
sys.exit(1) sys.exit(1)
if key in ['-P', '--numOfProcesses']: if key in ['-C', '--processes']:
processes = int(value) processes = int(value)
if processes < 1: if processes < 1:
print("FATAL: number of processes must be larger than 0") print("FATAL: number of processes must be larger than 0")
@ -653,7 +660,6 @@ if __name__ == "__main__":
print("Error: %s" % e.args[0]) print("Error: %s" % e.args[0])
sys.exit(1) sys.exit(1)
if native:
try: try:
cursor = conn.cursor() cursor = conn.cursor()
print("cursor:%d %s" % (id(cursor), str(cursor.__class__))) print("cursor:%d %s" % (id(cursor), str(cursor.__class__)))
@ -717,6 +723,7 @@ if __name__ == "__main__":
sys.exit(0) sys.exit(0)
print("CBD LN755 %d" % numOfTb)
if numOfTb > 0: if numOfTb > 0:
create_tb() create_tb()
insert_data(processes) insert_data(processes)
@ -739,11 +746,17 @@ if __name__ == "__main__":
host, port, user, password, "SELECT COUNT(*) FROM %s%d" % host, port, user, password, "SELECT COUNT(*) FROM %s%d" %
(tbName, j)) (tbName, j))
if queryCmd != "": if queryCmd != "DEFAULT":
print("queryCmd: %s" % queryCmd) print("queryCmd: %s" % queryCmd)
# cursor.close() ##
# conn.close() ## CBD
query_data(queryCmd) query_data(queryCmd)
sys.exit(0) sys.exit(0)
if native:
cursor.close()
conn.close()
print("done") print("done")
if measure: if measure:
end_time = time.time() end_time = time.time()