fix autopep8 format.

This commit is contained in:
Shuduo Sang 2020-07-08 18:28:17 +08:00
parent 2d02ee85a8
commit 4efd476268
1 changed files with 160 additions and 104 deletions

View File

@ -889,7 +889,7 @@ class StateEmpty(AnyState):
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
if (self.hasSuccess(tasks, TaskCreateDb) if (self.hasSuccess(tasks, TaskCreateDb)
): # at EMPTY, if there's succes in creating DB ): # at EMPTY, if there's succes in creating DB
if (not self.hasTask(tasks, TaskDropDb)): # and no drop_db tasks if (not self.hasTask(tasks, TaskDropDb)): # and no drop_db tasks
# we must have at most one. TODO: compare numbers # we must have at most one. TODO: compare numbers
self.assertAtMostOneSuccess(tasks, TaskCreateDb) self.assertAtMostOneSuccess(tasks, TaskCreateDb)
@ -944,7 +944,7 @@ class StateSuperTableOnly(AnyState):
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
if (self.hasSuccess(tasks, TaskDropSuperTable) if (self.hasSuccess(tasks, TaskDropSuperTable)
): # we are able to drop the table ): # we are able to drop the table
#self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
# we must have had recreted it # we must have had recreted it
self.hasSuccess(tasks, TaskCreateSuperTable) self.hasSuccess(tasks, TaskCreateSuperTable)
@ -978,7 +978,7 @@ class StateHasData(AnyState):
self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy
elif (newState.equals(AnyState.STATE_DB_ONLY)): # in DB only elif (newState.equals(AnyState.STATE_DB_ONLY)): # in DB only
if (not self.hasTask(tasks, TaskCreateDb) if (not self.hasTask(tasks, TaskCreateDb)
): # without a create_db task ): # without a create_db task
# we must have drop_db task # we must have drop_db task
self.assertNoTask(tasks, TaskDropDb) self.assertNoTask(tasks, TaskDropDb)
self.hasSuccess(tasks, TaskDropSuperTable) self.hasSuccess(tasks, TaskDropSuperTable)
@ -990,11 +990,11 @@ class StateHasData(AnyState):
# self.hasSuccess(tasks, DeleteDataTasks) # self.hasSuccess(tasks, DeleteDataTasks)
else: # should be STATE_HAS_DATA else: # should be STATE_HAS_DATA
if (not self.hasTask(tasks, TaskCreateDb) if (not self.hasTask(tasks, TaskCreateDb)
): # only if we didn't create one ): # only if we didn't create one
# we shouldn't have dropped it # we shouldn't have dropped it
self.assertNoTask(tasks, TaskDropDb) self.assertNoTask(tasks, TaskDropDb)
if (not self.hasTask(tasks, TaskCreateSuperTable) if (not self.hasTask(tasks, TaskCreateSuperTable)
): # if we didn't create the table ): # if we didn't create the table
# we should not have a task that drops it # we should not have a task that drops it
self.assertNoTask(tasks, TaskDropSuperTable) self.assertNoTask(tasks, TaskDropSuperTable)
# self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
@ -1385,15 +1385,18 @@ class Task():
try: try:
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme errno2 = err.errno if (
if ( errno2 in [ err.errno > 0) else 0x80000000 + err.errno # correct error scheme
0x05, # TSDB_CODE_RPC_NOT_READY if (errno2 in [
0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x05, # TSDB_CODE_RPC_NOT_READY
0x510, # vnode not in ready state 0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503,
0x510, # vnode not in ready state
0x600, 0x600,
1000 # REST catch-all error 1000 # REST catch-all error
]) : # allowed errors ]): # allowed errors
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) self.logDebug(
"[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, self._lastSql))
print("_", end="", flush=True) print("_", end="", flush=True)
self._err = err self._err = err
else: else:
@ -1862,7 +1865,8 @@ class MyLoggingAdapter(logging.LoggerAdapter):
return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs
class SvcManager:
class SvcManager:
MAX_QUEUE_SIZE = 10000 MAX_QUEUE_SIZE = 10000
def __init__(self): def __init__(self):
@ -1873,35 +1877,39 @@ class SvcManager:
self.ioThread = None self.ioThread = None
self.subProcess = None self.subProcess = None
self.shouldStop = False self.shouldStop = False
# self.status = MainExec.STATUS_RUNNING # set inside _startTaosService() # self.status = MainExec.STATUS_RUNNING # set inside
# _startTaosService()
def svcOutputReader(self, out: IO, queue): def svcOutputReader(self, out: IO, queue):
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python # Important Reference:
# https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
print("This is the svcOutput Reader...") print("This is the svcOutput Reader...")
# for line in out : # for line in out :
for line in iter(out.readline, b''): for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line)) # print("Finished reading a line: {}".format(line))
# print("Adding item to queue...") # print("Adding item to queue...")
line = line.decode("utf-8").rstrip() line = line.decode("utf-8").rstrip()
queue.put(line) # This might block, and then causing "out" buffer to block # This might block, and then causing "out" buffer to block
print("_i", end="", flush=True) queue.put(line)
print("_i", end="", flush=True)
# Trim the queue if necessary # Trim the queue if necessary
oneTenthQSize = self.MAX_QUEUE_SIZE // 10 oneTenthQSize = self.MAX_QUEUE_SIZE // 10
if (queue.qsize() >= (self.MAX_QUEUE_SIZE - oneTenthQSize) ) : # 90% full? if (queue.qsize() >= (self.MAX_QUEUE_SIZE - oneTenthQSize)): # 90% full?
print("Triming IPC queue by: {}".format(oneTenthQSize)) print("Triming IPC queue by: {}".format(oneTenthQSize))
for i in range(0, oneTenthQSize) : for i in range(0, oneTenthQSize):
try: try:
queue.get_nowait() queue.get_nowait()
except Empty: except Empty:
break # break out of for loop, no more trimming break # break out of for loop, no more trimming
if self.shouldStop : if self.shouldStop:
print("Stopping to read output from sub process") print("Stopping to read output from sub process")
break break
# queue.put(line) # queue.put(line)
print("\nNo more output (most likely) from IO thread managing TDengine service") # meaning sub process must have died # meaning sub process must have died
print("\nNo more output (most likely) from IO thread managing TDengine service")
out.close() out.close()
def _doMenu(self): def _doMenu(self):
@ -1912,30 +1920,32 @@ class SvcManager:
print("2: Terminate") print("2: Terminate")
print("3: Restart") print("3: Restart")
# Remember to update the if range below # Remember to update the if range below
# print("Enter Choice: ", end="", flush=True) # print("Enter Choice: ", end="", flush=True)
while choice == "": while choice == "":
choice = input("Enter Choice: ") choice = input("Enter Choice: ")
if choice != "": if choice != "":
break # done with reading repeated input break # done with reading repeated input
if choice in ["1", "2", "3"]: if choice in ["1", "2", "3"]:
break # we are done with whole method break # we are done with whole method
print("Invalid choice, please try again.") print("Invalid choice, please try again.")
choice = "" # reset choice = "" # reset
return choice return choice
def sigUsrHandler(self, signalNumber, frame) : def sigUsrHandler(self, signalNumber, frame):
print("Interrupting main thread execution upon SIGUSR1") print("Interrupting main thread execution upon SIGUSR1")
if self.status != MainExec.STATUS_RUNNING : if self.status != MainExec.STATUS_RUNNING:
print("Ignoring repeated SIG...") print("Ignoring repeated SIG...")
return # do nothing if it's already not running return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING self.status = MainExec.STATUS_STOPPING
choice = self._doMenu() choice = self._doMenu()
if choice == "1" : if choice == "1":
self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue? # TODO: can the sub-process be blocked due to us not reading from
elif choice == "2" : # queue?
self.sigHandlerResume()
elif choice == "2":
self.stopTaosService() self.stopTaosService()
elif choice == "3" : elif choice == "3":
self.stopTaosService() self.stopTaosService()
self.startTaosService() self.startTaosService()
else: else:
@ -1943,59 +1953,62 @@ class SvcManager:
def sigIntHandler(self, signalNumber, frame): def sigIntHandler(self, signalNumber, frame):
print("Sig INT Handler starting...") print("Sig INT Handler starting...")
if self.status != MainExec.STATUS_RUNNING : if self.status != MainExec.STATUS_RUNNING:
print("Ignoring repeated SIG_INT...") print("Ignoring repeated SIG_INT...")
return return
self.status = MainExec.STATUS_STOPPING # immediately set our status self.status = MainExec.STATUS_STOPPING # immediately set our status
self.stopTaosService() self.stopTaosService()
print("INT signal handler returning...") print("INT signal handler returning...")
def sigHandlerResume(self) : def sigHandlerResume(self):
print("Resuming TDengine service manager thread (main thread)...\n\n") print("Resuming TDengine service manager thread (main thread)...\n\n")
self.status = MainExec.STATUS_RUNNING self.status = MainExec.STATUS_RUNNING
def joinIoThread(self): def joinIoThread(self):
if self.ioThread: if self.ioThread:
self.ioThread.join() self.ioThread.join()
self.ioThread = None self.ioThread = None
else : else:
print("Joining empty thread, doing nothing") print("Joining empty thread, doing nothing")
TD_READY_MSG = "TDengine is initialized successfully" TD_READY_MSG = "TDengine is initialized successfully"
def _procIpcBatch(self): def _procIpcBatch(self):
# Process all the output generated by the underlying sub process, managed by IO thread # Process all the output generated by the underlying sub process,
while True : # managed by IO thread
try: while True:
line = self.ipcQueue.get_nowait() # getting output at fast speed try:
print("_o", end="", flush=True) line = self.ipcQueue.get_nowait() # getting output at fast speed
if self.status == MainExec.STATUS_STARTING : # we are starting, let's see if we have started print("_o", end="", flush=True)
if line.find(self.TD_READY_MSG) != -1 : # found if self.status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1: # found
self.status = MainExec.STATUS_RUNNING self.status = MainExec.STATUS_RUNNING
except Empty: except Empty:
# time.sleep(2.3) # wait only if there's no output # time.sleep(2.3) # wait only if there's no output
# no more output # no more output
return # we are done with THIS BATCH return # we are done with THIS BATCH
else: # got line else: # got line
print(line) print(line)
def _procIpcAll(self): def _procIpcAll(self):
while True : while True:
print("<", end="", flush=True) print("<", end="", flush=True)
self._procIpcBatch() # process one batch self._procIpcBatch() # process one batch
# check if the ioThread is still running # check if the ioThread is still running
if (not self.ioThread) or (not self.ioThread.is_alive()): if (not self.ioThread) or (not self.ioThread.is_alive()):
print("IO Thread (with subprocess) has ended, main thread now exiting...") print(
"IO Thread (with subprocess) has ended, main thread now exiting...")
self.stopTaosService() self.stopTaosService()
self._procIpcBatch() # one more batch self._procIpcBatch() # one more batch
return # TODO: maybe one last batch? return # TODO: maybe one last batch?
# Maybe handler says we should exit now # Maybe handler says we should exit now
if self.shouldStop: if self.shouldStop:
print("Main thread ending all IPC processing with IOThread/SubProcess") print("Main thread ending all IPC processing with IOThread/SubProcess")
self._procIpcBatch() # one more batch self._procIpcBatch() # one more batch
return return
print(">", end="", flush=True) print(">", end="", flush=True)
@ -2024,50 +2037,58 @@ class SvcManager:
svcCmd = [taosdPath, '-c', cfgPath] svcCmd = [taosdPath, '-c', cfgPath]
# svcCmd = ['vmstat', '1'] # svcCmd = ['vmstat', '1']
if self.subProcess : # already there if self.subProcess: # already there
raise RuntimeError("Corrupt process state") raise RuntimeError("Corrupt process state")
self.subProcess = subprocess.Popen( self.subProcess = subprocess.Popen(
svcCmd, svcCmd,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
# bufsize=1, # not supported in binary mode # bufsize=1, # not supported in binary mode
close_fds=ON_POSIX) # had text=True, which interferred with reading EOF close_fds=ON_POSIX) # had text=True, which interferred with reading EOF
self.ipcQueue = Queue() self.ipcQueue = Queue()
if self.ioThread : if self.ioThread:
raise RuntimeError("Corrupt thread state") raise RuntimeError("Corrupt thread state")
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, self.ipcQueue)) self.ioThread = threading.Thread(
self.ioThread.daemon = True # thread dies with the program target=self.svcOutputReader, args=(
self.subProcess.stdout, self.ipcQueue))
self.ioThread.daemon = True # thread dies with the program
self.ioThread.start() self.ioThread.start()
self.shouldStop = False # don't let the main loop stop self.shouldStop = False # don't let the main loop stop
self.status = MainExec.STATUS_STARTING self.status = MainExec.STATUS_STARTING
# wait for service to start # wait for service to start
for i in range(0, 10) : for i in range(0, 10):
time.sleep(1.0) time.sleep(1.0)
self._procIpcBatch() # pump messages self._procIpcBatch() # pump messages
print("_zz_", end="", flush=True) print("_zz_", end="", flush=True)
if self.status == MainExec.STATUS_RUNNING : if self.status == MainExec.STATUS_RUNNING:
print("TDengine service READY to process requests") print("TDengine service READY to process requests")
return # now we've started return # now we've started
raise RuntimeError("TDengine service did not start successfully") # TODO: handle this better? # TODO: handle this better?
raise RuntimeError("TDengine service did not start successfully")
def stopTaosService(self): def stopTaosService(self):
# can be called from both main thread or signal handler # can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...") print("Terminating TDengine service running as the sub process...")
# Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes # Linux will send Control-C generated SIGINT to the TDengine process
if not self.subProcess : # already, ref:
# https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self.subProcess:
print("Process already stopped") print("Process already stopped")
return return
retCode = self.subProcess.poll() retCode = self.subProcess.poll()
if retCode : # valid return code, process ended if retCode: # valid return code, process ended
self.subProcess = None self.subProcess = None
else: # process still alive, let's interrupt it else: # process still alive, let's interrupt it
print("Sub process still running, sending SIG_INT and waiting for it to stop...") print(
self.subProcess.send_signal(signal.SIGINT) # sub process should end, then IPC queue should end, causing IO thread to end "Sub process still running, sending SIG_INT and waiting for it to stop...")
try : # sub process should end, then IPC queue should end, causing IO
# thread to end
self.subProcess.send_signal(signal.SIGINT)
try:
self.subProcess.wait(10) self.subProcess.wait(10)
except subprocess.TimeoutExpired as err: except subprocess.TimeoutExpired as err:
print("Time out waiting for TDengine service process to exit") print("Time out waiting for TDengine service process to exit")
@ -2076,15 +2097,17 @@ class SvcManager:
self.subProcess = None self.subProcess = None
if self.subProcess and (not self.subProcess.poll()): if self.subProcess and (not self.subProcess.poll()):
print("Sub process is still running... pid = {}".format(self.subProcess.pid)) print(
"Sub process is still running... pid = {}".format(
self.subProcess.pid))
self.shouldStop = True self.shouldStop = True
self.joinIoThread() self.joinIoThread()
def run(self): def run(self):
self.startTaosService() self.startTaosService()
# proc = subprocess.Popen(['echo', '"to stdout"'], # proc = subprocess.Popen(['echo', '"to stdout"'],
# stdout=subprocess.PIPE, # stdout=subprocess.PIPE,
# ) # )
# stdout_value = proc.communicate()[0] # stdout_value = proc.communicate()[0]
@ -2093,7 +2116,7 @@ class SvcManager:
self._procIpcAll() self._procIpcAll()
print("End of loop reading from IPC queue") print("End of loop reading from IPC queue")
self.joinIoThread() # should have started already self.joinIoThread() # should have started already
print("SvcManager Run Finished") print("SvcManager Run Finished")
@ -2148,7 +2171,7 @@ class ClientManager:
self._printLastNumbers() self._printLastNumbers()
def run(self): def run(self):
if gConfig.auto_start_service : if gConfig.auto_start_service:
svcMgr = SvcManager() svcMgr = SvcManager()
svcMgr.startTaosService() svcMgr.startTaosService()
@ -2163,7 +2186,7 @@ class ClientManager:
# print("exec stats: {}".format(self.tc.getExecStats())) # print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed())) # print("TC failed = {}".format(self.tc.isFailed()))
self.conclude() self.conclude()
if gConfig.auto_start_service : if gConfig.auto_start_service:
svcMgr.stopTaosService() svcMgr.stopTaosService()
# print("TC failed (2) = {}".format(self.tc.isFailed())) # print("TC failed (2) = {}".format(self.tc.isFailed()))
# Linux return code: ref https://shapeshed.com/unix-exit-codes/ # Linux return code: ref https://shapeshed.com/unix-exit-codes/
@ -2248,24 +2271,57 @@ def main():
''')) '''))
parser.add_argument('-a', '--auto-start-service', action='store_true', parser.add_argument(
help='Automatically start/stop the TDengine service (default: false)') '-a',
parser.add_argument('-c', '--connector-type', action='store', default='native', type=str, '--auto-start-service',
help='Connector type to use: native, rest, or mixed (default: 10)') action='store_true',
parser.add_argument('-d', '--debug', action='store_true', help='Automatically start/stop the TDengine service (default: false)')
help='Turn on DEBUG mode for more logging (default: false)') parser.add_argument(
parser.add_argument('-e', '--run-tdengine', action='store_true', '-c',
help='Run TDengine service in foreground (default: false)') '--connector-type',
parser.add_argument('-l', '--larger-data', action='store_true', action='store',
help='Write larger amount of data during write operations (default: false)') default='native',
parser.add_argument('-p', '--per-thread-db-connection', action='store_true', type=str,
help='Use a single shared db connection (default: false)') help='Connector type to use: native, rest, or mixed (default: 10)')
parser.add_argument('-r', '--record-ops', action='store_true', parser.add_argument(
help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') '-d',
parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, '--debug',
help='Maximum number of steps to run (default: 100)') action='store_true',
parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, help='Turn on DEBUG mode for more logging (default: false)')
help='Number of threads to run (default: 10)') parser.add_argument(
'-e',
'--run-tdengine',
action='store_true',
help='Run TDengine service in foreground (default: false)')
parser.add_argument(
'-l',
'--larger-data',
action='store_true',
help='Write larger amount of data during write operations (default: false)')
parser.add_argument(
'-p',
'--per-thread-db-connection',
action='store_true',
help='Use a single shared db connection (default: false)')
parser.add_argument(
'-r',
'--record-ops',
action='store_true',
help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
parser.add_argument(
'-s',
'--max-steps',
action='store',
default=1000,
type=int,
help='Maximum number of steps to run (default: 100)')
parser.add_argument(
'-t',
'--num-threads',
action='store',
default=5,
type=int,
help='Number of threads to run (default: 10)')
global gConfig global gConfig
gConfig = parser.parse_args() gConfig = parser.parse_args()