Merge pull request #2667 from taosdata/feature/crash_gen
Feature/crash_gen added REST/NATIVE mixed connector type
This commit is contained in:
commit
dbbc86b2ea
|
@ -84,8 +84,17 @@ class WorkerThread:
|
|||
# Let us have a DB connection of our own
|
||||
if (gConfig.per_thread_db_connection): # type: ignore
|
||||
# print("connector_type = {}".format(gConfig.connector_type))
|
||||
self._dbConn = DbConn.createNative() if (
|
||||
gConfig.connector_type == 'native') else DbConn.createRest()
|
||||
if gConfig.connector_type == 'native':
|
||||
self._dbConn = DbConn.createNative()
|
||||
elif gConfig.connector_type == 'rest':
|
||||
self._dbConn = DbConn.createRest()
|
||||
elif gConfig.connector_type == 'mixed':
|
||||
if Dice.throw(2) == 0: # 1/2 chance
|
||||
self._dbConn = DbConn.createNative()
|
||||
else:
|
||||
self._dbConn = DbConn.createRest()
|
||||
else:
|
||||
raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
|
||||
|
||||
self._dbInUse = False # if "use db" was executed already
|
||||
|
||||
|
@ -130,22 +139,15 @@ class WorkerThread:
|
|||
while True:
|
||||
tc = self._tc # Thread Coordinator, the overall master
|
||||
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
|
||||
logger.debug(
|
||||
"[TRD] Worker thread [{}] exited barrier...".format(
|
||||
self._tid))
|
||||
logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
|
||||
self.crossStepGate() # then per-thread gate, after being tapped
|
||||
logger.debug(
|
||||
"[TRD] Worker thread [{}] exited step gate...".format(
|
||||
self._tid))
|
||||
logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
|
||||
if not self._tc.isRunning():
|
||||
logger.debug(
|
||||
"[TRD] Thread Coordinator not running any more, worker thread now stopping...")
|
||||
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
|
||||
break
|
||||
|
||||
# Fetch a task from the Thread Coordinator
|
||||
logger.debug(
|
||||
"[TRD] Worker thread [{}] about to fetch task".format(
|
||||
self._tid))
|
||||
logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
|
||||
task = tc.fetchTask()
|
||||
|
||||
# Execute such a task
|
||||
|
@ -154,9 +156,7 @@ class WorkerThread:
|
|||
self._tid, task.__class__.__name__))
|
||||
task.execute(self)
|
||||
tc.saveExecutedTask(task)
|
||||
logger.debug(
|
||||
"[TRD] Worker thread [{}] finished executing task".format(
|
||||
self._tid))
|
||||
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
|
||||
|
||||
self._dbInUse = False # there may be changes between steps
|
||||
|
||||
|
@ -255,101 +255,124 @@ class ThreadCoordinator:
|
|||
self._runStatus = MainExec.STATUS_STOPPING
|
||||
self._execStats.registerFailure("User Interruption")
|
||||
|
||||
def _runShouldEnd(self, transitionFailed, hasAbortedTask):
|
||||
maxSteps = gConfig.max_steps # type: ignore
|
||||
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
|
||||
return True
|
||||
if self._runStatus != MainExec.STATUS_RUNNING:
|
||||
return True
|
||||
if transitionFailed:
|
||||
return True
|
||||
if hasAbortedTask:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _hasAbortedTask(self): # from execution of previous step
|
||||
for task in self._executedTasks:
|
||||
if task.isAborted():
|
||||
# print("Task aborted: {}".format(task))
|
||||
# hasAbortedTask = True
|
||||
return True
|
||||
return False
|
||||
|
||||
def _releaseAllWorkerThreads(self, transitionFailed):
|
||||
self._curStep += 1 # we are about to get into next step. TODO: race condition here!
|
||||
# Now not all threads had time to go to sleep
|
||||
logger.debug(
|
||||
"--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))
|
||||
|
||||
# A new TE for the new step
|
||||
self._te = None # set to empty first, to signal worker thread to stop
|
||||
if not transitionFailed: # only if not failed
|
||||
self._te = TaskExecutor(self._curStep)
|
||||
|
||||
logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
|
||||
self._curStep)) # Now not all threads had time to go to sleep
|
||||
# Worker threads will wake up at this point, and each execute it's own task
|
||||
self.tapAllThreads() # release all worker thread from their "gate"
|
||||
|
||||
def _syncAtBarrier(self):
|
||||
# Now main thread (that's us) is ready to enter a step
|
||||
# let other threads go past the pool barrier, but wait at the
|
||||
# thread gate
|
||||
logger.debug("[TRD] Main thread about to cross the barrier")
|
||||
self.crossStepBarrier()
|
||||
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
|
||||
logger.debug("[TRD] Main thread finished crossing the barrier")
|
||||
|
||||
def _doTransition(self):
|
||||
transitionFailed = False
|
||||
try:
|
||||
sm = self._dbManager.getStateMachine()
|
||||
logger.debug("[STT] starting transitions")
|
||||
# at end of step, transiton the DB state
|
||||
sm.transition(self._executedTasks)
|
||||
logger.debug("[STT] transition ended")
|
||||
# Due to limitation (or maybe not) of the Python library,
|
||||
# we cannot share connections across threads
|
||||
if sm.hasDatabase():
|
||||
for t in self._pool.threadList:
|
||||
logger.debug("[DB] use db for all worker threads")
|
||||
t.useDb()
|
||||
# t.execSql("use db") # main thread executing "use
|
||||
# db" on behalf of every worker thread
|
||||
except taos.error.ProgrammingError as err:
|
||||
if (err.msg == 'network unavailable'): # broken DB connection
|
||||
logger.info("DB connection broken, execution failed")
|
||||
traceback.print_stack()
|
||||
transitionFailed = True
|
||||
self._te = None # Not running any more
|
||||
self._execStats.registerFailure("Broken DB Connection")
|
||||
# continue # don't do that, need to tap all threads at
|
||||
# end, and maybe signal them to stop
|
||||
else:
|
||||
raise
|
||||
|
||||
self.resetExecutedTasks() # clear the tasks after we are done
|
||||
# Get ready for next step
|
||||
logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
|
||||
return transitionFailed
|
||||
|
||||
def run(self):
|
||||
self._pool.createAndStartThreads(self)
|
||||
|
||||
# Coordinate all threads step by step
|
||||
self._curStep = -1 # not started yet
|
||||
maxSteps = gConfig.max_steps # type: ignore
|
||||
|
||||
self._execStats.startExec() # start the stop watch
|
||||
transitionFailed = False
|
||||
hasAbortedTask = False
|
||||
while(self._curStep < maxSteps - 1 and
|
||||
(not transitionFailed) and
|
||||
(self._runStatus == MainExec.STATUS_RUNNING) and
|
||||
(not hasAbortedTask)): # maxStep==10, last curStep should be 9
|
||||
|
||||
if not gConfig.debug:
|
||||
# print this only if we are not in debug mode
|
||||
while not self._runShouldEnd(transitionFailed, hasAbortedTask):
|
||||
if not gConfig.debug: # print this only if we are not in debug mode
|
||||
print(".", end="", flush=True)
|
||||
logger.debug("[TRD] Main thread going to sleep")
|
||||
|
||||
# Now main thread (that's us) is ready to enter a step
|
||||
# let other threads go past the pool barrier, but wait at the
|
||||
# thread gate
|
||||
self.crossStepBarrier()
|
||||
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
|
||||
|
||||
self._syncAtBarrier() # For now just cross the barrier
|
||||
|
||||
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
|
||||
# We use this period to do house keeping work, when all worker
|
||||
# threads are QUIET.
|
||||
hasAbortedTask = False
|
||||
for task in self._executedTasks:
|
||||
if task.isAborted():
|
||||
print("Task aborted: {}".format(task))
|
||||
hasAbortedTask = True
|
||||
break
|
||||
|
||||
if hasAbortedTask: # do transition only if tasks are error free
|
||||
hasAbortedTask = self._hasAbortedTask() # from previous step
|
||||
if hasAbortedTask:
|
||||
logger.info("Aborted task encountered, exiting test program")
|
||||
self._execStats.registerFailure("Aborted Task Encountered")
|
||||
else:
|
||||
try:
|
||||
sm = self._dbManager.getStateMachine()
|
||||
logger.debug("[STT] starting transitions")
|
||||
# at end of step, transiton the DB state
|
||||
sm.transition(self._executedTasks)
|
||||
logger.debug("[STT] transition ended")
|
||||
# Due to limitation (or maybe not) of the Python library,
|
||||
# we cannot share connections across threads
|
||||
if sm.hasDatabase():
|
||||
for t in self._pool.threadList:
|
||||
logger.debug("[DB] use db for all worker threads")
|
||||
t.useDb()
|
||||
# t.execSql("use db") # main thread executing "use
|
||||
# db" on behalf of every worker thread
|
||||
except taos.error.ProgrammingError as err:
|
||||
if (err.msg == 'network unavailable'): # broken DB connection
|
||||
logger.info("DB connection broken, execution failed")
|
||||
traceback.print_stack()
|
||||
transitionFailed = True
|
||||
self._te = None # Not running any more
|
||||
self._execStats.registerFailure("Broken DB Connection")
|
||||
# continue # don't do that, need to tap all threads at
|
||||
# end, and maybe signal them to stop
|
||||
else:
|
||||
raise
|
||||
# finally:
|
||||
# pass
|
||||
break # do transition only if tasks are error free
|
||||
|
||||
self.resetExecutedTasks() # clear the tasks after we are done
|
||||
# Ending previous step
|
||||
transitionFailed = self._doTransition() # To start, we end step -1 first
|
||||
# Then we move on to the next step
|
||||
self._releaseAllWorkerThreads(transitionFailed)
|
||||
|
||||
# Get ready for next step
|
||||
logger.debug("<-- Step {} finished".format(self._curStep))
|
||||
self._curStep += 1 # we are about to get into next step. TODO: race condition here!
|
||||
# Now not all threads had time to go to sleep
|
||||
logger.debug(
|
||||
"\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))
|
||||
if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
|
||||
logger.debug("Abnormal ending of main thraed")
|
||||
else: # regular ending, workers waiting at "barrier"
|
||||
logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
|
||||
self._syncAtBarrier()
|
||||
|
||||
# A new TE for the new step
|
||||
if not transitionFailed: # only if not failed
|
||||
self._te = TaskExecutor(self._curStep)
|
||||
|
||||
logger.debug(
|
||||
"[TRD] Main thread waking up at step {}, tapping worker threads".format(
|
||||
self._curStep)) # Now not all threads had time to go to sleep
|
||||
# Worker threads will wake up at this point, and each execute it's
|
||||
# own task
|
||||
self.tapAllThreads()
|
||||
|
||||
logger.debug("Main thread ready to finish up...")
|
||||
if not transitionFailed: # only in regular situations
|
||||
self.crossStepBarrier() # Cross it one last time, after all threads finish
|
||||
self._stepBarrier.reset()
|
||||
logger.debug("Main thread in exclusive zone...")
|
||||
self._te = None # No more executor, time to end
|
||||
logger.debug("Main thread tapping all threads one last time...")
|
||||
self.tapAllThreads() # Let the threads run one last time
|
||||
self._te = None # No more executor, time to end
|
||||
logger.debug("Main thread tapping all threads one last time...")
|
||||
self.tapAllThreads() # Let the threads run one last time
|
||||
|
||||
logger.debug("\r\n\n--> Main thread ready to finish up...")
|
||||
logger.debug("Main thread joining all threads")
|
||||
self._pool.joinAll() # Get all threads to finish
|
||||
logger.info("\nAll worker threads finished")
|
||||
|
@ -514,7 +537,7 @@ class LinearQueue():
|
|||
|
||||
class DbConn:
|
||||
TYPE_NATIVE = "native-c"
|
||||
TYPE_REST = "rest-api"
|
||||
TYPE_REST = "rest-api"
|
||||
TYPE_INVALID = "invalid"
|
||||
|
||||
@classmethod
|
||||
|
@ -620,9 +643,13 @@ class DbConnRest(DbConn):
|
|||
self.isOpen = False
|
||||
|
||||
def _doSql(self, sql):
|
||||
r = requests.post(self._url,
|
||||
data=sql,
|
||||
auth=HTTPBasicAuth('root', 'taosdata'))
|
||||
try:
|
||||
r = requests.post(self._url,
|
||||
data = sql,
|
||||
auth = HTTPBasicAuth('root', 'taosdata'))
|
||||
except:
|
||||
print("REST API Failure (TODO: more info here)")
|
||||
raise
|
||||
rj = r.json()
|
||||
# Sanity check for the "Json Result"
|
||||
if ('status' not in rj):
|
||||
|
@ -717,7 +744,7 @@ class MyTDSql:
|
|||
class DbConnNative(DbConn):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._type = self.TYPE_REST
|
||||
self._type = self.TYPE_NATIVE
|
||||
self._conn = None
|
||||
self._cursor = None
|
||||
|
||||
|
@ -2254,8 +2281,9 @@ class ClientManager:
|
|||
|
||||
def sigIntHandler(self, signalNumber, frame):
|
||||
if self._status != MainExec.STATUS_RUNNING:
|
||||
print("Ignoring repeated SIGINT...")
|
||||
return # do nothing if it's already not running
|
||||
print("Repeated SIGINT received, forced exit...")
|
||||
# return # do nothing if it's already not running
|
||||
sys.exit(-1)
|
||||
self._status = MainExec.STATUS_STOPPING # immediately set our status
|
||||
|
||||
print("Terminating program...")
|
||||
|
@ -2394,6 +2422,27 @@ def main():
|
|||
|
||||
'''))
|
||||
|
||||
# parser.add_argument('-a', '--auto-start-service', action='store_true',
|
||||
# help='Automatically start/stop the TDengine service (default: false)')
|
||||
# parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
|
||||
# help='Connector type to use: native, rest, or mixed (default: 10)')
|
||||
# parser.add_argument('-d', '--debug', action='store_true',
|
||||
# help='Turn on DEBUG mode for more logging (default: false)')
|
||||
# parser.add_argument('-e', '--run-tdengine', action='store_true',
|
||||
# help='Run TDengine service in foreground (default: false)')
|
||||
# parser.add_argument('-l', '--larger-data', action='store_true',
|
||||
# help='Write larger amount of data during write operations (default: false)')
|
||||
# parser.add_argument('-p', '--per-thread-db-connection', action='store_true',
|
||||
# help='Use a single shared db connection (default: false)')
|
||||
# parser.add_argument('-r', '--record-ops', action='store_true',
|
||||
# help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
|
||||
# parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
|
||||
# help='Maximum number of steps to run (default: 100)')
|
||||
# parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
|
||||
# help='Number of threads to run (default: 10)')
|
||||
# parser.add_argument('-x', '--continue-on-exception', action='store_true',
|
||||
# help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
|
||||
|
||||
parser.add_argument(
|
||||
'-a',
|
||||
'--auto-start-service',
|
||||
|
|
Loading…
Reference in New Issue