Switched to actual detection of states, discovered major metadata handling flaw, and is now blocked
This commit is contained in:
parent
27c5c2acbd
commit
5bf9e79871
|
@ -14,6 +14,7 @@
|
|||
from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
|
||||
|
||||
import sys
|
||||
import traceback
|
||||
# Require Python 3
|
||||
if sys.version_info[0] < 3:
|
||||
raise Exception("Must be using Python 3")
|
||||
|
@ -105,10 +106,11 @@ class WorkerThread:
|
|||
while True:
|
||||
tc = self._tc # Thread Coordinator, the overall master
|
||||
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
|
||||
logger.debug("Thread task loop exited barrier...")
|
||||
# logger.debug("Thread task loop exited barrier...")
|
||||
self.crossStepGate() # then per-thread gate, after being tapped
|
||||
logger.debug("Thread task loop exited step gate...")
|
||||
# logger.debug("Thread task loop exited step gate...")
|
||||
if not self._tc.isRunning():
|
||||
logger.debug("Thread Coordinator not running any more, worker thread now stopping...")
|
||||
break
|
||||
|
||||
task = tc.fetchTask()
|
||||
|
@ -143,7 +145,7 @@ class WorkerThread:
|
|||
self.verifyThreadAlive()
|
||||
self.verifyThreadMain() # only allowed for main thread
|
||||
|
||||
logger.debug("Tapping worker thread {}".format(self._tid))
|
||||
# logger.debug("Tapping worker thread {}".format(self._tid))
|
||||
self._stepGate.set() # wake up!
|
||||
time.sleep(0) # let the released thread run a bit
|
||||
|
||||
|
@ -153,11 +155,11 @@ class WorkerThread:
|
|||
else:
|
||||
return self._tc.getDbState().getDbConn().execute(sql)
|
||||
|
||||
def querySql(self, sql): # not "execute", since we are out side the DB context
|
||||
if ( gConfig.per_thread_db_connection ):
|
||||
return self._dbConn.query(sql)
|
||||
else:
|
||||
return self._tc.getDbState().getDbConn().query(sql)
|
||||
# def querySql(self, sql): # not "execute", since we are out side the DB context
|
||||
# if ( gConfig.per_thread_db_connection ):
|
||||
# return self._dbConn.query(sql)
|
||||
# else:
|
||||
# return self._tc.getDbState().getDbConn().query(sql)
|
||||
|
||||
class ThreadCoordinator:
|
||||
def __init__(self, pool, dbState):
|
||||
|
@ -187,8 +189,9 @@ class ThreadCoordinator:
|
|||
# Coordinate all threads step by step
|
||||
self._curStep = -1 # not started yet
|
||||
maxSteps = gConfig.max_steps # type: ignore
|
||||
startTime = time.time()
|
||||
while(self._curStep < maxSteps-1): # maxStep==10, last curStep should be 9
|
||||
self._execStats.startExec() # start the stop watch
|
||||
failed = False
|
||||
while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9
|
||||
print(".", end="", flush=True)
|
||||
logger.debug("Main thread going to sleep")
|
||||
|
||||
|
@ -197,7 +200,21 @@ class ThreadCoordinator:
|
|||
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
|
||||
|
||||
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
|
||||
self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
|
||||
try:
|
||||
self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
|
||||
except taos.error.ProgrammingError as err:
|
||||
if ( err.msg == 'network unavailable' ): # broken DB connection
|
||||
logger.info("DB connection broken, execution failed")
|
||||
traceback.print_stack()
|
||||
failed = True
|
||||
self._te = None # Not running any more
|
||||
self._execStats.registerFailure("Broken DB Connection")
|
||||
# continue # don't do that, need to tap all threads at end, and maybe signal them to stop
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
pass
|
||||
|
||||
self.resetExecutedTasks() # clear the tasks after we are done
|
||||
|
||||
# Get ready for next step
|
||||
|
@ -206,25 +223,28 @@ class ThreadCoordinator:
|
|||
logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
|
||||
|
||||
# A new TE for the new step
|
||||
self._te = TaskExecutor(self._curStep)
|
||||
if not failed: # only if not failed
|
||||
self._te = TaskExecutor(self._curStep)
|
||||
|
||||
logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep
|
||||
self.tapAllThreads()
|
||||
|
||||
logger.debug("Main thread ready to finish up...")
|
||||
self.crossStepBarrier() # Cross it one last time, after all threads finish
|
||||
self._stepBarrier.reset()
|
||||
logger.debug("Main thread in exclusive zone...")
|
||||
self._te = None # No more executor, time to end
|
||||
logger.debug("Main thread tapping all threads one last time...")
|
||||
self.tapAllThreads() # Let the threads run one last time
|
||||
if not failed: # only in regular situations
|
||||
self.crossStepBarrier() # Cross it one last time, after all threads finish
|
||||
self._stepBarrier.reset()
|
||||
logger.debug("Main thread in exclusive zone...")
|
||||
self._te = None # No more executor, time to end
|
||||
logger.debug("Main thread tapping all threads one last time...")
|
||||
self.tapAllThreads() # Let the threads run one last time
|
||||
|
||||
logger.debug("Main thread joining all threads")
|
||||
self._pool.joinAll() # Get all threads to finish
|
||||
|
||||
logger.info("All threads finished")
|
||||
self._execStats.endExec()
|
||||
|
||||
def logStats(self):
|
||||
self._execStats.logStats()
|
||||
logger.info("Total Execution Time (task busy time, plus Python overhead): {:.2f} seconds".format(time.time() - startTime))
|
||||
print("\r\nFinished")
|
||||
|
||||
def tapAllThreads(self): # in a deterministic manner
|
||||
wakeSeq = []
|
||||
|
@ -380,7 +400,7 @@ class DbConn:
|
|||
|
||||
# Get the connection/cursor ready
|
||||
self._cursor.execute('reset query cache')
|
||||
# self._cursor.execute('use db')
|
||||
# self._cursor.execute('use db') # note we do this in _findCurrenState
|
||||
|
||||
# Open connection
|
||||
self._tdSql = TDSql()
|
||||
|
@ -410,216 +430,71 @@ class DbConn:
|
|||
raise RuntimeError("Cannot execute database commands until connection is open")
|
||||
return self._tdSql.execute(sql)
|
||||
|
||||
def query(self, sql) -> int : # return number of rows retrieved
|
||||
def query(self, sql) : # return rows affected
|
||||
if ( not self.isOpen ):
|
||||
raise RuntimeError("Cannot query database until connection is open")
|
||||
return self._tdSql.query(sql)
|
||||
# results are in: return self._tdSql.queryResult
|
||||
|
||||
def _queryAny(self, sql) : # actual query result as an int
|
||||
if ( not self.isOpen ):
|
||||
raise RuntimeError("Cannot query database until connection is open")
|
||||
tSql = self._tdSql
|
||||
nRows = tSql.query(sql)
|
||||
if nRows != 1 :
|
||||
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
|
||||
if tSql.queryRows != 1 or tSql.queryCols != 1:
|
||||
raise RuntimeError("Unexpected result set for query: {}".format(sql))
|
||||
return tSql.queryResult[0][0]
|
||||
|
||||
# State of the database as we believe it to be
|
||||
class DbState():
|
||||
def queryScalar(self, sql) -> int :
|
||||
return self._queryAny(sql)
|
||||
|
||||
def queryString(self, sql) -> str :
|
||||
return self._queryAny(sql)
|
||||
|
||||
class AnyState:
|
||||
STATE_INVALID = -1
|
||||
STATE_EMPTY = 0 # nothing there, no even a DB
|
||||
STATE_DB_ONLY = 1 # we have a DB, but nothing else
|
||||
STATE_TABLE_ONLY = 2 # we have a table, but totally empty
|
||||
STATE_HAS_DATA = 3 # we have some data in the table
|
||||
_stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]
|
||||
|
||||
STATE_VAL_IDX = 0
|
||||
CAN_CREATE_DB = 1
|
||||
CAN_DROP_DB = 2
|
||||
CAN_CREATE_FIXED_TABLE = 3
|
||||
CAN_DROP_FIXED_TABLE = 4
|
||||
CAN_ADD_DATA = 5
|
||||
CAN_READ_DATA = 6
|
||||
|
||||
def __init__(self):
|
||||
self.tableNumQueue = LinearQueue()
|
||||
self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
|
||||
self._lastInt = 0 # next one is initial integer
|
||||
self._lock = threading.RLock()
|
||||
self._info = self.getInfo()
|
||||
|
||||
self._state = self.STATE_INVALID
|
||||
self._stateWeights = [1,3,5,10]
|
||||
|
||||
# self.openDbServerConnection()
|
||||
self._dbConn = DbConn()
|
||||
try:
|
||||
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
|
||||
except taos.error.ProgrammingError as err:
|
||||
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
|
||||
if ( err.msg == 'disconnected' ): # cannot open DB connection
|
||||
print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
|
||||
sys.exit()
|
||||
else:
|
||||
raise
|
||||
except:
|
||||
print("[=]Unexpected exception")
|
||||
raise
|
||||
self._dbConn.resetDb() # drop and recreate DB
|
||||
self._state = self.STATE_EMPTY # initial state, the result of above
|
||||
def __str__(self):
|
||||
return self._stateNames[self._info[self.STATE_VAL_IDX] - 1] # -1 hack to accomodate the STATE_INVALID case
|
||||
|
||||
def getDbConn(self):
|
||||
return self._dbConn
|
||||
def getInfo(self):
|
||||
raise RuntimeError("Must be overriden by child classes")
|
||||
|
||||
def pickAndAllocateTable(self): # pick any table, and "use" it
|
||||
return self.tableNumQueue.pickAndAllocate()
|
||||
def verifyTasksToState(self, tasks, newState):
|
||||
raise RuntimeError("Must be overriden by child classes")
|
||||
|
||||
def addTable(self):
|
||||
with self._lock:
|
||||
tIndex = self.tableNumQueue.push()
|
||||
return tIndex
|
||||
|
||||
def getFixedTableName(self):
|
||||
return "fixed_table"
|
||||
|
||||
def releaseTable(self, i): # return the table back, so others can use it
|
||||
self.tableNumQueue.release(i)
|
||||
|
||||
def getNextTick(self):
|
||||
with self._lock: # prevent duplicate tick
|
||||
self._lastTick += datetime.timedelta(0, 1) # add one second to it
|
||||
return self._lastTick
|
||||
|
||||
def getNextInt(self):
|
||||
with self._lock:
|
||||
self._lastInt += 1
|
||||
return self._lastInt
|
||||
|
||||
def getTableNameToDelete(self):
|
||||
tblNum = self.tableNumQueue.pop() # TODO: race condition!
|
||||
if ( not tblNum ): # maybe false
|
||||
return False
|
||||
|
||||
return "table_{}".format(tblNum)
|
||||
|
||||
def execSql(self, sql): # using the main DB connection
|
||||
return self._dbConn.execute(sql)
|
||||
|
||||
def cleanUp(self):
|
||||
self._dbConn.close()
|
||||
|
||||
def getTaskTypesAtState(self):
|
||||
allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
|
||||
taskTypes = []
|
||||
for tc in allTaskClasses:
|
||||
# t = tc(self) # create task object
|
||||
if tc.canBeginFrom(self._state):
|
||||
taskTypes.append(tc)
|
||||
if len(taskTypes) <= 0:
|
||||
raise RuntimeError("No suitable task types found for state: {}".format(self._state))
|
||||
return taskTypes
|
||||
|
||||
# tasks.append(ReadFixedDataTask(self)) # always for everybody
|
||||
# if ( self._state == self.STATE_EMPTY ):
|
||||
# tasks.append(CreateDbTask(self))
|
||||
# tasks.append(CreateFixedTableTask(self))
|
||||
# elif ( self._state == self.STATE_DB_ONLY ):
|
||||
# tasks.append(DropDbTask(self))
|
||||
# tasks.append(CreateFixedTableTask(self))
|
||||
# tasks.append(AddFixedDataTask(self))
|
||||
# elif ( self._state == self.STATE_TABLE_ONLY ):
|
||||
# tasks.append(DropFixedTableTask(self))
|
||||
# tasks.append(AddFixedDataTask(self))
|
||||
# elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
|
||||
# tasks.append(DropFixedTableTask(self))
|
||||
# tasks.append(AddFixedDataTask(self))
|
||||
# else:
|
||||
# raise RuntimeError("Unexpected DbState state: {}".format(self._state))
|
||||
# return tasks
|
||||
|
||||
def pickTaskType(self):
|
||||
taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state
|
||||
weights = []
|
||||
for tt in taskTypes:
|
||||
endState = tt.getEndState()
|
||||
if endState != None :
|
||||
weights.append(self._stateWeights[endState]) # TODO: change to a method
|
||||
else:
|
||||
weights.append(10) # read data task, default to 10: TODO: change to a constant
|
||||
i = self._weighted_choice_sub(weights)
|
||||
logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
|
||||
return taskTypes[i]
|
||||
|
||||
def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
|
||||
rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
|
||||
for i, w in enumerate(weights):
|
||||
rnd -= w
|
||||
if rnd < 0:
|
||||
return i
|
||||
|
||||
|
||||
|
||||
def transition(self, tasks):
|
||||
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
|
||||
return # do nothing
|
||||
|
||||
self.execSql("show dnodes") # this should show up in the server log, separating steps
|
||||
|
||||
if ( self._state == self.STATE_EMPTY ):
|
||||
# self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table
|
||||
if ( self.hasSuccess(tasks, CreateDbTask) ):
|
||||
self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class
|
||||
self._state = self.STATE_DB_ONLY
|
||||
if ( self.hasSuccess(tasks, CreateFixedTableTask )):
|
||||
self._state = self.STATE_TABLE_ONLY
|
||||
# else: # no successful table creation, not much we can say, as it is step 2
|
||||
else: # did not create db
|
||||
self.assertNoTask(tasks, CreateDbTask) # because we did not have such task
|
||||
# self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task
|
||||
self.assertNoSuccess(tasks, CreateFixedTableTask)
|
||||
|
||||
elif ( self._state == self.STATE_DB_ONLY ):
|
||||
self.assertAtMostOneSuccess(tasks, DropDbTask)
|
||||
self.assertIfExistThenSuccess(tasks, DropDbTask)
|
||||
self.assertAtMostOneSuccess(tasks, CreateFixedTableTask)
|
||||
# Nothing to be said about adding data task
|
||||
if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
|
||||
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
|
||||
self.assertAtMostOneSuccess(tasks, DropDbTask)
|
||||
self._state = self.STATE_EMPTY
|
||||
elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success
|
||||
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
|
||||
self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful
|
||||
self.assertNoTask(tasks, DropDbTask) # should have have tried
|
||||
if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
|
||||
# can't say there's add-data attempts, since they may all fail
|
||||
self._state = self.STATE_TABLE_ONLY
|
||||
else:
|
||||
self._state = self.STATE_HAS_DATA
|
||||
# What about AddFixedData?
|
||||
elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
|
||||
self._state = self.STATE_HAS_DATA
|
||||
else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
|
||||
# raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks,
|
||||
self._state = self.STATE_DB_ONLY # no change
|
||||
|
||||
elif ( self._state == self.STATE_TABLE_ONLY ):
|
||||
if ( self.hasSuccess(tasks, DropFixedTableTask) ): # we are able to drop the table
|
||||
self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
|
||||
self._state = self.STATE_DB_ONLY
|
||||
elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
|
||||
self.assertNoTask(tasks, DropFixedTableTask)
|
||||
self._state = self.STATE_HAS_DATA
|
||||
elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
|
||||
self.assertNoTask(tasks, DropFixedTableTask)
|
||||
self.assertNoTask(tasks, AddFixedDataTask)
|
||||
self._state = self.STATE_TABLE_ONLY # no change
|
||||
else: # did not drop table, did not insert data, did not read successfully, that is impossible
|
||||
raise RuntimeError("Unexpected no-success scenarios")
|
||||
|
||||
elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust
|
||||
if ( self.hasSuccess(tasks, DropFixedTableTask) ):
|
||||
self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
|
||||
self._state = self.STATE_DB_ONLY
|
||||
else: # no success dropping the table, table remains intact in this step
|
||||
self.assertNoTask(tasks, DropFixedTableTask) # we should not have had such a task
|
||||
|
||||
if ( self.hasSuccess(tasks, AddFixedDataTask) ): # added data
|
||||
self._state = self.STATE_HAS_DATA
|
||||
else:
|
||||
self.assertNoTask(tasks, AddFixedDataTask)
|
||||
|
||||
if ( self.hasSuccess(tasks, ReadFixedDataTask) ): # simple able to read some data
|
||||
# which is ok, then no state change
|
||||
self._state = self.STATE_HAS_DATA # no change
|
||||
else: # did not drop table, did not insert data, that is impossible? yeah, we might only had ReadData task
|
||||
raise RuntimeError("Unexpected no-success scenarios")
|
||||
|
||||
else:
|
||||
raise RuntimeError("Unexpected DbState state: {}".format(self._state))
|
||||
logger.debug("New DB state is: {}".format(self._state))
|
||||
def getValue(self):
|
||||
return self._info[self.STATE_VAL_IDX]
|
||||
def canCreateDb(self):
|
||||
return self._info[self.CAN_CREATE_DB]
|
||||
def canDropDb(self):
|
||||
return self._info[self.CAN_DROP_DB]
|
||||
def canCreateFixedTable(self):
|
||||
return self._info[self.CAN_CREATE_FIXED_TABLE]
|
||||
def canDropFixedTable(self):
|
||||
return self._info[self.CAN_DROP_FIXED_TABLE]
|
||||
def canAddData(self):
|
||||
return self._info[self.CAN_ADD_DATA]
|
||||
def canReadData(self):
|
||||
return self._info[self.CAN_READ_DATA]
|
||||
|
||||
def assertAtMostOneSuccess(self, tasks, cls):
|
||||
sCnt = 0
|
||||
|
@ -663,7 +538,274 @@ class DbState():
|
|||
return True
|
||||
return False
|
||||
|
||||
class StateInvalid(AnyState):
|
||||
def getInfo(self):
|
||||
return [
|
||||
self.STATE_INVALID,
|
||||
False, False, # can create/drop Db
|
||||
False, False, # can create/drop fixed table
|
||||
False, False, # can insert/read data with fixed table
|
||||
]
|
||||
|
||||
# def verifyTasksToState(self, tasks, newState):
|
||||
|
||||
class StateEmpty(AnyState):
|
||||
def getInfo(self):
|
||||
return [
|
||||
self.STATE_EMPTY,
|
||||
True, False, # can create/drop Db
|
||||
False, False, # can create/drop fixed table
|
||||
False, False, # can insert/read data with fixed table
|
||||
]
|
||||
|
||||
def verifyTasksToState(self, tasks, newState):
|
||||
if ( self.hasSuccess(tasks, CreateDbTask) ):
|
||||
self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really valid for massively parrallel tasks
|
||||
|
||||
class StateDbOnly(AnyState):
|
||||
def getInfo(self):
|
||||
return [
|
||||
self.STATE_DB_ONLY,
|
||||
False, True,
|
||||
True, False,
|
||||
False, False,
|
||||
]
|
||||
|
||||
def verifyTasksToState(self, tasks, newState):
|
||||
self.assertAtMostOneSuccess(tasks, DropDbTask) # not true in massively parralel cases
|
||||
self.assertIfExistThenSuccess(tasks, DropDbTask)
|
||||
self.assertAtMostOneSuccess(tasks, CreateFixedTableTask)
|
||||
# Nothing to be said about adding data task
|
||||
if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
|
||||
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
|
||||
self.assertAtMostOneSuccess(tasks, DropDbTask)
|
||||
# self._state = self.STATE_EMPTY
|
||||
elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success
|
||||
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
|
||||
self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful
|
||||
self.assertNoTask(tasks, DropDbTask) # should have have tried
|
||||
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
|
||||
# # can't say there's add-data attempts, since they may all fail
|
||||
# self._state = self.STATE_TABLE_ONLY
|
||||
# else:
|
||||
# self._state = self.STATE_HAS_DATA
|
||||
# What about AddFixedData?
|
||||
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
|
||||
# self._state = self.STATE_HAS_DATA
|
||||
# else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
|
||||
# # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks,
|
||||
# self._state = self.STATE_DB_ONLY # no change
|
||||
|
||||
class StateTableOnly(AnyState):
|
||||
def getInfo(self):
|
||||
return [
|
||||
self.STATE_TABLE_ONLY,
|
||||
False, True,
|
||||
False, True,
|
||||
True, True,
|
||||
]
|
||||
|
||||
def verifyTasksToState(self, tasks, newState):
|
||||
if ( self.hasSuccess(tasks, DropFixedTableTask) ): # we are able to drop the table
|
||||
self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
|
||||
# self._state = self.STATE_DB_ONLY
|
||||
elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
|
||||
self.assertNoTask(tasks, DropFixedTableTask)
|
||||
# self._state = self.STATE_HAS_DATA
|
||||
elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
|
||||
self.assertNoTask(tasks, DropFixedTableTask)
|
||||
self.assertNoTask(tasks, AddFixedDataTask)
|
||||
# self._state = self.STATE_TABLE_ONLY # no change
|
||||
else: # did not drop table, did not insert data, did not read successfully, that is impossible
|
||||
raise RuntimeError("Unexpected no-success scenarios")
|
||||
|
||||
class StateHasData(AnyState):
|
||||
def getInfo(self):
|
||||
return [
|
||||
self.STATE_HAS_DATA,
|
||||
False, True,
|
||||
False, True,
|
||||
True, True,
|
||||
]
|
||||
|
||||
def verifyTasksToState(self, tasks, newState):
|
||||
if ( self.hasSuccess(tasks, DropFixedTableTask) ):
|
||||
self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
|
||||
# self._state = self.STATE_DB_ONLY
|
||||
else: # no success dropping the table, table remains intact in this step
|
||||
self.assertNoTask(tasks, DropFixedTableTask) # we should not have had such a task
|
||||
|
||||
if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # added data
|
||||
# self._state = self.STATE_HAS_DATA
|
||||
# else:
|
||||
self.assertNoTask(tasks, AddFixedDataTask)
|
||||
if ( not self.hasSuccess(tasks, ReadFixedDataTask) ): # simple able to read some data
|
||||
# which is ok, then no state change
|
||||
# self._state = self.STATE_HAS_DATA # no change
|
||||
# else: # did not drop table, did not insert data, that is impossible? yeah, we might only had ReadData task
|
||||
raise RuntimeError("Unexpected no-success scenarios")
|
||||
|
||||
# State of the database as we believe it to be
|
||||
class DbState():
|
||||
|
||||
def __init__(self):
|
||||
self.tableNumQueue = LinearQueue()
|
||||
self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
|
||||
self._lastInt = 0 # next one is initial integer
|
||||
self._lock = threading.RLock()
|
||||
|
||||
self._state = StateInvalid() # starting state
|
||||
self._stateWeights = [1,3,5,10]
|
||||
|
||||
# self.openDbServerConnection()
|
||||
self._dbConn = DbConn()
|
||||
try:
|
||||
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
|
||||
except taos.error.ProgrammingError as err:
|
||||
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
|
||||
if ( err.msg == 'disconnected' ): # cannot open DB connection
|
||||
print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
|
||||
sys.exit()
|
||||
else:
|
||||
raise
|
||||
except:
|
||||
print("[=]Unexpected exception")
|
||||
raise
|
||||
self._dbConn.resetDb() # drop and recreate DB
|
||||
self._state = StateEmpty() # initial state, the result of above
|
||||
|
||||
def getDbConn(self):
|
||||
return self._dbConn
|
||||
|
||||
def pickAndAllocateTable(self): # pick any table, and "use" it
|
||||
return self.tableNumQueue.pickAndAllocate()
|
||||
|
||||
def addTable(self):
|
||||
with self._lock:
|
||||
tIndex = self.tableNumQueue.push()
|
||||
return tIndex
|
||||
|
||||
def getFixedTableName(self):
|
||||
return "fixed_table"
|
||||
|
||||
def releaseTable(self, i): # return the table back, so others can use it
|
||||
self.tableNumQueue.release(i)
|
||||
|
||||
def getNextTick(self):
|
||||
with self._lock: # prevent duplicate tick
|
||||
self._lastTick += datetime.timedelta(0, 1) # add one second to it
|
||||
return self._lastTick
|
||||
|
||||
def getNextInt(self):
|
||||
with self._lock:
|
||||
self._lastInt += 1
|
||||
return self._lastInt
|
||||
|
||||
def getTableNameToDelete(self):
|
||||
tblNum = self.tableNumQueue.pop() # TODO: race condition!
|
||||
if ( not tblNum ): # maybe false
|
||||
return False
|
||||
|
||||
return "table_{}".format(tblNum)
|
||||
|
||||
def cleanUp(self):
|
||||
self._dbConn.close()
|
||||
|
||||
def getTaskTypesAtState(self):
|
||||
allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
|
||||
taskTypes = []
|
||||
for tc in allTaskClasses:
|
||||
# t = tc(self) # create task object
|
||||
if tc.canBeginFrom(self._state):
|
||||
taskTypes.append(tc)
|
||||
if len(taskTypes) <= 0:
|
||||
raise RuntimeError("No suitable task types found for state: {}".format(self._state))
|
||||
return taskTypes
|
||||
|
||||
# tasks.append(ReadFixedDataTask(self)) # always for everybody
|
||||
# if ( self._state == self.STATE_EMPTY ):
|
||||
# tasks.append(CreateDbTask(self))
|
||||
# tasks.append(CreateFixedTableTask(self))
|
||||
# elif ( self._state == self.STATE_DB_ONLY ):
|
||||
# tasks.append(DropDbTask(self))
|
||||
# tasks.append(CreateFixedTableTask(self))
|
||||
# tasks.append(AddFixedDataTask(self))
|
||||
# elif ( self._state == self.STATE_TABLE_ONLY ):
|
||||
# tasks.append(DropFixedTableTask(self))
|
||||
# tasks.append(AddFixedDataTask(self))
|
||||
# elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
|
||||
# tasks.append(DropFixedTableTask(self))
|
||||
# tasks.append(AddFixedDataTask(self))
|
||||
# else:
|
||||
# raise RuntimeError("Unexpected DbState state: {}".format(self._state))
|
||||
# return tasks
|
||||
|
||||
def pickTaskType(self):
|
||||
taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state
|
||||
weights = []
|
||||
for tt in taskTypes:
|
||||
endState = tt.getEndState()
|
||||
if endState != None :
|
||||
weights.append(self._stateWeights[endState]) # TODO: change to a method
|
||||
else:
|
||||
weights.append(10) # read data task, default to 10: TODO: change to a constant
|
||||
i = self._weighted_choice_sub(weights)
|
||||
# logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
|
||||
return taskTypes[i]
|
||||
|
||||
def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
|
||||
rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
|
||||
for i, w in enumerate(weights):
|
||||
rnd -= w
|
||||
if rnd < 0:
|
||||
return i
|
||||
|
||||
def _findCurrentState(self):
|
||||
dbc = self._dbConn
|
||||
if dbc.query("show databases") == 0 : # no database?!
|
||||
return StateEmpty()
|
||||
dbc.execute("use db") # did not do this when openning connection
|
||||
if dbc.query("show tables") == 0 : # no tables
|
||||
return StateDbOnly()
|
||||
if dbc.query("SELECT * FROM {}".format(self.getFixedTableName()) ) == 0 : # no data
|
||||
return StateTableOnly()
|
||||
else:
|
||||
return StateHasData()
|
||||
|
||||
def transition(self, tasks):
|
||||
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
|
||||
return # do nothing
|
||||
|
||||
self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
|
||||
|
||||
# Generic Checks, first based on the start state
|
||||
if self._state.canCreateDb():
|
||||
self._state.assertIfExistThenSuccess(tasks, CreateDbTask)
|
||||
# self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops
|
||||
|
||||
if self._state.canDropDb():
|
||||
self._state.assertIfExistThenSuccess(tasks, DropDbTask)
|
||||
# self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop
|
||||
|
||||
# if self._state.canCreateFixedTable():
|
||||
# self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
|
||||
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create
|
||||
|
||||
# if self._state.canDropFixedTable():
|
||||
# self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
|
||||
# self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop
|
||||
|
||||
# if self._state.canAddData():
|
||||
# self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually
|
||||
|
||||
# if self._state.canReadData():
|
||||
# Nothing for sure
|
||||
|
||||
newState = self._findCurrentState()
|
||||
self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks?
|
||||
|
||||
self._state = newState
|
||||
logger.debug("New DB state is: {}".format(self._state))
|
||||
|
||||
class TaskExecutor():
|
||||
def __init__(self, curStep):
|
||||
|
@ -750,8 +892,19 @@ class ExecutionStats:
|
|||
self._tasksInProgress = 0
|
||||
self._lock = threading.Lock()
|
||||
self._firstTaskStartTime = None
|
||||
self._execStartTime = None
|
||||
self._elapsedTime = 0.0 # total elapsed time
|
||||
self._accRunTime = 0.0 # accumulated run time
|
||||
|
||||
self._failed = False
|
||||
self._failureReason = None
|
||||
|
||||
def startExec(self):
|
||||
self._execStartTime = time.time()
|
||||
|
||||
def endExec(self):
|
||||
self._elapsedTime = time.time() - self._execStartTime
|
||||
|
||||
def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
|
||||
if klassName not in self._execTimes:
|
||||
self._execTimes[klassName] = [0, 0]
|
||||
|
@ -773,16 +926,27 @@ class ExecutionStats:
|
|||
self._accRunTime += (time.time() - self._firstTaskStartTime)
|
||||
self._firstTaskStartTime = None
|
||||
|
||||
def registerFailure(self, reason):
|
||||
self._failed = True
|
||||
self._failureReason = reason
|
||||
|
||||
def logStats(self):
|
||||
logger.info("Logging task execution stats (success/total times)...")
|
||||
logger.info("----------------------------------------------------------------------")
|
||||
logger.info("| Crash_Gen test {}, with the following stats:".
|
||||
format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
|
||||
logger.info("| Task Execution Times (success/total):")
|
||||
execTimesAny = 0
|
||||
for k, n in self._execTimes.items():
|
||||
execTimesAny += n[1]
|
||||
logger.info(" {0:<24}: {1}/{2}".format(k,n[1],n[0]))
|
||||
logger.info("| {0:<24}: {1}/{2}".format(k,n[1],n[0]))
|
||||
|
||||
logger.info("Total Tasks Executed (success or not): {} ".format(execTimesAny))
|
||||
logger.info("Total Tasks In Progress at End: {}".format(self._tasksInProgress))
|
||||
logger.info("Total Task Busy Time (elapsed time when any task is in progress): {:.2f} seconds".format(self._accRunTime))
|
||||
logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny))
|
||||
logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress))
|
||||
logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
|
||||
logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
|
||||
logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
|
||||
logger.info("----------------------------------------------------------------------")
|
||||
|
||||
|
||||
|
||||
class StateTransitionTask(Task):
|
||||
|
@ -793,17 +957,18 @@ class StateTransitionTask(Task):
|
|||
def getInfo(cls): # each sub class should supply their own information
|
||||
raise RuntimeError("Overriding method expected")
|
||||
|
||||
@classmethod
|
||||
def getBeginStates(cls):
|
||||
return cls.getInfo()[0]
|
||||
# @classmethod
|
||||
# def getBeginStates(cls):
|
||||
# return cls.getInfo()[0]
|
||||
|
||||
@classmethod
|
||||
def getEndState(cls):
|
||||
return cls.getInfo()[1]
|
||||
return cls.getInfo()[0]
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state):
|
||||
return state in cls.getBeginStates()
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
# return state.getValue() in cls.getBeginStates()
|
||||
raise RuntimeError("must be overriden")
|
||||
|
||||
def execute(self, wt: WorkerThread):
|
||||
super().execute(wt)
|
||||
|
@ -814,10 +979,14 @@ class CreateDbTask(StateTransitionTask):
|
|||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
[DbState.STATE_EMPTY], # can begin from
|
||||
DbState.STATE_DB_ONLY # end state
|
||||
# [AnyState.STATE_EMPTY], # can begin from
|
||||
AnyState.STATE_DB_ONLY # end state
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canCreateDb()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
wt.execSql("create database db")
|
||||
|
||||
|
@ -825,10 +994,14 @@ class DropDbTask(StateTransitionTask):
|
|||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
[DbState.STATE_DB_ONLY, DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
|
||||
DbState.STATE_EMPTY
|
||||
# [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
|
||||
AnyState.STATE_EMPTY
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canDropDb()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
wt.execSql("drop database db")
|
||||
|
||||
|
@ -836,10 +1009,14 @@ class CreateFixedTableTask(StateTransitionTask):
|
|||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
[DbState.STATE_DB_ONLY],
|
||||
DbState.STATE_TABLE_ONLY
|
||||
# [AnyState.STATE_DB_ONLY],
|
||||
AnyState.STATE_TABLE_ONLY
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canCreateFixedTable()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
tblName = self._dbState.getFixedTableName()
|
||||
wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
|
||||
|
@ -848,23 +1025,31 @@ class ReadFixedDataTask(StateTransitionTask):
|
|||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
[DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
|
||||
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
|
||||
None # meaning doesn't affect state
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canReadData()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
tblName = self._dbState.getFixedTableName()
|
||||
self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later
|
||||
wt.execSql("select * from db.{}".format(tblName)) # TODO: analyze result set later
|
||||
# tdSql.query(" cars where tbname in ('carzero', 'carone')")
|
||||
|
||||
class DropFixedTableTask(StateTransitionTask):
|
||||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
[DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
|
||||
DbState.STATE_DB_ONLY # meaning doesn't affect state
|
||||
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
|
||||
AnyState.STATE_DB_ONLY # meaning doesn't affect state
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canDropFixedTable()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
tblName = self._dbState.getFixedTableName()
|
||||
wt.execSql("drop table db.{}".format(tblName))
|
||||
|
@ -873,9 +1058,13 @@ class AddFixedDataTask(StateTransitionTask):
|
|||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
[DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
|
||||
DbState.STATE_HAS_DATA
|
||||
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
|
||||
AnyState.STATE_HAS_DATA
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canAddData()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
ds = self._dbState
|
||||
|
@ -1015,8 +1204,11 @@ def main():
|
|||
# WorkDispatcher(dbState), # Obsolete?
|
||||
dbState
|
||||
)
|
||||
|
||||
tc.run()
|
||||
dbState.cleanUp()
|
||||
tc.logStats()
|
||||
dbState.cleanUp()
|
||||
|
||||
logger.info("Finished running thread pool")
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Loading…
Reference in New Issue