From e5908c5f02f31033b815bca6e4d0ec4457247a7a Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 6 Feb 2021 08:40:38 +0000 Subject: [PATCH] [TD-2771] feature: python taosdemo. multi process/thread working. --- tests/examples/python/taosdemo/taosdemo.py | 244 ++++++++++++--------- 1 file changed, 141 insertions(+), 103 deletions(-) diff --git a/tests/examples/python/taosdemo/taosdemo.py b/tests/examples/python/taosdemo/taosdemo.py index 9a6613e685..049ea05558 100755 --- a/tests/examples/python/taosdemo/taosdemo.py +++ b/tests/examples/python/taosdemo/taosdemo.py @@ -21,7 +21,7 @@ import json import random import time import datetime -from multiprocessing import Process, Pool, Lock +from multiprocessing import Manager, Pool, Lock from multipledispatch import dispatch from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED @@ -176,7 +176,6 @@ def insert_data(processes: int): quotient, remainder) - print("CBD LN210 processes:%d" % processes) for i in range(processes): begin = end @@ -274,67 +273,84 @@ def insert_func(process: int, thread: int): uuid = "%s" % uuid_int v_print("uuid is: %s", uuid) + # establish connection if native + if native: + v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) + try: + conn = taos.connect( + host=host, + user=user, + password=password, + config=configDir) + print("conn: %s" % str(conn.__class__)) + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + + try: + cursor = conn.cursor() + print("cursor:%d %s" % (id(cursor), str(cursor.__class__))) + except Exception as e: + print("Error: %s" % e.args[0]) + sys.exit(1) + v_print("numOfRec %d:", numOfRec) - if numOfRec > 0: - row = 0 - while row < numOfRec: - v_print("row: %d", row) - sqlCmd = ['INSERT INTO '] - try: - sqlCmd.append( - "%s.%s%d " % (current_db, tbName, thread)) - if (numOfStb > 0 and autosubtable): - sqlCmd.append("USING %s.%s%d TAGS('%s') " % - (current_db, stbName, numOfStb - 1, uuid)) + row = 0 + while row < numOfRec: + v_print("row: %d", row) + sqlCmd = ['INSERT INTO '] + try: + sqlCmd.append( + "%s.%s%d " % (current_db, tbName, thread)) - start_time = datetime.datetime( - 2021, 1, 25) + datetime.timedelta(seconds=row) + if (numOfStb > 0 and autosubtable): + sqlCmd.append("USING %s.%s%d TAGS('%s') " % + (current_db, stbName, numOfStb - 1, uuid)) - sqlCmd.append("VALUES ") - for batchIter in range(0, batch): - sqlCmd.append("(now, %f) " % - ( -# start_time + -# datetime.timedelta( -# milliseconds=batchIter), - random.random())) - row = row + 1 - if row >= numOfRec: - v_print("BREAK, row: %d numOfRec:%d", row, numOfRec) - break + start_time = datetime.datetime( + 2021, 1, 25) + datetime.timedelta(seconds=row) - except Exception as e: - print("Error: %s" % e.args[0]) + sqlCmd.append("VALUES ") + for batchIter in range(0, batch): + sqlCmd.append("('%s', %f) " % + ( + start_time + + datetime.timedelta( + milliseconds=batchIter), + random.random())) + row = row + 1 + if row >= numOfRec: + v_print("BREAK, row: %d numOfRec:%d", row, numOfRec) + break - cmd = ' '.join(sqlCmd) + except Exception as e: + print("Error: %s" % e.args[0]) - print("CBD: LN313") - if measure: - exec_start_time = datetime.datetime.now() + cmd = ' '.join(sqlCmd) - print("CBD: LN316 native: %d" % native) - if native: - print("CBD: LN319: %s" % cmd) - print("conn: %s" % str(conn.__class__)) - print("CBD: LN320 cursor:%d %s" % (id(cursor), str(cursor.__class__))) -# cursor.execute("SHOW DATABASES" ) - affectedRows = cursor.execute(cmd) - print("CBD: LN323 affectedRows:%d" % affectedRows) - else: - restful_execute( - host, port, user, password, cmd) + if measure: + exec_start_time = datetime.datetime.now() - print("CBD: LN327") - if measure: - exec_end_time = datetime.datetime.now() - exec_delta = exec_end_time - exec_start_time - print( - "%s, %d" % - (time.strftime('%X'), - exec_delta.microseconds)) + if native: + affectedRows = cursor.execute(cmd) + else: + restful_execute( + host, port, user, password, cmd) - v_print("cmd: %s, length:%d", cmd, len(cmd)) + if measure: + exec_end_time = datetime.datetime.now() + exec_delta = exec_end_time - exec_start_time + print( + "%s, %d" % + (time.strftime('%X'), + exec_delta.microseconds)) + + v_print("cmd: %s, length:%d", cmd, len(cmd)) + + if native: + cursor.close() + conn.close() def create_tb_using_stb(): @@ -367,11 +383,10 @@ def create_tb(): (tbName, j)) -def insert_data_process(i_lock, i: int, begin: int, end: int): - print("CBD LN371 insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) - time.sleep(0.01) +def insert_data_process(lock, i: int, begin: int, end: int): + lock.acquire() tasks = end - begin - i_lock.aquire() + v_print("insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) if (threads < (end - begin)): for j in range(begin, end, threads): @@ -395,7 +410,9 @@ def insert_data_process(i_lock, i: int, begin: int, end: int): begin, end)] wait(workers, return_when=ALL_COMPLETED) - i_lock.release() + + lock.release() + def query_db(i): if native: @@ -624,6 +641,10 @@ if __name__ == "__main__": if key in ['-n', '--numOfRec']: numOfRec = int(value) v_print("numOfRec is %d", numOfRec) + if numOfRec < 1: + print("FATAL: number of records must be larger than 0") + sys.exit(1) + if key in ['-c', '--config']: configDir = value @@ -665,6 +686,7 @@ if __name__ == "__main__": if not skipPrompt: input("Press any key to continue..") + # establish connection first if native if native: v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) try: @@ -685,8 +707,7 @@ if __name__ == "__main__": print("Error: %s" % e.args[0]) sys.exit(1) - - + # drop data only if delete method be set if deleteMethod > 0: if deleteMethod == 1: drop_tables() @@ -700,69 +721,86 @@ if __name__ == "__main__": sys.exit(0) # create databases - if (insertOnly == False): - drop_databases() - + drop_databases() create_databases() - if measure: - start_time = time.time() - # use last database current_db = "%s%d" % (dbName, (numOfDb - 1)) use_database() + if measure: + start_time_begin = time.time() + if numOfStb > 0: create_stb() if (autosubtable == False): create_tb_using_stb() - - insert_data(processes) - - if verbose: - for i in range(0, numOfDb): - for j in range(0, numOfStb): - if native: - cursor.execute( - "SELECT COUNT(*) FROM %s%d.%s%d" % - (dbName, i, stbName, j,)) - else: - restful_execute( - host, port, user, password, "SELECT COUNT(*) FROM %s%d.%s%d" % - (dbName, i, stbName, j,)) - - print("done") - - if measure: - end_time = time.time() - print( - "Total time consumed {} seconds.".format( - (end_time - start_time))) - - sys.exit(0) - - print("CBD LN755 %d" % numOfTb) - if numOfTb > 0: + else: create_tb() - insert_data(processes) - if debug: - for i in range(0, numOfDb): - query_db(i) - - if queryCmd != "NO": - print("queryCmd: %s" % queryCmd) - query_data(queryCmd) - sys.exit(0) + if measure: + end_time = time.time() + print( + "Total time consumed {} seconds for create table.".format( + (end_time - start_time_begin))) if native: cursor.close() conn.close() + # start insert data + if measure: + start_time = time.time() + + manager = Manager() + lock = manager.Lock() + pool = Pool(processes) + + begin = 0 + end = 0 + + quotient = numOfTb // processes + if quotient < 1: + processes = numOfTb + quotient = 1 + + remainder = numOfTb % processes + v_print( + "num of tables: %d, quotient: %d, remainder: %d", + numOfTb, + quotient, + remainder) + + for i in range(processes): + begin = end + + if i < remainder: + end = begin + quotient + 1 + else: + end = begin + quotient + pool.apply_async(insert_data_process, args=(lock, i, begin, end,)) +# pool.apply_async(text, args=(lock, i, begin, end,)) + + pool.close() + pool.join() + time.sleep(1) + + if measure: + end_time = time.time() + print( + "Total time consumed {} seconds for insert data.".format( + (end_time - start_time))) + + + # query data + if queryCmd != "NO": + print("queryCmd: %s" % queryCmd) + query_data(queryCmd) + if measure: end_time = time.time() print( "Total time consumed {} seconds.".format( - (end_time - start_time))) + (end_time - start_time_begin))) print("done")