Merge pull request #6213 from taosdata/feature/crash_gen2
Enhancing crash_gen tool to report memory leaks in clustering mode
This commit is contained in:
commit
59df7a6e2c
|
@ -22,7 +22,7 @@ from queue import Queue, Empty
|
||||||
from .shared.config import Config
|
from .shared.config import Config
|
||||||
from .shared.db import DbTarget, DbConn
|
from .shared.db import DbTarget, DbConn
|
||||||
from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice
|
from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice
|
||||||
from .shared.types import DirPath
|
from .shared.types import DirPath, IpcStream
|
||||||
|
|
||||||
# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
|
# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
|
||||||
# from crash_gen.db import DbConn, DbTarget
|
# from crash_gen.db import DbConn, DbTarget
|
||||||
|
@ -177,13 +177,12 @@ quorum 2
|
||||||
return "127.0.0.1"
|
return "127.0.0.1"
|
||||||
|
|
||||||
def getServiceCmdLine(self): # to start the instance
|
def getServiceCmdLine(self): # to start the instance
|
||||||
cmdLine = []
|
|
||||||
if Config.getConfig().track_memory_leaks:
|
if Config.getConfig().track_memory_leaks:
|
||||||
Logging.info("Invoking VALGRIND on service...")
|
Logging.info("Invoking VALGRIND on service...")
|
||||||
cmdLine = ['valgrind', '--leak-check=yes']
|
return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()]
|
||||||
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
|
else:
|
||||||
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
|
||||||
return cmdLine
|
return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
||||||
|
|
||||||
def _getDnodes(self, dbc):
|
def _getDnodes(self, dbc):
|
||||||
dbc.query("show dnodes")
|
dbc.query("show dnodes")
|
||||||
|
@ -281,16 +280,16 @@ class TdeSubProcess:
|
||||||
return '[TdeSubProc: pid = {}, status = {}]'.format(
|
return '[TdeSubProc: pid = {}, status = {}]'.format(
|
||||||
self.getPid(), self.getStatus() )
|
self.getPid(), self.getStatus() )
|
||||||
|
|
||||||
def getStdOut(self) -> BinaryIO :
|
def getIpcStdOut(self) -> IpcStream :
|
||||||
if self._popen.universal_newlines : # alias of text_mode
|
if self._popen.universal_newlines : # alias of text_mode
|
||||||
raise CrashGenError("We need binary mode for STDOUT IPC")
|
raise CrashGenError("We need binary mode for STDOUT IPC")
|
||||||
# Logging.info("Type of stdout is: {}".format(type(self._popen.stdout)))
|
# Logging.info("Type of stdout is: {}".format(type(self._popen.stdout)))
|
||||||
return typing.cast(BinaryIO, self._popen.stdout)
|
return typing.cast(IpcStream, self._popen.stdout)
|
||||||
|
|
||||||
def getStdErr(self) -> BinaryIO :
|
def getIpcStdErr(self) -> IpcStream :
|
||||||
if self._popen.universal_newlines : # alias of text_mode
|
if self._popen.universal_newlines : # alias of text_mode
|
||||||
raise CrashGenError("We need binary mode for STDERR IPC")
|
raise CrashGenError("We need binary mode for STDERR IPC")
|
||||||
return typing.cast(BinaryIO, self._popen.stderr)
|
return typing.cast(IpcStream, self._popen.stderr)
|
||||||
|
|
||||||
# Now it's always running, since we matched the life cycle
|
# Now it's always running, since we matched the life cycle
|
||||||
# def isRunning(self):
|
# def isRunning(self):
|
||||||
|
@ -301,11 +300,6 @@ class TdeSubProcess:
|
||||||
|
|
||||||
def _start(self, cmdLine) -> Popen :
|
def _start(self, cmdLine) -> Popen :
|
||||||
ON_POSIX = 'posix' in sys.builtin_module_names
|
ON_POSIX = 'posix' in sys.builtin_module_names
|
||||||
|
|
||||||
# Sanity check
|
|
||||||
# if self.subProcess: # already there
|
|
||||||
# raise RuntimeError("Corrupt process state")
|
|
||||||
|
|
||||||
|
|
||||||
# Prepare environment variables for coverage information
|
# Prepare environment variables for coverage information
|
||||||
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
|
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
|
||||||
|
@ -314,9 +308,8 @@ class TdeSubProcess:
|
||||||
|
|
||||||
# print(myEnv)
|
# print(myEnv)
|
||||||
# print("Starting TDengine with env: ", myEnv.items())
|
# print("Starting TDengine with env: ", myEnv.items())
|
||||||
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
|
print("Starting TDengine: {}".format(cmdLine))
|
||||||
|
|
||||||
# useShell = True # Needed to pass environments into it
|
|
||||||
return Popen(
|
return Popen(
|
||||||
' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
|
' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
|
||||||
shell=True, # Always use shell, since we need to pass ENV vars
|
shell=True, # Always use shell, since we need to pass ENV vars
|
||||||
|
@ -732,19 +725,19 @@ class ServiceManagerThread:
|
||||||
self._ipcQueue = Queue() # type: Queue
|
self._ipcQueue = Queue() # type: Queue
|
||||||
self._thread = threading.Thread( # First thread captures server OUTPUT
|
self._thread = threading.Thread( # First thread captures server OUTPUT
|
||||||
target=self.svcOutputReader,
|
target=self.svcOutputReader,
|
||||||
args=(subProc.getStdOut(), self._ipcQueue, logDir))
|
args=(subProc.getIpcStdOut(), self._ipcQueue, logDir))
|
||||||
self._thread.daemon = True # thread dies with the program
|
self._thread.daemon = True # thread dies with the program
|
||||||
self._thread.start()
|
self._thread.start()
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
if not self._thread.is_alive(): # What happened?
|
if not self._thread.is_alive(): # What happened?
|
||||||
Logging.info("Failed to started process to monitor STDOUT")
|
Logging.info("Failed to start process to monitor STDOUT")
|
||||||
self.stop()
|
self.stop()
|
||||||
raise CrashGenError("Failed to start thread to monitor STDOUT")
|
raise CrashGenError("Failed to start thread to monitor STDOUT")
|
||||||
Logging.info("Successfully started process to monitor STDOUT")
|
Logging.info("Successfully started process to monitor STDOUT")
|
||||||
|
|
||||||
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
|
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
|
||||||
target=self.svcErrorReader,
|
target=self.svcErrorReader,
|
||||||
args=(subProc.getStdErr(), self._ipcQueue, logDir))
|
args=(subProc.getIpcStdErr(), self._ipcQueue, logDir))
|
||||||
self._thread2.daemon = True # thread dies with the program
|
self._thread2.daemon = True # thread dies with the program
|
||||||
self._thread2.start()
|
self._thread2.start()
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
@ -887,14 +880,19 @@ class ServiceManagerThread:
|
||||||
print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437')))
|
print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437')))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str
|
def _textChunkGenerator(self, streamIn: IpcStream, logDir: str, logFile: str
|
||||||
) -> Generator[TextChunk, None, None]:
|
) -> Generator[TextChunk, None, None]:
|
||||||
'''
|
'''
|
||||||
Take an input stream with binary data, produced a generator of decoded
|
Take an input stream with binary data (likely from Popen), produced a generator of decoded
|
||||||
"text chunks", and also save the original binary data in a log file.
|
"text chunks".
|
||||||
|
|
||||||
|
Side effect: it also save the original binary data in a log file.
|
||||||
'''
|
'''
|
||||||
os.makedirs(logDir, exist_ok=True)
|
os.makedirs(logDir, exist_ok=True)
|
||||||
logF = open(os.path.join(logDir, logFile), 'wb')
|
logF = open(os.path.join(logDir, logFile), 'wb')
|
||||||
|
if logF is None:
|
||||||
|
Logging.error("Failed to open log file (binary write): {}/{}".format(logDir, logFile))
|
||||||
|
return
|
||||||
for bChunk in iter(streamIn.readline, b''):
|
for bChunk in iter(streamIn.readline, b''):
|
||||||
logF.write(bChunk) # Write to log file immediately
|
logF.write(bChunk) # Write to log file immediately
|
||||||
tChunk = self._decodeBinaryChunk(bChunk) # decode
|
tChunk = self._decodeBinaryChunk(bChunk) # decode
|
||||||
|
@ -902,14 +900,14 @@ class ServiceManagerThread:
|
||||||
yield tChunk # TODO: split into actual text lines
|
yield tChunk # TODO: split into actual text lines
|
||||||
|
|
||||||
# At the end...
|
# At the end...
|
||||||
streamIn.close() # Close the stream
|
streamIn.close() # Close the incoming stream
|
||||||
logF.close() # Close the output file
|
logF.close() # Close the log file
|
||||||
|
|
||||||
def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str):
|
def svcOutputReader(self, ipcStdOut: IpcStream, queue, logDir: str):
|
||||||
'''
|
'''
|
||||||
The infinite routine that processes the STDOUT stream for the sub process being managed.
|
The infinite routine that processes the STDOUT stream for the sub process being managed.
|
||||||
|
|
||||||
:param stdOut: the IO stream object used to fetch the data from
|
:param ipcStdOut: the IO stream object used to fetch the data from
|
||||||
:param queue: the queue where we dump the roughly parsed chunk-by-chunk text data
|
:param queue: the queue where we dump the roughly parsed chunk-by-chunk text data
|
||||||
:param logDir: where we should dump a verbatim output file
|
:param logDir: where we should dump a verbatim output file
|
||||||
'''
|
'''
|
||||||
|
@ -917,7 +915,7 @@ class ServiceManagerThread:
|
||||||
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
|
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
|
||||||
# print("This is the svcOutput Reader...")
|
# print("This is the svcOutput Reader...")
|
||||||
# stdOut.readline() # Skip the first output? TODO: remove?
|
# stdOut.readline() # Skip the first output? TODO: remove?
|
||||||
for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') :
|
for tChunk in self._textChunkGenerator(ipcStdOut, logDir, 'stdout.log') :
|
||||||
queue.put(tChunk) # tChunk garanteed not to be None
|
queue.put(tChunk) # tChunk garanteed not to be None
|
||||||
self._printProgress("_i")
|
self._printProgress("_i")
|
||||||
|
|
||||||
|
@ -940,12 +938,12 @@ class ServiceManagerThread:
|
||||||
Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
|
Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
|
||||||
self.setStatus(Status.STATUS_STOPPED)
|
self.setStatus(Status.STATUS_STOPPED)
|
||||||
|
|
||||||
def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str):
|
def svcErrorReader(self, ipcStdErr: IpcStream, queue, logDir: str):
|
||||||
# os.makedirs(logDir, exist_ok=True)
|
# os.makedirs(logDir, exist_ok=True)
|
||||||
# logFile = os.path.join(logDir,'stderr.log')
|
# logFile = os.path.join(logDir,'stderr.log')
|
||||||
# fErr = open(logFile, 'wb')
|
# fErr = open(logFile, 'wb')
|
||||||
# for line in iter(err.readline, b''):
|
# for line in iter(err.readline, b''):
|
||||||
for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') :
|
for tChunk in self._textChunkGenerator(ipcStdErr, logDir, 'stderr.log') :
|
||||||
queue.put(tChunk) # tChunk garanteed not to be None
|
queue.put(tChunk) # tChunk garanteed not to be None
|
||||||
# fErr.write(line)
|
# fErr.write(line)
|
||||||
Logging.info("TDengine STDERR: {}".format(tChunk))
|
Logging.info("TDengine STDERR: {}".format(tChunk))
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from typing import Any, List, Dict, NewType
|
from typing import Any, BinaryIO, List, Dict, NewType
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
DirPath = NewType('DirPath', str)
|
DirPath = NewType('DirPath', str)
|
||||||
|
@ -26,3 +26,5 @@ class TdDataType(Enum):
|
||||||
|
|
||||||
TdColumns = Dict[str, TdDataType]
|
TdColumns = Dict[str, TdDataType]
|
||||||
TdTags = Dict[str, TdDataType]
|
TdTags = Dict[str, TdDataType]
|
||||||
|
|
||||||
|
IpcStream = NewType('IpcStream', BinaryIO)
|
Loading…
Reference in New Issue