Now supporting CTRL-C abort in Crash_Gen, plus tracking top numbers
This commit is contained in:
parent
88ac7bb6df
commit
a519389d38
|
@ -15,6 +15,8 @@ from __future__ import annotations # For type hinting before definition, ref: h
|
|||
|
||||
import sys
|
||||
import os
|
||||
import io
|
||||
import signal
|
||||
import traceback
|
||||
# Require Python 3
|
||||
if sys.version_info[0] < 3:
|
||||
|
@ -36,6 +38,8 @@ from requests.auth import HTTPBasicAuth
|
|||
from typing import List
|
||||
from typing import Dict
|
||||
from typing import Set
|
||||
from typing import IO
|
||||
from queue import Queue, Empty
|
||||
|
||||
from util.log import *
|
||||
from util.dnodes import *
|
||||
|
@ -205,6 +209,7 @@ class WorkerThread:
|
|||
# else:
|
||||
# return self._tc.getDbState().getDbConn().query(sql)
|
||||
|
||||
# The coordinator of all worker threads, mostly running in main thread
|
||||
class ThreadCoordinator:
|
||||
def __init__(self, pool: ThreadPool, dbManager):
|
||||
self._curStep = -1 # first step is 0
|
||||
|
@ -217,6 +222,7 @@ class ThreadCoordinator:
|
|||
|
||||
self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
|
||||
self._execStats = ExecutionStats()
|
||||
self._runStatus = MainExec.STATUS_RUNNING
|
||||
|
||||
def getTaskExecutor(self):
|
||||
return self._te
|
||||
|
@ -227,6 +233,10 @@ class ThreadCoordinator:
|
|||
def crossStepBarrier(self):
|
||||
self._stepBarrier.wait()
|
||||
|
||||
def requestToStop(self):
|
||||
self._runStatus = MainExec.STATUS_STOPPING
|
||||
self._execStats.registerFailure("User Interruption")
|
||||
|
||||
def run(self):
|
||||
self._pool.createAndStartThreads(self)
|
||||
|
||||
|
@ -234,41 +244,56 @@ class ThreadCoordinator:
|
|||
self._curStep = -1 # not started yet
|
||||
maxSteps = gConfig.max_steps # type: ignore
|
||||
self._execStats.startExec() # start the stop watch
|
||||
failed = False
|
||||
while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9
|
||||
transitionFailed = False
|
||||
hasAbortedTask = False
|
||||
while(self._curStep < maxSteps-1 and
|
||||
(not transitionFailed) and
|
||||
(self._runStatus==MainExec.STATUS_RUNNING) and
|
||||
(not hasAbortedTask)): # maxStep==10, last curStep should be 9
|
||||
|
||||
if not gConfig.debug:
|
||||
print(".", end="", flush=True) # print this only if we are not in debug mode
|
||||
logger.debug("[TRD] Main thread going to sleep")
|
||||
|
||||
# Now ready to enter a step
|
||||
# Now main thread (that's us) is ready to enter a step
|
||||
self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
|
||||
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
|
||||
|
||||
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
|
||||
try:
|
||||
sm = self._dbManager.getStateMachine()
|
||||
logger.debug("[STT] starting transitions")
|
||||
sm.transition(self._executedTasks) # at end of step, transiton the DB state
|
||||
logger.debug("[STT] transition ended")
|
||||
# Due to limitation (or maybe not) of the Python library, we cannot share connections across threads
|
||||
if sm.hasDatabase() :
|
||||
for t in self._pool.threadList:
|
||||
logger.debug("[DB] use db for all worker threads")
|
||||
t.useDb()
|
||||
# t.execSql("use db") # main thread executing "use db" on behalf of every worker thread
|
||||
# We use this period to do house keeping work, when all worker threads are QUIET.
|
||||
hasAbortedTask = False
|
||||
for task in self._executedTasks :
|
||||
if task.isAborted() :
|
||||
print("Task aborted: {}".format(task))
|
||||
hasAbortedTask = True
|
||||
break
|
||||
|
||||
except taos.error.ProgrammingError as err:
|
||||
if ( err.msg == 'network unavailable' ): # broken DB connection
|
||||
logger.info("DB connection broken, execution failed")
|
||||
traceback.print_stack()
|
||||
failed = True
|
||||
self._te = None # Not running any more
|
||||
self._execStats.registerFailure("Broken DB Connection")
|
||||
# continue # don't do that, need to tap all threads at end, and maybe signal them to stop
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
pass
|
||||
if hasAbortedTask : # do transition only if tasks are error free
|
||||
self._execStats.registerFailure("Aborted Task Encountered")
|
||||
else:
|
||||
try:
|
||||
sm = self._dbManager.getStateMachine()
|
||||
logger.debug("[STT] starting transitions")
|
||||
sm.transition(self._executedTasks) # at end of step, transiton the DB state
|
||||
logger.debug("[STT] transition ended")
|
||||
# Due to limitation (or maybe not) of the Python library, we cannot share connections across threads
|
||||
if sm.hasDatabase() :
|
||||
for t in self._pool.threadList:
|
||||
logger.debug("[DB] use db for all worker threads")
|
||||
t.useDb()
|
||||
# t.execSql("use db") # main thread executing "use db" on behalf of every worker thread
|
||||
except taos.error.ProgrammingError as err:
|
||||
if ( err.msg == 'network unavailable' ): # broken DB connection
|
||||
logger.info("DB connection broken, execution failed")
|
||||
traceback.print_stack()
|
||||
transitionFailed = True
|
||||
self._te = None # Not running any more
|
||||
self._execStats.registerFailure("Broken DB Connection")
|
||||
# continue # don't do that, need to tap all threads at end, and maybe signal them to stop
|
||||
else:
|
||||
raise
|
||||
# finally:
|
||||
# pass
|
||||
|
||||
self.resetExecutedTasks() # clear the tasks after we are done
|
||||
|
||||
|
@ -278,14 +303,14 @@ class ThreadCoordinator:
|
|||
logger.debug("\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
|
||||
|
||||
# A new TE for the new step
|
||||
if not failed: # only if not failed
|
||||
if not transitionFailed: # only if not failed
|
||||
self._te = TaskExecutor(self._curStep)
|
||||
|
||||
logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep
|
||||
self.tapAllThreads()
|
||||
self.tapAllThreads() # Worker threads will wake up at this point, and each execute it's own task
|
||||
|
||||
logger.debug("Main thread ready to finish up...")
|
||||
if not failed: # only in regular situations
|
||||
if not transitionFailed: # only in regular situations
|
||||
self.crossStepBarrier() # Cross it one last time, after all threads finish
|
||||
self._stepBarrier.reset()
|
||||
logger.debug("Main thread in exclusive zone...")
|
||||
|
@ -298,8 +323,8 @@ class ThreadCoordinator:
|
|||
logger.info("\nAll worker threads finished")
|
||||
self._execStats.endExec()
|
||||
|
||||
def logStats(self):
|
||||
self._execStats.logStats()
|
||||
def printStats(self):
|
||||
self._execStats.printStats()
|
||||
|
||||
def tapAllThreads(self): # in a deterministic manner
|
||||
wakeSeq = []
|
||||
|
@ -1061,15 +1086,60 @@ class DbManager():
|
|||
self._dbConn.close()
|
||||
|
||||
class TaskExecutor():
|
||||
class BoundedList:
|
||||
def __init__(self, size = 10):
|
||||
self._size = size
|
||||
self._list = []
|
||||
|
||||
def add(self, n: int) :
|
||||
if not self._list: # empty
|
||||
self._list.append(n)
|
||||
return
|
||||
# now we should insert
|
||||
nItems = len(self._list)
|
||||
insPos = 0
|
||||
for i in range(nItems):
|
||||
insPos = i
|
||||
if n <= self._list[i] : # smaller than this item, time to insert
|
||||
break # found the insertion point
|
||||
insPos += 1 # insert to the right
|
||||
|
||||
if insPos == 0 : # except for the 1st item, # TODO: elimiate first item as gating item
|
||||
return # do nothing
|
||||
|
||||
# print("Inserting at postion {}, value: {}".format(insPos, n))
|
||||
self._list.insert(insPos, n) # insert
|
||||
|
||||
newLen = len(self._list)
|
||||
if newLen <= self._size :
|
||||
return # do nothing
|
||||
elif newLen == (self._size + 1) :
|
||||
del self._list[0] # remove the first item
|
||||
else :
|
||||
raise RuntimeError("Corrupt Bounded List")
|
||||
|
||||
def __str__(self):
|
||||
return repr(self._list)
|
||||
|
||||
_boundedList = BoundedList()
|
||||
|
||||
def __init__(self, curStep):
|
||||
self._curStep = curStep
|
||||
|
||||
@classmethod
|
||||
def getBoundedList(cls):
|
||||
return cls._boundedList
|
||||
|
||||
def getCurStep(self):
|
||||
return self._curStep
|
||||
|
||||
def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
|
||||
task.execute(wt)
|
||||
|
||||
def recordDataMark(self, n: int):
|
||||
# print("[{}]".format(n), end="", flush=True)
|
||||
self._boundedList.add(n)
|
||||
|
||||
# def logInfo(self, msg):
|
||||
# logger.info(" T[{}.x]: ".format(self._curStep) + msg)
|
||||
|
||||
|
@ -1089,6 +1159,7 @@ class Task():
|
|||
self._dbManager = dbManager
|
||||
self._workerThread = None
|
||||
self._err = None
|
||||
self._aborted = False
|
||||
self._curStep = None
|
||||
self._numRows = None # Number of rows affected
|
||||
|
||||
|
@ -1102,6 +1173,9 @@ class Task():
|
|||
def isSuccess(self):
|
||||
return self._err == None
|
||||
|
||||
def isAborted(self):
|
||||
return self._aborted
|
||||
|
||||
def clone(self): # TODO: why do we need this again?
|
||||
newTask = self.__class__(self._dbManager, self._execStats)
|
||||
return newTask
|
||||
|
@ -1143,7 +1217,9 @@ class Task():
|
|||
else: # non-debug
|
||||
print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
|
||||
"----------------------------\n")
|
||||
sys.exit(-1)
|
||||
# sys.exit(-1)
|
||||
self._err = err
|
||||
self._aborted = True
|
||||
except:
|
||||
self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql))
|
||||
raise
|
||||
|
@ -1213,7 +1289,7 @@ class ExecutionStats:
|
|||
self._failed = True
|
||||
self._failureReason = reason
|
||||
|
||||
def logStats(self):
|
||||
def printStats(self):
|
||||
logger.info("----------------------------------------------------------------------")
|
||||
logger.info("| Crash_Gen test {}, with the following stats:".
|
||||
format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
|
||||
|
@ -1228,6 +1304,7 @@ class ExecutionStats:
|
|||
logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
|
||||
logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
|
||||
logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
|
||||
logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
|
||||
logger.info("----------------------------------------------------------------------")
|
||||
|
||||
|
||||
|
@ -1449,6 +1526,8 @@ class TaskAddData(StateTransitionTask):
|
|||
ds.getNextBinary(), ds.getNextFloat(),
|
||||
ds.getNextTick(), nextInt)
|
||||
self.execWtSql(wt, sql)
|
||||
# Successfully wrote the data into the DB, let's record it somehow
|
||||
te.recordDataMark(nextInt)
|
||||
if gConfig.record_ops:
|
||||
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
|
||||
self.fAddLogDone.flush()
|
||||
|
@ -1528,23 +1607,152 @@ class MyLoggingAdapter(logging.LoggerAdapter):
|
|||
return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
|
||||
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
|
||||
|
||||
class MainExec:
|
||||
@classmethod
|
||||
def runClient(cls):
|
||||
# resetDb = False # DEBUG only
|
||||
# dbState = DbState(resetDb) # DBEUG only!
|
||||
class SvcManager:
|
||||
|
||||
def __init__(self):
|
||||
print("Starting service manager")
|
||||
signal.signal(signal.SIGTERM, self.sigIntHandler)
|
||||
signal.signal(signal.SIGINT, self.sigIntHandler)
|
||||
self.ioThread = None
|
||||
self.subProcess = None
|
||||
self.shouldStop = False
|
||||
self.status = MainExec.STATUS_RUNNING
|
||||
|
||||
def svcOutputReader(self, out: IO, queue):
|
||||
# print("This is the svcOutput Reader...")
|
||||
for line in out : # iter(out.readline, b''):
|
||||
# print("Finished reading a line: {}".format(line))
|
||||
queue.put(line.rstrip()) # get rid of new lines
|
||||
print("No more output from incoming IO") # meaning sub process must have died
|
||||
out.close()
|
||||
|
||||
def sigIntHandler(self, signalNumber, frame):
|
||||
if self.status != MainExec.STATUS_RUNNING :
|
||||
print("Ignoring repeated SIGINT...")
|
||||
return # do nothing if it's already not running
|
||||
self.status = MainExec.STATUS_STOPPING # immediately set our status
|
||||
|
||||
print("Terminating program...")
|
||||
self.subProcess.send_signal(signal.SIGINT)
|
||||
self.shouldStop = True
|
||||
self.joinIoThread()
|
||||
|
||||
def joinIoThread(self):
|
||||
if self.ioThread :
|
||||
self.ioThread.join()
|
||||
self.ioThread = None
|
||||
|
||||
def run(self):
|
||||
ON_POSIX = 'posix' in sys.builtin_module_names
|
||||
svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg']
|
||||
# svcCmd = ['vmstat', '1']
|
||||
self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True)
|
||||
q = Queue()
|
||||
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q))
|
||||
self.ioThread.daemon = True # thread dies with the program
|
||||
self.ioThread.start()
|
||||
|
||||
# proc = subprocess.Popen(['echo', '"to stdout"'],
|
||||
# stdout=subprocess.PIPE,
|
||||
# )
|
||||
# stdout_value = proc.communicate()[0]
|
||||
# print('\tstdout: {}'.format(repr(stdout_value)))
|
||||
|
||||
while True :
|
||||
try:
|
||||
line = q.get_nowait() # getting output at fast speed
|
||||
except Empty:
|
||||
# print('no output yet')
|
||||
time.sleep(2.3) # wait only if there's no output
|
||||
else: # got line
|
||||
print(line)
|
||||
# print("----end of iteration----")
|
||||
if self.shouldStop:
|
||||
print("Ending main Svc thread")
|
||||
break
|
||||
|
||||
print("end of loop")
|
||||
|
||||
self.joinIoThread()
|
||||
print("Finished")
|
||||
|
||||
class ClientManager:
|
||||
def __init__(self):
|
||||
print("Starting service manager")
|
||||
signal.signal(signal.SIGTERM, self.sigIntHandler)
|
||||
signal.signal(signal.SIGINT, self.sigIntHandler)
|
||||
|
||||
self.status = MainExec.STATUS_RUNNING
|
||||
self.tc = None
|
||||
|
||||
def sigIntHandler(self, signalNumber, frame):
|
||||
if self.status != MainExec.STATUS_RUNNING :
|
||||
print("Ignoring repeated SIGINT...")
|
||||
return # do nothing if it's already not running
|
||||
self.status = MainExec.STATUS_STOPPING # immediately set our status
|
||||
|
||||
print("Terminating program...")
|
||||
self.tc.requestToStop()
|
||||
|
||||
def _printLastNumbers(self): # to verify data durability
|
||||
dbManager = DbManager(resetDb=False)
|
||||
dbc = dbManager.getDbConn()
|
||||
if dbc.query("show databases") == 0 : # no databae
|
||||
return
|
||||
|
||||
dbc.execute("use db")
|
||||
sTbName = dbManager.getFixedSuperTableName()
|
||||
|
||||
# get all regular tables
|
||||
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
|
||||
rTables = dbc.getQueryResult()
|
||||
|
||||
bList = TaskExecutor.BoundedList()
|
||||
for rTbName in rTables : # regular tables
|
||||
dbc.query("select speed from db.{}".format(rTbName[0]))
|
||||
numbers = dbc.getQueryResult()
|
||||
for row in numbers :
|
||||
# print("<{}>".format(n), end="", flush=True)
|
||||
bList.add(row[0])
|
||||
|
||||
print("Top numbers in DB right now: {}".format(bList))
|
||||
print("TDengine client execution is about to start in 2 seconds...")
|
||||
time.sleep(2.0)
|
||||
dbManager = None # release?
|
||||
|
||||
def prepare(self):
|
||||
self._printLastNumbers()
|
||||
|
||||
def run(self):
|
||||
self._printLastNumbers()
|
||||
|
||||
dbManager = DbManager() # Regular function
|
||||
Dice.seed(0) # initial seeding of dice
|
||||
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
|
||||
tc = ThreadCoordinator(thPool, dbManager)
|
||||
self.tc = ThreadCoordinator(thPool, dbManager)
|
||||
|
||||
tc.run()
|
||||
tc.logStats()
|
||||
dbManager.cleanUp()
|
||||
self.tc.run()
|
||||
self.conclude()
|
||||
|
||||
def conclude(self):
|
||||
self.tc.printStats()
|
||||
self.tc.getDbManager().cleanUp()
|
||||
|
||||
|
||||
class MainExec:
|
||||
STATUS_RUNNING = 1
|
||||
STATUS_STOPPING = 2
|
||||
# STATUS_STOPPED = 3 # Not used yet
|
||||
|
||||
@classmethod
|
||||
def runClient(cls):
|
||||
clientManager = ClientManager()
|
||||
clientManager.run()
|
||||
|
||||
@classmethod
|
||||
def runService(cls):
|
||||
print("Running service...")
|
||||
svcManager = SvcManager()
|
||||
svcManager.run()
|
||||
|
||||
@classmethod
|
||||
def runTemp(cls): # for debugging purposes
|
||||
|
|
Loading…
Reference in New Issue