Crash_gen tool minor tweaks, hoping to run 10x5000 with -a option

This commit is contained in:
Steven Li 2020-08-27 03:58:53 +00:00
parent 5da3bbeb42
commit 43bbbfd866
1 changed files with 34 additions and 21 deletions

View File

@ -161,6 +161,18 @@ class WorkerThread:
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break break
# Before we fetch the task and run it, let's ensure we properly "use" the database
try:
self.useDb() # might encounter exceptions. TODO: catch
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno == 0x383 : # invalid database
# ignore
dummy = 0
else:
print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg))
raise
# Fetch a task from the Thread Coordinator # Fetch a task from the Thread Coordinator
logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid)) logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
task = tc.fetchTask() task = tc.fetchTask()
@ -324,10 +336,12 @@ class ThreadCoordinator:
logger.debug("[STT] transition ended") logger.debug("[STT] transition ended")
# Due to limitation (or maybe not) of the Python library, # Due to limitation (or maybe not) of the Python library,
# we cannot share connections across threads # we cannot share connections across threads
if sm.hasDatabase(): # Here we are in main thread, we cannot operate the connections created in workers
for t in self._pool.threadList: # Moving below to task loop
logger.debug("[DB] use db for all worker threads") # if sm.hasDatabase():
t.useDb() # for t in self._pool.threadList:
# logger.debug("[DB] use db for all worker threads")
# t.useDb()
# t.execSql("use db") # main thread executing "use # t.execSql("use db") # main thread executing "use
# db" on behalf of every worker thread # db" on behalf of every worker thread
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
@ -387,7 +401,7 @@ class ThreadCoordinator:
transitionFailed = self._doTransition() # To start, we end step -1 first transitionFailed = self._doTransition() # To start, we end step -1 first
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
transitionFailed = True transitionFailed = True
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme errno2 = Helper.convertErrno(err.errno) # correct error scheme
errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err) errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
logger.info(errMsg) logger.info(errMsg)
self._execStats.registerFailure(errMsg) self._execStats.registerFailure(errMsg)
@ -468,6 +482,10 @@ class ThreadCoordinator:
# We define a class to run a number of threads in locking steps. # We define a class to run a number of threads in locking steps.
class Helper:
@classmethod
def convertErrno(cls, errno):
return errno if (errno > 0) else 0x80000000 + errno
class ThreadPool: class ThreadPool:
def __init__(self, numThreads, maxSteps): def __init__(self, numThreads, maxSteps):
@ -613,8 +631,7 @@ class DbConn:
def resetDb(self): # reset the whole database, etc. def resetDb(self): # reset the whole database, etc.
if (not self.isOpen): if (not self.isOpen):
raise RuntimeError( raise RuntimeError("Cannot reset database until connection is open")
"Cannot reset database until connection is open")
# self._tdSql.prepare() # Recreate database, etc. # self._tdSql.prepare() # Recreate database, etc.
self.execute('drop database if exists db') self.execute('drop database if exists db')
@ -681,8 +698,7 @@ class DbConnRest(DbConn):
def close(self): def close(self):
if (not self.isOpen): if (not self.isOpen):
raise RuntimeError( raise RuntimeError("Cannot clean up database until connection is open")
"Cannot clean up database until connection is open")
# Do nothing for REST # Do nothing for REST
logger.debug("[DB] REST Database connection closed") logger.debug("[DB] REST Database connection closed")
self.isOpen = False self.isOpen = False
@ -798,7 +814,6 @@ class DbConnNative(DbConn):
self._conn = None self._conn = None
self._cursor = None self._cursor = None
def getBuildPath(self): def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath): if ("community" in selfPath):
@ -814,7 +829,7 @@ class DbConnNative(DbConn):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
break break
if buildPath == None: if buildPath == None:
raise RuntimeError("Failed to determine buildPath, selfPath={}".format(selfPath)) raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}".format(selfPath, projPath))
return buildPath return buildPath
@ -839,8 +854,7 @@ class DbConnNative(DbConn):
def close(self): def close(self):
if (not self.isOpen): if (not self.isOpen):
raise RuntimeError( raise RuntimeError("Cannot clean up database until connection is open")
"Cannot clean up database until connection is open")
self._tdSql.close() self._tdSql.close()
logger.debug("[DB] Database connection closed") logger.debug("[DB] Database connection closed")
self.isOpen = False self.isOpen = False
@ -1528,7 +1542,7 @@ class Task():
try: try:
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme errno2 = Helper.convertErrno(err.errno)
if (gConfig.continue_on_exception): # user choose to continue if (gConfig.continue_on_exception): # user choose to continue
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql())) errno2, err, wt.getDbConn().getLastSql()))
@ -1789,7 +1803,7 @@ class TdSuperTable:
try: try:
dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno errno2 = Helper.convertErrno(err.errno)
logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
raise raise
@ -1891,7 +1905,7 @@ class TaskReadData(StateTransitionTask):
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?! if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
dbc.execute("select {} from db.{}".format(aggExpr, sTable.getName())) dbc.execute("select {} from db.{}".format(aggExpr, sTable.getName()))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno errno2 = Helper.convertErrno(err.errno)
logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql())) logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
raise raise
@ -1921,8 +1935,7 @@ class TaskDropSuperTable(StateTransitionTask):
regTableName)) # nRows always 0, like MySQL regTableName)) # nRows always 0, like MySQL
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
# correcting for strange error number scheme # correcting for strange error number scheme
errno2 = err.errno if ( errno2 = Helper.convertErrno(err.errno)
err.errno > 0) else 0x80000000 + err.errno
if (errno2 in [0x362]): # mnode invalid table name if (errno2 in [0x362]): # mnode invalid table name
isSuccess = False isSuccess = False
logger.debug( logger.debug(
@ -2437,7 +2450,7 @@ class ServiceManagerThread:
if self._status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started if self._status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1: # found if line.find(self.TD_READY_MSG) != -1: # found
logger.info("Waiting for the service to become FULLY READY") logger.info("Waiting for the service to become FULLY READY")
time.sleep(1.0) # wait for the server to truly start. TODO: remove this time.sleep(5.0) # wait for the server to truly start. TODO: remove this
logger.info("Service is now FULLY READY") logger.info("Service is now FULLY READY")
self._status = MainExec.STATUS_RUNNING self._status = MainExec.STATUS_RUNNING