[TD-2771] feature: python taosdemo. multi process/thread working.

This commit is contained in:
Shuduo Sang 2021-02-06 08:40:38 +00:00
parent 11b1467d5c
commit e5908c5f02
1 changed files with 141 additions and 103 deletions

View File

@ -21,7 +21,7 @@ import json
import random import random
import time import time
import datetime import datetime
from multiprocessing import Process, Pool, Lock from multiprocessing import Manager, Pool, Lock
from multipledispatch import dispatch from multipledispatch import dispatch
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
@ -176,7 +176,6 @@ def insert_data(processes: int):
quotient, quotient,
remainder) remainder)
print("CBD LN210 processes:%d" % processes)
for i in range(processes): for i in range(processes):
begin = end begin = end
@ -274,8 +273,29 @@ def insert_func(process: int, thread: int):
uuid = "%s" % uuid_int uuid = "%s" % uuid_int
v_print("uuid is: %s", uuid) v_print("uuid is: %s", uuid)
# establish connection if native
if native:
v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir)
try:
conn = taos.connect(
host=host,
user=user,
password=password,
config=configDir)
print("conn: %s" % str(conn.__class__))
except Exception as e:
print("Error: %s" % e.args[0])
sys.exit(1)
try:
cursor = conn.cursor()
print("cursor:%d %s" % (id(cursor), str(cursor.__class__)))
except Exception as e:
print("Error: %s" % e.args[0])
sys.exit(1)
v_print("numOfRec %d:", numOfRec) v_print("numOfRec %d:", numOfRec)
if numOfRec > 0:
row = 0 row = 0
while row < numOfRec: while row < numOfRec:
v_print("row: %d", row) v_print("row: %d", row)
@ -293,11 +313,11 @@ def insert_func(process: int, thread: int):
sqlCmd.append("VALUES ") sqlCmd.append("VALUES ")
for batchIter in range(0, batch): for batchIter in range(0, batch):
sqlCmd.append("(now, %f) " % sqlCmd.append("('%s', %f) " %
( (
# start_time + start_time +
# datetime.timedelta( datetime.timedelta(
# milliseconds=batchIter), milliseconds=batchIter),
random.random())) random.random()))
row = row + 1 row = row + 1
if row >= numOfRec: if row >= numOfRec:
@ -309,23 +329,15 @@ def insert_func(process: int, thread: int):
cmd = ' '.join(sqlCmd) cmd = ' '.join(sqlCmd)
print("CBD: LN313")
if measure: if measure:
exec_start_time = datetime.datetime.now() exec_start_time = datetime.datetime.now()
print("CBD: LN316 native: %d" % native)
if native: if native:
print("CBD: LN319: %s" % cmd)
print("conn: %s" % str(conn.__class__))
print("CBD: LN320 cursor:%d %s" % (id(cursor), str(cursor.__class__)))
# cursor.execute("SHOW DATABASES" )
affectedRows = cursor.execute(cmd) affectedRows = cursor.execute(cmd)
print("CBD: LN323 affectedRows:%d" % affectedRows)
else: else:
restful_execute( restful_execute(
host, port, user, password, cmd) host, port, user, password, cmd)
print("CBD: LN327")
if measure: if measure:
exec_end_time = datetime.datetime.now() exec_end_time = datetime.datetime.now()
exec_delta = exec_end_time - exec_start_time exec_delta = exec_end_time - exec_start_time
@ -336,6 +348,10 @@ def insert_func(process: int, thread: int):
v_print("cmd: %s, length:%d", cmd, len(cmd)) v_print("cmd: %s, length:%d", cmd, len(cmd))
if native:
cursor.close()
conn.close()
def create_tb_using_stb(): def create_tb_using_stb():
# TODO: # TODO:
@ -367,11 +383,10 @@ def create_tb():
(tbName, j)) (tbName, j))
def insert_data_process(i_lock, i: int, begin: int, end: int): def insert_data_process(lock, i: int, begin: int, end: int):
print("CBD LN371 insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) lock.acquire()
time.sleep(0.01)
tasks = end - begin tasks = end - begin
i_lock.aquire() v_print("insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks)
if (threads < (end - begin)): if (threads < (end - begin)):
for j in range(begin, end, threads): for j in range(begin, end, threads):
@ -395,7 +410,9 @@ def insert_data_process(i_lock, i: int, begin: int, end: int):
begin, begin,
end)] end)]
wait(workers, return_when=ALL_COMPLETED) wait(workers, return_when=ALL_COMPLETED)
i_lock.release()
lock.release()
def query_db(i): def query_db(i):
if native: if native:
@ -624,6 +641,10 @@ if __name__ == "__main__":
if key in ['-n', '--numOfRec']: if key in ['-n', '--numOfRec']:
numOfRec = int(value) numOfRec = int(value)
v_print("numOfRec is %d", numOfRec) v_print("numOfRec is %d", numOfRec)
if numOfRec < 1:
print("FATAL: number of records must be larger than 0")
sys.exit(1)
if key in ['-c', '--config']: if key in ['-c', '--config']:
configDir = value configDir = value
@ -665,6 +686,7 @@ if __name__ == "__main__":
if not skipPrompt: if not skipPrompt:
input("Press any key to continue..") input("Press any key to continue..")
# establish connection first if native
if native: if native:
v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir)
try: try:
@ -685,8 +707,7 @@ if __name__ == "__main__":
print("Error: %s" % e.args[0]) print("Error: %s" % e.args[0])
sys.exit(1) sys.exit(1)
# drop data only if delete method be set
if deleteMethod > 0: if deleteMethod > 0:
if deleteMethod == 1: if deleteMethod == 1:
drop_tables() drop_tables()
@ -700,69 +721,86 @@ if __name__ == "__main__":
sys.exit(0) sys.exit(0)
# create databases # create databases
if (insertOnly == False):
drop_databases() drop_databases()
create_databases() create_databases()
if measure:
start_time = time.time()
# use last database # use last database
current_db = "%s%d" % (dbName, (numOfDb - 1)) current_db = "%s%d" % (dbName, (numOfDb - 1))
use_database() use_database()
if measure:
start_time_begin = time.time()
if numOfStb > 0: if numOfStb > 0:
create_stb() create_stb()
if (autosubtable == False): if (autosubtable == False):
create_tb_using_stb() create_tb_using_stb()
insert_data(processes)
if verbose:
for i in range(0, numOfDb):
for j in range(0, numOfStb):
if native:
cursor.execute(
"SELECT COUNT(*) FROM %s%d.%s%d" %
(dbName, i, stbName, j,))
else: else:
restful_execute( create_tb()
host, port, user, password, "SELECT COUNT(*) FROM %s%d.%s%d" %
(dbName, i, stbName, j,))
print("done")
if measure: if measure:
end_time = time.time() end_time = time.time()
print( print(
"Total time consumed {} seconds.".format( "Total time consumed {} seconds for create table.".format(
(end_time - start_time))) (end_time - start_time_begin)))
sys.exit(0)
print("CBD LN755 %d" % numOfTb)
if numOfTb > 0:
create_tb()
insert_data(processes)
if debug:
for i in range(0, numOfDb):
query_db(i)
if queryCmd != "NO":
print("queryCmd: %s" % queryCmd)
query_data(queryCmd)
sys.exit(0)
if native: if native:
cursor.close() cursor.close()
conn.close() conn.close()
# start insert data
if measure:
start_time = time.time()
manager = Manager()
lock = manager.Lock()
pool = Pool(processes)
begin = 0
end = 0
quotient = numOfTb // processes
if quotient < 1:
processes = numOfTb
quotient = 1
remainder = numOfTb % processes
v_print(
"num of tables: %d, quotient: %d, remainder: %d",
numOfTb,
quotient,
remainder)
for i in range(processes):
begin = end
if i < remainder:
end = begin + quotient + 1
else:
end = begin + quotient
pool.apply_async(insert_data_process, args=(lock, i, begin, end,))
# pool.apply_async(text, args=(lock, i, begin, end,))
pool.close()
pool.join()
time.sleep(1)
if measure:
end_time = time.time()
print(
"Total time consumed {} seconds for insert data.".format(
(end_time - start_time)))
# query data
if queryCmd != "NO":
print("queryCmd: %s" % queryCmd)
query_data(queryCmd)
if measure: if measure:
end_time = time.time() end_time = time.time()
print( print(
"Total time consumed {} seconds.".format( "Total time consumed {} seconds.".format(
(end_time - start_time))) (end_time - start_time_begin)))
print("done") print("done")