Merge pull request #5806 from taosdata/enhance/TD-3079
[TD-3079]<enhance>: use multi-level storage with vnode's wal files
This commit is contained in:
commit
9f5301a318
|
@ -997,8 +997,8 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
|
|||
}
|
||||
|
||||
if ( rpcIsReq(pHead->msgType) ) {
|
||||
terrno = rpcProcessReqHead(pConn, pHead);
|
||||
pConn->connType = pRecv->connType;
|
||||
terrno = rpcProcessReqHead(pConn, pHead);
|
||||
|
||||
// stop idle timer
|
||||
taosTmrStopA(&pConn->pIdleTimer);
|
||||
|
|
|
@ -232,9 +232,28 @@ int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void vnodeFindWalRootDir(int32_t vgId, char *walRootDir) {
|
||||
char vnodeDir[TSDB_FILENAME_LEN] = "\0";
|
||||
snprintf(vnodeDir, TSDB_FILENAME_LEN, "/vnode/vnode%d/wal", vgId);
|
||||
|
||||
TDIR *tdir = tfsOpendir(vnodeDir);
|
||||
if (!tdir) return;
|
||||
|
||||
const TFILE *tfile = tfsReaddir(tdir);
|
||||
if (!tfile) {
|
||||
tfsClosedir(tdir);
|
||||
return;
|
||||
}
|
||||
|
||||
sprintf(walRootDir, "%s/vnode/vnode%d", TFS_DISK_PATH(tfile->level, tfile->id), vgId);
|
||||
|
||||
tfsClosedir(tdir);
|
||||
}
|
||||
|
||||
int32_t vnodeOpen(int32_t vgId) {
|
||||
char temp[TSDB_FILENAME_LEN * 3];
|
||||
char rootDir[TSDB_FILENAME_LEN * 2];
|
||||
char walRootDir[TSDB_FILENAME_LEN * 2] = {0};
|
||||
snprintf(rootDir, TSDB_FILENAME_LEN * 2, "%s/vnode%d", tsVnodeDir, vgId);
|
||||
|
||||
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
|
||||
|
@ -321,7 +340,21 @@ int32_t vnodeOpen(int32_t vgId) {
|
|||
}
|
||||
}
|
||||
|
||||
sprintf(temp, "%s/wal", rootDir);
|
||||
// walRootDir for wal & syncInfo.path (not empty dir of /vnode/vnode{pVnode->vgId}/wal)
|
||||
vnodeFindWalRootDir(pVnode->vgId, walRootDir);
|
||||
if (walRootDir[0] == 0) {
|
||||
int level = -1, id = -1;
|
||||
|
||||
tfsAllocDisk(TFS_PRIMARY_LEVEL, &level, &id);
|
||||
if (level < 0 || id < 0) {
|
||||
vnodeCleanUp(pVnode);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
sprintf(walRootDir, "%s/vnode/vnode%d", TFS_DISK_PATH(level, id), vgId);
|
||||
}
|
||||
|
||||
sprintf(temp, "%s/wal", walRootDir);
|
||||
pVnode->walCfg.vgId = pVnode->vgId;
|
||||
pVnode->wal = walOpen(temp, &pVnode->walCfg);
|
||||
if (pVnode->wal == NULL) {
|
||||
|
@ -353,7 +386,7 @@ int32_t vnodeOpen(int32_t vgId) {
|
|||
|
||||
pVnode->events = NULL;
|
||||
|
||||
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
|
||||
vDebug("vgId:%d, vnode is opened in %s - %s, pVnode:%p", pVnode->vgId, rootDir, walRootDir, pVnode);
|
||||
|
||||
vnodeAddIntoHash(pVnode);
|
||||
|
||||
|
@ -361,7 +394,7 @@ int32_t vnodeOpen(int32_t vgId) {
|
|||
syncInfo.vgId = pVnode->vgId;
|
||||
syncInfo.version = pVnode->version;
|
||||
syncInfo.syncCfg = pVnode->syncCfg;
|
||||
tstrncpy(syncInfo.path, rootDir, TSDB_FILENAME_LEN);
|
||||
tstrncpy(syncInfo.path, walRootDir, TSDB_FILENAME_LEN);
|
||||
syncInfo.getWalInfoFp = vnodeGetWalInfo;
|
||||
syncInfo.writeToCacheFp = vnodeWriteToCache;
|
||||
syncInfo.confirmForward = vnodeConfirmForard;
|
||||
|
|
|
@ -6,6 +6,25 @@ To effectively test and debug our TDengine product, we have developed a simple t
|
|||
exercise various functions of the system in a randomized fashion, hoping to expose
|
||||
maximum number of problems, hopefully without a pre-determined scenario.
|
||||
|
||||
# Features
|
||||
|
||||
This tool can run as a test client with the following features:
|
||||
|
||||
1. Any number of concurrent threads
|
||||
1. Any number of test steps/loops
|
||||
1. Auto-create and writing to multiple databases
|
||||
1. Ignore specific error codes
|
||||
1. Write small or large data blocks
|
||||
1. Auto-generate out-of-sequence data, if needed
|
||||
1. Verify the result of write operations
|
||||
1. Concurrent writing to a shadow database for later data verification
|
||||
1. User specified number of replicas to use, against clusters
|
||||
|
||||
This tool can also use to start a TDengine service, either in stand-alone mode or
|
||||
cluster mode. The features include:
|
||||
|
||||
1. User specified number of D-Nodes to create/use.
|
||||
|
||||
# Preparation
|
||||
|
||||
To run this tool, please ensure the followed preparation work is done first.
|
||||
|
@ -16,7 +35,7 @@ To run this tool, please ensure the followed preparation work is done first.
|
|||
Ubuntu 20.04LTS as our own development environment, and suggest you also use such
|
||||
an environment if possible.
|
||||
|
||||
# Simple Execution
|
||||
# Simple Execution as Client Test Tool
|
||||
|
||||
To run the tool with the simplest method, follow the steps below:
|
||||
|
||||
|
@ -28,19 +47,21 @@ To run the tool with the simplest method, follow the steps below:
|
|||
|
||||
That's it!
|
||||
|
||||
# Running Clusters
|
||||
# Running Server-side Clusters
|
||||
|
||||
This tool also makes it easy to test/verify the clustering capabilities of TDengine. You
|
||||
can start a cluster quite easily with the following command:
|
||||
|
||||
```
|
||||
$ cd tests/pytest/
|
||||
$ ./crash_gen.sh -e -o 3
|
||||
$ rm -rf ../../build/cluster_dnode_?; ./crash_gen.sh -e -o 3 # first part optional
|
||||
```
|
||||
|
||||
The `-e` option above tells the tool to start the service, and do not run any tests, while
|
||||
the `-o 3` option tells the tool to start 3 DNodes and join them together in a cluster.
|
||||
Obviously you can adjust the the number here.
|
||||
Obviously you can adjust the the number here. The `rm -rf` command line is optional
|
||||
to clean up previous cluster data, so that we can start from a clean state with no data
|
||||
at all.
|
||||
|
||||
## Behind the Scenes
|
||||
|
||||
|
@ -89,8 +110,9 @@ The exhaustive features of the tool is available through the `-h` option:
|
|||
|
||||
```
|
||||
$ ./crash_gen.sh -h
|
||||
usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS] [-i MAX_REPLICAS] [-l] [-n] [-o NUM_DNODES] [-p] [-r]
|
||||
[-s MAX_STEPS] [-t NUM_THREADS] [-v] [-x]
|
||||
usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS]
|
||||
[-i NUM_REPLICAS] [-k] [-l] [-m] [-n]
|
||||
[-o NUM_DNODES] [-p] [-r] [-s MAX_STEPS] [-t NUM_THREADS] [-v] [-w] [-x]
|
||||
|
||||
TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
|
||||
---------------------------------------------------------------------
|
||||
|
@ -109,11 +131,14 @@ optional arguments:
|
|||
-e, --run-tdengine Run TDengine service in foreground (default: false)
|
||||
-g IGNORE_ERRORS, --ignore-errors IGNORE_ERRORS
|
||||
Ignore error codes, comma separated, 0x supported (default: None)
|
||||
-i MAX_REPLICAS, --max-replicas MAX_REPLICAS
|
||||
Maximum number of replicas to use, when testing against clusters. (default: 1)
|
||||
-i NUM_REPLICAS, --num-replicas NUM_REPLICAS
|
||||
Number (fixed) of replicas to use, when testing against clusters. (default: 1)
|
||||
-k, --track-memory-leaks
|
||||
Use Valgrind tool to track memory leaks (default: false)
|
||||
-l, --larger-data Write larger amount of data during write operations (default: false)
|
||||
-m, --mix-oos-data Mix out-of-sequence data into the test data stream (default: true)
|
||||
-n, --dynamic-db-table-names
|
||||
Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)
|
||||
Use non-fixed names for dbs/tables, for -b, useful for multi-instance executions (default: false)
|
||||
-o NUM_DNODES, --num-dnodes NUM_DNODES
|
||||
Number of Dnodes to initialize, used with -e option. (default: 1)
|
||||
-p, --per-thread-db-connection
|
||||
|
@ -124,6 +149,7 @@ optional arguments:
|
|||
-t NUM_THREADS, --num-threads NUM_THREADS
|
||||
Number of threads to run (default: 10)
|
||||
-v, --verify-data Verify data written in a number of places by reading back (default: false)
|
||||
-w, --use-shadow-db Use a shaddow database to verify data integrity (default: false)
|
||||
-x, --continue-on-exception
|
||||
Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)
|
||||
```
|
||||
|
|
|
@ -1574,9 +1574,9 @@ class TaskCreateDb(StateTransitionTask):
|
|||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
# was: self.execWtSql(wt, "create database db")
|
||||
repStr = ""
|
||||
if gConfig.max_replicas != 1:
|
||||
if gConfig.num_replicas != 1:
|
||||
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
|
||||
numReplica = gConfig.max_replicas # fixed, always
|
||||
numReplica = gConfig.num_replicas # fixed, always
|
||||
repStr = "replica {}".format(numReplica)
|
||||
updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active
|
||||
dbName = self._db.getName()
|
||||
|
@ -2394,11 +2394,16 @@ class MainExec:
|
|||
help='Ignore error codes, comma separated, 0x supported (default: None)')
|
||||
parser.add_argument(
|
||||
'-i',
|
||||
'--max-replicas',
|
||||
'--num-replicas',
|
||||
action='store',
|
||||
default=1,
|
||||
type=int,
|
||||
help='Maximum number of replicas to use, when testing against clusters. (default: 1)')
|
||||
help='Number (fixed) of replicas to use, when testing against clusters. (default: 1)')
|
||||
parser.add_argument(
|
||||
'-k',
|
||||
'--track-memory-leaks',
|
||||
action='store_true',
|
||||
help='Use Valgrind tool to track memory leaks (default: false)')
|
||||
parser.add_argument(
|
||||
'-l',
|
||||
'--larger-data',
|
||||
|
|
|
@ -19,6 +19,7 @@ from queue import Queue, Empty
|
|||
|
||||
from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
|
||||
from .db import DbConn, DbTarget
|
||||
import crash_gen.settings
|
||||
|
||||
class TdeInstance():
|
||||
"""
|
||||
|
@ -132,6 +133,7 @@ keep 36500
|
|||
walLevel 1
|
||||
#
|
||||
# maxConnections 100
|
||||
quorum 2
|
||||
"""
|
||||
cfgContent = cfgTemplate.format_map(cfgValues)
|
||||
f = open(cfgFile, "w")
|
||||
|
@ -164,7 +166,12 @@ walLevel 1
|
|||
return "127.0.0.1"
|
||||
|
||||
def getServiceCmdLine(self): # to start the instance
|
||||
return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
||||
cmdLine = []
|
||||
if crash_gen.settings.gConfig.track_memory_leaks:
|
||||
Logging.info("Invoking VALGRIND on service...")
|
||||
cmdLine = ['valgrind', '--leak-check=yes']
|
||||
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
||||
return cmdLine
|
||||
|
||||
def _getDnodes(self, dbc):
|
||||
dbc.query("show dnodes")
|
||||
|
@ -202,7 +209,7 @@ walLevel 1
|
|||
self.generateCfgFile() # service side generates config file, client does not
|
||||
self.rotateLogs()
|
||||
|
||||
self._smThread.start(self.getServiceCmdLine())
|
||||
self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
|
||||
|
||||
def stop(self):
|
||||
self._smThread.stop()
|
||||
|
@ -225,7 +232,7 @@ class TdeSubProcess:
|
|||
# RET_SUCCESS = -4
|
||||
|
||||
def __init__(self):
|
||||
self.subProcess = None
|
||||
self.subProcess = None # type: subprocess.Popen
|
||||
# if tInst is None:
|
||||
# raise CrashGenError("Empty instance not allowed in TdeSubProcess")
|
||||
# self._tInst = tInst # Default create at ServiceManagerThread
|
||||
|
@ -263,7 +270,7 @@ class TdeSubProcess:
|
|||
# print("Starting TDengine with env: ", myEnv.items())
|
||||
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
|
||||
|
||||
useShell = True
|
||||
useShell = True # Needed to pass environments into it
|
||||
self.subProcess = subprocess.Popen(
|
||||
# ' '.join(cmdLine) if useShell else cmdLine,
|
||||
# shell=useShell,
|
||||
|
@ -276,12 +283,12 @@ class TdeSubProcess:
|
|||
env=myEnv
|
||||
) # had text=True, which interferred with reading EOF
|
||||
|
||||
STOP_SIGNAL = signal.SIGKILL # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
|
||||
STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
|
||||
SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stop a sub process, and try to return a meaningful return code.
|
||||
Stop a sub process, DO NOT return anything, process all conditions INSIDE
|
||||
|
||||
Common POSIX signal values (from man -7 signal):
|
||||
SIGHUP 1
|
||||
|
@ -301,40 +308,99 @@ class TdeSubProcess:
|
|||
"""
|
||||
if not self.subProcess:
|
||||
Logging.error("Sub process already stopped")
|
||||
return # -1
|
||||
return
|
||||
|
||||
retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
|
||||
if retCode: # valid return code, process ended
|
||||
retCode = -retCode # only if valid
|
||||
# retCode = -retCode # only if valid
|
||||
Logging.warning("TSP.stop(): process ended itself")
|
||||
self.subProcess = None
|
||||
return retCode
|
||||
return
|
||||
|
||||
# process still alive, let's interrupt it
|
||||
Logging.info("Terminate running process, send SIG_{} and wait...".format(self.STOP_SIGNAL))
|
||||
# sub process should end, then IPC queue should end, causing IO thread to end
|
||||
topSubProc = psutil.Process(self.subProcess.pid)
|
||||
for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False
|
||||
child.send_signal(self.STOP_SIGNAL)
|
||||
time.sleep(0.2) # 200 ms
|
||||
# topSubProc.send_signal(sig) # now kill the main sub process (likely the Shell)
|
||||
|
||||
self.subProcess.send_signal(self.STOP_SIGNAL) # main sub process (likely the Shell)
|
||||
self.subProcess.wait(20)
|
||||
retCode = self.subProcess.returncode # should always be there
|
||||
# May throw subprocess.TimeoutExpired exception above, therefore
|
||||
# The process is guranteed to have ended by now
|
||||
self._stopForSure(self.subProcess, self.STOP_SIGNAL) # success if no exception
|
||||
self.subProcess = None
|
||||
if retCode == self.SIG_KILL_RETCODE:
|
||||
Logging.info("TSP.stop(): sub proc KILLED, as expected")
|
||||
elif retCode == (- self.STOP_SIGNAL):
|
||||
Logging.info("TSP.stop(), sub process STOPPED, as expected")
|
||||
elif retCode != 0: # != (- signal.SIGINT):
|
||||
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(
|
||||
self.STOP_SIGNAL, retCode))
|
||||
|
||||
# sub process should end, then IPC queue should end, causing IO thread to end
|
||||
|
||||
@classmethod
|
||||
def _stopForSure(cls, proc: subprocess.Popen, sig: int):
|
||||
'''
|
||||
Stop a process and all sub processes with a singal, and SIGKILL if necessary
|
||||
'''
|
||||
def doKillTdService(proc: subprocess.Popen, sig: int):
|
||||
Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))
|
||||
proc.send_signal(sig)
|
||||
try:
|
||||
retCode = proc.wait(20)
|
||||
if (- retCode) == signal.SIGSEGV: # Crashed
|
||||
Logging.warning("Process {} CRASHED, please check CORE file!".format(proc.pid))
|
||||
elif (- retCode) == sig :
|
||||
Logging.info("TD service terminated with expected return code {}".format(sig))
|
||||
else:
|
||||
Logging.info("TSP.stop(): sub proc successfully terminated with SIG {}".format(self.STOP_SIGNAL))
|
||||
return - retCode
|
||||
Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
|
||||
return True # terminated successfully
|
||||
except subprocess.TimeoutExpired as err:
|
||||
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig))
|
||||
return False # failed to terminate
|
||||
|
||||
|
||||
def doKillChild(child: psutil.Process, sig: int):
|
||||
Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig))
|
||||
child.send_signal(sig)
|
||||
try:
|
||||
retCode = child.wait(20)
|
||||
if (- retCode) == signal.SIGSEGV: # Crashed
|
||||
Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid))
|
||||
elif (- retCode) == sig :
|
||||
Logging.info("Sub-sub process terminated with expected return code {}".format(sig))
|
||||
else:
|
||||
Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
|
||||
return True # terminated successfully
|
||||
except psutil.TimeoutExpired as err:
|
||||
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig))
|
||||
return False # did not terminate
|
||||
|
||||
def doKill(proc: subprocess.Popen, sig: int):
|
||||
pid = proc.pid
|
||||
try:
|
||||
topSubProc = psutil.Process(pid)
|
||||
for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False
|
||||
Logging.warning("Unexpected child to be killed")
|
||||
doKillChild(child, sig)
|
||||
except psutil.NoSuchProcess as err:
|
||||
Logging.info("Process not found, can't kill, pid = {}".format(pid))
|
||||
|
||||
return doKillTdService(proc, sig)
|
||||
# TODO: re-examine if we need to kill the top process, which is always the SHELL for now
|
||||
# try:
|
||||
# proc.wait(1) # SHELL process here, may throw subprocess.TimeoutExpired exception
|
||||
# # expRetCode = self.SIG_KILL_RETCODE if sig==signal.SIGKILL else (-sig)
|
||||
# # if retCode == expRetCode:
|
||||
# # Logging.info("Process terminated with expected return code {}".format(retCode))
|
||||
# # else:
|
||||
# # Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(expRetCode, retCode))
|
||||
# # return True # success
|
||||
# except subprocess.TimeoutExpired as err:
|
||||
# Logging.warning("Failed to kill process {} with signal {}".format(pid, sig))
|
||||
# return False # failed to kill
|
||||
|
||||
def softKill(proc, sig):
|
||||
return doKill(proc, sig)
|
||||
|
||||
def hardKill(proc):
|
||||
return doKill(proc, signal.SIGKILL)
|
||||
|
||||
|
||||
|
||||
pid = proc.pid
|
||||
Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig))
|
||||
if softKill(proc, sig):
|
||||
return# success
|
||||
if sig != signal.SIGKILL: # really was soft above
|
||||
if hardKill(proc):
|
||||
return
|
||||
raise CrashGenError("Failed to stop process, pid={}".format(pid))
|
||||
|
||||
class ServiceManager:
|
||||
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
|
||||
|
@ -561,6 +627,7 @@ class ServiceManagerThread:
|
|||
# self._tInst = tInst or TdeInstance() # Need an instance
|
||||
|
||||
self._thread = None # The actual thread, # type: threading.Thread
|
||||
self._thread2 = None # watching stderr
|
||||
self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
|
||||
|
||||
def __repr__(self):
|
||||
|
@ -568,11 +635,20 @@ class ServiceManagerThread:
|
|||
self.getStatus(), self._tdeSubProcess)
|
||||
|
||||
def getStatus(self):
|
||||
'''
|
||||
Get the status of the process being managed. (misnomer alert!)
|
||||
'''
|
||||
return self._status
|
||||
|
||||
# Start the thread (with sub process), and wait for the sub service
|
||||
# to become fully operational
|
||||
def start(self, cmdLine):
|
||||
def start(self, cmdLine : str, logDir: str):
|
||||
'''
|
||||
Request the manager thread to start a new sub process, and manage it.
|
||||
|
||||
:param cmdLine: the command line to invoke
|
||||
:param logDir: the logging directory, to hold stdout/stderr files
|
||||
'''
|
||||
if self._thread:
|
||||
raise RuntimeError("Unexpected _thread")
|
||||
if self._tdeSubProcess:
|
||||
|
@ -582,20 +658,30 @@ class ServiceManagerThread:
|
|||
|
||||
self._status.set(Status.STATUS_STARTING)
|
||||
self._tdeSubProcess = TdeSubProcess()
|
||||
self._tdeSubProcess.start(cmdLine)
|
||||
self._tdeSubProcess.start(cmdLine) # TODO: verify process is running
|
||||
|
||||
self._ipcQueue = Queue()
|
||||
self._thread = threading.Thread( # First thread captures server OUTPUT
|
||||
target=self.svcOutputReader,
|
||||
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
|
||||
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir))
|
||||
self._thread.daemon = True # thread dies with the program
|
||||
self._thread.start()
|
||||
time.sleep(0.01)
|
||||
if not self._thread.is_alive(): # What happened?
|
||||
Logging.info("Failed to started process to monitor STDOUT")
|
||||
self.stop()
|
||||
raise CrashGenError("Failed to start thread to monitor STDOUT")
|
||||
Logging.info("Successfully started process to monitor STDOUT")
|
||||
|
||||
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
|
||||
target=self.svcErrorReader,
|
||||
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
|
||||
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue, logDir))
|
||||
self._thread2.daemon = True # thread dies with the program
|
||||
self._thread2.start()
|
||||
time.sleep(0.01)
|
||||
if not self._thread2.is_alive():
|
||||
self.stop()
|
||||
raise CrashGenError("Failed to start thread to monitor STDERR")
|
||||
|
||||
# wait for service to start
|
||||
for i in range(0, 100):
|
||||
|
@ -643,7 +729,7 @@ class ServiceManagerThread:
|
|||
Logging.info("Service already stopped")
|
||||
return
|
||||
if self.getStatus().isStopping():
|
||||
Logging.info("Service is already being stopped")
|
||||
Logging.info("Service is already being stopped, pid: {}".format(self._tdeSubProcess.getPid()))
|
||||
return
|
||||
# Linux will send Control-C generated SIGINT to the TDengine process
|
||||
# already, ref:
|
||||
|
@ -653,14 +739,14 @@ class ServiceManagerThread:
|
|||
|
||||
self._status.set(Status.STATUS_STOPPING)
|
||||
# retCode = self._tdeSubProcess.stop()
|
||||
try:
|
||||
retCode = self._tdeSubProcess.stop()
|
||||
# print("Attempted to stop sub process, got return code: {}".format(retCode))
|
||||
if retCode == signal.SIGSEGV : # SGV
|
||||
Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
|
||||
except subprocess.TimeoutExpired as err:
|
||||
Logging.info("Time out waiting for TDengine service process to exit")
|
||||
else:
|
||||
# try:
|
||||
# retCode = self._tdeSubProcess.stop()
|
||||
# # print("Attempted to stop sub process, got return code: {}".format(retCode))
|
||||
# if retCode == signal.SIGSEGV : # SGV
|
||||
# Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
|
||||
# except subprocess.TimeoutExpired as err:
|
||||
# Logging.info("Time out waiting for TDengine service process to exit")
|
||||
if not self._tdeSubProcess.stop(): # everything withing
|
||||
if self._tdeSubProcess.isRunning(): # still running, should now never happen
|
||||
Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
|
||||
self._tdeSubProcess.getPid()))
|
||||
|
@ -683,16 +769,18 @@ class ServiceManagerThread:
|
|||
raise RuntimeError(
|
||||
"SMT.Join(): Unexpected status: {}".format(self._status))
|
||||
|
||||
if self._thread or self._thread2 :
|
||||
if self._thread:
|
||||
self._thread.join()
|
||||
self._thread = None
|
||||
self._status.set(Status.STATUS_STOPPED)
|
||||
# STD ERR thread
|
||||
if self._thread2: # STD ERR thread
|
||||
self._thread2.join()
|
||||
self._thread2 = None
|
||||
else:
|
||||
print("Joining empty thread, doing nothing")
|
||||
|
||||
self._status.set(Status.STATUS_STOPPED)
|
||||
|
||||
def _trimQueue(self, targetSize):
|
||||
if targetSize <= 0:
|
||||
return # do nothing
|
||||
|
@ -739,11 +827,22 @@ class ServiceManagerThread:
|
|||
print(pBar, end="", flush=True)
|
||||
print('\b\b\b\b', end="", flush=True)
|
||||
|
||||
def svcOutputReader(self, out: IO, queue):
|
||||
def svcOutputReader(self, out: IO, queue, logDir: str):
|
||||
'''
|
||||
The infinite routine that processes the STDOUT stream for the sub process being managed.
|
||||
|
||||
:param out: the IO stream object used to fetch the data from
|
||||
:param queue: the queue where we dump the roughly parsed line-by-line data
|
||||
:param logDir: where we should dump a verbatim output file
|
||||
'''
|
||||
os.makedirs(logDir, exist_ok=True)
|
||||
logFile = os.path.join(logDir,'stdout.log')
|
||||
fOut = open(logFile, 'wb')
|
||||
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
|
||||
# print("This is the svcOutput Reader...")
|
||||
# for line in out :
|
||||
for line in iter(out.readline, b''):
|
||||
fOut.write(line)
|
||||
# print("Finished reading a line: {}".format(line))
|
||||
# print("Adding item to queue...")
|
||||
try:
|
||||
|
@ -772,10 +871,16 @@ class ServiceManagerThread:
|
|||
# queue.put(line)
|
||||
# meaning sub process must have died
|
||||
Logging.info("EOF for TDengine STDOUT: {}".format(self))
|
||||
out.close()
|
||||
out.close() # Close the stream
|
||||
fOut.close() # Close the output file
|
||||
|
||||
def svcErrorReader(self, err: IO, queue):
|
||||
def svcErrorReader(self, err: IO, queue, logDir: str):
|
||||
os.makedirs(logDir, exist_ok=True)
|
||||
logFile = os.path.join(logDir,'stderr.log')
|
||||
fErr = open(logFile, 'wb')
|
||||
for line in iter(err.readline, b''):
|
||||
fErr.write(line)
|
||||
Logging.info("TDengine STDERR: {}".format(line))
|
||||
Logging.info("EOF for TDengine STDERR: {}".format(self))
|
||||
err.close()
|
||||
fErr.close()
|
Loading…
Reference in New Issue