diff --git a/tests/pytest/crash_gen/crash_gen.py b/tests/pytest/crash_gen/crash_gen.py index b1d79f54c3..3f662fac73 100755 --- a/tests/pytest/crash_gen/crash_gen.py +++ b/tests/pytest/crash_gen/crash_gen.py @@ -1780,8 +1780,8 @@ class Task(): return True elif msg.find("duplicated column names") != -1: # also alter table tag issues return True - elif (gSvcMgr!=None) and gSvcMgr.isRestarting(): - logger.info("Ignoring error when service is restarting: errno = {}, msg = {}".format(errno, msg)) + elif gSvcMgr and (not gSvcMgr.isStable()): # We are managing service, and ... + logger.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg)) return True return False # Not an acceptable error @@ -2451,8 +2451,9 @@ class MyLoggingAdapter(logging.LoggerAdapter): class ServiceManager: PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process - def __init__(self): - print("Starting TDengine Service Manager") + def __init__(self, numDnodes = 1): + logger.info("TDengine Service Manager (TSM) created") + self._numDnodes = numDnodes # >1 means we have a cluster # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec # signal.signal(signal.SIGINT, self.sigIntHandler) # signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! @@ -2460,9 +2461,12 @@ class ServiceManager: self.inSigHandler = False # self._status = MainExec.STATUS_RUNNING # set inside # _startTaosService() - self.svcMgrThread = None # type: ServiceManagerThread + self.svcMgrThreads = [] # type: List[ServiceManagerThread] + for i in range(0, numDnodes): + self.svcMgrThreads.append(ServiceManagerThread(i)) + self._lock = threading.Lock() - self._isRestarting = False + # self._isRestarting = False def _doMenu(self): choice = "" @@ -2494,7 +2498,7 @@ class ServiceManager: if choice == "1": self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue? elif choice == "2": - self.stopTaosService() + self.stopTaosServices() elif choice == "3": # Restart self.restart() else: @@ -2509,33 +2513,70 @@ class ServiceManager: return self.inSigHandler = True - self.stopTaosService() + self.stopTaosServices() print("ServiceManager: INT Signal Handler returning...") self.inSigHandler = False def sigHandlerResume(self): print("Resuming TDengine service manager (main thread)...\n\n") - def _updateThreadStatus(self): - if self.svcMgrThread: # valid svc mgr thread - if self.svcMgrThread.isStopped(): # done? - self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate? - self.svcMgrThread = None # no more + # def _updateThreadStatus(self): + # if self.svcMgrThread: # valid svc mgr thread + # if self.svcMgrThread.isStopped(): # done? + # self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate? + # self.svcMgrThread = None # no more + + def isActive(self): + """ + Determine if the service/cluster is active at all, i.e. at least + one thread is not "stopped". + """ + for thread in self.svcMgrThreads: + if not thread.isStopped(): + return True + return False + + # def isRestarting(self): + # """ + # Determine if the service/cluster is being "restarted", i.e., at least + # one thread is in "restarting" status + # """ + # for thread in self.svcMgrThreads: + # if thread.isRestarting(): + # return True + # return False + + def isStable(self): + """ + Determine if the service/cluster is "stable", i.e. all of the + threads are in "stable" status. + """ + for thread in self.svcMgrThreads: + if not thread.isStable(): + return False + return True def _procIpcAll(self): - while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here - if self.isRunning(): - self.svcMgrThread.procIpcBatch() # regular processing, - self._updateThreadStatus() - elif self.isRetarting(): - print("Service restarting...") + while self.isActive(): + for thread in self.svcMgrThreads: # all thread objects should always be valid + # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here + if thread.isRunning(): + thread.procIpcBatch() # regular processing, + if thread.isStopped(): + thread.procIpcBatch() # one last time? + # self._updateThreadStatus() + elif thread.isRetarting(): + print("Service restarting...") + # else this thread is stopped + time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round + # raise CrashGenError("dummy") print("Service Manager Thread (with subprocess) ended, main thread exiting...") - def startTaosService(self): + def startTaosServices(self): with self._lock: - if self.svcMgrThread: - raise RuntimeError("Cannot start TAOS service when one may already be running") + if self.isActive(): + raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running") # Find if there's already a taosd service, and then kill it for proc in psutil.process_iter(): @@ -2545,53 +2586,45 @@ class ServiceManager: proc.kill() # print("Process: {}".format(proc.name())) - self.svcMgrThread = ServiceManagerThread() # create the object - print("Attempting to start TAOS service started, printing out output...") - self.svcMgrThread.start() - self.svcMgrThread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines - print("TAOS service started") + # self.svcMgrThread = ServiceManagerThread() # create the object + for thread in self.svcMgrThreads: + thread.start() + thread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines - def stopTaosService(self, outputLines=20): + def stopTaosServices(self): with self._lock: - if not self.isRunning(): - logger.warning("Cannot stop TAOS service, not running") + if not self.isActive(): + logger.warning("Cannot stop TAOS service(s), already not active") return - print("Terminating Service Manager Thread (SMT) execution...") - self.svcMgrThread.stop() - if self.svcMgrThread.isStopped(): - self.svcMgrThread.procIpcBatch(outputLines) # one last time - self.svcMgrThread = None - print("End of TDengine Service Output") - print("----- TDengine Service (managed by SMT) is now terminated -----\n") - else: - print("WARNING: SMT did not terminate as expected") - + for thread in self.svcMgrThreads: + thread.stop() + def run(self): - self.startTaosService() + self.startTaosServices() self._procIpcAll() # pump/process all the messages, may encounter SIG + restart - if self.isRunning(): # if sig handler hasn't destroyed it by now - self.stopTaosService() # should have started already + if self.isActive(): # if sig handler hasn't destroyed it by now + self.stopTaosServices() # should have started already def restart(self): - if self._isRestarting: - logger.warning("Cannot restart service when it's already restarting") + if not self.isStable(): + logger.warning("Cannot restart service/cluster, when not stable") return - self._isRestarting = True - if self.isRunning(): - self.stopTaosService() + # self._isRestarting = True + if self.isActive(): + self.stopTaosServices() else: - logger.warning("Service not running when restart requested") + logger.warning("Service not active when restart requested") self.startTaosService() - self._isRestarting = False + # self._isRestarting = False - def isRunning(self): - return self.svcMgrThread != None + # def isRunning(self): + # return self.svcMgrThread != None - def isRestarting(self): - return self._isRestarting + # def isRestarting(self): + # return self._isRestarting class ServiceManagerThread: """ @@ -2602,15 +2635,26 @@ class ServiceManagerThread: """ MAX_QUEUE_SIZE = 10000 - def __init__(self, tInst : TdeInstance = None): + def __init__(self, tInstNum = 0, tInst : TdeInstance = None): + # Set the sub process self._tdeSubProcess = None # type: TdeSubProcess - self._tInst = tInst or TdeInstance() # Need an instance - self._thread = None - self._status = None + + # Arrange the TDengine instance + self._tInstNum = tInstNum # instance serial number in cluster, ZERO based + self._tInst = tInst or TdeInstance() # Need an instance + + self._thread = None # The actual thread, # type: threading.Thread + self._status = MainExec.STATUS_STOPPED # The status of the underlying service, actually. + + def __repr__(self): + return "[SvcMgrThread: tInstNum={}]".format(self._tInstNum) def getStatus(self): return self._status + def isStarting(self): + return self._status == MainExec.STATUS_STARTING + def isRunning(self): # return self._thread and self._thread.is_alive() return self._status == MainExec.STATUS_RUNNING @@ -2621,6 +2665,9 @@ class ServiceManagerThread: def isStopped(self): return self._status == MainExec.STATUS_STOPPED + def isStable(self): + return self.isRunning() or self.isStopped() + # Start the thread (with sub process), and wait for the sub service # to become fully operational def start(self): @@ -2629,8 +2676,9 @@ class ServiceManagerThread: if self._tdeSubProcess: raise RuntimeError("TDengine sub process already created/running") - self._status = MainExec.STATUS_STARTING + logger.info("Attempting to start TAOS service: {}".format(self)) + self._status = MainExec.STATUS_STARTING self._tdeSubProcess = TdeSubProcess(self._tInst) self._tdeSubProcess.start() @@ -2654,10 +2702,11 @@ class ServiceManagerThread: print("_zz_", end="", flush=True) if self._status == MainExec.STATUS_RUNNING: logger.info("[] TDengine service READY to process requests") + logger.info("[] TAOS service started: {}".format(self)) return # now we've started - # TODO: handle this better? + # TODO: handle failure-to-start better? self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output - raise RuntimeError("TDengine service did not start successfully") + raise RuntimeError("TDengine service did not start successfully: {}".format(self)) def stop(self): # can be called from both main thread or signal handler @@ -2687,6 +2736,15 @@ class ServiceManagerThread: self._tdeSubProcess = None # not running any more self.join() # stop the thread, change the status, etc. + # Check if it's really stopped + outputLines = 20 # for last output + if self.isStopped(): + self.procIpcBatch(outputLines) # one last time + print("End of TDengine Service Output: {}".format(self)) + print("----- TDengine Service (managed by SMT) is now terminated -----\n") + else: + print("WARNING: SMT did not terminate as expected: {}".format(self)) + def join(self): # TODO: sanity check if not self.isStopping(): @@ -2770,7 +2828,7 @@ class ServiceManagerThread: if line.find(self.TD_READY_MSG) != -1: # found logger.info("Waiting for the service to become FULLY READY") time.sleep(1.0) # wait for the server to truly start. TODO: remove this - logger.info("Service is now FULLY READY") + logger.info("Service instance #{} is now FULLY READY".format(self._tInstNum)) self._status = MainExec.STATUS_RUNNING # Trim the queue if necessary: TODO: try this 1 out of 10 times