From dc72a1a60c7c206f1e2d03a14fea3d756997a8de Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 21 Oct 2020 07:54:47 +0000 Subject: [PATCH] Split crash_gen tool into different functional files/modules --- tests/pytest/crash_gen/crash_gen.py | 914 +++------------------- tests/pytest/crash_gen/misc.py | 133 ++++ tests/pytest/crash_gen/service_manager.py | 633 +++++++++++++++ 3 files changed, 873 insertions(+), 807 deletions(-) create mode 100644 tests/pytest/crash_gen/misc.py create mode 100644 tests/pytest/crash_gen/service_manager.py diff --git a/tests/pytest/crash_gen/crash_gen.py b/tests/pytest/crash_gen/crash_gen.py index dbd4eab9e7..f369f5a3e8 100755 --- a/tests/pytest/crash_gen/crash_gen.py +++ b/tests/pytest/crash_gen/crash_gen.py @@ -19,17 +19,15 @@ from util.sql import * from util.cases import * from util.dnodes import * from util.log import * -from queue import Queue, Empty -from typing import IO from typing import Set from typing import Dict from typing import List from requests.auth import HTTPBasicAuth import textwrap -import datetime -import logging import time +import datetime import random +import logging import threading import requests import copy @@ -38,19 +36,14 @@ import getopt import sys import os -import io import signal import traceback import resource from guppy import hpy import gc -import subprocess -try: - import psutil -except: - print("Psutil module needed, please install: sudo pip3 install psutil") - sys.exit(-1) +from .service_manager import ServiceManager, TdeInstance +from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress # Require Python 3 if sys.version_info[0] < 3: @@ -62,19 +55,12 @@ if sys.version_info[0] < 3: # ConfigNameSpace = argparse.Namespace gConfig: argparse.Namespace gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection -logger: logging.Logger +# logger: logging.Logger gContainer: Container # def runThread(wt: WorkerThread): # wt.run() -class CrashGenError(Exception): - def __init__(self, msg=None, errno=None): - self.msg = msg - self.errno = errno - - def __str__(self): - return self.msg class WorkerThread: def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator, @@ -107,10 +93,10 @@ class WorkerThread: # self._dbInUse = False # if "use db" was executed already def logDebug(self, msg): - logger.debug(" TRD[{}] {}".format(self._tid, msg)) + Logging.debug(" TRD[{}] {}".format(self._tid, msg)) def logInfo(self, msg): - logger.info(" TRD[{}] {}".format(self._tid, msg)) + Logging.info(" TRD[{}] {}".format(self._tid, msg)) # def dbInUse(self): # return self._dbInUse @@ -129,10 +115,10 @@ class WorkerThread: def run(self): # initialization after thread starts, in the thread context # self.isSleeping = False - logger.info("Starting to run thread: {}".format(self._tid)) + Logging.info("Starting to run thread: {}".format(self._tid)) if (gConfig.per_thread_db_connection): # type: ignore - logger.debug("Worker thread openning database connection") + Logging.debug("Worker thread openning database connection") self._dbConn.open() self._doTaskLoop() @@ -142,7 +128,7 @@ class WorkerThread: if self._dbConn.isOpen: #sometimes it is not open self._dbConn.close() else: - logger.warning("Cleaning up worker thread, dbConn already closed") + Logging.warning("Cleaning up worker thread, dbConn already closed") def _doTaskLoop(self): # while self._curStep < self._pool.maxSteps: @@ -153,15 +139,15 @@ class WorkerThread: tc.crossStepBarrier() # shared barrier first, INCLUDING the last one except threading.BrokenBarrierError as err: # main thread timed out print("_bto", end="") - logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out") + Logging.debug("[TRD] Worker thread exiting due to main thread barrier time-out") break - logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) + Logging.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) self.crossStepGate() # then per-thread gate, after being tapped - logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) + Logging.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) if not self._tc.isRunning(): print("_wts", end="") - logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") + Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) @@ -180,15 +166,15 @@ class WorkerThread: raise # Fetch a task from the Thread Coordinator - logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid)) + Logging.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() # Execute such a task - logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format( + Logging.debug("[TRD] Worker thread [{}] about to execute task: {}".format( self._tid, task.__class__.__name__)) task.execute(self) tc.saveExecutedTask(task) - logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) + Logging.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) # self._dbInUse = False # there may be changes between steps # print("_wtd", end=None) # worker thread died @@ -211,7 +197,7 @@ class WorkerThread: self.verifyThreadSelf() # only allowed by ourselves # Wait again at the "gate", waiting to be "tapped" - logger.debug( + Logging.debug( "[TRD] Worker thread {} about to cross the step gate".format( self._tid)) self._stepGate.wait() @@ -224,7 +210,7 @@ class WorkerThread: self.verifyThreadMain() # only allowed for main thread if self._thread.is_alive(): - logger.debug("[TRD] Tapping worker thread {}".format(self._tid)) + Logging.debug("[TRD] Tapping worker thread {}".format(self._tid)) self._stepGate.set() # wake up! time.sleep(0) # let the released thread run a bit else: @@ -269,7 +255,7 @@ class ThreadCoordinator: self._stepBarrier = threading.Barrier( self._pool.numThreads + 1) # one barrier for all threads self._execStats = ExecutionStats() - self._runStatus = MainExec.STATUS_RUNNING + self._runStatus = Status.STATUS_RUNNING self._initDbs() def getTaskExecutor(self): @@ -282,14 +268,14 @@ class ThreadCoordinator: self._stepBarrier.wait(timeout) def requestToStop(self): - self._runStatus = MainExec.STATUS_STOPPING + self._runStatus = Status.STATUS_STOPPING self._execStats.registerFailure("User Interruption") def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout): maxSteps = gConfig.max_steps # type: ignore if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 return True - if self._runStatus != MainExec.STATUS_RUNNING: + if self._runStatus != Status.STATUS_RUNNING: return True if transitionFailed: return True @@ -310,7 +296,7 @@ class ThreadCoordinator: def _releaseAllWorkerThreads(self, transitionFailed): self._curStep += 1 # we are about to get into next step. TODO: race condition here! # Now not all threads had time to go to sleep - logger.debug( + Logging.debug( "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) # A new TE for the new step @@ -318,7 +304,7 @@ class ThreadCoordinator: if not transitionFailed: # only if not failed self._te = TaskExecutor(self._curStep) - logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format( + Logging.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format( self._curStep)) # Now not all threads had time to go to sleep # Worker threads will wake up at this point, and each execute it's own task self.tapAllThreads() # release all worker thread from their "gates" @@ -327,10 +313,10 @@ class ThreadCoordinator: # Now main thread (that's us) is ready to enter a step # let other threads go past the pool barrier, but wait at the # thread gate - logger.debug("[TRD] Main thread about to cross the barrier") + Logging.debug("[TRD] Main thread about to cross the barrier") self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT) self._stepBarrier.reset() # Other worker threads should now be at the "gate" - logger.debug("[TRD] Main thread finished crossing the barrier") + Logging.debug("[TRD] Main thread finished crossing the barrier") def _doTransition(self): transitionFailed = False @@ -338,11 +324,11 @@ class ThreadCoordinator: for x in self._dbs: db = x # type: Database sm = db.getStateMachine() - logger.debug("[STT] starting transitions for DB: {}".format(db.getName())) + Logging.debug("[STT] starting transitions for DB: {}".format(db.getName())) # at end of step, transiton the DB state tasksForDb = db.filterTasks(self._executedTasks) sm.transition(tasksForDb, self.getDbManager().getDbConn()) - logger.debug("[STT] transition ended for DB: {}".format(db.getName())) + Logging.debug("[STT] transition ended for DB: {}".format(db.getName())) # Due to limitation (or maybe not) of the TD Python library, # we cannot share connections across threads @@ -350,14 +336,14 @@ class ThreadCoordinator: # Moving below to task loop # if sm.hasDatabase(): # for t in self._pool.threadList: - # logger.debug("[DB] use db for all worker threads") + # Logging.debug("[DB] use db for all worker threads") # t.useDb() # t.execSql("use db") # main thread executing "use # db" on behalf of every worker thread except taos.error.ProgrammingError as err: if (err.msg == 'network unavailable'): # broken DB connection - logger.info("DB connection broken, execution failed") + Logging.info("DB connection broken, execution failed") traceback.print_stack() transitionFailed = True self._te = None # Not running any more @@ -370,7 +356,7 @@ class ThreadCoordinator: self.resetExecutedTasks() # clear the tasks after we are done # Get ready for next step - logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed)) + Logging.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed)) return transitionFailed def run(self): @@ -384,8 +370,9 @@ class ThreadCoordinator: hasAbortedTask = False workerTimeout = False while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout): - if not gConfig.debug: # print this only if we are not in debug mode - print(".", end="", flush=True) + if not gConfig.debug: # print this only if we are not in debug mode + Progress.emit(Progress.STEP_BOUNDARY) + # print(".", end="", flush=True) # if (self._curStep % 2) == 0: # print memory usage once every 10 steps # memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss # print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage @@ -397,8 +384,9 @@ class ThreadCoordinator: try: self._syncAtBarrier() # For now just cross the barrier + Progress.emit(Progress.END_THREAD_STEP) except threading.BrokenBarrierError as err: - logger.info("Main loop aborted, caused by worker thread time-out") + Logging.info("Main loop aborted, caused by worker thread time-out") self._execStats.registerFailure("Aborted due to worker thread timeout") print("\n\nWorker Thread time-out detected, important thread info:") ts = ThreadStacks() @@ -411,7 +399,7 @@ class ThreadCoordinator: # threads are QUIET. hasAbortedTask = self._hasAbortedTask() # from previous step if hasAbortedTask: - logger.info("Aborted task encountered, exiting test program") + Logging.info("Aborted task encountered, exiting test program") self._execStats.registerFailure("Aborted Task Encountered") break # do transition only if tasks are error free @@ -422,29 +410,30 @@ class ThreadCoordinator: transitionFailed = True errno2 = Helper.convertErrno(err.errno) # correct error scheme errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err) - logger.info(errMsg) + Logging.info(errMsg) traceback.print_exc() self._execStats.registerFailure(errMsg) # Then we move on to the next step + Progress.emit(Progress.BEGIN_THREAD_STEP) self._releaseAllWorkerThreads(transitionFailed) if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" - logger.debug("Abnormal ending of main thraed") + Logging.debug("Abnormal ending of main thraed") elif workerTimeout: - logger.debug("Abnormal ending of main thread, due to worker timeout") + Logging.debug("Abnormal ending of main thread, due to worker timeout") else: # regular ending, workers waiting at "barrier" - logger.debug("Regular ending, main thread waiting for all worker threads to stop...") + Logging.debug("Regular ending, main thread waiting for all worker threads to stop...") self._syncAtBarrier() self._te = None # No more executor, time to end - logger.debug("Main thread tapping all threads one last time...") + Logging.debug("Main thread tapping all threads one last time...") self.tapAllThreads() # Let the threads run one last time - logger.debug("\r\n\n--> Main thread ready to finish up...") - logger.debug("Main thread joining all threads") + Logging.debug("\r\n\n--> Main thread ready to finish up...") + Logging.debug("Main thread joining all threads") self._pool.joinAll() # Get all threads to finish - logger.info("\nAll worker threads finished") + Logging.info("\nAll worker threads finished") self._execStats.endExec() def cleanup(self): # free resources @@ -476,7 +465,7 @@ class ThreadCoordinator: wakeSeq.append(i) else: wakeSeq.insert(0, i) - logger.debug( + Logging.debug( "[TRD] Main thread waking up worker threads: {}".format( str(wakeSeq))) # TODO: set dice seed to a deterministic value @@ -524,13 +513,6 @@ class ThreadCoordinator: with self._lock: self._executedTasks.append(task) -# We define a class to run a number of threads in locking steps. - -class Helper: - @classmethod - def convertErrno(cls, errno): - return errno if (errno > 0) else 0x80000000 + errno - class ThreadPool: def __init__(self, numThreads, maxSteps): self.numThreads = numThreads @@ -548,7 +530,7 @@ class ThreadPool: def joinAll(self): for workerThread in self.threadList: - logger.debug("Joining thread...") + Logging.debug("Joining thread...") workerThread._thread.join() def cleanup(self): @@ -605,7 +587,7 @@ class LinearQueue(): def allocate(self, i): with self._lock: - # logger.debug("LQ allocating item {}".format(i)) + # Logging.debug("LQ allocating item {}".format(i)) if (i in self.inUse): raise RuntimeError( "Cannot re-use same index in queue: {}".format(i)) @@ -613,7 +595,7 @@ class LinearQueue(): def release(self, i): with self._lock: - # logger.debug("LQ releasing item {}".format(i)) + # Logging.debug("LQ releasing item {}".format(i)) self.inUse.remove(i) # KeyError possible, TODO: why? def size(self): @@ -673,9 +655,12 @@ class DbConn: # below implemented by child classes self.openByType() - logger.debug("[DB] data connection opened, type = {}".format(self._type)) + Logging.debug("[DB] data connection opened, type = {}".format(self._type)) self.isOpen = True + def close(self): + raise RuntimeError("Unexpected execution, should be overriden") + def queryScalar(self, sql) -> int: return self._queryAny(sql) @@ -755,7 +740,7 @@ class DbConnRest(DbConn): if (not self.isOpen): raise RuntimeError("Cannot clean up database until connection is open") # Do nothing for REST - logger.debug("[DB] REST Database connection closed") + Logging.debug("[DB] REST Database connection closed") self.isOpen = False def _doSql(self, sql): @@ -793,9 +778,9 @@ class DbConnRest(DbConn): if (not self.isOpen): raise RuntimeError( "Cannot execute database commands until connection is open") - logger.debug("[SQL-REST] Executing SQL: {}".format(sql)) + Logging.debug("[SQL-REST] Executing SQL: {}".format(sql)) nRows = self._doSql(sql) - logger.debug( + Logging.debug( "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) return nRows @@ -884,127 +869,6 @@ class MyTDSql: raise return self.affectedRows -class TdeInstance(): - """ - A class to capture the *static* information of a TDengine instance, - including the location of the various files/directories, and basica - configuration. - """ - - @classmethod - def _getBuildPath(cls): - selfPath = os.path.dirname(os.path.realpath(__file__)) - if ("community" in selfPath): - projPath = selfPath[:selfPath.find("communit")] - else: - projPath = selfPath[:selfPath.find("tests")] - - buildPath = None - for root, dirs, files in os.walk(projPath): - if ("taosd" in files): - rootRealPath = os.path.dirname(os.path.realpath(root)) - if ("packaging" not in rootRealPath): - buildPath = root[:len(root) - len("/build/bin")] - break - if buildPath == None: - raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}" - .format(selfPath, projPath)) - return buildPath - - def __init__(self, subdir='test'): - self._buildDir = self._getBuildPath() - self._subdir = '/' + subdir # TODO: tolerate "/" - - def __repr__(self): - return "[TdeInstance: {}, subdir={}]".format(self._buildDir, self._subdir) - - def generateCfgFile(self): - # buildPath = self.getBuildPath() - # taosdPath = self._buildPath + "/build/bin/taosd" - - cfgDir = self.getCfgDir() - cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed - if os.path.exists(cfgFile): - if os.path.isfile(cfgFile): - logger.warning("Config file exists already, skip creation: {}".format(cfgFile)) - return # cfg file already exists, nothing to do - else: - raise CrashGenError("Invalid config file: {}".format(cfgFile)) - # Now that the cfg file doesn't exist - if os.path.exists(cfgDir): - if not os.path.isdir(cfgDir): - raise CrashGenError("Invalid config dir: {}".format(cfgDir)) - # else: good path - else: - os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p" - # Now we have a good cfg dir - cfgValues = { - 'runDir': self.getRunDir(), - 'ip': '127.0.0.1', # TODO: change to a network addressable ip - 'port': 6030, - } - cfgTemplate = """ -dataDir {runDir}/data -logDir {runDir}/log - -charset UTF-8 - -firstEp {ip}:{port} -fqdn {ip} -serverPort {port} - -# was all 135 below -dDebugFlag 135 -cDebugFlag 135 -rpcDebugFlag 135 -qDebugFlag 135 -# httpDebugFlag 143 -# asyncLog 0 -# tables 10 -maxtablesPerVnode 10 -rpcMaxTime 101 -# cache 2 -keep 36500 -# walLevel 2 -walLevel 1 -# -# maxConnections 100 -""" - cfgContent = cfgTemplate.format_map(cfgValues) - f = open(cfgFile, "w") - f.write(cfgContent) - f.close() - - def rotateLogs(self): - logPath = self.getLogDir() - # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397 - if os.path.exists(logPath): - logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S') - logger.info("Saving old log files to: {}".format(logPathSaved)) - os.rename(logPath, logPathSaved) - # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms - - - def getExecFile(self): # .../taosd - return self._buildDir + "/build/bin/taosd" - - def getRunDir(self): # TODO: rename to "root dir" ?! - return self._buildDir + self._subdir - - def getCfgDir(self): # path, not file - return self.getRunDir() + "/cfg" - - def getLogDir(self): - return self.getRunDir() + "/log" - - def getHostAddr(self): - return "127.0.0.1" - - def getServiceCommand(self): # to start the instance - return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() - - - class DbConnNative(DbConn): # Class variables _lock = threading.Lock() @@ -1028,7 +892,7 @@ class DbConnNative(DbConn): with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!! if not cls._connInfoDisplayed: cls._connInfoDisplayed = True # updating CLASS variable - logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) + Logging.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) # Make the connection # self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable # self._cursor = self._conn.cursor() @@ -1052,16 +916,16 @@ class DbConnNative(DbConn): with cls._lock: cls.totalConnections -= 1 - logger.debug("[DB] Database connection closed") + Logging.debug("[DB] Database connection closed") self.isOpen = False def execute(self, sql): if (not self.isOpen): raise RuntimeError("Cannot execute database commands until connection is open") - logger.debug("[SQL] Executing SQL: {}".format(sql)) + Logging.debug("[SQL] Executing SQL: {}".format(sql)) self._lastSql = sql nRows = self._tdSql.execute(sql) - logger.debug( + Logging.debug( "[SQL] Execution Result, nRows = {}, SQL = {}".format( nRows, sql)) return nRows @@ -1070,10 +934,10 @@ class DbConnNative(DbConn): if (not self.isOpen): raise RuntimeError( "Cannot query database until connection is open") - logger.debug("[SQL] Executing SQL: {}".format(sql)) + Logging.debug("[SQL] Executing SQL: {}".format(sql)) self._lastSql = sql nRows = self._tdSql.query(sql) - logger.debug( + Logging.debug( "[SQL] Query Result, nRows = {}, SQL = {}".format( nRows, sql)) return nRows @@ -1337,7 +1201,7 @@ class StateMechine: def init(self, dbc: DbConn): # late initailization, don't save the dbConn self._curState = self._findCurrentState(dbc) # starting state - logger.debug("Found Starting State: {}".format(self._curState)) + Logging.debug("Found Starting State: {}".format(self._curState)) # TODO: seems no lnoger used, remove? def getCurrentState(self): @@ -1375,7 +1239,7 @@ class StateMechine: raise RuntimeError( "No suitable task types found for state: {}".format( self._curState)) - logger.debug( + Logging.debug( "[OPS] Tasks found for state {}: {}".format( self._curState, typesToStrings(taskTypes))) @@ -1385,27 +1249,27 @@ class StateMechine: ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state dbName =self._db.getName() if not dbc.existsDatabase(dbName): # dbc.hasDatabases(): # no database?! - logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time())) + Logging.debug( "[STT] empty database found, between {} and {}".format(ts, time.time())) return StateEmpty() # did not do this when openning connection, and this is NOT the worker # thread, which does this on their own dbc.use(dbName) if not dbc.hasTables(): # no tables - logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) + Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) return StateDbOnly() sTable = self._db.getFixedSuperTable() if sTable.hasRegTables(dbc, dbName): # no regular tables - logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) + Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) return StateSuperTableOnly() else: # has actual tables - logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) + Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) return StateHasData() # We transition the system to a new state by examining the current state itself def transition(self, tasks, dbc: DbConn): if (len(tasks) == 0): # before 1st step, or otherwise empty - logger.debug("[STT] Starting State: {}".format(self._curState)) + Logging.debug("[STT] Starting State: {}".format(self._curState)) return # do nothing # this should show up in the server log, separating steps @@ -1441,7 +1305,7 @@ class StateMechine: # Nothing for sure newState = self._findCurrentState(dbc) - logger.debug("[STT] New DB state determined: {}".format(newState)) + Logging.debug("[STT] New DB state determined: {}".format(newState)) # can old state move to new state through the tasks? self._curState.verifyTasksToState(tasks, newState) self._curState = newState @@ -1459,7 +1323,7 @@ class StateMechine: # read data task, default to 10: TODO: change to a constant weights.append(10) i = self._weighted_choice_sub(weights) - # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) + # Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) return taskTypes[i] # ref: @@ -1538,7 +1402,7 @@ class Database: t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above - logger.info("Setting up TICKS to start from: {}".format(t4)) + Logging.info("Setting up TICKS to start from: {}".format(t4)) return t4 @classmethod @@ -1689,10 +1553,10 @@ class TaskExecutor(): self._boundedList.add(n) # def logInfo(self, msg): - # logger.info(" T[{}.x]: ".format(self._curStep) + msg) + # Logging.info(" T[{}.x]: ".format(self._curStep) + msg) # def logDebug(self, msg): - # logger.debug(" T[{}.x]: ".format(self._curStep) + msg) + # Logging.debug(" T[{}.x]: ".format(self._curStep) + msg) class Task(): @@ -1705,7 +1569,7 @@ class Task(): @classmethod def allocTaskNum(cls): Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy - # logger.debug("Allocating taskSN: {}".format(Task.taskSn)) + # Logging.debug("Allocating taskSN: {}".format(Task.taskSn)) return Task.taskSn def __init__(self, execStats: ExecutionStats, db: Database): @@ -1717,7 +1581,7 @@ class Task(): # Assign an incremental task serial number self._taskNum = self.allocTaskNum() - # logger.debug("Creating new task {}...".format(self._taskNum)) + # Logging.debug("Creating new task {}...".format(self._taskNum)) self._execStats = execStats self._db = db # A task is always associated/for a specific DB @@ -1781,7 +1645,7 @@ class Task(): elif msg.find("duplicated column names") != -1: # also alter table tag issues return True elif gSvcMgr and (not gSvcMgr.isStable()): # We are managing service, and ... - logger.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg)) + Logging.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg)) return True return False # Not an acceptable error @@ -1922,13 +1786,13 @@ class ExecutionStats: self._failureReason = reason def printStats(self): - logger.info( + Logging.info( "----------------------------------------------------------------------") - logger.info( + Logging.info( "| Crash_Gen test {}, with the following stats:". format( "FAILED (reason: {})".format( self._failureReason) if self._failed else "SUCCEEDED")) - logger.info("| Task Execution Times (success/total):") + Logging.info("| Task Execution Times (success/total):") execTimesAny = 0.001 # avoid div by zero for k, n in self._execTimes.items(): execTimesAny += n[0] @@ -1939,28 +1803,28 @@ class ExecutionStats: errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()] # print("error strings = {}".format(errStrs)) errStr = ", ".join(errStrs) - logger.info("| {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr)) + Logging.info("| {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr)) - logger.info( + Logging.info( "| Total Tasks Executed (success or not): {} ".format(execTimesAny)) - logger.info( + Logging.info( "| Total Tasks In Progress at End: {}".format( self._tasksInProgress)) - logger.info( + Logging.info( "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format( self._accRunTime)) - logger.info( + Logging.info( "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny)) - logger.info( + Logging.info( "| Total Elapsed Time (from wall clock): {:.3f} seconds".format( self._elapsedTime)) - logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList())) - logger.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections)) - logger.info("| Longest native query time: {:.3f} seconds, started: {}". + Logging.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList())) + Logging.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections)) + Logging.info("| Longest native query time: {:.3f} seconds, started: {}". format(MyTDSql.longestQueryTime, time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) ) - logger.info("| Longest native query: {}".format(MyTDSql.longestQuery)) - logger.info( + Logging.info("| Longest native query: {}".format(MyTDSql.longestQuery)) + Logging.info( "----------------------------------------------------------------------") @@ -2030,7 +1894,7 @@ class TaskDropDb(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): self.execWtSql(wt, "drop database {}".format(self._db.getName())) - logger.debug("[OPS] database dropped at {}".format(time.time())) + Logging.debug("[OPS] database dropped at {}".format(time.time())) class TaskCreateSuperTable(StateTransitionTask): @classmethod @@ -2043,7 +1907,7 @@ class TaskCreateSuperTable(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): if not self._db.exists(wt.getDbConn()): - logger.debug("Skipping task, no DB yet") + Logging.debug("Skipping task, no DB yet") return sTable = self._db.getFixedSuperTable() # type: TdSuperTable @@ -2078,7 +1942,7 @@ class TdSuperTable: dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later except taos.error.ProgrammingError as err: errno2 = Helper.convertErrno(err.errno) - logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) + Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) raise qr = dbc.getQueryResult() @@ -2193,7 +2057,7 @@ class TaskReadData(StateTransitionTask): dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName())) except taos.error.ProgrammingError as err: errno2 = Helper.convertErrno(err.errno) - logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql())) + Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql())) raise class TaskDropSuperTable(StateTransitionTask): @@ -2224,7 +2088,7 @@ class TaskDropSuperTable(StateTransitionTask): errno2 = Helper.convertErrno(err.errno) if (errno2 in [0x362]): # mnode invalid table name isSuccess = False - logger.debug("[DB] Acceptable error when dropping a table") + Logging.debug("[DB] Acceptable error when dropping a table") continue # try to delete next regular table if (not tickOutput): @@ -2304,20 +2168,19 @@ class TaskAddData(StateTransitionTask): # Track which table is being actively worked on activeTable: Set[int] = set() - # We use these two files to record operations to DB, useful for power-off - # tests - fAddLogReady = None - fAddLogDone = None + # We use these two files to record operations to DB, useful for power-off tests + fAddLogReady = None # type: TextIOWrapper + fAddLogDone = None # type: TextIOWrapper @classmethod def prepToRecordOps(cls): if gConfig.record_ops: if (cls.fAddLogReady is None): - logger.info( + Logging.info( "Recording in a file operations to be performed...") cls.fAddLogReady = open("add_log_ready.txt", "w") if (cls.fAddLogDone is None): - logger.info("Recording in a file operations completed...") + Logging.info("Recording in a file operations completed...") cls.fAddLogDone = open("add_log_done.txt", "w") @classmethod @@ -2393,553 +2256,8 @@ class TaskAddData(StateTransitionTask): self.activeTable.discard(i) # not raising an error, unlike remove -# Deterministic random number generator -class Dice(): - seeded = False # static, uninitialized - @classmethod - def seed(cls, s): # static - if (cls.seeded): - raise RuntimeError( - "Cannot seed the random generator more than once") - cls.verifyRNG() - random.seed(s) - cls.seeded = True # TODO: protect against multi-threading - @classmethod - def verifyRNG(cls): # Verify that the RNG is determinstic - random.seed(0) - x1 = random.randrange(0, 1000) - x2 = random.randrange(0, 1000) - x3 = random.randrange(0, 1000) - if (x1 != 864 or x2 != 394 or x3 != 776): - raise RuntimeError("System RNG is not deterministic") - - @classmethod - def throw(cls, stop): # get 0 to stop-1 - return cls.throwRange(0, stop) - - @classmethod - def throwRange(cls, start, stop): # up to stop-1 - if (not cls.seeded): - raise RuntimeError("Cannot throw dice before seeding it") - return random.randrange(start, stop) - - @classmethod - def choice(cls, cList): - return random.choice(cList) - - -class LoggingFilter(logging.Filter): - def filter(self, record: logging.LogRecord): - if (record.levelno >= logging.INFO): - return True # info or above always log - - # Commenting out below to adjust... - - # if msg.startswith("[TRD]"): - # return False - return True - - -class MyLoggingAdapter(logging.LoggerAdapter): - def process(self, msg, kwargs): - return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs - # return '[%s] %s' % (self.extra['connid'], msg), kwargs - - -class ServiceManager: - PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process - - def __init__(self, numDnodes = 1): - logger.info("TDengine Service Manager (TSM) created") - self._numDnodes = numDnodes # >1 means we have a cluster - # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec - # signal.signal(signal.SIGINT, self.sigIntHandler) - # signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! - - self.inSigHandler = False - # self._status = MainExec.STATUS_RUNNING # set inside - # _startTaosService() - self.svcMgrThreads = [] # type: List[ServiceManagerThread] - for i in range(0, numDnodes): - self.svcMgrThreads.append(ServiceManagerThread(i)) - - self._lock = threading.Lock() - # self._isRestarting = False - - def _doMenu(self): - choice = "" - while True: - print("\nInterrupting Service Program, Choose an Action: ") - print("1: Resume") - print("2: Terminate") - print("3: Restart") - # Remember to update the if range below - # print("Enter Choice: ", end="", flush=True) - while choice == "": - choice = input("Enter Choice: ") - if choice != "": - break # done with reading repeated input - if choice in ["1", "2", "3"]: - break # we are done with whole method - print("Invalid choice, please try again.") - choice = "" # reset - return choice - - def sigUsrHandler(self, signalNumber, frame): - print("Interrupting main thread execution upon SIGUSR1") - if self.inSigHandler: # already - print("Ignoring repeated SIG...") - return # do nothing if it's already not running - self.inSigHandler = True - - choice = self._doMenu() - if choice == "1": - self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue? - elif choice == "2": - self.stopTaosServices() - elif choice == "3": # Restart - self.restart() - else: - raise RuntimeError("Invalid menu choice: {}".format(choice)) - - self.inSigHandler = False - - def sigIntHandler(self, signalNumber, frame): - print("ServiceManager: INT Signal Handler starting...") - if self.inSigHandler: - print("Ignoring repeated SIG_INT...") - return - self.inSigHandler = True - - self.stopTaosServices() - print("ServiceManager: INT Signal Handler returning...") - self.inSigHandler = False - - def sigHandlerResume(self): - print("Resuming TDengine service manager (main thread)...\n\n") - - # def _updateThreadStatus(self): - # if self.svcMgrThread: # valid svc mgr thread - # if self.svcMgrThread.isStopped(): # done? - # self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate? - # self.svcMgrThread = None # no more - - def isActive(self): - """ - Determine if the service/cluster is active at all, i.e. at least - one thread is not "stopped". - """ - for thread in self.svcMgrThreads: - if not thread.isStopped(): - return True - return False - - # def isRestarting(self): - # """ - # Determine if the service/cluster is being "restarted", i.e., at least - # one thread is in "restarting" status - # """ - # for thread in self.svcMgrThreads: - # if thread.isRestarting(): - # return True - # return False - - def isStable(self): - """ - Determine if the service/cluster is "stable", i.e. all of the - threads are in "stable" status. - """ - for thread in self.svcMgrThreads: - if not thread.isStable(): - return False - return True - - def _procIpcAll(self): - while self.isActive(): - for thread in self.svcMgrThreads: # all thread objects should always be valid - # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here - if thread.isRunning(): - thread.procIpcBatch() # regular processing, - if thread.isStopped(): - thread.procIpcBatch() # one last time? - # self._updateThreadStatus() - elif thread.isRetarting(): - print("Service restarting...") - # else this thread is stopped - - time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round - # raise CrashGenError("dummy") - print("Service Manager Thread (with subprocess) ended, main thread exiting...") - - def startTaosServices(self): - with self._lock: - if self.isActive(): - raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running") - - # Find if there's already a taosd service, and then kill it - for proc in psutil.process_iter(): - if proc.name() == 'taosd': - print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe") - time.sleep(2.0) - proc.kill() - # print("Process: {}".format(proc.name())) - - # self.svcMgrThread = ServiceManagerThread() # create the object - for thread in self.svcMgrThreads: - thread.start() - thread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines - - def stopTaosServices(self): - with self._lock: - if not self.isActive(): - logger.warning("Cannot stop TAOS service(s), already not active") - return - - for thread in self.svcMgrThreads: - thread.stop() - - def run(self): - self.startTaosServices() - self._procIpcAll() # pump/process all the messages, may encounter SIG + restart - if self.isActive(): # if sig handler hasn't destroyed it by now - self.stopTaosServices() # should have started already - - def restart(self): - if not self.isStable(): - logger.warning("Cannot restart service/cluster, when not stable") - return - - # self._isRestarting = True - if self.isActive(): - self.stopTaosServices() - else: - logger.warning("Service not active when restart requested") - - self.startTaosService() - # self._isRestarting = False - - # def isRunning(self): - # return self.svcMgrThread != None - - # def isRestarting(self): - # return self._isRestarting - -class ServiceManagerThread: - """ - A class representing a dedicated thread which manages the "sub process" - of the TDengine service, interacting with its STDOUT/ERR. - - It takes a TdeInstance parameter at creation time, or create a default - """ - MAX_QUEUE_SIZE = 10000 - - def __init__(self, tInstNum = 0, tInst : TdeInstance = None): - # Set the sub process - self._tdeSubProcess = None # type: TdeSubProcess - - # Arrange the TDengine instance - self._tInstNum = tInstNum # instance serial number in cluster, ZERO based - self._tInst = tInst or TdeInstance() # Need an instance - - self._thread = None # The actual thread, # type: threading.Thread - self._status = MainExec.STATUS_STOPPED # The status of the underlying service, actually. - - def __repr__(self): - return "[SvcMgrThread: tInstNum={}]".format(self._tInstNum) - - def getStatus(self): - return self._status - - def isStarting(self): - return self._status == MainExec.STATUS_STARTING - - def isRunning(self): - # return self._thread and self._thread.is_alive() - return self._status == MainExec.STATUS_RUNNING - - def isStopping(self): - return self._status == MainExec.STATUS_STOPPING - - def isStopped(self): - return self._status == MainExec.STATUS_STOPPED - - def isStable(self): - return self.isRunning() or self.isStopped() - - # Start the thread (with sub process), and wait for the sub service - # to become fully operational - def start(self): - if self._thread: - raise RuntimeError("Unexpected _thread") - if self._tdeSubProcess: - raise RuntimeError("TDengine sub process already created/running") - - logger.info("Attempting to start TAOS service: {}".format(self)) - - self._status = MainExec.STATUS_STARTING - self._tdeSubProcess = TdeSubProcess(self._tInst) - self._tdeSubProcess.start() - - self._ipcQueue = Queue() - self._thread = threading.Thread( # First thread captures server OUTPUT - target=self.svcOutputReader, - args=(self._tdeSubProcess.getStdOut(), self._ipcQueue)) - self._thread.daemon = True # thread dies with the program - self._thread.start() - - self._thread2 = threading.Thread( # 2nd thread captures server ERRORs - target=self.svcErrorReader, - args=(self._tdeSubProcess.getStdErr(), self._ipcQueue)) - self._thread2.daemon = True # thread dies with the program - self._thread2.start() - - # wait for service to start - for i in range(0, 100): - time.sleep(1.0) - # self.procIpcBatch() # don't pump message during start up - print("_zz_", end="", flush=True) - if self._status == MainExec.STATUS_RUNNING: - logger.info("[] TDengine service READY to process requests") - logger.info("[] TAOS service started: {}".format(self)) - return # now we've started - # TODO: handle failure-to-start better? - self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output - raise RuntimeError("TDengine service did not start successfully: {}".format(self)) - - def stop(self): - # can be called from both main thread or signal handler - print("Terminating TDengine service running as the sub process...") - if self.isStopped(): - print("Service already stopped") - return - if self.isStopping(): - print("Service is already being stopped") - return - # Linux will send Control-C generated SIGINT to the TDengine process - # already, ref: - # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes - if not self._tdeSubProcess: - raise RuntimeError("sub process object missing") - - self._status = MainExec.STATUS_STOPPING - retCode = self._tdeSubProcess.stop() - print("Attempted to stop sub process, got return code: {}".format(retCode)) - if (retCode==-11): # SGV - logger.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") - - if self._tdeSubProcess.isRunning(): # still running - print("FAILED to stop sub process, it is still running... pid = {}".format( - self._tdeSubProcess.getPid())) - else: - self._tdeSubProcess = None # not running any more - self.join() # stop the thread, change the status, etc. - - # Check if it's really stopped - outputLines = 20 # for last output - if self.isStopped(): - self.procIpcBatch(outputLines) # one last time - print("End of TDengine Service Output: {}".format(self)) - print("----- TDengine Service (managed by SMT) is now terminated -----\n") - else: - print("WARNING: SMT did not terminate as expected: {}".format(self)) - - def join(self): - # TODO: sanity check - if not self.isStopping(): - raise RuntimeError( - "Unexpected status when ending svc mgr thread: {}".format( - self._status)) - - if self._thread: - self._thread.join() - self._thread = None - self._status = MainExec.STATUS_STOPPED - # STD ERR thread - self._thread2.join() - self._thread2 = None - else: - print("Joining empty thread, doing nothing") - - def _trimQueue(self, targetSize): - if targetSize <= 0: - return # do nothing - q = self._ipcQueue - if (q.qsize() <= targetSize): # no need to trim - return - - logger.debug("Triming IPC queue to target size: {}".format(targetSize)) - itemsToTrim = q.qsize() - targetSize - for i in range(0, itemsToTrim): - try: - q.get_nowait() - except Empty: - break # break out of for loop, no more trimming - - TD_READY_MSG = "TDengine is initialized successfully" - - def procIpcBatch(self, trimToTarget=0, forceOutput=False): - self._trimQueue(trimToTarget) # trim if necessary - # Process all the output generated by the underlying sub process, - # managed by IO thread - print("<", end="", flush=True) - while True: - try: - line = self._ipcQueue.get_nowait() # getting output at fast speed - self._printProgress("_o") - except Empty: - # time.sleep(2.3) # wait only if there's no output - # no more output - print(".>", end="", flush=True) - return # we are done with THIS BATCH - else: # got line, printing out - if forceOutput: - logger.info(line) - else: - logger.debug(line) - print(">", end="", flush=True) - - _ProgressBars = ["--", "//", "||", "\\\\"] - - def _printProgress(self, msg): # TODO: assuming 2 chars - print(msg, end="", flush=True) - pBar = self._ProgressBars[Dice.throw(4)] - print(pBar, end="", flush=True) - print('\b\b\b\b', end="", flush=True) - - def svcOutputReader(self, out: IO, queue): - # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python - # print("This is the svcOutput Reader...") - # for line in out : - for line in iter(out.readline, b''): - # print("Finished reading a line: {}".format(line)) - # print("Adding item to queue...") - try: - line = line.decode("utf-8").rstrip() - except UnicodeError: - print("\nNon-UTF8 server output: {}\n".format(line)) - - # This might block, and then causing "out" buffer to block - queue.put(line) - self._printProgress("_i") - - if self._status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started - if line.find(self.TD_READY_MSG) != -1: # found - logger.info("Waiting for the service to become FULLY READY") - time.sleep(1.0) # wait for the server to truly start. TODO: remove this - logger.info("Service instance #{} is now FULLY READY".format(self._tInstNum)) - self._status = MainExec.STATUS_RUNNING - - # Trim the queue if necessary: TODO: try this 1 out of 10 times - self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size - - if self.isStopping(): # TODO: use thread status instead - # WAITING for stopping sub process to finish its outptu - print("_w", end="", flush=True) - - # queue.put(line) - # meaning sub process must have died - print("\nNo more output from IO thread managing TDengine service") - out.close() - - def svcErrorReader(self, err: IO, queue): - for line in iter(err.readline, b''): - print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line)) - - -class TdeSubProcess: - """ - A class to to represent the actual sub process that is the run-time - of a TDengine instance. - - It takes a TdeInstance object as its parameter, with the rationale being - "a sub process runs an instance". - """ - - def __init__(self, tInst : TdeInstance): - self.subProcess = None - if tInst is None: - raise CrashGenError("Empty instance not allowed in TdeSubProcess") - self._tInst = tInst # Default create at ServiceManagerThread - - def getStdOut(self): - return self.subProcess.stdout - - def getStdErr(self): - return self.subProcess.stderr - - def isRunning(self): - return self.subProcess is not None - - def getPid(self): - return self.subProcess.pid - - # Repalced by TdeInstance class - # def getBuildPath(self): - # selfPath = os.path.dirname(os.path.realpath(__file__)) - # if ("community" in selfPath): - # projPath = selfPath[:selfPath.find("communit")] - # else: - # projPath = selfPath[:selfPath.find("tests")] - - # for root, dirs, files in os.walk(projPath): - # if ("taosd" in files): - # rootRealPath = os.path.dirname(os.path.realpath(root)) - # if ("packaging" not in rootRealPath): - # buildPath = root[:len(root) - len("/build/bin")] - # break - # return buildPath - - def start(self): - ON_POSIX = 'posix' in sys.builtin_module_names - - # Sanity check - if self.subProcess: # already there - raise RuntimeError("Corrupt process state") - - # global gContainer - # tInst = gContainer.defTdeInstance = TdeInstance('test3') # creae the instance - self._tInst.generateCfgFile() # service side generates config file, client does not - - self._tInst.rotateLogs() - - print("Starting TDengine instance: {}".format(self._tInst)) - self.subProcess = subprocess.Popen( - self._tInst.getServiceCommand(), - shell=False, - # svcCmdSingle, shell=True, # capture core dump? - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - # bufsize=1, # not supported in binary mode - close_fds=ON_POSIX - ) # had text=True, which interferred with reading EOF - - def stop(self): - if not self.subProcess: - print("Sub process already stopped") - return -1 - - retCode = self.subProcess.poll() # contains real sub process return code - if retCode: # valid return code, process ended - self.subProcess = None - else: # process still alive, let's interrupt it - print( - "Sub process is running, sending SIG_INT and waiting for it to terminate...") - # sub process should end, then IPC queue should end, causing IO - # thread to end - self.subProcess.send_signal(signal.SIGINT) - try: - self.subProcess.wait(10) - retCode = self.subProcess.returncode - except subprocess.TimeoutExpired as err: - print("Time out waiting for TDengine service process to exit") - retCode = -3 - else: - print("TDengine service process terminated successfully from SIG_INT") - retCode = -4 - self.subProcess = None - return retCode class ThreadStacks: # stack info for all threads def __init__(self): @@ -2976,17 +2294,17 @@ class ClientManager: # signal.signal(signal.SIGTERM, self.sigIntHandler) # signal.signal(signal.SIGINT, self.sigIntHandler) - self._status = MainExec.STATUS_RUNNING + self._status = Status.STATUS_RUNNING self.tc = None self.inSigHandler = False def sigIntHandler(self, signalNumber, frame): - if self._status != MainExec.STATUS_RUNNING: + if self._status != Status.STATUS_RUNNING: print("Repeated SIGINT received, forced exit...") # return # do nothing if it's already not running sys.exit(-1) - self._status = MainExec.STATUS_STOPPING # immediately set our status + self._status = Status.STATUS_STOPPING # immediately set our status print("ClientManager: Terminating program...") self.tc.requestToStop() @@ -3110,11 +2428,6 @@ class ClientManager: self.tc.printStats() class MainExec: - STATUS_STARTING = 1 - STATUS_RUNNING = 2 - STATUS_STOPPING = 3 - STATUS_STOPPED = 4 - def __init__(self): self._clientMgr = None self._svcMgr = None @@ -3147,7 +2460,7 @@ class MainExec: try: ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside except requests.exceptions.ConnectionError as err: - logger.warning("Failed to open REST connection to DB: {}".format(err.getMessage())) + Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage())) # don't raise return ret @@ -3255,20 +2568,7 @@ class MainExec: global gConfig gConfig = parser.parse_args() - # Logging Stuff - global logger - _logger = logging.getLogger('CrashGen') # real logger - _logger.addFilter(LoggingFilter()) - ch = logging.StreamHandler() - _logger.addHandler(ch) - - # Logging adapter, to be used as a logger - logger = MyLoggingAdapter(_logger, []) - - if (gConfig.debug): - logger.setLevel(logging.DEBUG) # default seems to be INFO - else: - logger.setLevel(logging.INFO) + Logging.clsInit(gConfig) Dice.seed(0) # initial seeding of dice diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/misc.py new file mode 100644 index 0000000000..08e50e5070 --- /dev/null +++ b/tests/pytest/crash_gen/misc.py @@ -0,0 +1,133 @@ +import threading +import random +import logging + + +class CrashGenError(Exception): + def __init__(self, msg=None, errno=None): + self.msg = msg + self.errno = errno + + def __str__(self): + return self.msg + + +class LoggingFilter(logging.Filter): + def filter(self, record: logging.LogRecord): + if (record.levelno >= logging.INFO): + return True # info or above always log + + # Commenting out below to adjust... + + # if msg.startswith("[TRD]"): + # return False + return True + + +class MyLoggingAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs + # return '[%s] %s' % (self.extra['connid'], msg), kwargs + + +class Logging: + logger = None + + @classmethod + def getLogger(cls): + return logger + + @classmethod + def clsInit(cls, gConfig): # TODO: refactor away gConfig + if cls.logger: + return + + # Logging Stuff + # global misc.logger + _logger = logging.getLogger('CrashGen') # real logger + _logger.addFilter(LoggingFilter()) + ch = logging.StreamHandler() + _logger.addHandler(ch) + + # Logging adapter, to be used as a logger + print("setting logger variable") + # global logger + cls.logger = MyLoggingAdapter(_logger, []) + + if (gConfig.debug): + cls.logger.setLevel(logging.DEBUG) # default seems to be INFO + else: + cls.logger.setLevel(logging.INFO) + + @classmethod + def info(cls, msg): + cls.logger.info(msg) + + @classmethod + def debug(cls, msg): + cls.logger.debug(msg) + + @classmethod + def warning(cls, msg): + cls.logger.warning(msg) + +class Status: + STATUS_STARTING = 1 + STATUS_RUNNING = 2 + STATUS_STOPPING = 3 + STATUS_STOPPED = 4 + +# Deterministic random number generator +class Dice(): + seeded = False # static, uninitialized + + @classmethod + def seed(cls, s): # static + if (cls.seeded): + raise RuntimeError( + "Cannot seed the random generator more than once") + cls.verifyRNG() + random.seed(s) + cls.seeded = True # TODO: protect against multi-threading + + @classmethod + def verifyRNG(cls): # Verify that the RNG is determinstic + random.seed(0) + x1 = random.randrange(0, 1000) + x2 = random.randrange(0, 1000) + x3 = random.randrange(0, 1000) + if (x1 != 864 or x2 != 394 or x3 != 776): + raise RuntimeError("System RNG is not deterministic") + + @classmethod + def throw(cls, stop): # get 0 to stop-1 + return cls.throwRange(0, stop) + + @classmethod + def throwRange(cls, start, stop): # up to stop-1 + if (not cls.seeded): + raise RuntimeError("Cannot throw dice before seeding it") + return random.randrange(start, stop) + + @classmethod + def choice(cls, cList): + return random.choice(cList) + +class Helper: + @classmethod + def convertErrno(cls, errno): + return errno if (errno > 0) else 0x80000000 + errno + +class Progress: + STEP_BOUNDARY = 0 + BEGIN_THREAD_STEP = 1 + END_THREAD_STEP = 2 + tokens = { + STEP_BOUNDARY: '.', + BEGIN_THREAD_STEP: '[', + END_THREAD_STEP: '] ' + } + + @classmethod + def emit(cls, token): + print(cls.tokens[token], end="", flush=True) diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py new file mode 100644 index 0000000000..cdb12303a2 --- /dev/null +++ b/tests/pytest/crash_gen/service_manager.py @@ -0,0 +1,633 @@ +import os +import io +import sys +import threading +import signal +import logging +import time +import subprocess + +from typing import IO + +try: + import psutil +except: + print("Psutil module needed, please install: sudo pip3 install psutil") + sys.exit(-1) + +from queue import Queue, Empty +from .misc import Logging, Status, CrashGenError, Dice + +class TdeInstance(): + """ + A class to capture the *static* information of a TDengine instance, + including the location of the various files/directories, and basica + configuration. + """ + + @classmethod + def _getBuildPath(cls): + selfPath = os.path.dirname(os.path.realpath(__file__)) + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("communit")] + else: + projPath = selfPath[:selfPath.find("tests")] + + buildPath = None + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + if buildPath == None: + raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}" + .format(selfPath, projPath)) + return buildPath + + def __init__(self, subdir='test'): + self._buildDir = self._getBuildPath() + self._subdir = '/' + subdir # TODO: tolerate "/" + + def __repr__(self): + return "[TdeInstance: {}, subdir={}]".format(self._buildDir, self._subdir) + + def generateCfgFile(self): + # print("Logger = {}".format(logger)) + # buildPath = self.getBuildPath() + # taosdPath = self._buildPath + "/build/bin/taosd" + + cfgDir = self.getCfgDir() + cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed + if os.path.exists(cfgFile): + if os.path.isfile(cfgFile): + Logging.warning("Config file exists already, skip creation: {}".format(cfgFile)) + return # cfg file already exists, nothing to do + else: + raise CrashGenError("Invalid config file: {}".format(cfgFile)) + # Now that the cfg file doesn't exist + if os.path.exists(cfgDir): + if not os.path.isdir(cfgDir): + raise CrashGenError("Invalid config dir: {}".format(cfgDir)) + # else: good path + else: + os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p" + # Now we have a good cfg dir + cfgValues = { + 'runDir': self.getRunDir(), + 'ip': '127.0.0.1', # TODO: change to a network addressable ip + 'port': 6030, + } + cfgTemplate = """ +dataDir {runDir}/data +logDir {runDir}/log + +charset UTF-8 + +firstEp {ip}:{port} +fqdn {ip} +serverPort {port} + +# was all 135 below +dDebugFlag 135 +cDebugFlag 135 +rpcDebugFlag 135 +qDebugFlag 135 +# httpDebugFlag 143 +# asyncLog 0 +# tables 10 +maxtablesPerVnode 10 +rpcMaxTime 101 +# cache 2 +keep 36500 +# walLevel 2 +walLevel 1 +# +# maxConnections 100 +""" + cfgContent = cfgTemplate.format_map(cfgValues) + f = open(cfgFile, "w") + f.write(cfgContent) + f.close() + + def rotateLogs(self): + logPath = self.getLogDir() + # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397 + if os.path.exists(logPath): + logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S') + Logging.info("Saving old log files to: {}".format(logPathSaved)) + os.rename(logPath, logPathSaved) + # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms + + + def getExecFile(self): # .../taosd + return self._buildDir + "/build/bin/taosd" + + def getRunDir(self): # TODO: rename to "root dir" ?! + return self._buildDir + self._subdir + + def getCfgDir(self): # path, not file + return self.getRunDir() + "/cfg" + + def getLogDir(self): + return self.getRunDir() + "/log" + + def getHostAddr(self): + return "127.0.0.1" + + def getServiceCommand(self): # to start the instance + return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() + + +class TdeSubProcess: + """ + A class to to represent the actual sub process that is the run-time + of a TDengine instance. + + It takes a TdeInstance object as its parameter, with the rationale being + "a sub process runs an instance". + """ + + def __init__(self, tInst : TdeInstance): + self.subProcess = None + if tInst is None: + raise CrashGenError("Empty instance not allowed in TdeSubProcess") + self._tInst = tInst # Default create at ServiceManagerThread + + def getStdOut(self): + return self.subProcess.stdout + + def getStdErr(self): + return self.subProcess.stderr + + def isRunning(self): + return self.subProcess is not None + + def getPid(self): + return self.subProcess.pid + + # Repalced by TdeInstance class + # def getBuildPath(self): + # selfPath = os.path.dirname(os.path.realpath(__file__)) + # if ("community" in selfPath): + # projPath = selfPath[:selfPath.find("communit")] + # else: + # projPath = selfPath[:selfPath.find("tests")] + + # for root, dirs, files in os.walk(projPath): + # if ("taosd" in files): + # rootRealPath = os.path.dirname(os.path.realpath(root)) + # if ("packaging" not in rootRealPath): + # buildPath = root[:len(root) - len("/build/bin")] + # break + # return buildPath + + def start(self): + ON_POSIX = 'posix' in sys.builtin_module_names + + # Sanity check + if self.subProcess: # already there + raise RuntimeError("Corrupt process state") + + # global gContainer + # tInst = gContainer.defTdeInstance = TdeInstance('test3') # creae the instance + self._tInst.generateCfgFile() # service side generates config file, client does not + + self._tInst.rotateLogs() + + print("Starting TDengine instance: {}".format(self._tInst)) + self.subProcess = subprocess.Popen( + self._tInst.getServiceCommand(), + shell=False, + # svcCmdSingle, shell=True, # capture core dump? + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + # bufsize=1, # not supported in binary mode + close_fds=ON_POSIX + ) # had text=True, which interferred with reading EOF + + def stop(self): + if not self.subProcess: + print("Sub process already stopped") + return -1 + + retCode = self.subProcess.poll() # contains real sub process return code + if retCode: # valid return code, process ended + self.subProcess = None + else: # process still alive, let's interrupt it + print( + "Sub process is running, sending SIG_INT and waiting for it to terminate...") + # sub process should end, then IPC queue should end, causing IO + # thread to end + self.subProcess.send_signal(signal.SIGINT) + try: + self.subProcess.wait(10) + retCode = self.subProcess.returncode + except subprocess.TimeoutExpired as err: + print("Time out waiting for TDengine service process to exit") + retCode = -3 + else: + print("TDengine service process terminated successfully from SIG_INT") + retCode = -4 + self.subProcess = None + return retCode + + +class ServiceManager: + PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process + + def __init__(self, numDnodes = 1): + Logging.info("TDengine Service Manager (TSM) created") + self._numDnodes = numDnodes # >1 means we have a cluster + # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec + # signal.signal(signal.SIGINT, self.sigIntHandler) + # signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler! + + self.inSigHandler = False + # self._status = MainExec.STATUS_RUNNING # set inside + # _startTaosService() + self.svcMgrThreads = [] # type: List[ServiceManagerThread] + for i in range(0, numDnodes): + self.svcMgrThreads.append(ServiceManagerThread(i)) + + self._lock = threading.Lock() + # self._isRestarting = False + + def _doMenu(self): + choice = "" + while True: + print("\nInterrupting Service Program, Choose an Action: ") + print("1: Resume") + print("2: Terminate") + print("3: Restart") + # Remember to update the if range below + # print("Enter Choice: ", end="", flush=True) + while choice == "": + choice = input("Enter Choice: ") + if choice != "": + break # done with reading repeated input + if choice in ["1", "2", "3"]: + break # we are done with whole method + print("Invalid choice, please try again.") + choice = "" # reset + return choice + + def sigUsrHandler(self, signalNumber, frame): + print("Interrupting main thread execution upon SIGUSR1") + if self.inSigHandler: # already + print("Ignoring repeated SIG...") + return # do nothing if it's already not running + self.inSigHandler = True + + choice = self._doMenu() + if choice == "1": + self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue? + elif choice == "2": + self.stopTaosServices() + elif choice == "3": # Restart + self.restart() + else: + raise RuntimeError("Invalid menu choice: {}".format(choice)) + + self.inSigHandler = False + + def sigIntHandler(self, signalNumber, frame): + print("ServiceManager: INT Signal Handler starting...") + if self.inSigHandler: + print("Ignoring repeated SIG_INT...") + return + self.inSigHandler = True + + self.stopTaosServices() + print("ServiceManager: INT Signal Handler returning...") + self.inSigHandler = False + + def sigHandlerResume(self): + print("Resuming TDengine service manager (main thread)...\n\n") + + # def _updateThreadStatus(self): + # if self.svcMgrThread: # valid svc mgr thread + # if self.svcMgrThread.isStopped(): # done? + # self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate? + # self.svcMgrThread = None # no more + + def isActive(self): + """ + Determine if the service/cluster is active at all, i.e. at least + one thread is not "stopped". + """ + for thread in self.svcMgrThreads: + if not thread.isStopped(): + return True + return False + + # def isRestarting(self): + # """ + # Determine if the service/cluster is being "restarted", i.e., at least + # one thread is in "restarting" status + # """ + # for thread in self.svcMgrThreads: + # if thread.isRestarting(): + # return True + # return False + + def isStable(self): + """ + Determine if the service/cluster is "stable", i.e. all of the + threads are in "stable" status. + """ + for thread in self.svcMgrThreads: + if not thread.isStable(): + return False + return True + + def _procIpcAll(self): + while self.isActive(): + for thread in self.svcMgrThreads: # all thread objects should always be valid + # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here + if thread.isRunning(): + thread.procIpcBatch() # regular processing, + if thread.isStopped(): + thread.procIpcBatch() # one last time? + # self._updateThreadStatus() + elif thread.isRetarting(): + print("Service restarting...") + # else this thread is stopped + + time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round + # raise CrashGenError("dummy") + print("Service Manager Thread (with subprocess) ended, main thread exiting...") + + def startTaosServices(self): + with self._lock: + if self.isActive(): + raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running") + + # Find if there's already a taosd service, and then kill it + for proc in psutil.process_iter(): + if proc.name() == 'taosd': + print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe") + time.sleep(2.0) + proc.kill() + # print("Process: {}".format(proc.name())) + + # self.svcMgrThread = ServiceManagerThread() # create the object + for thread in self.svcMgrThreads: + thread.start() + thread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines + + def stopTaosServices(self): + with self._lock: + if not self.isActive(): + Logging.warning("Cannot stop TAOS service(s), already not active") + return + + for thread in self.svcMgrThreads: + thread.stop() + + def run(self): + self.startTaosServices() + self._procIpcAll() # pump/process all the messages, may encounter SIG + restart + if self.isActive(): # if sig handler hasn't destroyed it by now + self.stopTaosServices() # should have started already + + def restart(self): + if not self.isStable(): + Logging.warning("Cannot restart service/cluster, when not stable") + return + + # self._isRestarting = True + if self.isActive(): + self.stopTaosServices() + else: + Logging.warning("Service not active when restart requested") + + self.startTaosService() + # self._isRestarting = False + + # def isRunning(self): + # return self.svcMgrThread != None + + # def isRestarting(self): + # return self._isRestarting + +class ServiceManagerThread: + """ + A class representing a dedicated thread which manages the "sub process" + of the TDengine service, interacting with its STDOUT/ERR. + + It takes a TdeInstance parameter at creation time, or create a default + """ + MAX_QUEUE_SIZE = 10000 + + def __init__(self, tInstNum = 0, tInst : TdeInstance = None): + # Set the sub process + self._tdeSubProcess = None # type: TdeSubProcess + + # Arrange the TDengine instance + self._tInstNum = tInstNum # instance serial number in cluster, ZERO based + self._tInst = tInst or TdeInstance() # Need an instance + + self._thread = None # The actual thread, # type: threading.Thread + self._status = Status.STATUS_STOPPED # The status of the underlying service, actually. + + def __repr__(self): + return "[SvcMgrThread: tInstNum={}]".format(self._tInstNum) + + def getStatus(self): + return self._status + + def isStarting(self): + return self._status == Status.STATUS_STARTING + + def isRunning(self): + # return self._thread and self._thread.is_alive() + return self._status == Status.STATUS_RUNNING + + def isStopping(self): + return self._status == Status.STATUS_STOPPING + + def isStopped(self): + return self._status == Status.STATUS_STOPPED + + def isStable(self): + return self.isRunning() or self.isStopped() + + # Start the thread (with sub process), and wait for the sub service + # to become fully operational + def start(self): + if self._thread: + raise RuntimeError("Unexpected _thread") + if self._tdeSubProcess: + raise RuntimeError("TDengine sub process already created/running") + + Logging.info("Attempting to start TAOS service: {}".format(self)) + + self._status = Status.STATUS_STARTING + self._tdeSubProcess = TdeSubProcess(self._tInst) + self._tdeSubProcess.start() + + self._ipcQueue = Queue() + self._thread = threading.Thread( # First thread captures server OUTPUT + target=self.svcOutputReader, + args=(self._tdeSubProcess.getStdOut(), self._ipcQueue)) + self._thread.daemon = True # thread dies with the program + self._thread.start() + + self._thread2 = threading.Thread( # 2nd thread captures server ERRORs + target=self.svcErrorReader, + args=(self._tdeSubProcess.getStdErr(), self._ipcQueue)) + self._thread2.daemon = True # thread dies with the program + self._thread2.start() + + # wait for service to start + for i in range(0, 100): + time.sleep(1.0) + # self.procIpcBatch() # don't pump message during start up + print("_zz_", end="", flush=True) + if self._status == Status.STATUS_RUNNING: + Logging.info("[] TDengine service READY to process requests") + Logging.info("[] TAOS service started: {}".format(self)) + return # now we've started + # TODO: handle failure-to-start better? + self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output + raise RuntimeError("TDengine service did not start successfully: {}".format(self)) + + def stop(self): + # can be called from both main thread or signal handler + print("Terminating TDengine service running as the sub process...") + if self.isStopped(): + print("Service already stopped") + return + if self.isStopping(): + print("Service is already being stopped") + return + # Linux will send Control-C generated SIGINT to the TDengine process + # already, ref: + # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes + if not self._tdeSubProcess: + raise RuntimeError("sub process object missing") + + self._status = Status.STATUS_STOPPING + retCode = self._tdeSubProcess.stop() + print("Attempted to stop sub process, got return code: {}".format(retCode)) + if (retCode==-11): # SGV + Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") + + if self._tdeSubProcess.isRunning(): # still running + print("FAILED to stop sub process, it is still running... pid = {}".format( + self._tdeSubProcess.getPid())) + else: + self._tdeSubProcess = None # not running any more + self.join() # stop the thread, change the status, etc. + + # Check if it's really stopped + outputLines = 20 # for last output + if self.isStopped(): + self.procIpcBatch(outputLines) # one last time + print("End of TDengine Service Output: {}".format(self)) + print("----- TDengine Service (managed by SMT) is now terminated -----\n") + else: + print("WARNING: SMT did not terminate as expected: {}".format(self)) + + def join(self): + # TODO: sanity check + if not self.isStopping(): + raise RuntimeError( + "Unexpected status when ending svc mgr thread: {}".format( + self._status)) + + if self._thread: + self._thread.join() + self._thread = None + self._status = Status.STATUS_STOPPED + # STD ERR thread + self._thread2.join() + self._thread2 = None + else: + print("Joining empty thread, doing nothing") + + def _trimQueue(self, targetSize): + if targetSize <= 0: + return # do nothing + q = self._ipcQueue + if (q.qsize() <= targetSize): # no need to trim + return + + Logging.debug("Triming IPC queue to target size: {}".format(targetSize)) + itemsToTrim = q.qsize() - targetSize + for i in range(0, itemsToTrim): + try: + q.get_nowait() + except Empty: + break # break out of for loop, no more trimming + + TD_READY_MSG = "TDengine is initialized successfully" + + def procIpcBatch(self, trimToTarget=0, forceOutput=False): + self._trimQueue(trimToTarget) # trim if necessary + # Process all the output generated by the underlying sub process, + # managed by IO thread + print("<", end="", flush=True) + while True: + try: + line = self._ipcQueue.get_nowait() # getting output at fast speed + self._printProgress("_o") + except Empty: + # time.sleep(2.3) # wait only if there's no output + # no more output + print(".>", end="", flush=True) + return # we are done with THIS BATCH + else: # got line, printing out + if forceOutput: + Logging.info(line) + else: + Logging.debug(line) + print(">", end="", flush=True) + + _ProgressBars = ["--", "//", "||", "\\\\"] + + def _printProgress(self, msg): # TODO: assuming 2 chars + print(msg, end="", flush=True) + pBar = self._ProgressBars[Dice.throw(4)] + print(pBar, end="", flush=True) + print('\b\b\b\b', end="", flush=True) + + def svcOutputReader(self, out: IO, queue): + # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python + # print("This is the svcOutput Reader...") + # for line in out : + for line in iter(out.readline, b''): + # print("Finished reading a line: {}".format(line)) + # print("Adding item to queue...") + try: + line = line.decode("utf-8").rstrip() + except UnicodeError: + print("\nNon-UTF8 server output: {}\n".format(line)) + + # This might block, and then causing "out" buffer to block + queue.put(line) + self._printProgress("_i") + + if self._status == Status.STATUS_STARTING: # we are starting, let's see if we have started + if line.find(self.TD_READY_MSG) != -1: # found + Logging.info("Waiting for the service to become FULLY READY") + time.sleep(1.0) # wait for the server to truly start. TODO: remove this + Logging.info("Service instance #{} is now FULLY READY".format(self._tInstNum)) + self._status = Status.STATUS_RUNNING + + # Trim the queue if necessary: TODO: try this 1 out of 10 times + self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size + + if self.isStopping(): # TODO: use thread status instead + # WAITING for stopping sub process to finish its outptu + print("_w", end="", flush=True) + + # queue.put(line) + # meaning sub process must have died + print("\nNo more output from IO thread managing TDengine service") + out.close() + + def svcErrorReader(self, err: IO, queue): + for line in iter(err.readline, b''): + print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))