now able to replicate TD-1038
This commit is contained in:
parent
cbe2602ef1
commit
fc1cfb56e7
|
@ -238,7 +238,7 @@ class WorkerThread:
|
||||||
|
|
||||||
|
|
||||||
class ThreadCoordinator:
|
class ThreadCoordinator:
|
||||||
WORKER_THREAD_TIMEOUT = 30
|
WORKER_THREAD_TIMEOUT = 60 # one minute
|
||||||
|
|
||||||
def __init__(self, pool: ThreadPool, dbManager):
|
def __init__(self, pool: ThreadPool, dbManager):
|
||||||
self._curStep = -1 # first step is 0
|
self._curStep = -1 # first step is 0
|
||||||
|
@ -388,7 +388,9 @@ class ThreadCoordinator:
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
transitionFailed = True
|
transitionFailed = True
|
||||||
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
|
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
|
||||||
logger.info("Transition failed: errno=0x{:X}, msg: {}".format(errno2, err))
|
errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
|
||||||
|
logger.info(errMsg)
|
||||||
|
self._execStats.registerFailure(errMsg)
|
||||||
|
|
||||||
# Then we move on to the next step
|
# Then we move on to the next step
|
||||||
self._releaseAllWorkerThreads(transitionFailed)
|
self._releaseAllWorkerThreads(transitionFailed)
|
||||||
|
@ -812,7 +814,7 @@ class DbConnNative(DbConn):
|
||||||
buildPath = root[:len(root) - len("/build/bin")]
|
buildPath = root[:len(root) - len("/build/bin")]
|
||||||
break
|
break
|
||||||
if buildPath == None:
|
if buildPath == None:
|
||||||
raise RuntimeError("Failed to determine buildPath, selfPath={}".format(self_path))
|
raise RuntimeError("Failed to determine buildPath, selfPath={}".format(selfPath))
|
||||||
return buildPath
|
return buildPath
|
||||||
|
|
||||||
|
|
||||||
|
@ -2325,7 +2327,7 @@ class ServiceManagerThread:
|
||||||
if self._tdeSubProcess.isRunning(): # still running
|
if self._tdeSubProcess.isRunning(): # still running
|
||||||
print(
|
print(
|
||||||
"FAILED to stop sub process, it is still running... pid = {}".format(
|
"FAILED to stop sub process, it is still running... pid = {}".format(
|
||||||
self.subProcess.pid))
|
self._tdeSubProcess.pid))
|
||||||
else:
|
else:
|
||||||
self._tdeSubProcess = None # not running any more
|
self._tdeSubProcess = None # not running any more
|
||||||
self.join() # stop the thread, change the status, etc.
|
self.join() # stop the thread, change the status, etc.
|
||||||
|
@ -2469,15 +2471,19 @@ class TdeSubProcess:
|
||||||
|
|
||||||
|
|
||||||
svcCmd = [taosdPath, '-c', cfgPath]
|
svcCmd = [taosdPath, '-c', cfgPath]
|
||||||
|
svcCmdSingle = "{} -c {}".format(taosdPath, cfgPath)
|
||||||
# svcCmd = ['vmstat', '1']
|
# svcCmd = ['vmstat', '1']
|
||||||
if self.subProcess: # already there
|
if self.subProcess: # already there
|
||||||
raise RuntimeError("Corrupt process state")
|
raise RuntimeError("Corrupt process state")
|
||||||
|
|
||||||
|
# print("Starting service: {}".format(svcCmd))
|
||||||
self.subProcess = subprocess.Popen(
|
self.subProcess = subprocess.Popen(
|
||||||
svcCmd,
|
# svcCmd, shell=False,
|
||||||
|
svcCmdSingle, shell=True, # capture core dump?
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
# bufsize=1, # not supported in binary mode
|
# bufsize=1, # not supported in binary mode
|
||||||
close_fds=ON_POSIX) # had text=True, which interferred with reading EOF
|
close_fds=ON_POSIX
|
||||||
|
) # had text=True, which interferred with reading EOF
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if not self.subProcess:
|
if not self.subProcess:
|
||||||
|
|
Loading…
Reference in New Issue