Refactored crash_gen to have the TdeSubProcess own the SvcMgrThread object, also switched to Pylance

This commit is contained in:
Steven Li 2021-04-28 08:36:56 +00:00
parent bcbb6017c0
commit 5d1d5cadc2
8 changed files with 287 additions and 212 deletions

View File

@ -1,6 +1,7 @@
from .connection import TDengineConnection from .connection import TDengineConnection
from .cursor import TDengineCursor from .cursor import TDengineCursor
from .error import Error
# Globals # Globals
threadsafety = 0 threadsafety = 0

View File

@ -3,3 +3,4 @@ from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
from crash_gen.settings import Settings from crash_gen.settings import Settings
from crash_gen.types import DirPath

View File

@ -15,7 +15,7 @@
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel # https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations from __future__ import annotations
from typing import Set from typing import Any, Set, Tuple
from typing import Dict from typing import Dict
from typing import List from typing import List
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
@ -57,8 +57,8 @@ if sys.version_info[0] < 3:
# Command-line/Environment Configurations, will set a bit later # Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace # ConfigNameSpace = argparse.Namespace
gConfig: argparse.Namespace # gConfig: argparse.Namespace
gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection gSvcMgr: Optional[ServiceManager] # TODO: refactor this hack, use dep injection
# logger: logging.Logger # logger: logging.Logger
gContainer: Container gContainer: Container
@ -81,20 +81,20 @@ class WorkerThread:
self._stepGate = threading.Event() self._stepGate = threading.Event()
# Let us have a DB connection of our own # Let us have a DB connection of our own
if (gConfig.per_thread_db_connection): # type: ignore if (Settings.getConfig().per_thread_db_connection): # type: ignore
# print("connector_type = {}".format(gConfig.connector_type)) # print("connector_type = {}".format(gConfig.connector_type))
tInst = gContainer.defTdeInstance tInst = gContainer.defTdeInstance
if gConfig.connector_type == 'native': if Settings.getConfig().connector_type == 'native':
self._dbConn = DbConn.createNative(tInst.getDbTarget()) self._dbConn = DbConn.createNative(tInst.getDbTarget())
elif gConfig.connector_type == 'rest': elif Settings.getConfig().connector_type == 'rest':
self._dbConn = DbConn.createRest(tInst.getDbTarget()) self._dbConn = DbConn.createRest(tInst.getDbTarget())
elif gConfig.connector_type == 'mixed': elif Settings.getConfig().connector_type == 'mixed':
if Dice.throw(2) == 0: # 1/2 chance if Dice.throw(2) == 0: # 1/2 chance
self._dbConn = DbConn.createNative(tInst.getDbTarget()) self._dbConn = DbConn.createNative(tInst.getDbTarget())
else: else:
self._dbConn = DbConn.createRest(tInst.getDbTarget()) self._dbConn = DbConn.createRest(tInst.getDbTarget())
else: else:
raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type)) raise RuntimeError("Unexpected connector type: {}".format(Settings.getConfig().connector_type))
# self._dbInUse = False # if "use db" was executed already # self._dbInUse = False # if "use db" was executed already
@ -123,14 +123,14 @@ class WorkerThread:
# self.isSleeping = False # self.isSleeping = False
Logging.info("Starting to run thread: {}".format(self._tid)) Logging.info("Starting to run thread: {}".format(self._tid))
if (gConfig.per_thread_db_connection): # type: ignore if (Settings.getConfig().per_thread_db_connection): # type: ignore
Logging.debug("Worker thread openning database connection") Logging.debug("Worker thread openning database connection")
self._dbConn.open() self._dbConn.open()
self._doTaskLoop() self._doTaskLoop()
# clean up # clean up
if (gConfig.per_thread_db_connection): # type: ignore if (Settings.getConfig().per_thread_db_connection): # type: ignore
if self._dbConn.isOpen: #sometimes it is not open if self._dbConn.isOpen: #sometimes it is not open
self._dbConn.close() self._dbConn.close()
else: else:
@ -158,7 +158,7 @@ class WorkerThread:
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
try: try:
if (gConfig.per_thread_db_connection): # most likely TRUE if (Settings.getConfig().per_thread_db_connection): # most likely TRUE
if not self._dbConn.isOpen: # might have been closed during server auto-restart if not self._dbConn.isOpen: # might have been closed during server auto-restart
self._dbConn.open() self._dbConn.open()
# self.useDb() # might encounter exceptions. TODO: catch # self.useDb() # might encounter exceptions. TODO: catch
@ -232,7 +232,7 @@ class WorkerThread:
return self.getDbConn().getQueryResult() return self.getDbConn().getQueryResult()
def getDbConn(self) -> DbConn : def getDbConn(self) -> DbConn :
if (gConfig.per_thread_db_connection): if (Settings.getConfig().per_thread_db_connection):
return self._dbConn return self._dbConn
else: else:
return self._tc.getDbManager().getDbConn() return self._tc.getDbManager().getDbConn()
@ -254,7 +254,7 @@ class ThreadCoordinator:
self._pool = pool self._pool = pool
# self._wd = wd # self._wd = wd
self._te = None # prepare for every new step self._te = None # prepare for every new step
self._dbManager = dbManager self._dbManager = dbManager # type: Optional[DbManager] # may be freed
self._executedTasks: List[Task] = [] # in a given step self._executedTasks: List[Task] = [] # in a given step
self._lock = threading.RLock() # sync access for a few things self._lock = threading.RLock() # sync access for a few things
@ -266,9 +266,13 @@ class ThreadCoordinator:
self._stepStartTime = None # Track how long it takes to execute each step self._stepStartTime = None # Track how long it takes to execute each step
def getTaskExecutor(self): def getTaskExecutor(self):
if self._te is None:
raise CrashGenError("Unexpected empty TE")
return self._te return self._te
def getDbManager(self) -> DbManager: def getDbManager(self) -> DbManager:
if self._dbManager is None:
raise ChildProcessError("Unexpected empty _dbManager")
return self._dbManager return self._dbManager
def crossStepBarrier(self, timeout=None): def crossStepBarrier(self, timeout=None):
@ -279,7 +283,7 @@ class ThreadCoordinator:
self._execStats.registerFailure("User Interruption") self._execStats.registerFailure("User Interruption")
def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout): def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
maxSteps = gConfig.max_steps # type: ignore maxSteps = Settings.getConfig().max_steps # type: ignore
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
return True return True
if self._runStatus != Status.STATUS_RUNNING: if self._runStatus != Status.STATUS_RUNNING:
@ -384,7 +388,7 @@ class ThreadCoordinator:
hasAbortedTask = False hasAbortedTask = False
workerTimeout = False workerTimeout = False
while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout): while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
if not gConfig.debug: # print this only if we are not in debug mode if not Settings.getConfig().debug: # print this only if we are not in debug mode
Progress.emit(Progress.STEP_BOUNDARY) Progress.emit(Progress.STEP_BOUNDARY)
# print(".", end="", flush=True) # print(".", end="", flush=True)
# if (self._curStep % 2) == 0: # print memory usage once every 10 steps # if (self._curStep % 2) == 0: # print memory usage once every 10 steps
@ -469,7 +473,7 @@ class ThreadCoordinator:
self._pool = None self._pool = None
self._te = None self._te = None
self._dbManager = None self._dbManager = None
self._executedTasks = None self._executedTasks = []
self._lock = None self._lock = None
self._stepBarrier = None self._stepBarrier = None
self._execStats = None self._execStats = None
@ -508,18 +512,18 @@ class ThreadCoordinator:
''' Initialize multiple databases, invoked at __ini__() time ''' ''' Initialize multiple databases, invoked at __ini__() time '''
self._dbs = [] # type: List[Database] self._dbs = [] # type: List[Database]
dbc = self.getDbManager().getDbConn() dbc = self.getDbManager().getDbConn()
if gConfig.max_dbs == 0: if Settings.getConfig().max_dbs == 0:
self._dbs.append(Database(0, dbc)) self._dbs.append(Database(0, dbc))
else: else:
baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
)*333) % 888 if gConfig.dynamic_db_table_names else 0 )*333) % 888 if Settings.getConfig().dynamic_db_table_names else 0
for i in range(gConfig.max_dbs): for i in range(Settings.getConfig().max_dbs):
self._dbs.append(Database(baseDbNumber + i, dbc)) self._dbs.append(Database(baseDbNumber + i, dbc))
def pickDatabase(self): def pickDatabase(self):
idxDb = 0 idxDb = 0
if gConfig.max_dbs != 0 : if Settings.getConfig().max_dbs != 0 :
idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1 idxDb = Dice.throw(Settings.getConfig().max_dbs) # 0 to N-1
db = self._dbs[idxDb] # type: Database db = self._dbs[idxDb] # type: Database
return db return db
@ -563,7 +567,7 @@ class ThreadPool:
workerThread._thread.join() workerThread._thread.join()
def cleanup(self): def cleanup(self):
self.threadList = None # maybe clean up each? self.threadList = [] # maybe clean up each?
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers # A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names # for new table names
@ -673,7 +677,7 @@ class AnyState:
# Each sub state tells us the "info", about itself, so we can determine # Each sub state tells us the "info", about itself, so we can determine
# on things like canDropDB() # on things like canDropDB()
def getInfo(self): def getInfo(self) -> List[Any]:
raise RuntimeError("Must be overriden by child classes") raise RuntimeError("Must be overriden by child classes")
def equals(self, other): def equals(self, other):
@ -701,7 +705,7 @@ class AnyState:
def canDropDb(self): def canDropDb(self):
# If user requests to run up to a number of DBs, # If user requests to run up to a number of DBs,
# we'd then not do drop_db operations any more # we'd then not do drop_db operations any more
if gConfig.max_dbs > 0 or gConfig.use_shadow_db : if Settings.getConfig().max_dbs > 0 or Settings.getConfig().use_shadow_db :
return False return False
return self._info[self.CAN_DROP_DB] return self._info[self.CAN_DROP_DB]
@ -709,7 +713,7 @@ class AnyState:
return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
def canDropFixedSuperTable(self): def canDropFixedSuperTable(self):
if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table if Settings.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
return False return False
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
@ -911,7 +915,7 @@ class StateMechine:
# May be slow, use cautionsly... # May be slow, use cautionsly...
def getTaskTypes(self): # those that can run (directly/indirectly) from the current state def getTaskTypes(self): # those that can run (directly/indirectly) from the current state
def typesToStrings(types): def typesToStrings(types) -> List:
ss = [] ss = []
for t in types: for t in types:
ss.append(t.__name__) ss.append(t.__name__)
@ -1030,13 +1034,14 @@ class StateMechine:
# ref: # ref:
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
def _weighted_choice_sub(self, weights): def _weighted_choice_sub(self, weights) -> int:
# TODO: use our dice to ensure it being determinstic? # TODO: use our dice to ensure it being determinstic?
rnd = random.random() * sum(weights) rnd = random.random() * sum(weights)
for i, w in enumerate(weights): for i, w in enumerate(weights):
rnd -= w rnd -= w
if rnd < 0: if rnd < 0:
return i return i
raise CrashGenError("Unexpected no choice")
class Database: class Database:
''' We use this to represent an actual TDengine database inside a service instance, ''' We use this to represent an actual TDengine database inside a service instance,
@ -1048,8 +1053,8 @@ class Database:
''' '''
_clsLock = threading.Lock() # class wide lock _clsLock = threading.Lock() # class wide lock
_lastInt = 101 # next one is initial integer _lastInt = 101 # next one is initial integer
_lastTick = 0 _lastTick = None # Optional[datetime]
_lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions _lastLaggingTick = None # Optional[datetime] # lagging tick, for out-of-sequence (oos) data insertions
def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
self._dbNum = dbNum # we assign a number to databases, for our testing purpose self._dbNum = dbNum # we assign a number to databases, for our testing purpose
@ -1114,14 +1119,14 @@ class Database:
Fetch a timestamp tick, with some random factor, may not be unique. Fetch a timestamp tick, with some random factor, may not be unique.
''' '''
with cls._clsLock: # prevent duplicate tick with cls._clsLock: # prevent duplicate tick
if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized if cls._lastLaggingTick is None or cls._lastTick is None : # not initialized
# 10k at 1/20 chance, should be enough to avoid overlaps # 10k at 1/20 chance, should be enough to avoid overlaps
tick = cls.setupLastTick() tick = cls.setupLastTick()
cls._lastTick = tick cls._lastTick = tick
cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast
# if : # should be quite a bit into the future # if : # should be quite a bit into the future
if gConfig.mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick if Settings.getConfig().mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
return cls._lastLaggingTick return cls._lastLaggingTick
else: # regular else: # regular
@ -1303,10 +1308,10 @@ class Task():
]: ]:
return True # These are the ALWAYS-ACCEPTABLE ones return True # These are the ALWAYS-ACCEPTABLE ones
# This case handled below already. # This case handled below already.
# elif (errno in [ 0x0B ]) and gConfig.auto_start_service: # elif (errno in [ 0x0B ]) and Settings.getConfig().auto_start_service:
# return True # We may get "network unavilable" when restarting service # return True # We may get "network unavilable" when restarting service
elif gConfig.ignore_errors: # something is specified on command line elif Settings.getConfig().ignore_errors: # something is specified on command line
moreErrnos = [int(v, 0) for v in gConfig.ignore_errors.split(',')] moreErrnos = [int(v, 0) for v in Settings.getConfig().ignore_errors.split(',')]
if errno in moreErrnos: if errno in moreErrnos:
return True return True
elif errno == 0x200 : # invalid SQL, we need to div in a bit more elif errno == 0x200 : # invalid SQL, we need to div in a bit more
@ -1342,7 +1347,7 @@ class Task():
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno) errno2 = Helper.convertErrno(err.errno)
if (gConfig.continue_on_exception): # user choose to continue if (Settings.getConfig().continue_on_exception): # user choose to continue
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql())) errno2, err, wt.getDbConn().getLastSql()))
self._err = err self._err = err
@ -1357,7 +1362,7 @@ class Task():
self.__class__.__name__, self.__class__.__name__,
errno2, err, wt.getDbConn().getLastSql()) errno2, err, wt.getDbConn().getLastSql())
self.logDebug(errMsg) self.logDebug(errMsg)
if gConfig.debug: if Settings.getConfig().debug:
# raise # so that we see full stack # raise # so that we see full stack
traceback.print_exc() traceback.print_exc()
print( print(
@ -1422,11 +1427,11 @@ class Task():
class ExecutionStats: class ExecutionStats:
def __init__(self): def __init__(self):
# total/success times for a task # total/success times for a task
self._execTimes: Dict[str, [int, int]] = {} self._execTimes: Dict[str, List[int]] = {}
self._tasksInProgress = 0 self._tasksInProgress = 0
self._lock = threading.Lock() self._lock = threading.Lock()
self._firstTaskStartTime = None self._firstTaskStartTime = 0.0
self._execStartTime = None self._execStartTime = 0.0
self._errors = {} self._errors = {}
self._elapsedTime = 0.0 # total elapsed time self._elapsedTime = 0.0 # total elapsed time
self._accRunTime = 0.0 # accumulated run time self._accRunTime = 0.0 # accumulated run time
@ -1471,7 +1476,7 @@ class ExecutionStats:
self._tasksInProgress -= 1 self._tasksInProgress -= 1
if self._tasksInProgress == 0: # all tasks have stopped if self._tasksInProgress == 0: # all tasks have stopped
self._accRunTime += (time.time() - self._firstTaskStartTime) self._accRunTime += (time.time() - self._firstTaskStartTime)
self._firstTaskStartTime = None self._firstTaskStartTime = 0.0
def registerFailure(self, reason): def registerFailure(self, reason):
self._failed = True self._failed = True
@ -1555,7 +1560,7 @@ class StateTransitionTask(Task):
def getRegTableName(cls, i): def getRegTableName(cls, i):
if ( StateTransitionTask._baseTableNumber is None): # Set it one time if ( StateTransitionTask._baseTableNumber is None): # Set it one time
StateTransitionTask._baseTableNumber = Dice.throw( StateTransitionTask._baseTableNumber = Dice.throw(
999) if gConfig.dynamic_db_table_names else 0 999) if Settings.getConfig().dynamic_db_table_names else 0
return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i) return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
def execute(self, wt: WorkerThread): def execute(self, wt: WorkerThread):
@ -1575,14 +1580,14 @@ class TaskCreateDb(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# was: self.execWtSql(wt, "create database db") # was: self.execWtSql(wt, "create database db")
repStr = "" repStr = ""
if gConfig.num_replicas != 1: if Settings.getConfig().num_replicas != 1:
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N # numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N
numReplica = gConfig.num_replicas # fixed, always numReplica = Settings.getConfig().num_replicas # fixed, always
repStr = "replica {}".format(numReplica) repStr = "replica {}".format(numReplica)
updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active updatePostfix = "update 1" if Settings.getConfig().verify_data else "" # allow update only when "verify data" is active
dbName = self._db.getName() dbName = self._db.getName()
self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) ) self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
if dbName == "db_0" and gConfig.use_shadow_db: if dbName == "db_0" and Settings.getConfig().use_shadow_db:
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) ) self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
class TaskDropDb(StateTransitionTask): class TaskDropDb(StateTransitionTask):
@ -1887,7 +1892,7 @@ class TaskDropSuperTable(StateTransitionTask):
if Dice.throw(2) == 0: if Dice.throw(2) == 0:
# print("_7_", end="", flush=True) # print("_7_", end="", flush=True)
tblSeq = list(range( tblSeq = list(range(
2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) 2 + (self.LARGE_NUMBER_OF_TABLES if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES)))
random.shuffle(tblSeq) random.shuffle(tblSeq)
tickOutput = False # if we have spitted out a "d" character for "drop regular table" tickOutput = False # if we have spitted out a "d" character for "drop regular table"
isSuccess = True isSuccess = True
@ -1953,13 +1958,13 @@ class TaskRestartService(StateTransitionTask):
@classmethod @classmethod
def canBeginFrom(cls, state: AnyState): def canBeginFrom(cls, state: AnyState):
if gConfig.auto_start_service: if Settings.getConfig().auto_start_service:
return state.canDropFixedSuperTable() # Basicallly when we have the super table return state.canDropFixedSuperTable() # Basicallly when we have the super table
return False # don't run this otherwise return False # don't run this otherwise
CHANCE_TO_RESTART_SERVICE = 200 CHANCE_TO_RESTART_SERVICE = 200
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
if not gConfig.auto_start_service: # only execute when we are in -a mode if not Settings.getConfig().auto_start_service: # only execute when we are in -a mode
print("_a", end="", flush=True) print("_a", end="", flush=True)
return return
@ -1981,12 +1986,12 @@ class TaskAddData(StateTransitionTask):
activeTable: Set[int] = set() activeTable: Set[int] = set()
# We use these two files to record operations to DB, useful for power-off tests # We use these two files to record operations to DB, useful for power-off tests
fAddLogReady = None # type: io.TextIOWrapper fAddLogReady = None # type: Optional[io.TextIOWrapper]
fAddLogDone = None # type: io.TextIOWrapper fAddLogDone = None # type: Optional[io.TextIOWrapper]
@classmethod @classmethod
def prepToRecordOps(cls): def prepToRecordOps(cls):
if gConfig.record_ops: if Settings.getConfig().record_ops:
if (cls.fAddLogReady is None): if (cls.fAddLogReady is None):
Logging.info( Logging.info(
"Recording in a file operations to be performed...") "Recording in a file operations to be performed...")
@ -2004,7 +2009,7 @@ class TaskAddData(StateTransitionTask):
return state.canAddData() return state.canAddData()
def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor):
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
sql = "INSERT INTO {} VALUES ".format(fullTableName) sql = "INSERT INTO {} VALUES ".format(fullTableName)
@ -2016,21 +2021,23 @@ class TaskAddData(StateTransitionTask):
dbc.execute(sql) dbc.execute(sql)
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
for j in range(numRecords): # number of records per table for j in range(numRecords): # number of records per table
nextInt = db.getNextInt() nextInt = db.getNextInt()
nextTick = db.getNextTick() nextTick = db.getNextTick()
nextColor = db.getNextColor() nextColor = db.getNextColor()
if gConfig.record_ops: if Settings.getConfig().record_ops:
self.prepToRecordOps() self.prepToRecordOps()
if self.fAddLogReady is None:
raise CrashGenError("Unexpected empty fAddLogReady")
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
self.fAddLogReady.flush() self.fAddLogReady.flush()
os.fsync(self.fAddLogReady.fileno()) os.fsync(self.fAddLogReady.fileno())
# TODO: too ugly trying to lock the table reliably, refactor... # TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
if gConfig.verify_data: if Settings.getConfig().verify_data:
self.lockTable(fullTableName) self.lockTable(fullTableName)
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
@ -2043,7 +2050,7 @@ class TaskAddData(StateTransitionTask):
dbc.execute(sql) dbc.execute(sql)
# Quick hack, attach an update statement here. TODO: create an "update" task # Quick hack, attach an update statement here. TODO: create an "update" task
if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB if (not Settings.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
nextInt = db.getNextInt() nextInt = db.getNextInt()
nextColor = db.getNextColor() nextColor = db.getNextColor()
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
@ -2054,12 +2061,12 @@ class TaskAddData(StateTransitionTask):
dbc.execute(sql) dbc.execute(sql)
except: # Any exception at all except: # Any exception at all
if gConfig.verify_data: if Settings.getConfig().verify_data:
self.unlockTable(fullTableName) self.unlockTable(fullTableName)
raise raise
# Now read it back and verify, we might encounter an error if table is dropped # Now read it back and verify, we might encounter an error if table is dropped
if gConfig.verify_data: # only if command line asks for it if Settings.getConfig().verify_data: # only if command line asks for it
try: try:
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'". readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
format(db.getName(), regTableName, nextTick)) format(db.getName(), regTableName, nextTick))
@ -2086,7 +2093,9 @@ class TaskAddData(StateTransitionTask):
# Successfully wrote the data into the DB, let's record it somehow # Successfully wrote the data into the DB, let's record it somehow
te.recordDataMark(nextInt) te.recordDataMark(nextInt)
if gConfig.record_ops: if Settings.getConfig().record_ops:
if self.fAddLogDone is None:
raise CrashGenError("Unexpected empty fAddLogDone")
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
self.fAddLogDone.flush() self.fAddLogDone.flush()
os.fsync(self.fAddLogDone.fileno()) os.fsync(self.fAddLogDone.fileno())
@ -2095,8 +2104,8 @@ class TaskAddData(StateTransitionTask):
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
db = self._db db = self._db
dbc = wt.getDbConn() dbc = wt.getDbConn()
numTables = self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES numTables = self.LARGE_NUMBER_OF_TABLES if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
tblSeq = list(range(numTables )) tblSeq = list(range(numTables ))
random.shuffle(tblSeq) # now we have random sequence random.shuffle(tblSeq) # now we have random sequence
for i in tblSeq: for i in tblSeq:
@ -2127,6 +2136,8 @@ class ThreadStacks: # stack info for all threads
self._allStacks = {} self._allStacks = {}
allFrames = sys._current_frames() allFrames = sys._current_frames()
for th in threading.enumerate(): for th in threading.enumerate():
if th.ident is None:
continue
stack = traceback.extract_stack(allFrames[th.ident]) stack = traceback.extract_stack(allFrames[th.ident])
self._allStacks[th.native_id] = stack self._allStacks[th.native_id] = stack
@ -2247,14 +2258,15 @@ class ClientManager:
def run(self, svcMgr): def run(self, svcMgr):
# self._printLastNumbers() # self._printLastNumbers()
global gConfig # global gConfig
# Prepare Tde Instance # Prepare Tde Instance
global gContainer global gContainer
tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance" tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"
dbManager = DbManager(gConfig.connector_type, tInst.getDbTarget()) # Regular function cfg = Settings.getConfig()
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) dbManager = DbManager(cfg.connector_type, tInst.getDbTarget()) # Regular function
thPool = ThreadPool(cfg.num_threads, cfg.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager) self.tc = ThreadCoordinator(thPool, dbManager)
Logging.info("Starting client instance: {}".format(tInst)) Logging.info("Starting client instance: {}".format(tInst))
@ -2267,7 +2279,8 @@ class ClientManager:
# Release global variables # Release global variables
gConfig = None # gConfig = None
Settings.clearConfig()
gSvcMgr = None gSvcMgr = None
logger = None logger = None
@ -2298,7 +2311,7 @@ class ClientManager:
class MainExec: class MainExec:
def __init__(self): def __init__(self):
self._clientMgr = None self._clientMgr = None
self._svcMgr = None # type: ServiceManager self._svcMgr = None # type: Optional[ServiceManager]
signal.signal(signal.SIGTERM, self.sigIntHandler) signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler) signal.signal(signal.SIGINT, self.sigIntHandler)
@ -2318,7 +2331,7 @@ class MainExec:
def runClient(self): def runClient(self):
global gSvcMgr global gSvcMgr
if gConfig.auto_start_service: if Settings.getConfig().auto_start_service:
gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
gSvcMgr.startTaosServices() # we start, don't run gSvcMgr.startTaosServices() # we start, don't run
@ -2327,13 +2340,13 @@ class MainExec:
try: try:
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
except requests.exceptions.ConnectionError as err: except requests.exceptions.ConnectionError as err:
Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage())) Logging.warning("Failed to open REST connection to DB: {}".format(err))
# don't raise # don't raise
return ret return ret
def runService(self): def runService(self):
global gSvcMgr global gSvcMgr
gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert gSvcMgr = self._svcMgr = ServiceManager(Settings.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
gSvcMgr.run() # run to some end state gSvcMgr.run() # run to some end state
gSvcMgr = self._svcMgr = None gSvcMgr = self._svcMgr = None
@ -2467,20 +2480,20 @@ class MainExec:
action='store_true', action='store_true',
help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)') help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
global gConfig # global gConfig
gConfig = parser.parse_args() config = parser.parse_args()
Settings.setConfig(gConfig) # TODO: fix this hack, consolidate this global var Settings.setConfig(config) # TODO: fix this hack, consolidate this global var
# Sanity check for arguments # Sanity check for arguments
if gConfig.use_shadow_db and gConfig.max_dbs>1 : if Settings.getConfig().use_shadow_db and Settings.getConfig().max_dbs>1 :
raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1") raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
Logging.clsInit(gConfig) Logging.clsInit(Settings.getConfig())
Dice.seed(0) # initial seeding of dice Dice.seed(0) # initial seeding of dice
def run(self): def run(self):
if gConfig.run_tdengine: # run server if Settings.getConfig().run_tdengine: # run server
try: try:
self.runService() self.runService()
return 0 # success return 0 # success

