Refactoring of crash_gen tool
This commit is contained in:
parent
b871621a88
commit
b43c5ba926
|
@ -38,9 +38,9 @@ import resource
|
||||||
from guppy import hpy
|
from guppy import hpy
|
||||||
import gc
|
import gc
|
||||||
|
|
||||||
from .service_manager import ServiceManager, TdeInstance
|
from crash_gen.service_manager import ServiceManager, TdeInstance
|
||||||
from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
|
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
|
||||||
from .db import DbConn, MyTDSql, DbConnNative, DbManager
|
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
|
||||||
|
|
||||||
import taos
|
import taos
|
||||||
import requests
|
import requests
|
||||||
|
@ -435,7 +435,7 @@ class ThreadCoordinator:
|
||||||
Logging.debug("\r\n\n--> Main thread ready to finish up...")
|
Logging.debug("\r\n\n--> Main thread ready to finish up...")
|
||||||
Logging.debug("Main thread joining all threads")
|
Logging.debug("Main thread joining all threads")
|
||||||
self._pool.joinAll() # Get all threads to finish
|
self._pool.joinAll() # Get all threads to finish
|
||||||
Logging.info("\nAll worker threads finished")
|
Logging.info(". . . All worker threads finished") # No CR/LF before
|
||||||
self._execStats.endExec()
|
self._execStats.endExec()
|
||||||
|
|
||||||
def cleanup(self): # free resources
|
def cleanup(self): # free resources
|
||||||
|
@ -1072,17 +1072,18 @@ class Database:
|
||||||
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
|
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
|
||||||
t4 = datetime.datetime.fromtimestamp(
|
t4 = datetime.datetime.fromtimestamp(
|
||||||
t3.timestamp() + elSec2) # see explanation above
|
t3.timestamp() + elSec2) # see explanation above
|
||||||
Logging.info("Setting up TICKS to start from: {}".format(t4))
|
Logging.debug("Setting up TICKS to start from: {}".format(t4))
|
||||||
return t4
|
return t4
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def getNextTick(cls):
|
def getNextTick(cls):
|
||||||
with cls._clsLock: # prevent duplicate tick
|
with cls._clsLock: # prevent duplicate tick
|
||||||
if cls._lastLaggingTick==0:
|
if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized
|
||||||
# 10k at 1/20 chance, should be enough to avoid overlaps
|
# 10k at 1/20 chance, should be enough to avoid overlaps
|
||||||
cls._lastLaggingTick = cls.setupLastTick() + datetime.timedelta(0, -10000)
|
tick = cls.setupLastTick()
|
||||||
if cls._lastTick==0: # should be quite a bit into the future
|
cls._lastTick = tick
|
||||||
cls._lastTick = cls.setupLastTick()
|
cls._lastLaggingTick = tick + datetime.timedelta(0, -10000)
|
||||||
|
# if : # should be quite a bit into the future
|
||||||
|
|
||||||
if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick
|
if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick
|
||||||
cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds
|
cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds
|
||||||
|
@ -1322,7 +1323,7 @@ class Task():
|
||||||
self._err = err
|
self._err = err
|
||||||
self._aborted = True
|
self._aborted = True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logInfo("Non-TAOS exception encountered")
|
Logging.info("Non-TAOS exception encountered with: {}".format(self.__class__.__name__))
|
||||||
self._err = e
|
self._err = e
|
||||||
self._aborted = True
|
self._aborted = True
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
@ -1566,8 +1567,11 @@ class TaskCreateSuperTable(StateTransitionTask):
|
||||||
|
|
||||||
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
||||||
# wt.execSql("use db") # should always be in place
|
# wt.execSql("use db") # should always be in place
|
||||||
|
|
||||||
sTable.create(wt.getDbConn(), self._db.getName(),
|
sTable.create(wt.getDbConn(), self._db.getName(),
|
||||||
{'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'})
|
{'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'},
|
||||||
|
dropIfExists = True
|
||||||
|
)
|
||||||
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
|
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
|
||||||
# No need to create the regular tables, INSERT will do that
|
# No need to create the regular tables, INSERT will do that
|
||||||
# automatically
|
# automatically
|
||||||
|
@ -1580,14 +1584,41 @@ class TdSuperTable:
|
||||||
def getName(self):
|
def getName(self):
|
||||||
return self._stName
|
return self._stName
|
||||||
|
|
||||||
|
def drop(self, dbc, dbName, skipCheck = False):
|
||||||
|
if self.exists(dbc, dbName) : # if myself exists
|
||||||
|
fullTableName = dbName + '.' + self._stName
|
||||||
|
dbc.execute("DROP TABLE {}".format(fullTableName))
|
||||||
|
else:
|
||||||
|
if not skipCheck:
|
||||||
|
raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))
|
||||||
|
|
||||||
|
def exists(self, dbc, dbName):
|
||||||
|
dbc.execute("USE " + dbName)
|
||||||
|
return dbc.existsSuperTable(self._stName)
|
||||||
|
|
||||||
# TODO: odd semantic, create() method is usually static?
|
# TODO: odd semantic, create() method is usually static?
|
||||||
def create(self, dbc, dbName, cols: dict, tags: dict):
|
def create(self, dbc, dbName, cols: dict, tags: dict,
|
||||||
|
dropIfExists = False
|
||||||
|
):
|
||||||
|
|
||||||
'''Creating a super table'''
|
'''Creating a super table'''
|
||||||
sql = "CREATE TABLE {}.{} ({}) TAGS ({})".format(
|
dbc.execute("USE " + dbName)
|
||||||
dbName,
|
fullTableName = dbName + '.' + self._stName
|
||||||
self._stName,
|
if dbc.existsSuperTable(self._stName):
|
||||||
",".join(['%s %s'%(k,v) for (k,v) in cols.items()]),
|
if dropIfExists:
|
||||||
",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
|
dbc.execute("DROP TABLE {}".format(fullTableName))
|
||||||
|
else: # error
|
||||||
|
raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
|
||||||
|
|
||||||
|
# Now let's create
|
||||||
|
sql = "CREATE TABLE {} ({})".format(
|
||||||
|
fullTableName,
|
||||||
|
",".join(['%s %s'%(k,v) for (k,v) in cols.items()]))
|
||||||
|
if tags is None :
|
||||||
|
sql += " TAGS (dummy int) "
|
||||||
|
else:
|
||||||
|
sql += " TAGS ({})".format(
|
||||||
|
",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
|
||||||
)
|
)
|
||||||
dbc.execute(sql)
|
dbc.execute(sql)
|
||||||
|
|
||||||
|
@ -1612,16 +1643,18 @@ class TdSuperTable:
|
||||||
|
|
||||||
# acquire a lock first, so as to be able to *verify*. More details in TD-1471
|
# acquire a lock first, so as to be able to *verify*. More details in TD-1471
|
||||||
fullTableName = dbName + '.' + regTableName
|
fullTableName = dbName + '.' + regTableName
|
||||||
task.lockTable(fullTableName)
|
if task is not None: # optional lock
|
||||||
|
task.lockTable(fullTableName)
|
||||||
Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
|
Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
|
||||||
print("(" + fullTableName[-3:] + ")", end="", flush=True)
|
# print("(" + fullTableName[-3:] + ")", end="", flush=True)
|
||||||
try:
|
try:
|
||||||
sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
|
sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
|
||||||
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName)
|
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName)
|
||||||
)
|
)
|
||||||
dbc.execute(sql)
|
dbc.execute(sql)
|
||||||
finally:
|
finally:
|
||||||
task.unlockTable(fullTableName) # no matter what
|
if task is not None:
|
||||||
|
task.unlockTable(fullTableName) # no matter what
|
||||||
|
|
||||||
def _getTagStrForSql(self, dbc, dbName: str) :
|
def _getTagStrForSql(self, dbc, dbName: str) :
|
||||||
tags = self._getTags(dbc, dbName)
|
tags = self._getTags(dbc, dbName)
|
||||||
|
@ -1840,7 +1873,7 @@ class TaskRestartService(StateTransitionTask):
|
||||||
|
|
||||||
with self._classLock:
|
with self._classLock:
|
||||||
if self._isRunning:
|
if self._isRunning:
|
||||||
print("Skipping restart task, another running already")
|
Logging.info("Skipping restart task, another running already")
|
||||||
return
|
return
|
||||||
self._isRunning = True
|
self._isRunning = True
|
||||||
|
|
||||||
|
@ -1999,7 +2032,7 @@ class ThreadStacks: # stack info for all threads
|
||||||
|
|
||||||
class ClientManager:
|
class ClientManager:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
print("Starting service manager")
|
Logging.info("Starting service manager")
|
||||||
# signal.signal(signal.SIGTERM, self.sigIntHandler)
|
# signal.signal(signal.SIGTERM, self.sigIntHandler)
|
||||||
# signal.signal(signal.SIGINT, self.sigIntHandler)
|
# signal.signal(signal.SIGINT, self.sigIntHandler)
|
||||||
|
|
||||||
|
@ -2101,7 +2134,7 @@ class ClientManager:
|
||||||
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
|
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
|
||||||
self.tc = ThreadCoordinator(thPool, dbManager)
|
self.tc = ThreadCoordinator(thPool, dbManager)
|
||||||
|
|
||||||
print("Starting client instance to: {}".format(tInst))
|
Logging.info("Starting client instance: {}".format(tInst))
|
||||||
self.tc.run()
|
self.tc.run()
|
||||||
# print("exec stats: {}".format(self.tc.getExecStats()))
|
# print("exec stats: {}".format(self.tc.getExecStats()))
|
||||||
# print("TC failed = {}".format(self.tc.isFailed()))
|
# print("TC failed = {}".format(self.tc.isFailed()))
|
|
@ -95,6 +95,11 @@ class DbConn:
|
||||||
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
|
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
|
||||||
return dbName in dbs # TODO: super weird type mangling seen, once here
|
return dbName in dbs # TODO: super weird type mangling seen, once here
|
||||||
|
|
||||||
|
def existsSuperTable(self, stName):
|
||||||
|
self.query("show stables")
|
||||||
|
sts = [v[0] for v in self.getQueryResult()]
|
||||||
|
return stName in sts
|
||||||
|
|
||||||
def hasTables(self):
|
def hasTables(self):
|
||||||
return self.query("show tables") > 0
|
return self.query("show tables") > 0
|
||||||
|
|
||||||
|
@ -240,6 +245,7 @@ class MyTDSql:
|
||||||
|
|
||||||
def _execInternal(self, sql):
|
def _execInternal(self, sql):
|
||||||
startTime = time.time()
|
startTime = time.time()
|
||||||
|
# Logging.debug("Executing SQL: " + sql)
|
||||||
ret = self._cursor.execute(sql)
|
ret = self._cursor.execute(sql)
|
||||||
# print("\nSQL success: {}".format(sql))
|
# print("\nSQL success: {}".format(sql))
|
||||||
queryTime = time.time() - startTime
|
queryTime = time.time() - startTime
|
||||||
|
|
|
@ -27,7 +27,7 @@ class LoggingFilter(logging.Filter):
|
||||||
|
|
||||||
class MyLoggingAdapter(logging.LoggerAdapter):
|
class MyLoggingAdapter(logging.LoggerAdapter):
|
||||||
def process(self, msg, kwargs):
|
def process(self, msg, kwargs):
|
||||||
return "[{}] {}".format(threading.get_ident() % 10000, msg), kwargs
|
return "[{:04d}] {}".format(threading.get_ident() % 10000, msg), kwargs
|
||||||
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
|
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
|
||||||
|
|
||||||
|
|
||||||
|
@ -51,7 +51,7 @@ class Logging:
|
||||||
_logger.addHandler(ch)
|
_logger.addHandler(ch)
|
||||||
|
|
||||||
# Logging adapter, to be used as a logger
|
# Logging adapter, to be used as a logger
|
||||||
print("setting logger variable")
|
# print("setting logger variable")
|
||||||
# global logger
|
# global logger
|
||||||
cls.logger = MyLoggingAdapter(_logger, [])
|
cls.logger = MyLoggingAdapter(_logger, [])
|
||||||
|
|
||||||
|
@ -166,7 +166,8 @@ class Progress:
|
||||||
SERVICE_RECONNECT_START = 4
|
SERVICE_RECONNECT_START = 4
|
||||||
SERVICE_RECONNECT_SUCCESS = 5
|
SERVICE_RECONNECT_SUCCESS = 5
|
||||||
SERVICE_RECONNECT_FAILURE = 6
|
SERVICE_RECONNECT_FAILURE = 6
|
||||||
CREATE_TABLE_ATTEMPT = 7
|
SERVICE_START_NAP = 7
|
||||||
|
CREATE_TABLE_ATTEMPT = 8
|
||||||
|
|
||||||
tokens = {
|
tokens = {
|
||||||
STEP_BOUNDARY: '.',
|
STEP_BOUNDARY: '.',
|
||||||
|
@ -176,6 +177,7 @@ class Progress:
|
||||||
SERVICE_RECONNECT_START: '<r.',
|
SERVICE_RECONNECT_START: '<r.',
|
||||||
SERVICE_RECONNECT_SUCCESS: '.r>',
|
SERVICE_RECONNECT_SUCCESS: '.r>',
|
||||||
SERVICE_RECONNECT_FAILURE: '.xr>',
|
SERVICE_RECONNECT_FAILURE: '.xr>',
|
||||||
|
SERVICE_START_NAP: '_zz',
|
||||||
CREATE_TABLE_ATTEMPT: '_c',
|
CREATE_TABLE_ATTEMPT: '_c',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -47,6 +47,17 @@ class TdeInstance():
|
||||||
.format(selfPath, projPath))
|
.format(selfPath, projPath))
|
||||||
return buildPath
|
return buildPath
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def prepareGcovEnv(cls, env):
|
||||||
|
# Ref: https://gcc.gnu.org/onlinedocs/gcc/Cross-profiling.html
|
||||||
|
bPath = cls._getBuildPath() # build PATH
|
||||||
|
numSegments = len(bPath.split('/')) - 1 # "/x/TDengine/build" should yield 3
|
||||||
|
numSegments = numSegments - 1 # DEBUG only
|
||||||
|
env['GCOV_PREFIX'] = bPath + '/svc_gcov'
|
||||||
|
env['GCOV_PREFIX_STRIP'] = str(numSegments) # Strip every element, plus, ENV needs strings
|
||||||
|
Logging.info("Preparing GCOV environement to strip {} elements and use path: {}".format(
|
||||||
|
numSegments, env['GCOV_PREFIX'] ))
|
||||||
|
|
||||||
def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030):
|
def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030):
|
||||||
self._buildDir = self._getBuildPath()
|
self._buildDir = self._getBuildPath()
|
||||||
self._subdir = '/' + subdir # TODO: tolerate "/"
|
self._subdir = '/' + subdir # TODO: tolerate "/"
|
||||||
|
@ -217,6 +228,11 @@ class TdeSubProcess:
|
||||||
# raise CrashGenError("Empty instance not allowed in TdeSubProcess")
|
# raise CrashGenError("Empty instance not allowed in TdeSubProcess")
|
||||||
# self._tInst = tInst # Default create at ServiceManagerThread
|
# self._tInst = tInst # Default create at ServiceManagerThread
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
if self.subProcess is None:
|
||||||
|
return '[TdeSubProc: Empty]'
|
||||||
|
return '[TdeSubProc: pid = {}]'.format(self.getPid())
|
||||||
|
|
||||||
def getStdOut(self):
|
def getStdOut(self):
|
||||||
return self.subProcess.stdout
|
return self.subProcess.stdout
|
||||||
|
|
||||||
|
@ -236,16 +252,29 @@ class TdeSubProcess:
|
||||||
if self.subProcess: # already there
|
if self.subProcess: # already there
|
||||||
raise RuntimeError("Corrupt process state")
|
raise RuntimeError("Corrupt process state")
|
||||||
|
|
||||||
|
# Prepare environment variables for coverage information
|
||||||
|
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
|
||||||
|
myEnv = os.environ.copy()
|
||||||
|
TdeInstance.prepareGcovEnv(myEnv)
|
||||||
|
|
||||||
|
# print(myEnv)
|
||||||
|
# print(myEnv.items())
|
||||||
|
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
|
||||||
|
|
||||||
|
useShell = True
|
||||||
self.subProcess = subprocess.Popen(
|
self.subProcess = subprocess.Popen(
|
||||||
cmdLine,
|
' '.join(cmdLine) if useShell else cmdLine,
|
||||||
shell=False,
|
shell=useShell,
|
||||||
# svcCmdSingle, shell=True, # capture core dump?
|
# svcCmdSingle, shell=True, # capture core dump?
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
# bufsize=1, # not supported in binary mode
|
# bufsize=1, # not supported in binary mode
|
||||||
close_fds=ON_POSIX
|
close_fds=ON_POSIX,
|
||||||
|
env=myEnv
|
||||||
) # had text=True, which interferred with reading EOF
|
) # had text=True, which interferred with reading EOF
|
||||||
|
|
||||||
|
STOP_SIGNAL = signal.SIGKILL # What signal to use (in kill) to stop a taosd process?
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""
|
"""
|
||||||
Stop a sub process, and try to return a meaningful return code.
|
Stop a sub process, and try to return a meaningful return code.
|
||||||
|
@ -267,7 +296,7 @@ class TdeSubProcess:
|
||||||
SIGUSR2 12
|
SIGUSR2 12
|
||||||
"""
|
"""
|
||||||
if not self.subProcess:
|
if not self.subProcess:
|
||||||
print("Sub process already stopped")
|
Logging.error("Sub process already stopped")
|
||||||
return # -1
|
return # -1
|
||||||
|
|
||||||
retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
|
retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
|
||||||
|
@ -278,20 +307,25 @@ class TdeSubProcess:
|
||||||
return retCode
|
return retCode
|
||||||
|
|
||||||
# process still alive, let's interrupt it
|
# process still alive, let's interrupt it
|
||||||
print("Terminate running process, send SIG_INT and wait...")
|
Logging.info("Terminate running process, send SIG_{} and wait...".format(self.STOP_SIGNAL))
|
||||||
# sub process should end, then IPC queue should end, causing IO thread to end
|
# sub process should end, then IPC queue should end, causing IO thread to end
|
||||||
# sig = signal.SIGINT
|
topSubProc = psutil.Process(self.subProcess.pid)
|
||||||
sig = signal.SIGKILL
|
for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False
|
||||||
self.subProcess.send_signal(sig) # SIGNINT or SIGKILL
|
child.send_signal(self.STOP_SIGNAL)
|
||||||
|
time.sleep(0.2) # 200 ms
|
||||||
|
# topSubProc.send_signal(sig) # now kill the main sub process (likely the Shell)
|
||||||
|
|
||||||
|
self.subProcess.send_signal(self.STOP_SIGNAL) # main sub process (likely the Shell)
|
||||||
self.subProcess.wait(20)
|
self.subProcess.wait(20)
|
||||||
retCode = self.subProcess.returncode # should always be there
|
retCode = self.subProcess.returncode # should always be there
|
||||||
# May throw subprocess.TimeoutExpired exception above, therefore
|
# May throw subprocess.TimeoutExpired exception above, therefore
|
||||||
# The process is guranteed to have ended by now
|
# The process is guranteed to have ended by now
|
||||||
self.subProcess = None
|
self.subProcess = None
|
||||||
if retCode != 0: # != (- signal.SIGINT):
|
if retCode != 0: # != (- signal.SIGINT):
|
||||||
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(sig, retCode))
|
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(
|
||||||
|
self.STOP_SIGNAL, retCode))
|
||||||
else:
|
else:
|
||||||
Logging.info("TSP.stop(): sub proc successfully terminated with SIG {}".format(sig))
|
Logging.info("TSP.stop(): sub proc successfully terminated with SIG {}".format(self.STOP_SIGNAL))
|
||||||
return - retCode
|
return - retCode
|
||||||
|
|
||||||
class ServiceManager:
|
class ServiceManager:
|
||||||
|
@ -439,7 +473,7 @@ class ServiceManager:
|
||||||
|
|
||||||
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
|
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
|
||||||
# raise CrashGenError("dummy")
|
# raise CrashGenError("dummy")
|
||||||
print("Service Manager Thread (with subprocess) ended, main thread exiting...")
|
Logging.info("Service Manager Thread (with subprocess) ended, main thread exiting...")
|
||||||
|
|
||||||
def _getFirstInstance(self):
|
def _getFirstInstance(self):
|
||||||
return self._tInsts[0]
|
return self._tInsts[0]
|
||||||
|
@ -452,7 +486,7 @@ class ServiceManager:
|
||||||
# Find if there's already a taosd service, and then kill it
|
# Find if there's already a taosd service, and then kill it
|
||||||
for proc in psutil.process_iter():
|
for proc in psutil.process_iter():
|
||||||
if proc.name() == 'taosd':
|
if proc.name() == 'taosd':
|
||||||
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt")
|
Logging.info("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt")
|
||||||
time.sleep(2.0)
|
time.sleep(2.0)
|
||||||
proc.kill()
|
proc.kill()
|
||||||
# print("Process: {}".format(proc.name()))
|
# print("Process: {}".format(proc.name()))
|
||||||
|
@ -559,7 +593,8 @@ class ServiceManagerThread:
|
||||||
for i in range(0, 100):
|
for i in range(0, 100):
|
||||||
time.sleep(1.0)
|
time.sleep(1.0)
|
||||||
# self.procIpcBatch() # don't pump message during start up
|
# self.procIpcBatch() # don't pump message during start up
|
||||||
print("_zz_", end="", flush=True)
|
Progress.emit(Progress.SERVICE_START_NAP)
|
||||||
|
# print("_zz_", end="", flush=True)
|
||||||
if self._status.isRunning():
|
if self._status.isRunning():
|
||||||
Logging.info("[] TDengine service READY to process requests")
|
Logging.info("[] TDengine service READY to process requests")
|
||||||
Logging.info("[] TAOS service started: {}".format(self))
|
Logging.info("[] TAOS service started: {}".format(self))
|
||||||
|
@ -595,12 +630,12 @@ class ServiceManagerThread:
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
# can be called from both main thread or signal handler
|
# can be called from both main thread or signal handler
|
||||||
print("Terminating TDengine service running as the sub process...")
|
Logging.info("Terminating TDengine service running as the sub process...")
|
||||||
if self.getStatus().isStopped():
|
if self.getStatus().isStopped():
|
||||||
print("Service already stopped")
|
Logging.info("Service already stopped")
|
||||||
return
|
return
|
||||||
if self.getStatus().isStopping():
|
if self.getStatus().isStopping():
|
||||||
print("Service is already being stopped")
|
Logging.info("Service is already being stopped")
|
||||||
return
|
return
|
||||||
# Linux will send Control-C generated SIGINT to the TDengine process
|
# Linux will send Control-C generated SIGINT to the TDengine process
|
||||||
# already, ref:
|
# already, ref:
|
||||||
|
@ -616,10 +651,10 @@ class ServiceManagerThread:
|
||||||
if retCode == signal.SIGSEGV : # SGV
|
if retCode == signal.SIGSEGV : # SGV
|
||||||
Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
|
Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
|
||||||
except subprocess.TimeoutExpired as err:
|
except subprocess.TimeoutExpired as err:
|
||||||
print("Time out waiting for TDengine service process to exit")
|
Logging.info("Time out waiting for TDengine service process to exit")
|
||||||
else:
|
else:
|
||||||
if self._tdeSubProcess.isRunning(): # still running, should now never happen
|
if self._tdeSubProcess.isRunning(): # still running, should now never happen
|
||||||
print("FAILED to stop sub process, it is still running... pid = {}".format(
|
Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
|
||||||
self._tdeSubProcess.getPid()))
|
self._tdeSubProcess.getPid()))
|
||||||
else:
|
else:
|
||||||
self._tdeSubProcess = None # not running any more
|
self._tdeSubProcess = None # not running any more
|
||||||
|
@ -683,9 +718,9 @@ class ServiceManagerThread:
|
||||||
return # we are done with THIS BATCH
|
return # we are done with THIS BATCH
|
||||||
else: # got line, printing out
|
else: # got line, printing out
|
||||||
if forceOutput:
|
if forceOutput:
|
||||||
Logging.info(line)
|
Logging.info('[TAOSD] ' + line)
|
||||||
else:
|
else:
|
||||||
Logging.debug(line)
|
Logging.debug('[TAOSD] ' + line)
|
||||||
print(">", end="", flush=True)
|
print(">", end="", flush=True)
|
||||||
|
|
||||||
_ProgressBars = ["--", "//", "||", "\\\\"]
|
_ProgressBars = ["--", "//", "||", "\\\\"]
|
||||||
|
@ -728,11 +763,11 @@ class ServiceManagerThread:
|
||||||
|
|
||||||
# queue.put(line)
|
# queue.put(line)
|
||||||
# meaning sub process must have died
|
# meaning sub process must have died
|
||||||
Logging.info("\nEnd of stream detected for TDengine STDOUT: {}".format(self))
|
Logging.info("EOF for TDengine STDOUT: {}".format(self))
|
||||||
out.close()
|
out.close()
|
||||||
|
|
||||||
def svcErrorReader(self, err: IO, queue):
|
def svcErrorReader(self, err: IO, queue):
|
||||||
for line in iter(err.readline, b''):
|
for line in iter(err.readline, b''):
|
||||||
print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
|
Logging.info("TDengine STDERR: {}".format(line))
|
||||||
Logging.info("\nEnd of stream detected for TDengine STDERR: {}".format(self))
|
Logging.info("EOF for TDengine STDERR: {}".format(self))
|
||||||
err.close()
|
err.close()
|
|
@ -11,7 +11,7 @@
|
||||||
###################################################################
|
###################################################################
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from crash_gen.crash_gen import MainExec
|
from crash_gen.crash_gen_main import MainExec
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue