Fixed a major problem in crash_gen tool, now it properly releases DB connections
This commit is contained in:
parent
6c2d806726
commit
2c6c3775ed
|
@ -766,27 +766,32 @@ class DbConnRest(DbConn):
|
||||||
|
|
||||||
|
|
||||||
class MyTDSql:
|
class MyTDSql:
|
||||||
def __init__(self):
|
def __init__(self, hostAddr, cfgPath):
|
||||||
|
# Make the DB connection
|
||||||
|
self._conn = taos.connect(host=hostAddr, config=cfgPath)
|
||||||
|
self._cursor = self._conn.cursor()
|
||||||
|
|
||||||
self.queryRows = 0
|
self.queryRows = 0
|
||||||
self.queryCols = 0
|
self.queryCols = 0
|
||||||
self.affectedRows = 0
|
self.affectedRows = 0
|
||||||
|
|
||||||
def init(self, cursor, log=True):
|
# def init(self, cursor, log=True):
|
||||||
self.cursor = cursor
|
# self.cursor = cursor
|
||||||
# if (log):
|
# if (log):
|
||||||
# caller = inspect.getframeinfo(inspect.stack()[1][0])
|
# caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
# self.cursor.log(caller.filename + ".sql")
|
# self.cursor.log(caller.filename + ".sql")
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.cursor.close()
|
self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
|
||||||
|
self._cursor.close()
|
||||||
|
|
||||||
def query(self, sql):
|
def query(self, sql):
|
||||||
self.sql = sql
|
self.sql = sql
|
||||||
try:
|
try:
|
||||||
self.cursor.execute(sql)
|
self._cursor.execute(sql)
|
||||||
self.queryResult = self.cursor.fetchall()
|
self.queryResult = self._cursor.fetchall()
|
||||||
self.queryRows = len(self.queryResult)
|
self.queryRows = len(self.queryResult)
|
||||||
self.queryCols = len(self.cursor.description)
|
self.queryCols = len(self._cursor.description)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# caller = inspect.getframeinfo(inspect.stack()[1][0])
|
# caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
# args = (caller.filename, caller.lineno, sql, repr(e))
|
# args = (caller.filename, caller.lineno, sql, repr(e))
|
||||||
|
@ -797,7 +802,7 @@ class MyTDSql:
|
||||||
def execute(self, sql):
|
def execute(self, sql):
|
||||||
self.sql = sql
|
self.sql = sql
|
||||||
try:
|
try:
|
||||||
self.affectedRows = self.cursor.execute(sql)
|
self.affectedRows = self._cursor.execute(sql)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# caller = inspect.getframeinfo(inspect.stack()[1][0])
|
# caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
# args = (caller.filename, caller.lineno, sql, repr(e))
|
# args = (caller.filename, caller.lineno, sql, repr(e))
|
||||||
|
@ -810,12 +815,13 @@ class DbConnNative(DbConn):
|
||||||
# Class variables
|
# Class variables
|
||||||
_lock = threading.Lock()
|
_lock = threading.Lock()
|
||||||
_connInfoDisplayed = False
|
_connInfoDisplayed = False
|
||||||
|
totalConnections = 0 # Not private
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self._type = self.TYPE_NATIVE
|
self._type = self.TYPE_NATIVE
|
||||||
self._conn = None
|
self._conn = None
|
||||||
self._cursor = None
|
# self._cursor = None
|
||||||
|
|
||||||
def getBuildPath(self):
|
def getBuildPath(self):
|
||||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
@ -832,7 +838,8 @@ class DbConnNative(DbConn):
|
||||||
buildPath = root[:len(root) - len("/build/bin")]
|
buildPath = root[:len(root) - len("/build/bin")]
|
||||||
break
|
break
|
||||||
if buildPath == None:
|
if buildPath == None:
|
||||||
raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}".format(selfPath, projPath))
|
raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
|
||||||
|
.format(selfPath, projPath))
|
||||||
return buildPath
|
return buildPath
|
||||||
|
|
||||||
|
|
||||||
|
@ -840,25 +847,34 @@ class DbConnNative(DbConn):
|
||||||
cfgPath = self.getBuildPath() + "/test/cfg"
|
cfgPath = self.getBuildPath() + "/test/cfg"
|
||||||
hostAddr = "127.0.0.1"
|
hostAddr = "127.0.0.1"
|
||||||
|
|
||||||
with self._lock: # force single threading for opening DB connections
|
cls = self.__class__ # Get the class, to access class variables
|
||||||
if not self._connInfoDisplayed:
|
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
|
||||||
self.__class__._connInfoDisplayed = True # updating CLASS variable
|
if not cls._connInfoDisplayed:
|
||||||
logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
|
cls._connInfoDisplayed = True # updating CLASS variable
|
||||||
|
logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
|
||||||
self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
|
# Make the connection
|
||||||
self._cursor = self._conn.cursor()
|
# self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
|
||||||
|
# self._cursor = self._conn.cursor()
|
||||||
|
# Record the count in the class
|
||||||
|
self._tdSql = MyTDSql(hostAddr, cfgPath) # making DB connection
|
||||||
|
cls.totalConnections += 1
|
||||||
|
|
||||||
self._cursor.execute('reset query cache')
|
self._tdSql.execute('reset query cache')
|
||||||
# self._cursor.execute('use db') # do this at the beginning of every
|
# self._cursor.execute('use db') # do this at the beginning of every
|
||||||
|
|
||||||
# Open connection
|
# Open connection
|
||||||
self._tdSql = MyTDSql()
|
# self._tdSql = MyTDSql()
|
||||||
self._tdSql.init(self._cursor)
|
# self._tdSql.init(self._cursor)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if (not self.isOpen):
|
if (not self.isOpen):
|
||||||
raise RuntimeError("Cannot clean up database until connection is open")
|
raise RuntimeError("Cannot clean up database until connection is open")
|
||||||
self._tdSql.close()
|
self._tdSql.close()
|
||||||
|
# Decrement the class wide counter
|
||||||
|
cls = self.__class__ # Get the class, to access class variables
|
||||||
|
with cls._lock:
|
||||||
|
cls.totalConnections -= 1
|
||||||
|
|
||||||
logger.debug("[DB] Database connection closed")
|
logger.debug("[DB] Database connection closed")
|
||||||
self.isOpen = False
|
self.isOpen = False
|
||||||
|
|
||||||
|
@ -1694,9 +1710,8 @@ class ExecutionStats:
|
||||||
logger.info(
|
logger.info(
|
||||||
"| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
|
"| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
|
||||||
self._elapsedTime))
|
self._elapsedTime))
|
||||||
logger.info(
|
logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
|
||||||
"| Top numbers written: {}".format(
|
logger.info("| Total Number of Active DB Native Connections: {}".format(DbConnNative.totalConnections))
|
||||||
TaskExecutor.getBoundedList()))
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"----------------------------------------------------------------------")
|
"----------------------------------------------------------------------")
|
||||||
|
|
||||||
|
@ -2474,7 +2489,7 @@ class ServiceManagerThread:
|
||||||
|
|
||||||
def svcErrorReader(self, err: IO, queue):
|
def svcErrorReader(self, err: IO, queue):
|
||||||
for line in iter(err.readline, b''):
|
for line in iter(err.readline, b''):
|
||||||
print("\nTD Svc STDERR: {}".format(line))
|
print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
|
||||||
|
|
||||||
|
|
||||||
class TdeSubProcess:
|
class TdeSubProcess:
|
||||||
|
|
Loading…
Reference in New Issue