View File

@ -5,6 +5,7 @@ import time
import threading import threading
import requests import requests
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
from crash_gen.types import QueryResult
import taos import taos
from util.sql import * from util.sql import *
@ -18,7 +19,7 @@ import datetime
import traceback import traceback
# from .service_manager import TdeInstance # from .service_manager import TdeInstance
import crash_gen.settings from crash_gen.settings import Settings
class DbConn: class DbConn:
TYPE_NATIVE = "native-c" TYPE_NATIVE = "native-c"
@ -79,7 +80,7 @@ class DbConn:
raise RuntimeError("Cannot query database until connection is open") raise RuntimeError("Cannot query database until connection is open")
nRows = self.query(sql) nRows = self.query(sql)
if nRows != 1: if nRows != 1:
raise taos.error.ProgrammingError( raise CrashGenError(
"Unexpected result for query: {}, rows = {}".format(sql, nRows), "Unexpected result for query: {}, rows = {}".format(sql, nRows),
(CrashGenError.INVALID_EMPTY_RESULT if nRows==0 else CrashGenError.INVALID_MULTIPLE_RESULT) (CrashGenError.INVALID_EMPTY_RESULT if nRows==0 else CrashGenError.INVALID_MULTIPLE_RESULT)
) )
@ -115,7 +116,7 @@ class DbConn:
try: try:
self.execute(sql) self.execute(sql)
return True # ignore num of results, return success return True # ignore num of results, return success
except taos.error.ProgrammingError as err: except taos.error.Error as err:
return False # failed, for whatever TAOS reason return False # failed, for whatever TAOS reason
# Not possile to reach here, non-TAOS exception would have been thrown # Not possile to reach here, non-TAOS exception would have been thrown
@ -126,7 +127,7 @@ class DbConn:
def openByType(self): def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden") raise RuntimeError("Unexpected execution, should be overriden")
def getQueryResult(self): def getQueryResult(self) -> QueryResult :
raise RuntimeError("Unexpected execution, should be overriden") raise RuntimeError("Unexpected execution, should be overriden")
def getResultRows(self): def getResultRows(self):
@ -221,7 +222,7 @@ class DbConnRest(DbConn):
class MyTDSql: class MyTDSql:
# Class variables # Class variables
_clsLock = threading.Lock() # class wide locking _clsLock = threading.Lock() # class wide locking
longestQuery = None # type: str longestQuery = '' # type: str
longestQueryTime = 0.0 # seconds longestQueryTime = 0.0 # seconds
lqStartTime = 0.0 lqStartTime = 0.0
# lqEndTime = 0.0 # Not needed, as we have the two above already # lqEndTime = 0.0 # Not needed, as we have the two above already
@ -261,7 +262,7 @@ class MyTDSql:
cls.lqStartTime = startTime cls.lqStartTime = startTime
# Now write to the shadow database # Now write to the shadow database
if crash_gen.settings.gConfig.use_shadow_db: if Settings.getConfig().use_shadow_db:
if sql[:11] == "INSERT INTO": if sql[:11] == "INSERT INTO":
if sql[:16] == "INSERT INTO db_0": if sql[:16] == "INSERT INTO db_0":
sql2 = "INSERT INTO db_s" + sql[16:] sql2 = "INSERT INTO db_s" + sql[16:]
@ -453,31 +454,11 @@ class DbManager():
''' Release the underlying DB connection upon deletion of DbManager ''' ''' Release the underlying DB connection upon deletion of DbManager '''
self.cleanUp() self.cleanUp()
def getDbConn(self): def getDbConn(self) -> DbConn :
if self._dbConn is None:
raise CrashGenError("Unexpected empty DbConn")
return self._dbConn return self._dbConn
# TODO: not used any more, to delete
def pickAndAllocateTable(self): # pick any table, and "use" it
return self.tableNumQueue.pickAndAllocate()
# TODO: Not used any more, to delete
def addTable(self):
with self._lock:
tIndex = self.tableNumQueue.push()
return tIndex
# Not used any more, to delete
def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i)
# TODO: not used any more, delete
def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition!
if (not tblNum): # maybe false
return False
return "table_{}".format(tblNum)
def cleanUp(self): def cleanUp(self):
if self._dbConn: if self._dbConn:
self._dbConn.close() self._dbConn.close()

View File

@ -3,6 +3,7 @@ import random
import logging import logging
import os import os
import sys import sys
from typing import Optional
import taos import taos
@ -39,11 +40,11 @@ class MyLoggingAdapter(logging.LoggerAdapter):
class Logging: class Logging:
logger = None logger = None # type: Optional[MyLoggingAdapter]
@classmethod @classmethod
def getLogger(cls): def getLogger(cls):
return logger return cls.logger
@classmethod @classmethod
def clsInit(cls, gConfig): # TODO: refactor away gConfig def clsInit(cls, gConfig): # TODO: refactor away gConfig
@ -60,7 +61,7 @@ class Logging:
# Logging adapter, to be used as a logger # Logging adapter, to be used as a logger
# print("setting logger variable") # print("setting logger variable")
# global logger # global logger
cls.logger = MyLoggingAdapter(_logger, []) cls.logger = MyLoggingAdapter(_logger, {})
if (gConfig.debug): if (gConfig.debug):
cls.logger.setLevel(logging.DEBUG) # default seems to be INFO cls.logger.setLevel(logging.DEBUG) # default seems to be INFO
@ -84,6 +85,7 @@ class Logging:
cls.logger.error(msg) cls.logger.error(msg)
class Status: class Status:
STATUS_EMPTY = 99
STATUS_STARTING = 1 STATUS_STARTING = 1
STATUS_RUNNING = 2 STATUS_RUNNING = 2
STATUS_STOPPING = 3 STATUS_STOPPING = 3
@ -95,12 +97,16 @@ class Status:
def __repr__(self): def __repr__(self):
return "[Status: v={}]".format(self._status) return "[Status: v={}]".format(self._status)
def set(self, status): def set(self, status: int):
self._status = status self._status = status
def get(self): def get(self):
return self._status return self._status
def isEmpty(self):
''' Empty/Undefined '''
return self._status == Status.STATUS_EMPTY
def isStarting(self): def isStarting(self):
return self._status == Status.STATUS_STARTING return self._status == Status.STATUS_STARTING
@ -117,6 +123,9 @@ class Status:
def isStable(self): def isStable(self):
return self.isRunning() or self.isStopped() return self.isRunning() or self.isStopped()
def isActive(self):
return self.isStarting() or self.isRunning() or self.isStopping()
# Deterministic random number generator # Deterministic random number generator
class Dice(): class Dice():
seeded = False # static, uninitialized seeded = False # static, uninitialized

View File

@ -3,12 +3,12 @@ from __future__ import annotations
import os import os
import io import io
import sys import sys
from enum import Enum
import threading import threading
import signal import signal
import logging import logging
import time import time
from subprocess import PIPE, Popen, TimeoutExpired from subprocess import PIPE, Popen, TimeoutExpired
from typing import IO, List, NewType, Optional from typing import IO, List, NewType, Optional
try: try:
@ -16,12 +16,12 @@ try:
except: except:
print("Psutil module needed, please install: sudo pip3 install psutil") print("Psutil module needed, please install: sudo pip3 install psutil")
sys.exit(-1) sys.exit(-1)
from queue import Queue, Empty from queue import Queue, Empty
from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
from .db import DbConn, DbTarget from crash_gen.db import DbConn, DbTarget
import crash_gen.settings from crash_gen.settings import Settings
from crash_gen.types import DirPath
class TdeInstance(): class TdeInstance():
""" """
@ -70,7 +70,10 @@ class TdeInstance():
self._fepPort = fepPort self._fepPort = fepPort
self._tInstNum = tInstNum self._tInstNum = tInstNum
self._smThread = ServiceManagerThread()
# An "Tde Instance" will *contain* a "sub process" object, with will/may use a thread internally
# self._smThread = ServiceManagerThread()
self._subProcess = None # type: Optional[TdeSubProcess]
def getDbTarget(self): def getDbTarget(self):
return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port) return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port)
@ -155,21 +158,21 @@ quorum 2
def getExecFile(self): # .../taosd def getExecFile(self): # .../taosd
return self._buildDir + "/build/bin/taosd" return self._buildDir + "/build/bin/taosd"
def getRunDir(self): # TODO: rename to "root dir" ?! def getRunDir(self) -> DirPath : # TODO: rename to "root dir" ?!
return self._buildDir + self._subdir return DirPath(self._buildDir + self._subdir)
def getCfgDir(self): # path, not file def getCfgDir(self) -> DirPath : # path, not file
return self.getRunDir() + "/cfg" return DirPath(self.getRunDir() + "/cfg")
def getLogDir(self): def getLogDir(self) -> DirPath :
return self.getRunDir() + "/log" return DirPath(self.getRunDir() + "/log")
def getHostAddr(self): def getHostAddr(self):
return "127.0.0.1" return "127.0.0.1"
def getServiceCmdLine(self): # to start the instance def getServiceCmdLine(self): # to start the instance
cmdLine = [] cmdLine = []
if crash_gen.settings.gConfig.track_memory_leaks: if Settings.getConfig().track_memory_leaks:
Logging.info("Invoking VALGRIND on service...") Logging.info("Invoking VALGRIND on service...")
cmdLine = ['valgrind', '--leak-check=yes'] cmdLine = ['valgrind', '--leak-check=yes']
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
@ -199,27 +202,46 @@ quorum 2
dbc.close() dbc.close()
def getStatus(self): def getStatus(self):
return self._smThread.getStatus() # return self._smThread.getStatus()
if self._subProcess is None:
return Status(Status.STATUS_EMPTY)
return self._subProcess.getStatus()
def getSmThread(self): # def getSmThread(self):
return self._smThread # return self._smThread
def start(self): def start(self):
if not self.getStatus().isStopped(): if self.getStatus().isActive():
raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus())) raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus()))
Logging.info("Starting TDengine instance: {}".format(self)) Logging.info("Starting TDengine instance: {}".format(self))
self.generateCfgFile() # service side generates config file, client does not self.generateCfgFile() # service side generates config file, client does not
self.rotateLogs() self.rotateLogs()
self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions # self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
self._subProcess = TdeSubProcess(self.getServiceCmdLine(), self.getLogDir())
def stop(self): def stop(self):
self._smThread.stop() self._subProcess.stop()
self._subProcess = None
def isFirst(self): def isFirst(self):
return self._tInstNum == 0 return self._tInstNum == 0
def printFirst10Lines(self):
if self._subProcess is None:
Logging.warning("Incorrect TI status for procIpcBatch-10 operation")
return
self._subProcess.procIpcBatch(trimToTarget=10, forceOutput=True)
def procIpcBatch(self):
if self._subProcess is None:
Logging.warning("Incorrect TI status for procIpcBatch operation")
return
self._subProcess.procIpcBatch() # may enounter EOF and change status to STOPPED
if self._subProcess.getStatus().isStopped():
self._subProcess.stop()
self._subProcess = None
class TdeSubProcess: class TdeSubProcess:
""" """
@ -237,16 +259,21 @@ class TdeSubProcess:
# RET_TIME_OUT = -3 # RET_TIME_OUT = -3
# RET_SUCCESS = -4 # RET_SUCCESS = -4
def __init__(self, po: Popen): def __init__(self, cmdLine: List[str], logDir: DirPath):
self._popen = po # type: Popen # Create the process + managing thread immediately
# if tInst is None:
# raise CrashGenError("Empty instance not allowed in TdeSubProcess") Logging.info("Attempting to start TAOS sub process...")
# self._tInst = tInst # Default create at ServiceManagerThread self._popen = self._start(cmdLine) # the actual sub process
self._smThread = ServiceManagerThread(self, logDir) # A thread to manage the sub process, mostly to process the IO
Logging.info("Successfully started TAOS process: {}".format(self))
def __repr__(self): def __repr__(self):
# if self.subProcess is None: # if self.subProcess is None:
# return '[TdeSubProc: Empty]' # return '[TdeSubProc: Empty]'
return '[TdeSubProc: pid = {}]'.format(self.getPid()) return '[TdeSubProc: pid = {}, status = {}]'.format(
self.getPid(), self.getStatus() )
def getStdOut(self): def getStdOut(self):
return self._popen.stdout return self._popen.stdout
@ -261,14 +288,14 @@ class TdeSubProcess:
def getPid(self): def getPid(self):
return self._popen.pid return self._popen.pid
@classmethod def _start(self, cmdLine) -> Popen :
def start(cls, cmdLine):
ON_POSIX = 'posix' in sys.builtin_module_names ON_POSIX = 'posix' in sys.builtin_module_names
# Sanity check # Sanity check
# if self.subProcess: # already there # if self.subProcess: # already there
# raise RuntimeError("Corrupt process state") # raise RuntimeError("Corrupt process state")
# Prepare environment variables for coverage information # Prepare environment variables for coverage information
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
myEnv = os.environ.copy() myEnv = os.environ.copy()
@ -279,7 +306,7 @@ class TdeSubProcess:
# print("Starting TDengine via Shell: {}".format(cmdLineStr)) # print("Starting TDengine via Shell: {}".format(cmdLineStr))
# useShell = True # Needed to pass environments into it # useShell = True # Needed to pass environments into it
popen = Popen( return Popen(
' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine, ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
shell=True, # Always use shell, since we need to pass ENV vars shell=True, # Always use shell, since we need to pass ENV vars
stdout=PIPE, stdout=PIPE,
@ -287,15 +314,15 @@ class TdeSubProcess:
close_fds=ON_POSIX, close_fds=ON_POSIX,
env=myEnv env=myEnv
) # had text=True, which interferred with reading EOF ) # had text=True, which interferred with reading EOF
return cls(popen)
STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process? STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
@classmethod def stop(self):
def stop(cls, tsp: TdeSubProcess):
""" """
Stop a sub process, DO NOT return anything, process all conditions INSIDE Stop a sub process, DO NOT return anything, process all conditions INSIDE.
Calling function should immediately delete/unreference the object
Common POSIX signal values (from man -7 signal): Common POSIX signal values (from man -7 signal):
SIGHUP 1 SIGHUP 1
@ -315,11 +342,17 @@ class TdeSubProcess:
""" """
# self._popen should always be valid. # self._popen should always be valid.
# if not self.subProcess: Logging.info("Terminating TDengine service running as the sub process...")
# Logging.error("Sub process already stopped") if self.getStatus().isStopped():
# return Logging.info("Service already stopped")
return
if self.getStatus().isStopping():
Logging.info("Service is already being stopped, pid: {}".format(self.getPid()))
return
retCode = tsp._popen.poll() # ret -N means killed with signal N, otherwise it's from exit(N) self.setStatus(Status.STATUS_STOPPING)
retCode = self._popen.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
if retCode: # valid return code, process ended if retCode: # valid return code, process ended
# retCode = -retCode # only if valid # retCode = -retCode # only if valid
Logging.warning("TSP.stop(): process ended itself") Logging.warning("TSP.stop(): process ended itself")
@ -327,9 +360,12 @@ class TdeSubProcess:
return return
# process still alive, let's interrupt it # process still alive, let's interrupt it
cls._stopForSure(tsp._popen, cls.STOP_SIGNAL) # success if no exception self._stopForSure(self._popen, self.STOP_SIGNAL) # success if no exception
# sub process should end, then IPC queue should end, causing IO thread to end # sub process should end, then IPC queue should end, causing IO thread to end
self._smThread.stop() # stop for sure too
self.setStatus(Status.STATUS_STOPPED)
@classmethod @classmethod
def _stopForSure(cls, proc: Popen, sig: int): def _stopForSure(cls, proc: Popen, sig: int):
@ -357,13 +393,13 @@ class TdeSubProcess:
Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig)) Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig))
child.send_signal(sig) child.send_signal(sig)
try: try:
retCode = child.wait(20) retCode = child.wait(20) # type: ignore
if (- retCode) == signal.SIGSEGV: # Crashed if (- retCode) == signal.SIGSEGV: # type: ignore # Crashed
Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid)) Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid))
elif (- retCode) == sig : elif (- retCode) == sig : # type: ignore
Logging.info("Sub-sub process terminated with expected return code {}".format(sig)) Logging.info("Sub-sub process terminated with expected return code {}".format(sig))
else: else:
Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) # type: ignore
return True # terminated successfully return True # terminated successfully
except psutil.TimeoutExpired as err: except psutil.TimeoutExpired as err:
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig)) Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig))
@ -408,6 +444,15 @@ class TdeSubProcess:
return return
raise CrashGenError("Failed to stop process, pid={}".format(pid)) raise CrashGenError("Failed to stop process, pid={}".format(pid))
def getStatus(self):
return self._smThread.getStatus()
def setStatus(self, status):
self._smThread.setStatus(status)
def procIpcBatch(self, trimToTarget=0, forceOutput=False):
self._smThread.procIpcBatch(trimToTarget, forceOutput)
class ServiceManager: class ServiceManager:
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
@ -504,10 +549,10 @@ class ServiceManager:
def isActive(self): def isActive(self):
""" """
Determine if the service/cluster is active at all, i.e. at least Determine if the service/cluster is active at all, i.e. at least
one thread is not "stopped". one instance is active
""" """
for ti in self._tInsts: for ti in self._tInsts:
if not ti.getStatus().isStopped(): if ti.getStatus().isActive():
return True return True
return False return False
@ -545,10 +590,10 @@ class ServiceManager:
# while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
status = ti.getStatus() status = ti.getStatus()
if status.isRunning(): if status.isRunning():
th = ti.getSmThread() # th = ti.getSmThread()
th.procIpcBatch() # regular processing, ti.procIpcBatch() # regular processing,
if status.isStopped(): if status.isStopped():
th.procIpcBatch() # one last time? ti.procIpcBatch() # one last time?
# self._updateThreadStatus() # self._updateThreadStatus()
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
@ -578,7 +623,8 @@ class ServiceManager:
if not ti.isFirst(): if not ti.isFirst():
tFirst = self._getFirstInstance() tFirst = self._getFirstInstance()
tFirst.createDnode(ti.getDbTarget()) tFirst.createDnode(ti.getDbTarget())
ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines ti.printFirst10Lines()
# ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
def stopTaosServices(self): def stopTaosServices(self):
with self._lock: with self._lock:
@ -624,21 +670,24 @@ class ServiceManagerThread:
""" """
MAX_QUEUE_SIZE = 10000 MAX_QUEUE_SIZE = 10000
def __init__(self): def __init__(self, subProc: TdeSubProcess, logDir: str):
# Set the sub process # Set the sub process
self._tdeSubProcess = None # type: TdeSubProcess # self._tdeSubProcess = None # type: TdeSubProcess
# Arrange the TDengine instance # Arrange the TDengine instance
# self._tInstNum = tInstNum # instance serial number in cluster, ZERO based # self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
# self._tInst = tInst or TdeInstance() # Need an instance # self._tInst = tInst or TdeInstance() # Need an instance
self._thread = None # The actual thread, # type: threading.Thread # self._thread = None # type: Optional[threading.Thread] # The actual thread, # type: threading.Thread
self._thread2 = None # watching stderr # self._thread2 = None # type: Optional[threading.Thread] Thread # watching stderr
self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually. self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
self._start(subProc, logDir)
def __repr__(self): def __repr__(self):
return "[SvcMgrThread: status={}, subProc={}]".format( raise CrashGenError("SMT status moved to TdeSubProcess")
self.getStatus(), self._tdeSubProcess) # return "[SvcMgrThread: status={}, subProc={}]".format(
# self.getStatus(), self._tdeSubProcess)
def getStatus(self): def getStatus(self):
''' '''
@ -646,29 +695,33 @@ class ServiceManagerThread:
''' '''
return self._status return self._status
def setStatus(self, statusVal: int):
self._status.set(statusVal)
# Start the thread (with sub process), and wait for the sub service # Start the thread (with sub process), and wait for the sub service
# to become fully operational # to become fully operational
def start(self, cmdLine : str, logDir: str): def _start(self, subProc :TdeSubProcess, logDir: str):
''' '''
Request the manager thread to start a new sub process, and manage it. Request the manager thread to start a new sub process, and manage it.
:param cmdLine: the command line to invoke :param cmdLine: the command line to invoke
:param logDir: the logging directory, to hold stdout/stderr files :param logDir: the logging directory, to hold stdout/stderr files
''' '''
if self._thread: # if self._thread:
raise RuntimeError("Unexpected _thread") # raise RuntimeError("Unexpected _thread")
if self._tdeSubProcess: # if self._tdeSubProcess:
raise RuntimeError("TDengine sub process already created/running") # raise RuntimeError("TDengine sub process already created/running")
Logging.info("Attempting to start TAOS service: {}".format(self)) # Moved to TdeSubProcess
# Logging.info("Attempting to start TAOS service: {}".format(self))
self._status.set(Status.STATUS_STARTING) self._status.set(Status.STATUS_STARTING)
self._tdeSubProcess = TdeSubProcess.start(cmdLine) # TODO: verify process is running # self._tdeSubProcess = TdeSubProcess.start(cmdLine) # TODO: verify process is running
self._ipcQueue = Queue() # type: Queue self._ipcQueue = Queue() # type: Queue
self._thread = threading.Thread( # First thread captures server OUTPUT self._thread = threading.Thread( # First thread captures server OUTPUT
target=self.svcOutputReader, target=self.svcOutputReader,
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir)) args=(subProc.getStdOut(), self._ipcQueue, logDir))
self._thread.daemon = True # thread dies with the program self._thread.daemon = True # thread dies with the program
self._thread.start() self._thread.start()
time.sleep(0.01) time.sleep(0.01)
@ -680,7 +733,7 @@ class ServiceManagerThread:
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
target=self.svcErrorReader, target=self.svcErrorReader,
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue, logDir)) args=(subProc.getStdErr(), self._ipcQueue, logDir))
self._thread2.daemon = True # thread dies with the program self._thread2.daemon = True # thread dies with the program
self._thread2.start() self._thread2.start()
time.sleep(0.01) time.sleep(0.01)
@ -695,14 +748,14 @@ class ServiceManagerThread:
Progress.emit(Progress.SERVICE_START_NAP) Progress.emit(Progress.SERVICE_START_NAP)
# print("_zz_", end="", flush=True) # print("_zz_", end="", flush=True)
if self._status.isRunning(): if self._status.isRunning():
Logging.info("[] TDengine service READY to process requests") Logging.info("[] TDengine service READY to process requests: pid={}".format(subProc.getPid()))
Logging.info("[] TAOS service started: {}".format(self)) # Logging.info("[] TAOS service started: {}".format(self))
# self._verifyDnode(self._tInst) # query and ensure dnode is ready # self._verifyDnode(self._tInst) # query and ensure dnode is ready
# Logging.debug("[] TAOS Dnode verified: {}".format(self)) # Logging.debug("[] TAOS Dnode verified: {}".format(self))
return # now we've started return # now we've started
# TODO: handle failure-to-start better? # TODO: handle failure-to-start better?
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
raise RuntimeError("TDengine service did not start successfully: {}".format(self)) raise RuntimeError("TDengine service DID NOT achieve READY status: pid={}".format(subProc.getPid()))
def _verifyDnode(self, tInst: TdeInstance): def _verifyDnode(self, tInst: TdeInstance):
dbc = DbConn.createNative(tInst.getDbTarget()) dbc = DbConn.createNative(tInst.getDbTarget())
@ -722,29 +775,23 @@ class ServiceManagerThread:
break break
if not isValid: if not isValid:
print("Failed to start dnode, sleep for a while") print("Failed to start dnode, sleep for a while")
time.sleep(600) time.sleep(10.0)
raise RuntimeError("Failed to start Dnode, expected port not found: {}". raise RuntimeError("Failed to start Dnode, expected port not found: {}".
format(tInst.getPort())) format(tInst.getPort()))
dbc.close() dbc.close()
def stop(self): def stop(self):
# can be called from both main thread or signal handler # can be called from both main thread or signal handler
Logging.info("Terminating TDengine service running as the sub process...")
if self.getStatus().isStopped():
Logging.info("Service already stopped")
return
if self.getStatus().isStopping():
Logging.info("Service is already being stopped, pid: {}".format(self._tdeSubProcess.getPid()))
return
# Linux will send Control-C generated SIGINT to the TDengine process # Linux will send Control-C generated SIGINT to the TDengine process
# already, ref: # already, ref:
# https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self._tdeSubProcess: # if not self._tdeSubProcess:
raise RuntimeError("sub process object missing") # raise RuntimeError("sub process object missing")
self._status.set(Status.STATUS_STOPPING) # self._status.set(Status.STATUS_STOPPING)
TdeSubProcess.stop(self._tdeSubProcess) # must stop, no matter what # TdeSubProcess.stop(self._tdeSubProcess) # must stop, no matter what
self._tdeSubProcess = None # self._tdeSubProcess = None
# if not self._tdeSubProcess.stop(): # everything withing # if not self._tdeSubProcess.stop(): # everything withing
# if self._tdeSubProcess.isRunning(): # still running, should now never happen # if self._tdeSubProcess.isRunning(): # still running, should now never happen
# Logging.error("FAILED to stop sub process, it is still running... pid = {}".format( # Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
@ -757,29 +804,28 @@ class ServiceManagerThread:
outputLines = 10 # for last output outputLines = 10 # for last output
if self.getStatus().isStopped(): if self.getStatus().isStopped():
self.procIpcBatch(outputLines) # one last time self.procIpcBatch(outputLines) # one last time
Logging.debug("End of TDengine Service Output: {}".format(self)) Logging.debug("End of TDengine Service Output")
Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n") Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n")
else: else:
print("WARNING: SMT did not terminate as expected: {}".format(self)) print("WARNING: SMT did not terminate as expected")
def join(self): def join(self):
# TODO: sanity check # TODO: sanity check
if not self.getStatus().isStopping(): s = self.getStatus()
if s.isStopping() or s.isStopped(): # we may be stopping ourselves, or have been stopped/killed by others
if self._thread or self._thread2 :
if self._thread:
self._thread.join()
self._thread = None
if self._thread2: # STD ERR thread
self._thread2.join()
self._thread2 = None
else:
Logging.warning("Joining empty thread, doing nothing")
else:
raise RuntimeError( raise RuntimeError(
"SMT.Join(): Unexpected status: {}".format(self._status)) "SMT.Join(): Unexpected status: {}".format(self._status))
if self._thread or self._thread2 :
if self._thread:
self._thread.join()
self._thread = None
if self._thread2: # STD ERR thread
self._thread2.join()
self._thread2 = None
else:
print("Joining empty thread, doing nothing")
self._status.set(Status.STATUS_STOPPED)
def _trimQueue(self, targetSize): def _trimQueue(self, targetSize):
if targetSize <= 0: if targetSize <= 0:
return # do nothing return # do nothing
@ -798,6 +844,10 @@ class ServiceManagerThread:
TD_READY_MSG = "TDengine is initialized successfully" TD_READY_MSG = "TDengine is initialized successfully"
def procIpcBatch(self, trimToTarget=0, forceOutput=False): def procIpcBatch(self, trimToTarget=0, forceOutput=False):
'''
Process a batch of STDOUT/STDERR data, until we read EMPTY from
the pipe.
'''
self._trimQueue(trimToTarget) # trim if necessary self._trimQueue(trimToTarget) # trim if necessary
# Process all the output generated by the underlying sub process, # Process all the output generated by the underlying sub process,
# managed by IO thread # managed by IO thread
@ -887,7 +937,8 @@ class ServiceManagerThread:
# queue.put(line) # queue.put(line)
# meaning sub process must have died # meaning sub process must have died
Logging.info("EOF for TDengine STDOUT: {}".format(self)) Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
self.setStatus(Status.STATUS_STOPPED)
out.close() # Close the stream out.close() # Close the stream
fOut.close() # Close the output file fOut.close() # Close the output file
@ -898,6 +949,6 @@ class ServiceManagerThread:
for line in iter(err.readline, b''): for line in iter(err.readline, b''):
fErr.write(line) fErr.write(line)
Logging.info("TDengine STDERR: {}".format(line)) Logging.info("TDengine STDERR: {}".format(line))
Logging.info("EOF for TDengine STDERR: {}".format(self)) Logging.info("EOF for TDengine STDERR")
err.close() err.close()
fErr.close() fErr.close()

View File

@ -1,15 +1,29 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
from typing import Optional
gConfig: argparse.Namespace from crash_gen.misc import CrashGenError
# gConfig: Optional[argparse.Namespace]
class Settings: class Settings:
@classmethod _config = None # type Optional[argparse.Namespace]
def init(cls):
global gConfig
gConfig = []
@classmethod @classmethod
def setConfig(cls, config): def init(cls):
global gConfig cls._config = None
gConfig = config
@classmethod
def setConfig(cls, config: argparse.Namespace):
cls._config = config
@classmethod
# TODO: check items instead of exposing everything
def getConfig(cls) -> argparse.Namespace:
if cls._config is None:
raise CrashGenError("invalid state")
return cls._config
@classmethod
def clearConfig(cls):
cls._config = None

View File

@ -0,0 +1,5 @@
from typing import Any, List, NewType
DirPath = NewType('DirPath', str)
QueryResult = NewType('QueryResult', List[List[Any]])