Added service restart to crash_gen tool, discovered meter id mismatch problem
This commit is contained in:
parent
9315b99ca4
commit
91c1fe45b6
|
@ -59,13 +59,12 @@ if sys.version_info[0] < 3:
|
||||||
# Command-line/Environment Configurations, will set a bit later
|
# Command-line/Environment Configurations, will set a bit later
|
||||||
# ConfigNameSpace = argparse.Namespace
|
# ConfigNameSpace = argparse.Namespace
|
||||||
gConfig = argparse.Namespace() # Dummy value, will be replaced later
|
gConfig = argparse.Namespace() # Dummy value, will be replaced later
|
||||||
|
gSvcMgr = None # TODO: refactor this hack, use dep injection
|
||||||
logger = None
|
logger = None
|
||||||
|
|
||||||
|
|
||||||
def runThread(wt: WorkerThread):
|
def runThread(wt: WorkerThread):
|
||||||
wt.run()
|
wt.run()
|
||||||
|
|
||||||
|
|
||||||
class CrashGenError(Exception):
|
class CrashGenError(Exception):
|
||||||
def __init__(self, msg=None, errno=None):
|
def __init__(self, msg=None, errno=None):
|
||||||
self.msg = msg
|
self.msg = msg
|
||||||
|
@ -206,22 +205,13 @@ class WorkerThread:
|
||||||
time.sleep(0) # let the released thread run a bit
|
time.sleep(0) # let the released thread run a bit
|
||||||
|
|
||||||
def execSql(self, sql): # TODO: expose DbConn directly
|
def execSql(self, sql): # TODO: expose DbConn directly
|
||||||
if (gConfig.per_thread_db_connection):
|
return self.getDbConn().execute(sql)
|
||||||
return self._dbConn.execute(sql)
|
|
||||||
else:
|
|
||||||
return self._tc.getDbManager().getDbConn().execute(sql)
|
|
||||||
|
|
||||||
def querySql(self, sql): # TODO: expose DbConn directly
|
def querySql(self, sql): # TODO: expose DbConn directly
|
||||||
if (gConfig.per_thread_db_connection):
|
return self.getDbConn().query(sql)
|
||||||
return self._dbConn.query(sql)
|
|
||||||
else:
|
|
||||||
return self._tc.getDbManager().getDbConn().query(sql)
|
|
||||||
|
|
||||||
def getQueryResult(self):
|
def getQueryResult(self):
|
||||||
if (gConfig.per_thread_db_connection):
|
return self.getDbConn().getQueryResult()
|
||||||
return self._dbConn.getQueryResult()
|
|
||||||
else:
|
|
||||||
return self._tc.getDbManager().getDbConn().getQueryResult()
|
|
||||||
|
|
||||||
def getDbConn(self):
|
def getDbConn(self):
|
||||||
if (gConfig.per_thread_db_connection):
|
if (gConfig.per_thread_db_connection):
|
||||||
|
@ -239,6 +229,8 @@ class WorkerThread:
|
||||||
|
|
||||||
|
|
||||||
class ThreadCoordinator:
|
class ThreadCoordinator:
|
||||||
|
WORKER_THREAD_TIMEOUT = 30
|
||||||
|
|
||||||
def __init__(self, pool: ThreadPool, dbManager):
|
def __init__(self, pool: ThreadPool, dbManager):
|
||||||
self._curStep = -1 # first step is 0
|
self._curStep = -1 # first step is 0
|
||||||
self._pool = pool
|
self._pool = pool
|
||||||
|
@ -309,7 +301,7 @@ class ThreadCoordinator:
|
||||||
# let other threads go past the pool barrier, but wait at the
|
# let other threads go past the pool barrier, but wait at the
|
||||||
# thread gate
|
# thread gate
|
||||||
logger.debug("[TRD] Main thread about to cross the barrier")
|
logger.debug("[TRD] Main thread about to cross the barrier")
|
||||||
self.crossStepBarrier(timeout=15)
|
self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
|
||||||
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
|
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
|
||||||
logger.debug("[TRD] Main thread finished crossing the barrier")
|
logger.debug("[TRD] Main thread finished crossing the barrier")
|
||||||
|
|
||||||
|
@ -586,6 +578,10 @@ class DbConn:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.isOpen = False
|
self.isOpen = False
|
||||||
self._type = self.TYPE_INVALID
|
self._type = self.TYPE_INVALID
|
||||||
|
self._lastSql = None
|
||||||
|
|
||||||
|
def getLastSql(self):
|
||||||
|
return self._lastSql
|
||||||
|
|
||||||
def open(self):
|
def open(self):
|
||||||
if (self.isOpen):
|
if (self.isOpen):
|
||||||
|
@ -674,10 +670,11 @@ class DbConnRest(DbConn):
|
||||||
self.isOpen = False
|
self.isOpen = False
|
||||||
|
|
||||||
def _doSql(self, sql):
|
def _doSql(self, sql):
|
||||||
|
self._lastSql = sql # remember this, last SQL attempted
|
||||||
try:
|
try:
|
||||||
r = requests.post(self._url,
|
r = requests.post(self._url,
|
||||||
data = sql,
|
data = sql,
|
||||||
auth = HTTPBasicAuth('root', 'taosdata'))
|
auth = HTTPBasicAuth('root', 'taosdata'))
|
||||||
except:
|
except:
|
||||||
print("REST API Failure (TODO: more info here)")
|
print("REST API Failure (TODO: more info here)")
|
||||||
raise
|
raise
|
||||||
|
@ -1116,7 +1113,7 @@ class StateMechine:
|
||||||
self._curState = self._findCurrentState() # starting state
|
self._curState = self._findCurrentState() # starting state
|
||||||
# transitition target probabilities, indexed with value of STATE_EMPTY,
|
# transitition target probabilities, indexed with value of STATE_EMPTY,
|
||||||
# STATE_DB_ONLY, etc.
|
# STATE_DB_ONLY, etc.
|
||||||
self._stateWeights = [1, 3, 5, 15]
|
self._stateWeights = [1, 2, 10, 40]
|
||||||
|
|
||||||
def getCurrentState(self):
|
def getCurrentState(self):
|
||||||
return self._curState
|
return self._curState
|
||||||
|
@ -1332,8 +1329,8 @@ class DbManager():
|
||||||
|
|
||||||
def getNextTick(self):
|
def getNextTick(self):
|
||||||
with self._lock: # prevent duplicate tick
|
with self._lock: # prevent duplicate tick
|
||||||
if Dice.throw(10) == 0: # 1 in 10 chance
|
if Dice.throw(20) == 0: # 1 in 20 chance
|
||||||
return self._lastTick + datetime.timedelta(0, -100)
|
return self._lastTick + datetime.timedelta(0, -100) # Go back in time 100 seconds
|
||||||
else: # regular
|
else: # regular
|
||||||
# add one second to it
|
# add one second to it
|
||||||
self._lastTick += datetime.timedelta(0, 1)
|
self._lastTick += datetime.timedelta(0, 1)
|
||||||
|
@ -1448,7 +1445,6 @@ class Task():
|
||||||
# logger.debug("Creating new task {}...".format(self._taskNum))
|
# logger.debug("Creating new task {}...".format(self._taskNum))
|
||||||
|
|
||||||
self._execStats = execStats
|
self._execStats = execStats
|
||||||
self._lastSql = "" # last SQL executed/attempted
|
|
||||||
|
|
||||||
def isSuccess(self):
|
def isSuccess(self):
|
||||||
return self._err is None
|
return self._err is None
|
||||||
|
@ -1492,6 +1488,8 @@ class Task():
|
||||||
1000 # REST catch-all error
|
1000 # REST catch-all error
|
||||||
]:
|
]:
|
||||||
return True # These are the ALWAYS-ACCEPTABLE ones
|
return True # These are the ALWAYS-ACCEPTABLE ones
|
||||||
|
elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
|
||||||
|
return True # We may get "network unavilable" when restarting service
|
||||||
elif errno == 0x200 : # invalid SQL, we need to div in a bit more
|
elif errno == 0x200 : # invalid SQL, we need to div in a bit more
|
||||||
if msg.find("invalid column name") != -1:
|
if msg.find("invalid column name") != -1:
|
||||||
return True
|
return True
|
||||||
|
@ -1513,8 +1511,8 @@ class Task():
|
||||||
"[-] executing task {}...".format(self.__class__.__name__))
|
"[-] executing task {}...".format(self.__class__.__name__))
|
||||||
|
|
||||||
self._err = None
|
self._err = None
|
||||||
self._execStats.beginTaskType(
|
self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
|
||||||
self.__class__.__name__) # mark beginning
|
errno2 = None
|
||||||
try:
|
try:
|
||||||
self._executeInternal(te, wt) # TODO: no return value?
|
self._executeInternal(te, wt) # TODO: no return value?
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
|
@ -1522,16 +1520,17 @@ class Task():
|
||||||
err.errno > 0) else 0x80000000 + err.errno # correct error scheme
|
err.errno > 0) else 0x80000000 + err.errno # correct error scheme
|
||||||
if (gConfig.continue_on_exception): # user choose to continue
|
if (gConfig.continue_on_exception): # user choose to continue
|
||||||
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
|
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
|
||||||
errno2, err, self._lastSql))
|
errno2, err, wt.getDbConn().getLastSql()))
|
||||||
self._err = err
|
self._err = err
|
||||||
elif self._isErrAcceptable(errno2, err.__str__()):
|
elif self._isErrAcceptable(errno2, err.__str__()):
|
||||||
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
|
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
|
||||||
errno2, err, self._lastSql))
|
errno2, err, wt.getDbConn().getLastSql()))
|
||||||
print("_", end="", flush=True)
|
print("_", end="", flush=True)
|
||||||
self._err = err
|
self._err = err
|
||||||
else: # not an acceptable error
|
else: # not an acceptable error
|
||||||
errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
|
errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
|
||||||
errno2, err, self._lastSql)
|
self.__class__.__name__,
|
||||||
|
errno2, err, wt.getDbConn().getLastSql())
|
||||||
self.logDebug(errMsg)
|
self.logDebug(errMsg)
|
||||||
if gConfig.debug:
|
if gConfig.debug:
|
||||||
# raise # so that we see full stack
|
# raise # so that we see full stack
|
||||||
|
@ -1555,25 +1554,22 @@ class Task():
|
||||||
except BaseException:
|
except BaseException:
|
||||||
self.logDebug(
|
self.logDebug(
|
||||||
"[=] Unexpected exception, SQL: {}".format(
|
"[=] Unexpected exception, SQL: {}".format(
|
||||||
self._lastSql))
|
wt.getDbConn().getLastSql()))
|
||||||
raise
|
raise
|
||||||
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
|
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
|
||||||
|
|
||||||
self.logDebug("[X] task execution completed, {}, status: {}".format(
|
self.logDebug("[X] task execution completed, {}, status: {}".format(
|
||||||
self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
|
self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
|
||||||
# TODO: merge with above.
|
# TODO: merge with above.
|
||||||
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess())
|
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
|
||||||
|
|
||||||
def execSql(self, sql):
|
def execSql(self, sql):
|
||||||
self._lastSql = sql
|
|
||||||
return self._dbManager.execute(sql)
|
return self._dbManager.execute(sql)
|
||||||
|
|
||||||
def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
|
def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
|
||||||
self._lastSql = sql
|
|
||||||
return wt.execSql(sql)
|
return wt.execSql(sql)
|
||||||
|
|
||||||
def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
|
def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
|
||||||
self._lastSql = sql
|
|
||||||
return wt.querySql(sql)
|
return wt.querySql(sql)
|
||||||
|
|
||||||
def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread
|
def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread
|
||||||
|
@ -1588,6 +1584,7 @@ class ExecutionStats:
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._firstTaskStartTime = None
|
self._firstTaskStartTime = None
|
||||||
self._execStartTime = None
|
self._execStartTime = None
|
||||||
|
self._errors = {}
|
||||||
self._elapsedTime = 0.0 # total elapsed time
|
self._elapsedTime = 0.0 # total elapsed time
|
||||||
self._accRunTime = 0.0 # accumulated run time
|
self._accRunTime = 0.0 # accumulated run time
|
||||||
|
|
||||||
|
@ -1607,13 +1604,18 @@ class ExecutionStats:
|
||||||
def endExec(self):
|
def endExec(self):
|
||||||
self._elapsedTime = time.time() - self._execStartTime
|
self._elapsedTime = time.time() - self._execStartTime
|
||||||
|
|
||||||
def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
|
def incExecCount(self, klassName, isSuccess, eno=None): # TODO: add a lock here
|
||||||
if klassName not in self._execTimes:
|
if klassName not in self._execTimes:
|
||||||
self._execTimes[klassName] = [0, 0]
|
self._execTimes[klassName] = [0, 0]
|
||||||
t = self._execTimes[klassName] # tuple for the data
|
t = self._execTimes[klassName] # tuple for the data
|
||||||
t[0] += 1 # index 0 has the "total" execution times
|
t[0] += 1 # index 0 has the "total" execution times
|
||||||
if isSuccess:
|
if isSuccess:
|
||||||
t[1] += 1 # index 1 has the "success" execution times
|
t[1] += 1 # index 1 has the "success" execution times
|
||||||
|
if eno != None:
|
||||||
|
if klassName not in self._errors:
|
||||||
|
self._errors[klassName] = {}
|
||||||
|
errors = self._errors[klassName]
|
||||||
|
errors[eno] = errors[eno]+1 if eno in errors else 1
|
||||||
|
|
||||||
def beginTaskType(self, klassName):
|
def beginTaskType(self, klassName):
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
@ -1643,7 +1645,14 @@ class ExecutionStats:
|
||||||
execTimesAny = 0
|
execTimesAny = 0
|
||||||
for k, n in self._execTimes.items():
|
for k, n in self._execTimes.items():
|
||||||
execTimesAny += n[0]
|
execTimesAny += n[0]
|
||||||
logger.info("| {0:<24}: {1}/{2}".format(k, n[1], n[0]))
|
errStr = None
|
||||||
|
if k in self._errors:
|
||||||
|
errors = self._errors[k]
|
||||||
|
# print("errors = {}".format(errors))
|
||||||
|
errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
|
||||||
|
# print("error strings = {}".format(errStrs))
|
||||||
|
errStr = ", ".join(errStrs)
|
||||||
|
logger.info("| {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"| Total Tasks Executed (success or not): {} ".format(execTimesAny))
|
"| Total Tasks Executed (success or not): {} ".format(execTimesAny))
|
||||||
|
@ -1742,11 +1751,10 @@ class TaskCreateSuperTable(StateTransitionTask):
|
||||||
logger.debug("Skipping task, no DB yet")
|
logger.debug("Skipping task, no DB yet")
|
||||||
return
|
return
|
||||||
|
|
||||||
tblName = self._dbManager.getFixedSuperTableName()
|
sTable = self._dbManager.getFixedSuperTable()
|
||||||
# wt.execSql("use db") # should always be in place
|
# wt.execSql("use db") # should always be in place
|
||||||
self.execWtSql(
|
sTable.create(wt.getDbConn(), {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'})
|
||||||
wt,
|
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
|
||||||
"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
|
|
||||||
# No need to create the regular tables, INSERT will do that
|
# No need to create the regular tables, INSERT will do that
|
||||||
# automatically
|
# automatically
|
||||||
|
|
||||||
|
@ -1755,6 +1763,14 @@ class TdSuperTable:
|
||||||
def __init__(self, stName):
|
def __init__(self, stName):
|
||||||
self._stName = stName
|
self._stName = stName
|
||||||
|
|
||||||
|
def create(self, dbc, cols: dict, tags: dict):
|
||||||
|
sql = "CREATE TABLE db.{} ({}) TAGS ({})".format(
|
||||||
|
self._stName,
|
||||||
|
",".join(['%s %s'%(k,v) for (k,v) in cols.items()]),
|
||||||
|
",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
|
||||||
|
)
|
||||||
|
dbc.execute(sql)
|
||||||
|
|
||||||
def getRegTables(self, dbc: DbConn):
|
def getRegTables(self, dbc: DbConn):
|
||||||
try:
|
try:
|
||||||
dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later
|
dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later
|
||||||
|
@ -1773,12 +1789,56 @@ class TdSuperTable:
|
||||||
sql = "select tbname from {} where tbname in ('{}')".format(self._stName, regTableName)
|
sql = "select tbname from {} where tbname in ('{}')".format(self._stName, regTableName)
|
||||||
if dbc.query(sql) >= 1 : # reg table exists already
|
if dbc.query(sql) >= 1 : # reg table exists already
|
||||||
return
|
return
|
||||||
sql = "CREATE TABLE {} USING {} tags ('{}', {})".format(
|
sql = "CREATE TABLE {} USING {} tags ({})".format(
|
||||||
regTableName, self._stName,
|
regTableName, self._stName, self._getTagStrForSql(dbc)
|
||||||
'xyz', '33'
|
|
||||||
)
|
)
|
||||||
dbc.execute(sql)
|
dbc.execute(sql)
|
||||||
|
|
||||||
|
def _getTagStrForSql(self, dbc) :
|
||||||
|
tags = self._getTags(dbc)
|
||||||
|
tagStrs = []
|
||||||
|
for tagName in tags:
|
||||||
|
tagType = tags[tagName]
|
||||||
|
if tagType == 'BINARY':
|
||||||
|
tagStrs.append("'Beijing-Shanghai-LosAngeles'")
|
||||||
|
elif tagType == 'FLOAT':
|
||||||
|
tagStrs.append('9.9')
|
||||||
|
elif tagType == 'INT':
|
||||||
|
tagStrs.append('88')
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Unexpected tag type: {}".format(tagType))
|
||||||
|
return ", ".join(tagStrs)
|
||||||
|
|
||||||
|
def _getTags(self, dbc) -> dict:
|
||||||
|
dbc.query("DESCRIBE {}".format(self._stName))
|
||||||
|
stCols = dbc.getQueryResult()
|
||||||
|
# print(stCols)
|
||||||
|
ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
|
||||||
|
# print("Tags retrieved: {}".format(ret))
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def addTag(self, dbc, tagName, tagType):
|
||||||
|
if tagName in self._getTags(dbc): # already
|
||||||
|
return
|
||||||
|
# sTable.addTag("extraTag", "int")
|
||||||
|
sql = "alter table db.{} add tag {} {}".format(self._stName, tagName, tagType)
|
||||||
|
dbc.execute(sql)
|
||||||
|
|
||||||
|
def dropTag(self, dbc, tagName):
|
||||||
|
if not tagName in self._getTags(dbc): # don't have this tag
|
||||||
|
return
|
||||||
|
sql = "alter table db.{} drop tag {}".format(self._stName, tagName)
|
||||||
|
dbc.execute(sql)
|
||||||
|
|
||||||
|
def changeTag(self, dbc, oldTag, newTag):
|
||||||
|
tags = self._getTags(dbc)
|
||||||
|
if not oldTag in tags: # don't have this tag
|
||||||
|
return
|
||||||
|
if newTag in tags: # already have this tag
|
||||||
|
return
|
||||||
|
sql = "alter table db.{} change tag {} {}".format(self._stName, oldTag, newTag)
|
||||||
|
dbc.execute(sql)
|
||||||
|
|
||||||
class TaskReadData(StateTransitionTask):
|
class TaskReadData(StateTransitionTask):
|
||||||
@classmethod
|
@classmethod
|
||||||
def getEndState(cls):
|
def getEndState(cls):
|
||||||
|
@ -1796,7 +1856,7 @@ class TaskReadData(StateTransitionTask):
|
||||||
wt.getDbConn().close()
|
wt.getDbConn().close()
|
||||||
wt.getDbConn().open()
|
wt.getDbConn().open()
|
||||||
|
|
||||||
for rTbName in sTable.getRegTables(self._dbManager.getDbConn()): # regular tables
|
for rTbName in sTable.getRegTables(wt.getDbConn()): # regular tables
|
||||||
aggExpr = Dice.choice(['*', 'count(*)', 'avg(speed)',
|
aggExpr = Dice.choice(['*', 'count(*)', 'avg(speed)',
|
||||||
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
|
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
|
||||||
'sum(speed)', 'stddev(speed)',
|
'sum(speed)', 'stddev(speed)',
|
||||||
|
@ -1805,7 +1865,7 @@ class TaskReadData(StateTransitionTask):
|
||||||
self.execWtSql(wt, "select {} from db.{}".format(aggExpr, rTbName))
|
self.execWtSql(wt, "select {} from db.{}".format(aggExpr, rTbName))
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
|
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
|
||||||
logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
|
logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, wt.getDbConn().getLastSql()))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
class TaskDropSuperTable(StateTransitionTask):
|
class TaskDropSuperTable(StateTransitionTask):
|
||||||
|
@ -1864,20 +1924,54 @@ class TaskAlterTags(StateTransitionTask):
|
||||||
return state.canDropFixedSuperTable() # if we can drop it, we can alter tags
|
return state.canDropFixedSuperTable() # if we can drop it, we can alter tags
|
||||||
|
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
tblName = self._dbManager.getFixedSuperTableName()
|
# tblName = self._dbManager.getFixedSuperTableName()
|
||||||
|
dbc = wt.getDbConn()
|
||||||
|
sTable = self._dbManager.getFixedSuperTable()
|
||||||
dice = Dice.throw(4)
|
dice = Dice.throw(4)
|
||||||
if dice == 0:
|
if dice == 0:
|
||||||
sql = "alter table db.{} add tag extraTag int".format(tblName)
|
sTable.addTag(dbc, "extraTag", "int")
|
||||||
|
# sql = "alter table db.{} add tag extraTag int".format(tblName)
|
||||||
elif dice == 1:
|
elif dice == 1:
|
||||||
sql = "alter table db.{} drop tag extraTag".format(tblName)
|
sTable.dropTag(dbc, "extraTag")
|
||||||
|
# sql = "alter table db.{} drop tag extraTag".format(tblName)
|
||||||
elif dice == 2:
|
elif dice == 2:
|
||||||
sql = "alter table db.{} drop tag newTag".format(tblName)
|
sTable.dropTag(dbc, "newTag")
|
||||||
|
# sql = "alter table db.{} drop tag newTag".format(tblName)
|
||||||
else: # dice == 3
|
else: # dice == 3
|
||||||
sql = "alter table db.{} change tag extraTag newTag".format(
|
sTable.changeTag(dbc, "extraTag", "newTag")
|
||||||
tblName)
|
# sql = "alter table db.{} change tag extraTag newTag".format(tblName)
|
||||||
|
|
||||||
self.execWtSql(wt, sql)
|
class TaskRestartService(StateTransitionTask):
|
||||||
|
_isRunning = False
|
||||||
|
_classLock = threading.Lock()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def getEndState(cls):
|
||||||
|
return None # meaning doesn't affect state
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def canBeginFrom(cls, state: AnyState):
|
||||||
|
if gConfig.auto_start_service:
|
||||||
|
return state.canDropFixedSuperTable() # Basicallly when we have the super table
|
||||||
|
return False # don't run this otherwise
|
||||||
|
|
||||||
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
|
if not gConfig.auto_start_service: # only execute when we are in -a mode
|
||||||
|
print("_a", end="", flush=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
with self._classLock:
|
||||||
|
if self._isRunning:
|
||||||
|
print("Skipping restart task, another running already")
|
||||||
|
return
|
||||||
|
self._isRunning = True
|
||||||
|
|
||||||
|
if Dice.throw(50) == 0: # 1 in N chance
|
||||||
|
dbc = wt.getDbConn()
|
||||||
|
dbc.execute("show databases") # simple delay, align timing with other workers
|
||||||
|
gSvcMgr.restart()
|
||||||
|
|
||||||
|
self._isRunning = False
|
||||||
|
|
||||||
class TaskAddData(StateTransitionTask):
|
class TaskAddData(StateTransitionTask):
|
||||||
# Track which table is being actively worked on
|
# Track which table is being actively worked on
|
||||||
|
@ -1908,7 +2002,7 @@ class TaskAddData(StateTransitionTask):
|
||||||
return state.canAddData()
|
return state.canAddData()
|
||||||
|
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
ds = self._dbManager
|
ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
|
||||||
tblSeq = list(range(
|
tblSeq = list(range(
|
||||||
self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
|
self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
|
||||||
random.shuffle(tblSeq)
|
random.shuffle(tblSeq)
|
||||||
|
@ -1920,7 +2014,7 @@ class TaskAddData(StateTransitionTask):
|
||||||
|
|
||||||
sTable = ds.getFixedSuperTable()
|
sTable = ds.getFixedSuperTable()
|
||||||
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
||||||
sTable.ensureTable(ds.getDbConn(), regTableName) # Ensure the table exists
|
sTable.ensureTable(wt.getDbConn(), regTableName) # Ensure the table exists
|
||||||
|
|
||||||
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table
|
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table
|
||||||
nextInt = ds.getNextInt()
|
nextInt = ds.getNextInt()
|
||||||
|
@ -2013,6 +2107,8 @@ class SvcManager:
|
||||||
# self._status = MainExec.STATUS_RUNNING # set inside
|
# self._status = MainExec.STATUS_RUNNING # set inside
|
||||||
# _startTaosService()
|
# _startTaosService()
|
||||||
self.svcMgrThread = None
|
self.svcMgrThread = None
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._isRestarting = False
|
||||||
|
|
||||||
def _doMenu(self):
|
def _doMenu(self):
|
||||||
choice = ""
|
choice = ""
|
||||||
|
@ -2047,9 +2143,8 @@ class SvcManager:
|
||||||
self.sigHandlerResume()
|
self.sigHandlerResume()
|
||||||
elif choice == "2":
|
elif choice == "2":
|
||||||
self.stopTaosService()
|
self.stopTaosService()
|
||||||
elif choice == "3":
|
elif choice == "3": # Restart
|
||||||
self.stopTaosService()
|
self.restart()
|
||||||
self.startTaosService()
|
|
||||||
else:
|
else:
|
||||||
raise RuntimeError("Invalid menu choice: {}".format(choice))
|
raise RuntimeError("Invalid menu choice: {}".format(choice))
|
||||||
|
|
||||||
|
@ -2076,57 +2171,79 @@ class SvcManager:
|
||||||
self.svcMgrThread = None # no more
|
self.svcMgrThread = None # no more
|
||||||
|
|
||||||
def _procIpcAll(self):
|
def _procIpcAll(self):
|
||||||
while self.svcMgrThread: # for as long as the svc mgr thread is still here
|
while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
|
||||||
self.svcMgrThread.procIpcBatch() # regular processing,
|
if self.isRunning():
|
||||||
|
self.svcMgrThread.procIpcBatch() # regular processing,
|
||||||
|
self._checkServiceManagerThread()
|
||||||
|
elif self.isRetarting():
|
||||||
|
print("Service restarting...")
|
||||||
time.sleep(0.5) # pause, before next round
|
time.sleep(0.5) # pause, before next round
|
||||||
self._checkServiceManagerThread()
|
|
||||||
print(
|
print(
|
||||||
"Service Manager Thread (with subprocess) has ended, main thread now exiting...")
|
"Service Manager Thread (with subprocess) has ended, main thread now exiting...")
|
||||||
|
|
||||||
def startTaosService(self):
|
def startTaosService(self):
|
||||||
if self.svcMgrThread:
|
with self._lock:
|
||||||
raise RuntimeError("Cannot start TAOS service when one may already be running")
|
if self.svcMgrThread:
|
||||||
|
raise RuntimeError("Cannot start TAOS service when one may already be running")
|
||||||
|
|
||||||
# Find if there's already a taosd service, and then kill it
|
# Find if there's already a taosd service, and then kill it
|
||||||
for proc in psutil.process_iter():
|
for proc in psutil.process_iter():
|
||||||
if proc.name() == 'taosd':
|
if proc.name() == 'taosd':
|
||||||
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
|
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
|
||||||
time.sleep(2.0)
|
time.sleep(2.0)
|
||||||
proc.kill()
|
proc.kill()
|
||||||
# print("Process: {}".format(proc.name()))
|
# print("Process: {}".format(proc.name()))
|
||||||
|
|
||||||
self.svcMgrThread = ServiceManagerThread() # create the object
|
self.svcMgrThread = ServiceManagerThread() # create the object
|
||||||
self.svcMgrThread.start()
|
self.svcMgrThread.start()
|
||||||
print("TAOS service started, printing out output...")
|
print("TAOS service started, printing out output...")
|
||||||
self.svcMgrThread.procIpcBatch(
|
self.svcMgrThread.procIpcBatch(
|
||||||
trimToTarget=10,
|
trimToTarget=10,
|
||||||
forceOutput=True) # for printing 10 lines
|
forceOutput=True) # for printing 10 lines
|
||||||
print("TAOS service started")
|
print("TAOS service started")
|
||||||
|
|
||||||
def stopTaosService(self, outputLines=20):
|
def stopTaosService(self, outputLines=20):
|
||||||
if not self.isRunning():
|
with self._lock:
|
||||||
logger.warning("Cannot stop TAOS service, not running")
|
if not self.isRunning():
|
||||||
return
|
logger.warning("Cannot stop TAOS service, not running")
|
||||||
|
return
|
||||||
|
|
||||||
print("Terminating Service Manager Thread (SMT) execution...")
|
print("Terminating Service Manager Thread (SMT) execution...")
|
||||||
self.svcMgrThread.stop()
|
self.svcMgrThread.stop()
|
||||||
if self.svcMgrThread.isStopped():
|
if self.svcMgrThread.isStopped():
|
||||||
self.svcMgrThread.procIpcBatch(outputLines) # one last time
|
self.svcMgrThread.procIpcBatch(outputLines) # one last time
|
||||||
self.svcMgrThread = None
|
self.svcMgrThread = None
|
||||||
print("----- End of TDengine Service Output -----\n")
|
print("----- End of TDengine Service Output -----\n")
|
||||||
print("SMT execution terminated")
|
print("SMT execution terminated")
|
||||||
else:
|
else:
|
||||||
print("WARNING: SMT did not terminate as expected")
|
print("WARNING: SMT did not terminate as expected")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.startTaosService()
|
self.startTaosService()
|
||||||
self._procIpcAll() # pump/process all the messages
|
self._procIpcAll() # pump/process all the messages, may encounter SIG + restart
|
||||||
if self.isRunning(): # if sig handler hasn't destroyed it by now
|
if self.isRunning(): # if sig handler hasn't destroyed it by now
|
||||||
self.stopTaosService() # should have started already
|
self.stopTaosService() # should have started already
|
||||||
|
|
||||||
|
def restart(self):
|
||||||
|
if self._isRestarting:
|
||||||
|
logger.warning("Cannot restart service when it's already restarting")
|
||||||
|
return
|
||||||
|
|
||||||
|
self._isRestarting = True
|
||||||
|
if self.isRunning():
|
||||||
|
self.stopTaosService()
|
||||||
|
else:
|
||||||
|
logger.warning("Service not running when restart requested")
|
||||||
|
|
||||||
|
self.startTaosService()
|
||||||
|
self._isRestarting = False
|
||||||
|
|
||||||
def isRunning(self):
|
def isRunning(self):
|
||||||
return self.svcMgrThread != None
|
return self.svcMgrThread != None
|
||||||
|
|
||||||
|
def isRestarting(self):
|
||||||
|
return self._isRestarting
|
||||||
|
|
||||||
class ServiceManagerThread:
|
class ServiceManagerThread:
|
||||||
MAX_QUEUE_SIZE = 10000
|
MAX_QUEUE_SIZE = 10000
|
||||||
|
|
||||||
|
@ -2513,7 +2630,6 @@ class ClientManager:
|
||||||
self.tc.printStats()
|
self.tc.printStats()
|
||||||
self.tc.getDbManager().cleanUp()
|
self.tc.getDbManager().cleanUp()
|
||||||
|
|
||||||
|
|
||||||
class MainExec:
|
class MainExec:
|
||||||
STATUS_STARTING = 1
|
STATUS_STARTING = 1
|
||||||
STATUS_RUNNING = 2
|
STATUS_RUNNING = 2
|
||||||
|
@ -2541,16 +2657,29 @@ class MainExec:
|
||||||
self._clientMgr.sigIntHandler(signalNumber, frame)
|
self._clientMgr.sigIntHandler(signalNumber, frame)
|
||||||
|
|
||||||
def runClient(self):
|
def runClient(self):
|
||||||
|
global gSvcMgr
|
||||||
if gConfig.auto_start_service:
|
if gConfig.auto_start_service:
|
||||||
self._svcMgr = SvcManager()
|
self._svcMgr = SvcManager()
|
||||||
|
gSvcMgr = self._svcMgr # hack alert
|
||||||
self._svcMgr.startTaosService() # we start, don't run
|
self._svcMgr.startTaosService() # we start, don't run
|
||||||
|
|
||||||
self._clientMgr = ClientManager()
|
self._clientMgr = ClientManager()
|
||||||
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
|
ret = None
|
||||||
|
try:
|
||||||
|
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
|
||||||
|
except requests.exceptions.ConnectionError as err:
|
||||||
|
logger.warning("Failed to open REST connection to DB")
|
||||||
|
# don't raise
|
||||||
|
return ret
|
||||||
|
|
||||||
def runService(self):
|
def runService(self):
|
||||||
|
global gSvcMgr
|
||||||
self._svcMgr = SvcManager()
|
self._svcMgr = SvcManager()
|
||||||
|
gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert
|
||||||
|
|
||||||
self._svcMgr.run() # run to some end state
|
self._svcMgr.run() # run to some end state
|
||||||
|
self._svcMgr = None
|
||||||
|
gSvcMgr = None
|
||||||
|
|
||||||
def runTemp(self): # for debugging purposes
|
def runTemp(self): # for debugging purposes
|
||||||
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
|
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
|
||||||
|
|
Loading…
Reference in New Issue