Moved over changes from feature/crash_gen to feature/crash_gen_master
This commit is contained in:
parent
289b805388
commit
b6ee0c8fc7
|
@ -6,6 +6,25 @@ To effectively test and debug our TDengine product, we have developed a simple t
|
||||||
exercise various functions of the system in a randomized fashion, hoping to expose
|
exercise various functions of the system in a randomized fashion, hoping to expose
|
||||||
maximum number of problems, hopefully without a pre-determined scenario.
|
maximum number of problems, hopefully without a pre-determined scenario.
|
||||||
|
|
||||||
|
# Features
|
||||||
|
|
||||||
|
This tool can run as a test client with the following features:
|
||||||
|
|
||||||
|
1. Any number of concurrent threads
|
||||||
|
1. Any number of test steps/loops
|
||||||
|
1. Auto-create and writing to multiple databases
|
||||||
|
1. Ignore specific error codes
|
||||||
|
1. Write small or large data blocks
|
||||||
|
1. Auto-generate out-of-sequence data, if needed
|
||||||
|
1. Verify the result of write operations
|
||||||
|
1. Concurrent writing to a shadow database for later data verification
|
||||||
|
1. User specified number of replicas to use, against clusters
|
||||||
|
|
||||||
|
This tool can also use to start a TDengine service, either in stand-alone mode or
|
||||||
|
cluster mode. The features include:
|
||||||
|
|
||||||
|
1. User specified number of D-Nodes to create/use.
|
||||||
|
|
||||||
# Preparation
|
# Preparation
|
||||||
|
|
||||||
To run this tool, please ensure the followed preparation work is done first.
|
To run this tool, please ensure the followed preparation work is done first.
|
||||||
|
@ -16,7 +35,7 @@ To run this tool, please ensure the followed preparation work is done first.
|
||||||
Ubuntu 20.04LTS as our own development environment, and suggest you also use such
|
Ubuntu 20.04LTS as our own development environment, and suggest you also use such
|
||||||
an environment if possible.
|
an environment if possible.
|
||||||
|
|
||||||
# Simple Execution
|
# Simple Execution as Client Test Tool
|
||||||
|
|
||||||
To run the tool with the simplest method, follow the steps below:
|
To run the tool with the simplest method, follow the steps below:
|
||||||
|
|
||||||
|
@ -28,19 +47,21 @@ To run the tool with the simplest method, follow the steps below:
|
||||||
|
|
||||||
That's it!
|
That's it!
|
||||||
|
|
||||||
# Running Clusters
|
# Running Server-side Clusters
|
||||||
|
|
||||||
This tool also makes it easy to test/verify the clustering capabilities of TDengine. You
|
This tool also makes it easy to test/verify the clustering capabilities of TDengine. You
|
||||||
can start a cluster quite easily with the following command:
|
can start a cluster quite easily with the following command:
|
||||||
|
|
||||||
```
|
```
|
||||||
$ cd tests/pytest/
|
$ cd tests/pytest/
|
||||||
$ ./crash_gen.sh -e -o 3
|
$ rm -rf ../../build/cluster_dnode_?; ./crash_gen.sh -e -o 3 # first part optional
|
||||||
```
|
```
|
||||||
|
|
||||||
The `-e` option above tells the tool to start the service, and do not run any tests, while
|
The `-e` option above tells the tool to start the service, and do not run any tests, while
|
||||||
the `-o 3` option tells the tool to start 3 DNodes and join them together in a cluster.
|
the `-o 3` option tells the tool to start 3 DNodes and join them together in a cluster.
|
||||||
Obviously you can adjust the the number here.
|
Obviously you can adjust the the number here. The `rm -rf` command line is optional
|
||||||
|
to clean up previous cluster data, so that we can start from a clean state with no data
|
||||||
|
at all.
|
||||||
|
|
||||||
## Behind the Scenes
|
## Behind the Scenes
|
||||||
|
|
||||||
|
@ -89,8 +110,9 @@ The exhaustive features of the tool is available through the `-h` option:
|
||||||
|
|
||||||
```
|
```
|
||||||
$ ./crash_gen.sh -h
|
$ ./crash_gen.sh -h
|
||||||
usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS] [-i MAX_REPLICAS] [-l] [-n] [-o NUM_DNODES] [-p] [-r]
|
usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS]
|
||||||
[-s MAX_STEPS] [-t NUM_THREADS] [-v] [-x]
|
[-i NUM_REPLICAS] [-k] [-l] [-m] [-n]
|
||||||
|
[-o NUM_DNODES] [-p] [-r] [-s MAX_STEPS] [-t NUM_THREADS] [-v] [-w] [-x]
|
||||||
|
|
||||||
TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
|
TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -109,11 +131,14 @@ optional arguments:
|
||||||
-e, --run-tdengine Run TDengine service in foreground (default: false)
|
-e, --run-tdengine Run TDengine service in foreground (default: false)
|
||||||
-g IGNORE_ERRORS, --ignore-errors IGNORE_ERRORS
|
-g IGNORE_ERRORS, --ignore-errors IGNORE_ERRORS
|
||||||
Ignore error codes, comma separated, 0x supported (default: None)
|
Ignore error codes, comma separated, 0x supported (default: None)
|
||||||
-i MAX_REPLICAS, --max-replicas MAX_REPLICAS
|
-i NUM_REPLICAS, --num-replicas NUM_REPLICAS
|
||||||
Maximum number of replicas to use, when testing against clusters. (default: 1)
|
Number (fixed) of replicas to use, when testing against clusters. (default: 1)
|
||||||
|
-k, --track-memory-leaks
|
||||||
|
Use Valgrind tool to track memory leaks (default: false)
|
||||||
-l, --larger-data Write larger amount of data during write operations (default: false)
|
-l, --larger-data Write larger amount of data during write operations (default: false)
|
||||||
|
-m, --mix-oos-data Mix out-of-sequence data into the test data stream (default: true)
|
||||||
-n, --dynamic-db-table-names
|
-n, --dynamic-db-table-names
|
||||||
Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)
|
Use non-fixed names for dbs/tables, for -b, useful for multi-instance executions (default: false)
|
||||||
-o NUM_DNODES, --num-dnodes NUM_DNODES
|
-o NUM_DNODES, --num-dnodes NUM_DNODES
|
||||||
Number of Dnodes to initialize, used with -e option. (default: 1)
|
Number of Dnodes to initialize, used with -e option. (default: 1)
|
||||||
-p, --per-thread-db-connection
|
-p, --per-thread-db-connection
|
||||||
|
@ -124,6 +149,7 @@ optional arguments:
|
||||||
-t NUM_THREADS, --num-threads NUM_THREADS
|
-t NUM_THREADS, --num-threads NUM_THREADS
|
||||||
Number of threads to run (default: 10)
|
Number of threads to run (default: 10)
|
||||||
-v, --verify-data Verify data written in a number of places by reading back (default: false)
|
-v, --verify-data Verify data written in a number of places by reading back (default: false)
|
||||||
|
-w, --use-shadow-db Use a shaddow database to verify data integrity (default: false)
|
||||||
-x, --continue-on-exception
|
-x, --continue-on-exception
|
||||||
Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)
|
Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)
|
||||||
```
|
```
|
||||||
|
|
|
@ -1574,9 +1574,9 @@ class TaskCreateDb(StateTransitionTask):
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
# was: self.execWtSql(wt, "create database db")
|
# was: self.execWtSql(wt, "create database db")
|
||||||
repStr = ""
|
repStr = ""
|
||||||
if gConfig.max_replicas != 1:
|
if gConfig.num_replicas != 1:
|
||||||
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
|
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
|
||||||
numReplica = gConfig.max_replicas # fixed, always
|
numReplica = gConfig.num_replicas # fixed, always
|
||||||
repStr = "replica {}".format(numReplica)
|
repStr = "replica {}".format(numReplica)
|
||||||
updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active
|
updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active
|
||||||
dbName = self._db.getName()
|
dbName = self._db.getName()
|
||||||
|
@ -2394,11 +2394,16 @@ class MainExec:
|
||||||
help='Ignore error codes, comma separated, 0x supported (default: None)')
|
help='Ignore error codes, comma separated, 0x supported (default: None)')
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'-i',
|
'-i',
|
||||||
'--max-replicas',
|
'--num-replicas',
|
||||||
action='store',
|
action='store',
|
||||||
default=1,
|
default=1,
|
||||||
type=int,
|
type=int,
|
||||||
help='Maximum number of replicas to use, when testing against clusters. (default: 1)')
|
help='Number (fixed) of replicas to use, when testing against clusters. (default: 1)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-k',
|
||||||
|
'--track-memory-leaks',
|
||||||
|
action='store_true',
|
||||||
|
help='Use Valgrind tool to track memory leaks (default: false)')
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'-l',
|
'-l',
|
||||||
'--larger-data',
|
'--larger-data',
|
||||||
|
|
|
@ -19,6 +19,7 @@ from queue import Queue, Empty
|
||||||
|
|
||||||
from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
|
from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
|
||||||
from .db import DbConn, DbTarget
|
from .db import DbConn, DbTarget
|
||||||
|
import crash_gen.settings
|
||||||
|
|
||||||
class TdeInstance():
|
class TdeInstance():
|
||||||
"""
|
"""
|
||||||
|
@ -132,6 +133,7 @@ keep 36500
|
||||||
walLevel 1
|
walLevel 1
|
||||||
#
|
#
|
||||||
# maxConnections 100
|
# maxConnections 100
|
||||||
|
quorum 2
|
||||||
"""
|
"""
|
||||||
cfgContent = cfgTemplate.format_map(cfgValues)
|
cfgContent = cfgTemplate.format_map(cfgValues)
|
||||||
f = open(cfgFile, "w")
|
f = open(cfgFile, "w")
|
||||||
|
@ -164,7 +166,12 @@ walLevel 1
|
||||||
return "127.0.0.1"
|
return "127.0.0.1"
|
||||||
|
|
||||||
def getServiceCmdLine(self): # to start the instance
|
def getServiceCmdLine(self): # to start the instance
|
||||||
return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
cmdLine = []
|
||||||
|
if crash_gen.settings.gConfig.track_memory_leaks:
|
||||||
|
Logging.info("Invoking VALGRIND on service...")
|
||||||
|
cmdLine = ['valgrind', '--leak-check=yes']
|
||||||
|
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
||||||
|
return cmdLine
|
||||||
|
|
||||||
def _getDnodes(self, dbc):
|
def _getDnodes(self, dbc):
|
||||||
dbc.query("show dnodes")
|
dbc.query("show dnodes")
|
||||||
|
@ -202,7 +209,7 @@ walLevel 1
|
||||||
self.generateCfgFile() # service side generates config file, client does not
|
self.generateCfgFile() # service side generates config file, client does not
|
||||||
self.rotateLogs()
|
self.rotateLogs()
|
||||||
|
|
||||||
self._smThread.start(self.getServiceCmdLine())
|
self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._smThread.stop()
|
self._smThread.stop()
|
||||||
|
@ -225,7 +232,7 @@ class TdeSubProcess:
|
||||||
# RET_SUCCESS = -4
|
# RET_SUCCESS = -4
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.subProcess = None
|
self.subProcess = None # type: subprocess.Popen
|
||||||
# if tInst is None:
|
# if tInst is None:
|
||||||
# raise CrashGenError("Empty instance not allowed in TdeSubProcess")
|
# raise CrashGenError("Empty instance not allowed in TdeSubProcess")
|
||||||
# self._tInst = tInst # Default create at ServiceManagerThread
|
# self._tInst = tInst # Default create at ServiceManagerThread
|
||||||
|
@ -263,7 +270,7 @@ class TdeSubProcess:
|
||||||
# print("Starting TDengine with env: ", myEnv.items())
|
# print("Starting TDengine with env: ", myEnv.items())
|
||||||
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
|
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
|
||||||
|
|
||||||
useShell = True
|
useShell = True # Needed to pass environments into it
|
||||||
self.subProcess = subprocess.Popen(
|
self.subProcess = subprocess.Popen(
|
||||||
# ' '.join(cmdLine) if useShell else cmdLine,
|
# ' '.join(cmdLine) if useShell else cmdLine,
|
||||||
# shell=useShell,
|
# shell=useShell,
|
||||||
|
@ -276,12 +283,12 @@ class TdeSubProcess:
|
||||||
env=myEnv
|
env=myEnv
|
||||||
) # had text=True, which interferred with reading EOF
|
) # had text=True, which interferred with reading EOF
|
||||||
|
|
||||||
STOP_SIGNAL = signal.SIGKILL # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
|
STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
|
||||||
SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
|
SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""
|
"""
|
||||||
Stop a sub process, and try to return a meaningful return code.
|
Stop a sub process, DO NOT return anything, process all conditions INSIDE
|
||||||
|
|
||||||
Common POSIX signal values (from man -7 signal):
|
Common POSIX signal values (from man -7 signal):
|
||||||
SIGHUP 1
|
SIGHUP 1
|
||||||
|
@ -301,40 +308,99 @@ class TdeSubProcess:
|
||||||
"""
|
"""
|
||||||
if not self.subProcess:
|
if not self.subProcess:
|
||||||
Logging.error("Sub process already stopped")
|
Logging.error("Sub process already stopped")
|
||||||
return # -1
|
return
|
||||||
|
|
||||||
retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
|
retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
|
||||||
if retCode: # valid return code, process ended
|
if retCode: # valid return code, process ended
|
||||||
retCode = -retCode # only if valid
|
# retCode = -retCode # only if valid
|
||||||
Logging.warning("TSP.stop(): process ended itself")
|
Logging.warning("TSP.stop(): process ended itself")
|
||||||
self.subProcess = None
|
self.subProcess = None
|
||||||
return retCode
|
return
|
||||||
|
|
||||||
# process still alive, let's interrupt it
|
# process still alive, let's interrupt it
|
||||||
Logging.info("Terminate running process, send SIG_{} and wait...".format(self.STOP_SIGNAL))
|
self._stopForSure(self.subProcess, self.STOP_SIGNAL) # success if no exception
|
||||||
# sub process should end, then IPC queue should end, causing IO thread to end
|
|
||||||
topSubProc = psutil.Process(self.subProcess.pid)
|
|
||||||
for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False
|
|
||||||
child.send_signal(self.STOP_SIGNAL)
|
|
||||||
time.sleep(0.2) # 200 ms
|
|
||||||
# topSubProc.send_signal(sig) # now kill the main sub process (likely the Shell)
|
|
||||||
|
|
||||||
self.subProcess.send_signal(self.STOP_SIGNAL) # main sub process (likely the Shell)
|
|
||||||
self.subProcess.wait(20)
|
|
||||||
retCode = self.subProcess.returncode # should always be there
|
|
||||||
# May throw subprocess.TimeoutExpired exception above, therefore
|
|
||||||
# The process is guranteed to have ended by now
|
|
||||||
self.subProcess = None
|
self.subProcess = None
|
||||||
if retCode == self.SIG_KILL_RETCODE:
|
|
||||||
Logging.info("TSP.stop(): sub proc KILLED, as expected")
|
# sub process should end, then IPC queue should end, causing IO thread to end
|
||||||
elif retCode == (- self.STOP_SIGNAL):
|
|
||||||
Logging.info("TSP.stop(), sub process STOPPED, as expected")
|
@classmethod
|
||||||
elif retCode != 0: # != (- signal.SIGINT):
|
def _stopForSure(cls, proc: subprocess.Popen, sig: int):
|
||||||
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(
|
'''
|
||||||
self.STOP_SIGNAL, retCode))
|
Stop a process and all sub processes with a singal, and SIGKILL if necessary
|
||||||
else:
|
'''
|
||||||
Logging.info("TSP.stop(): sub proc successfully terminated with SIG {}".format(self.STOP_SIGNAL))
|
def doKillTdService(proc: subprocess.Popen, sig: int):
|
||||||
return - retCode
|
Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))
|
||||||
|
proc.send_signal(sig)
|
||||||
|
try:
|
||||||
|
retCode = proc.wait(20)
|
||||||
|
if (- retCode) == signal.SIGSEGV: # Crashed
|
||||||
|
Logging.warning("Process {} CRASHED, please check CORE file!".format(proc.pid))
|
||||||
|
elif (- retCode) == sig :
|
||||||
|
Logging.info("TD service terminated with expected return code {}".format(sig))
|
||||||
|
else:
|
||||||
|
Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
|
||||||
|
return True # terminated successfully
|
||||||
|
except subprocess.TimeoutExpired as err:
|
||||||
|
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig))
|
||||||
|
return False # failed to terminate
|
||||||
|
|
||||||
|
|
||||||
|
def doKillChild(child: psutil.Process, sig: int):
|
||||||
|
Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig))
|
||||||
|
child.send_signal(sig)
|
||||||
|
try:
|
||||||
|
retCode = child.wait(20)
|
||||||
|
if (- retCode) == signal.SIGSEGV: # Crashed
|
||||||
|
Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid))
|
||||||
|
elif (- retCode) == sig :
|
||||||
|
Logging.info("Sub-sub process terminated with expected return code {}".format(sig))
|
||||||
|
else:
|
||||||
|
Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
|
||||||
|
return True # terminated successfully
|
||||||
|
except psutil.TimeoutExpired as err:
|
||||||
|
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig))
|
||||||
|
return False # did not terminate
|
||||||
|
|
||||||
|
def doKill(proc: subprocess.Popen, sig: int):
|
||||||
|
pid = proc.pid
|
||||||
|
try:
|
||||||
|
topSubProc = psutil.Process(pid)
|
||||||
|
for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False
|
||||||
|
Logging.warning("Unexpected child to be killed")
|
||||||
|
doKillChild(child, sig)
|
||||||
|
except psutil.NoSuchProcess as err:
|
||||||
|
Logging.info("Process not found, can't kill, pid = {}".format(pid))
|
||||||
|
|
||||||
|
return doKillTdService(proc, sig)
|
||||||
|
# TODO: re-examine if we need to kill the top process, which is always the SHELL for now
|
||||||
|
# try:
|
||||||
|
# proc.wait(1) # SHELL process here, may throw subprocess.TimeoutExpired exception
|
||||||
|
# # expRetCode = self.SIG_KILL_RETCODE if sig==signal.SIGKILL else (-sig)
|
||||||
|
# # if retCode == expRetCode:
|
||||||
|
# # Logging.info("Process terminated with expected return code {}".format(retCode))
|
||||||
|
# # else:
|
||||||
|
# # Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(expRetCode, retCode))
|
||||||
|
# # return True # success
|
||||||
|
# except subprocess.TimeoutExpired as err:
|
||||||
|
# Logging.warning("Failed to kill process {} with signal {}".format(pid, sig))
|
||||||
|
# return False # failed to kill
|
||||||
|
|
||||||
|
def softKill(proc, sig):
|
||||||
|
return doKill(proc, sig)
|
||||||
|
|
||||||
|
def hardKill(proc):
|
||||||
|
return doKill(proc, signal.SIGKILL)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
pid = proc.pid
|
||||||
|
Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig))
|
||||||
|
if softKill(proc, sig):
|
||||||
|
return# success
|
||||||
|
if sig != signal.SIGKILL: # really was soft above
|
||||||
|
if hardKill(proc):
|
||||||
|
return
|
||||||
|
raise CrashGenError("Failed to stop process, pid={}".format(pid))
|
||||||
|
|
||||||
class ServiceManager:
|
class ServiceManager:
|
||||||
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
|
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
|
||||||
|
@ -560,7 +626,8 @@ class ServiceManagerThread:
|
||||||
# self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
|
# self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
|
||||||
# self._tInst = tInst or TdeInstance() # Need an instance
|
# self._tInst = tInst or TdeInstance() # Need an instance
|
||||||
|
|
||||||
self._thread = None # The actual thread, # type: threading.Thread
|
self._thread = None # The actual thread, # type: threading.Thread
|
||||||
|
self._thread2 = None # watching stderr
|
||||||
self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
|
self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
|
@ -568,11 +635,20 @@ class ServiceManagerThread:
|
||||||
self.getStatus(), self._tdeSubProcess)
|
self.getStatus(), self._tdeSubProcess)
|
||||||
|
|
||||||
def getStatus(self):
|
def getStatus(self):
|
||||||
|
'''
|
||||||
|
Get the status of the process being managed. (misnomer alert!)
|
||||||
|
'''
|
||||||
return self._status
|
return self._status
|
||||||
|
|
||||||
# Start the thread (with sub process), and wait for the sub service
|
# Start the thread (with sub process), and wait for the sub service
|
||||||
# to become fully operational
|
# to become fully operational
|
||||||
def start(self, cmdLine):
|
def start(self, cmdLine : str, logDir: str):
|
||||||
|
'''
|
||||||
|
Request the manager thread to start a new sub process, and manage it.
|
||||||
|
|
||||||
|
:param cmdLine: the command line to invoke
|
||||||
|
:param logDir: the logging directory, to hold stdout/stderr files
|
||||||
|
'''
|
||||||
if self._thread:
|
if self._thread:
|
||||||
raise RuntimeError("Unexpected _thread")
|
raise RuntimeError("Unexpected _thread")
|
||||||
if self._tdeSubProcess:
|
if self._tdeSubProcess:
|
||||||
|
@ -582,20 +658,30 @@ class ServiceManagerThread:
|
||||||
|
|
||||||
self._status.set(Status.STATUS_STARTING)
|
self._status.set(Status.STATUS_STARTING)
|
||||||
self._tdeSubProcess = TdeSubProcess()
|
self._tdeSubProcess = TdeSubProcess()
|
||||||
self._tdeSubProcess.start(cmdLine)
|
self._tdeSubProcess.start(cmdLine) # TODO: verify process is running
|
||||||
|
|
||||||
self._ipcQueue = Queue()
|
self._ipcQueue = Queue()
|
||||||
self._thread = threading.Thread( # First thread captures server OUTPUT
|
self._thread = threading.Thread( # First thread captures server OUTPUT
|
||||||
target=self.svcOutputReader,
|
target=self.svcOutputReader,
|
||||||
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
|
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir))
|
||||||
self._thread.daemon = True # thread dies with the program
|
self._thread.daemon = True # thread dies with the program
|
||||||
self._thread.start()
|
self._thread.start()
|
||||||
|
time.sleep(0.01)
|
||||||
|
if not self._thread.is_alive(): # What happened?
|
||||||
|
Logging.info("Failed to started process to monitor STDOUT")
|
||||||
|
self.stop()
|
||||||
|
raise CrashGenError("Failed to start thread to monitor STDOUT")
|
||||||
|
Logging.info("Successfully started process to monitor STDOUT")
|
||||||
|
|
||||||
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
|
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
|
||||||
target=self.svcErrorReader,
|
target=self.svcErrorReader,
|
||||||
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
|
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue, logDir))
|
||||||
self._thread2.daemon = True # thread dies with the program
|
self._thread2.daemon = True # thread dies with the program
|
||||||
self._thread2.start()
|
self._thread2.start()
|
||||||
|
time.sleep(0.01)
|
||||||
|
if not self._thread2.is_alive():
|
||||||
|
self.stop()
|
||||||
|
raise CrashGenError("Failed to start thread to monitor STDERR")
|
||||||
|
|
||||||
# wait for service to start
|
# wait for service to start
|
||||||
for i in range(0, 100):
|
for i in range(0, 100):
|
||||||
|
@ -643,7 +729,7 @@ class ServiceManagerThread:
|
||||||
Logging.info("Service already stopped")
|
Logging.info("Service already stopped")
|
||||||
return
|
return
|
||||||
if self.getStatus().isStopping():
|
if self.getStatus().isStopping():
|
||||||
Logging.info("Service is already being stopped")
|
Logging.info("Service is already being stopped, pid: {}".format(self._tdeSubProcess.getPid()))
|
||||||
return
|
return
|
||||||
# Linux will send Control-C generated SIGINT to the TDengine process
|
# Linux will send Control-C generated SIGINT to the TDengine process
|
||||||
# already, ref:
|
# already, ref:
|
||||||
|
@ -653,14 +739,14 @@ class ServiceManagerThread:
|
||||||
|
|
||||||
self._status.set(Status.STATUS_STOPPING)
|
self._status.set(Status.STATUS_STOPPING)
|
||||||
# retCode = self._tdeSubProcess.stop()
|
# retCode = self._tdeSubProcess.stop()
|
||||||
try:
|
# try:
|
||||||
retCode = self._tdeSubProcess.stop()
|
# retCode = self._tdeSubProcess.stop()
|
||||||
# print("Attempted to stop sub process, got return code: {}".format(retCode))
|
# # print("Attempted to stop sub process, got return code: {}".format(retCode))
|
||||||
if retCode == signal.SIGSEGV : # SGV
|
# if retCode == signal.SIGSEGV : # SGV
|
||||||
Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
|
# Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
|
||||||
except subprocess.TimeoutExpired as err:
|
# except subprocess.TimeoutExpired as err:
|
||||||
Logging.info("Time out waiting for TDengine service process to exit")
|
# Logging.info("Time out waiting for TDengine service process to exit")
|
||||||
else:
|
if not self._tdeSubProcess.stop(): # everything withing
|
||||||
if self._tdeSubProcess.isRunning(): # still running, should now never happen
|
if self._tdeSubProcess.isRunning(): # still running, should now never happen
|
||||||
Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
|
Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
|
||||||
self._tdeSubProcess.getPid()))
|
self._tdeSubProcess.getPid()))
|
||||||
|
@ -683,16 +769,18 @@ class ServiceManagerThread:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"SMT.Join(): Unexpected status: {}".format(self._status))
|
"SMT.Join(): Unexpected status: {}".format(self._status))
|
||||||
|
|
||||||
if self._thread:
|
if self._thread or self._thread2 :
|
||||||
self._thread.join()
|
if self._thread:
|
||||||
self._thread = None
|
self._thread.join()
|
||||||
self._status.set(Status.STATUS_STOPPED)
|
self._thread = None
|
||||||
# STD ERR thread
|
if self._thread2: # STD ERR thread
|
||||||
self._thread2.join()
|
self._thread2.join()
|
||||||
self._thread2 = None
|
self._thread2 = None
|
||||||
else:
|
else:
|
||||||
print("Joining empty thread, doing nothing")
|
print("Joining empty thread, doing nothing")
|
||||||
|
|
||||||
|
self._status.set(Status.STATUS_STOPPED)
|
||||||
|
|
||||||
def _trimQueue(self, targetSize):
|
def _trimQueue(self, targetSize):
|
||||||
if targetSize <= 0:
|
if targetSize <= 0:
|
||||||
return # do nothing
|
return # do nothing
|
||||||
|
@ -739,11 +827,22 @@ class ServiceManagerThread:
|
||||||
print(pBar, end="", flush=True)
|
print(pBar, end="", flush=True)
|
||||||
print('\b\b\b\b', end="", flush=True)
|
print('\b\b\b\b', end="", flush=True)
|
||||||
|
|
||||||
def svcOutputReader(self, out: IO, queue):
|
def svcOutputReader(self, out: IO, queue, logDir: str):
|
||||||
|
'''
|
||||||
|
The infinite routine that processes the STDOUT stream for the sub process being managed.
|
||||||
|
|
||||||
|
:param out: the IO stream object used to fetch the data from
|
||||||
|
:param queue: the queue where we dump the roughly parsed line-by-line data
|
||||||
|
:param logDir: where we should dump a verbatim output file
|
||||||
|
'''
|
||||||
|
os.makedirs(logDir, exist_ok=True)
|
||||||
|
logFile = os.path.join(logDir,'stdout.log')
|
||||||
|
fOut = open(logFile, 'wb')
|
||||||
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
|
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
|
||||||
# print("This is the svcOutput Reader...")
|
# print("This is the svcOutput Reader...")
|
||||||
# for line in out :
|
# for line in out :
|
||||||
for line in iter(out.readline, b''):
|
for line in iter(out.readline, b''):
|
||||||
|
fOut.write(line)
|
||||||
# print("Finished reading a line: {}".format(line))
|
# print("Finished reading a line: {}".format(line))
|
||||||
# print("Adding item to queue...")
|
# print("Adding item to queue...")
|
||||||
try:
|
try:
|
||||||
|
@ -772,10 +871,16 @@ class ServiceManagerThread:
|
||||||
# queue.put(line)
|
# queue.put(line)
|
||||||
# meaning sub process must have died
|
# meaning sub process must have died
|
||||||
Logging.info("EOF for TDengine STDOUT: {}".format(self))
|
Logging.info("EOF for TDengine STDOUT: {}".format(self))
|
||||||
out.close()
|
out.close() # Close the stream
|
||||||
|
fOut.close() # Close the output file
|
||||||
|
|
||||||
def svcErrorReader(self, err: IO, queue):
|
def svcErrorReader(self, err: IO, queue, logDir: str):
|
||||||
|
os.makedirs(logDir, exist_ok=True)
|
||||||
|
logFile = os.path.join(logDir,'stderr.log')
|
||||||
|
fErr = open(logFile, 'wb')
|
||||||
for line in iter(err.readline, b''):
|
for line in iter(err.readline, b''):
|
||||||
|
fErr.write(line)
|
||||||
Logging.info("TDengine STDERR: {}".format(line))
|
Logging.info("TDengine STDERR: {}".format(line))
|
||||||
Logging.info("EOF for TDengine STDERR: {}".format(self))
|
Logging.info("EOF for TDengine STDERR: {}".format(self))
|
||||||
err.close()
|
err.close()
|
||||||
|
fErr.close()
|
Loading…
Reference in New Issue