From 43bbbfd8669de91298f11a616312d8eb40937719 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Thu, 27 Aug 2020 03:58:53 +0000 Subject: [PATCH 1/4] Crash_gen tool minor tweaks, hoping to run 10x5000 with -a option --- tests/pytest/crash_gen.py | 55 ++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 7d3eb959c0..f67aeaf00e 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -161,6 +161,18 @@ class WorkerThread: logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break + # Before we fetch the task and run it, let's ensure we properly "use" the database + try: + self.useDb() # might encounter exceptions. TODO: catch + except taos.error.ProgrammingError as err: + errno = Helper.convertErrno(err.errno) + if errno == 0x383 : # invalid database + # ignore + dummy = 0 + else: + print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg)) + raise + # Fetch a task from the Thread Coordinator logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() @@ -324,10 +336,12 @@ class ThreadCoordinator: logger.debug("[STT] transition ended") # Due to limitation (or maybe not) of the Python library, # we cannot share connections across threads - if sm.hasDatabase(): - for t in self._pool.threadList: - logger.debug("[DB] use db for all worker threads") - t.useDb() + # Here we are in main thread, we cannot operate the connections created in workers + # Moving below to task loop + # if sm.hasDatabase(): + # for t in self._pool.threadList: + # logger.debug("[DB] use db for all worker threads") + # t.useDb() # t.execSql("use db") # main thread executing "use # db" on behalf of every worker thread except taos.error.ProgrammingError as err: @@ -387,7 +401,7 @@ class ThreadCoordinator: transitionFailed = self._doTransition() # To start, we end step -1 first except taos.error.ProgrammingError as err: transitionFailed = True - errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme + errno2 = Helper.convertErrno(err.errno) # correct error scheme errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err) logger.info(errMsg) self._execStats.registerFailure(errMsg) @@ -468,6 +482,10 @@ class ThreadCoordinator: # We define a class to run a number of threads in locking steps. +class Helper: + @classmethod + def convertErrno(cls, errno): + return errno if (errno > 0) else 0x80000000 + errno class ThreadPool: def __init__(self, numThreads, maxSteps): @@ -613,8 +631,7 @@ class DbConn: def resetDb(self): # reset the whole database, etc. if (not self.isOpen): - raise RuntimeError( - "Cannot reset database until connection is open") + raise RuntimeError("Cannot reset database until connection is open") # self._tdSql.prepare() # Recreate database, etc. self.execute('drop database if exists db') @@ -681,8 +698,7 @@ class DbConnRest(DbConn): def close(self): if (not self.isOpen): - raise RuntimeError( - "Cannot clean up database until connection is open") + raise RuntimeError("Cannot clean up database until connection is open") # Do nothing for REST logger.debug("[DB] REST Database connection closed") self.isOpen = False @@ -796,8 +812,7 @@ class DbConnNative(DbConn): super().__init__() self._type = self.TYPE_NATIVE self._conn = None - self._cursor = None - + self._cursor = None def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -814,7 +829,7 @@ class DbConnNative(DbConn): buildPath = root[:len(root) - len("/build/bin")] break if buildPath == None: - raise RuntimeError("Failed to determine buildPath, selfPath={}".format(selfPath)) + raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}".format(selfPath, projPath)) return buildPath @@ -839,8 +854,7 @@ class DbConnNative(DbConn): def close(self): if (not self.isOpen): - raise RuntimeError( - "Cannot clean up database until connection is open") + raise RuntimeError("Cannot clean up database until connection is open") self._tdSql.close() logger.debug("[DB] Database connection closed") self.isOpen = False @@ -1528,7 +1542,7 @@ class Task(): try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme + errno2 = Helper.convertErrno(err.errno) if (gConfig.continue_on_exception): # user choose to continue self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( errno2, err, wt.getDbConn().getLastSql())) @@ -1789,7 +1803,7 @@ class TdSuperTable: try: dbc.query("select TBNAME from db.{}".format(self._stName)) # TODO: analyze result set later except taos.error.ProgrammingError as err: - errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno + errno2 = Helper.convertErrno(err.errno) logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) raise @@ -1891,7 +1905,7 @@ class TaskReadData(StateTransitionTask): if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?! dbc.execute("select {} from db.{}".format(aggExpr, sTable.getName())) except taos.error.ProgrammingError as err: - errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno + errno2 = Helper.convertErrno(err.errno) logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql())) raise @@ -1920,9 +1934,8 @@ class TaskDropSuperTable(StateTransitionTask): self.execWtSql(wt, "drop table {}".format( regTableName)) # nRows always 0, like MySQL except taos.error.ProgrammingError as err: - # correcting for strange error number scheme - errno2 = err.errno if ( - err.errno > 0) else 0x80000000 + err.errno + # correcting for strange error number scheme + errno2 = Helper.convertErrno(err.errno) if (errno2 in [0x362]): # mnode invalid table name isSuccess = False logger.debug( @@ -2437,7 +2450,7 @@ class ServiceManagerThread: if self._status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started if line.find(self.TD_READY_MSG) != -1: # found logger.info("Waiting for the service to become FULLY READY") - time.sleep(1.0) # wait for the server to truly start. TODO: remove this + time.sleep(5.0) # wait for the server to truly start. TODO: remove this logger.info("Service is now FULLY READY") self._status = MainExec.STATUS_RUNNING From 2b74699d3592ca1a521fae5ff6f0ca90a1fa421c Mon Sep 17 00:00:00 2001 From: Steven Li Date: Thu, 27 Aug 2020 04:47:43 +0000 Subject: [PATCH 2/4] Adjusted crash_gen tool to tolerate non UTF8 server output --- tests/pytest/crash_gen.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index f67aeaf00e..fd48edae05 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -166,7 +166,7 @@ class WorkerThread: self.useDb() # might encounter exceptions. TODO: catch except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) - if errno == 0x383 : # invalid database + if errno in [0x383, 0x386, 0x00B, 0x014] : # invalid database, dropping, Unable to establish connection, Database not ready # ignore dummy = 0 else: @@ -2442,7 +2442,11 @@ class ServiceManagerThread: for line in iter(out.readline, b''): # print("Finished reading a line: {}".format(line)) # print("Adding item to queue...") - line = line.decode("utf-8").rstrip() + try: + line = line.decode("utf-8").rstrip() + except UnicodeError: + print("\nNon-UTF8 server output: {}\n".format(line)) + # This might block, and then causing "out" buffer to block queue.put(line) self._printProgress("_i") From 4d122b7adbd486d2ffe2d0292061060af8a7440a Mon Sep 17 00:00:00 2001 From: Steven Li Date: Thu, 27 Aug 2020 05:04:27 +0000 Subject: [PATCH 3/4] Adjusted crash_gen tool to re-connect after server restart disruption --- tests/pytest/crash_gen.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index fd48edae05..3ffe77c6a4 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -163,6 +163,9 @@ class WorkerThread: # Before we fetch the task and run it, let's ensure we properly "use" the database try: + if (gConfig.per_thread_db_connection): # most likely TRUE + if not self._dbConn.isOpen: # might have been closed during server auto-restart + self._dbConn.open() self.useDb() # might encounter exceptions. TODO: catch except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) @@ -861,8 +864,7 @@ class DbConnNative(DbConn): def execute(self, sql): if (not self.isOpen): - raise RuntimeError( - "Cannot execute database commands until connection is open") + raise RuntimeError("Cannot execute database commands until connection is open") logger.debug("[SQL] Executing SQL: {}".format(sql)) self._lastSql = sql nRows = self._tdSql.execute(sql) @@ -2454,7 +2456,7 @@ class ServiceManagerThread: if self._status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started if line.find(self.TD_READY_MSG) != -1: # found logger.info("Waiting for the service to become FULLY READY") - time.sleep(5.0) # wait for the server to truly start. TODO: remove this + time.sleep(1.0) # wait for the server to truly start. TODO: remove this logger.info("Service is now FULLY READY") self._status = MainExec.STATUS_RUNNING From 2c6c3775ed214d9171cf4c15c7f634c1ed59700f Mon Sep 17 00:00:00 2001 From: Steven Li Date: Sun, 30 Aug 2020 02:47:50 +0000 Subject: [PATCH 4/4] Fixed a major problem in crash_gen tool, now it properly releases DB connections --- tests/pytest/crash_gen.py | 65 ++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 3ffe77c6a4..7588e03e17 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -766,27 +766,32 @@ class DbConnRest(DbConn): class MyTDSql: - def __init__(self): + def __init__(self, hostAddr, cfgPath): + # Make the DB connection + self._conn = taos.connect(host=hostAddr, config=cfgPath) + self._cursor = self._conn.cursor() + self.queryRows = 0 self.queryCols = 0 self.affectedRows = 0 - def init(self, cursor, log=True): - self.cursor = cursor + # def init(self, cursor, log=True): + # self.cursor = cursor # if (log): # caller = inspect.getframeinfo(inspect.stack()[1][0]) # self.cursor.log(caller.filename + ".sql") def close(self): - self.cursor.close() + self._conn.close() # TODO: very important, cursor close does NOT close DB connection! + self._cursor.close() def query(self, sql): self.sql = sql try: - self.cursor.execute(sql) - self.queryResult = self.cursor.fetchall() + self._cursor.execute(sql) + self.queryResult = self._cursor.fetchall() self.queryRows = len(self.queryResult) - self.queryCols = len(self.cursor.description) + self.queryCols = len(self._cursor.description) except Exception as e: # caller = inspect.getframeinfo(inspect.stack()[1][0]) # args = (caller.filename, caller.lineno, sql, repr(e)) @@ -797,7 +802,7 @@ class MyTDSql: def execute(self, sql): self.sql = sql try: - self.affectedRows = self.cursor.execute(sql) + self.affectedRows = self._cursor.execute(sql) except Exception as e: # caller = inspect.getframeinfo(inspect.stack()[1][0]) # args = (caller.filename, caller.lineno, sql, repr(e)) @@ -810,12 +815,13 @@ class DbConnNative(DbConn): # Class variables _lock = threading.Lock() _connInfoDisplayed = False + totalConnections = 0 # Not private def __init__(self): super().__init__() self._type = self.TYPE_NATIVE self._conn = None - self._cursor = None + # self._cursor = None def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -832,7 +838,8 @@ class DbConnNative(DbConn): buildPath = root[:len(root) - len("/build/bin")] break if buildPath == None: - raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}".format(selfPath, projPath)) + raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}" + .format(selfPath, projPath)) return buildPath @@ -840,25 +847,34 @@ class DbConnNative(DbConn): cfgPath = self.getBuildPath() + "/test/cfg" hostAddr = "127.0.0.1" - with self._lock: # force single threading for opening DB connections - if not self._connInfoDisplayed: - self.__class__._connInfoDisplayed = True # updating CLASS variable - logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) - - self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable - self._cursor = self._conn.cursor() + cls = self.__class__ # Get the class, to access class variables + with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!! + if not cls._connInfoDisplayed: + cls._connInfoDisplayed = True # updating CLASS variable + logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) + # Make the connection + # self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable + # self._cursor = self._conn.cursor() + # Record the count in the class + self._tdSql = MyTDSql(hostAddr, cfgPath) # making DB connection + cls.totalConnections += 1 - self._cursor.execute('reset query cache') + self._tdSql.execute('reset query cache') # self._cursor.execute('use db') # do this at the beginning of every # Open connection - self._tdSql = MyTDSql() - self._tdSql.init(self._cursor) - + # self._tdSql = MyTDSql() + # self._tdSql.init(self._cursor) + def close(self): if (not self.isOpen): raise RuntimeError("Cannot clean up database until connection is open") self._tdSql.close() + # Decrement the class wide counter + cls = self.__class__ # Get the class, to access class variables + with cls._lock: + cls.totalConnections -= 1 + logger.debug("[DB] Database connection closed") self.isOpen = False @@ -1694,9 +1710,8 @@ class ExecutionStats: logger.info( "| Total Elapsed Time (from wall clock): {:.3f} seconds".format( self._elapsedTime)) - logger.info( - "| Top numbers written: {}".format( - TaskExecutor.getBoundedList())) + logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList())) + logger.info("| Total Number of Active DB Native Connections: {}".format(DbConnNative.totalConnections)) logger.info( "----------------------------------------------------------------------") @@ -2474,7 +2489,7 @@ class ServiceManagerThread: def svcErrorReader(self, err: IO, queue): for line in iter(err.readline, b''): - print("\nTD Svc STDERR: {}".format(line)) + print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line)) class TdeSubProcess: