This commit is contained in:
wenzhouwww@live.cn 2022-11-08 09:41:42 +08:00
parent 17a502f569
commit 8a5427c95b
2 changed files with 77 additions and 19 deletions

View File

@ -1761,7 +1761,10 @@ class TaskCreateSuperTable(StateTransitionTask):
sTable = self._db.getFixedSuperTable() # type: TdSuperTable sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place # wt.execSql("use db") # should always be in place
if sTable.hasStreams(wt.getDbConn()) or sTable.hasStreamTables(wt.getDbConn()):
sTable.dropStreams(wt.getDbConn())
sTable.dropStreamTables(wt.getDbConn())
sTable.create(wt.getDbConn(), sTable.create(wt.getDbConn(),
{'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, { {'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, {
'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT}, 'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT},
@ -2557,16 +2560,20 @@ class TaskDeleteData(StateTransitionTask):
# Now read it back and verify, we might encounter an error if table is dropped # Now read it back and verify, we might encounter an error if table is dropped
if Config.getConfig().verify_data: # only if command line asks for it if Config.getConfig().verify_data: # only if command line asks for it
try: try:
readBack = dbc.queryScalar("SELECT * from {}.{} WHERE ts='{}'". dbc.query("SELECT * from {}.{} WHERE ts='{}'".
format(db.getName(), regTableName, nextTick)) format(db.getName(), regTableName, nextTick))
if readBack == None : result = dbc.getQueryResult()
pass if len(result)==0:
# means data has been delete
print("D1",end="") # DF means delete failed
else:
print("DF",end="") # DF means delete failed
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result # if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result
print("D1",end="") # D1 means delete data success and only 1 record # print("D1",end="") # D1 means delete data success and only 1 record
elif errno in [0x218, 0x362]: # table doesn't exist if errno in [0x218, 0x362,0x2662]: # table doesn't exist
# do nothing # do nothing
pass pass
else: else:
@ -2612,16 +2619,20 @@ class TaskDeleteData(StateTransitionTask):
# Now read it back and verify, we might encounter an error if table is dropped # Now read it back and verify, we might encounter an error if table is dropped
if Config.getConfig().verify_data: # only if command line asks for it if Config.getConfig().verify_data: # only if command line asks for it
try: try:
readBack = dbc.queryScalar("SELECT * from {}.{} ". dbc.query("SELECT * from {}.{} WHERE ts='{}'".
format(db.getName(), regTableName)) format(db.getName(), regTableName, nextTick))
if readBack == None : result = dbc.getQueryResult()
pass if len(result)==0:
# means data has been delete
print("DA",end="")
else:
print("DF",end="") # DF means delete failed
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result # if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result
print("Da",end="") # Da means delete data success and for all datas # print("Da",end="") # Da means delete data success and for all datas
elif errno in [0x218, 0x362]: # table doesn't exist if errno in [0x218, 0x362,0x2662]: # table doesn't exist
# do nothing # do nothing
pass pass
else: else:

View File

@ -26,9 +26,12 @@ class DbConn:
TYPE_NATIVE = "native-c" TYPE_NATIVE = "native-c"
TYPE_REST = "rest-api" TYPE_REST = "rest-api"
TYPE_INVALID = "invalid" TYPE_INVALID = "invalid"
# class variables # class variables
lastSqlFromThreads : dict[int, str] = {} # stored by thread id, obtained from threading.current_thread().ident%10000 lastSqlFromThreads : dict[int, str] = {} # stored by thread id, obtained from threading.current_thread().ident%10000
spendThreads : dict[int, float] = {} # stored by thread id, obtained from threading.current_thread().ident%10000
@classmethod @classmethod
def saveSqlForCurrentThread(cls, sql: str): def saveSqlForCurrentThread(cls, sql: str):
@ -37,15 +40,36 @@ class DbConn:
run into a dead-lock situation, we can pick out the deadlocked thread, and use run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck. that information to find what what SQL statement is stuck.
''' '''
th = threading.current_thread() th = threading.current_thread()
shortTid = th.native_id % 10000 #type: ignore shortTid = th.native_id % 10000 #type: ignore
cls.lastSqlFromThreads[shortTid] = sql # Save this for later cls.lastSqlFromThreads[shortTid] = sql # Save this for later
@classmethod @classmethod
def fetchSqlForThread(cls, shortTid : int) -> str : def fetchSqlForThread(cls, shortTid : int) -> str :
print("=======================")
if shortTid not in cls.lastSqlFromThreads: if shortTid not in cls.lastSqlFromThreads:
raise CrashGenError("No last-attempted-SQL found for thread id: {}".format(shortTid)) raise CrashGenError("No last-attempted-SQL found for thread id: {}".format(shortTid))
return cls.lastSqlFromThreads[shortTid] return cls.lastSqlFromThreads[shortTid]
@classmethod
def sql_exec_spend(cls, cost: float):
'''
Let us save the last SQL statement on a per-thread basis, so that when later we
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
'''
th = threading.current_thread()
shortTid = th.native_id % 10000 #type: ignore
cls.spendThreads[shortTid] = cost # Save this for later
@classmethod
def get_time_cost(cls) ->float:
th = threading.current_thread()
shortTid = th.native_id % 10000 #type: ignore
return cls.spendThreads.get(shortTid)
@classmethod @classmethod
def create(cls, connType, dbTarget): def create(cls, connType, dbTarget):
@ -61,6 +85,7 @@ class DbConn:
def createNative(cls, dbTarget) -> DbConn: def createNative(cls, dbTarget) -> DbConn:
return cls.create(cls.TYPE_NATIVE, dbTarget) return cls.create(cls.TYPE_NATIVE, dbTarget)
@classmethod @classmethod
def createRest(cls, dbTarget) -> DbConn: def createRest(cls, dbTarget) -> DbConn:
return cls.create(cls.TYPE_REST, dbTarget) return cls.create(cls.TYPE_REST, dbTarget)
@ -75,6 +100,7 @@ class DbConn:
return "[DbConn: type={}, target={}]".format(self._type, self._dbTarget) return "[DbConn: type={}, target={}]".format(self._type, self._dbTarget)
def getLastSql(self): def getLastSql(self):
return self._lastSql return self._lastSql
def open(self): def open(self):
@ -184,13 +210,19 @@ class DbConnRest(DbConn):
def _doSql(self, sql): def _doSql(self, sql):
self._lastSql = sql # remember this, last SQL attempted self._lastSql = sql # remember this, last SQL attempted
self.saveSqlForCurrentThread(sql) # Save in global structure too. #TODO: combine with above self.saveSqlForCurrentThread(sql) # Save in global structure too. #TODO: combine with above
try: time_cost = -1
time_start = time.time()
try:
r = requests.post(self._url, r = requests.post(self._url,
data = sql, data = sql,
auth = HTTPBasicAuth('root', 'taosdata')) auth = HTTPBasicAuth('root', 'taosdata'))
except: except:
print("REST API Failure (TODO: more info here)") print("REST API Failure (TODO: more info here)")
self.sql_exec_spend(-2)
raise raise
finally:
time_cost = time.time()- time_start
self.sql_exec_spend(time_cost)
rj = r.json() rj = r.json()
# Sanity check for the "Json Result" # Sanity check for the "Json Result"
if ('status' not in rj): if ('status' not in rj):
@ -223,6 +255,8 @@ class DbConnRest(DbConn):
"[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows return nRows
def query(self, sql): # return rows affected def query(self, sql): # return rows affected
return self.execute(sql) return self.execute(sql)
@ -336,6 +370,7 @@ class MyTDSql:
raise raise
return self.affectedRows return self.affectedRows
class DbTarget: class DbTarget:
def __init__(self, cfgPath, hostAddr, port): def __init__(self, cfgPath, hostAddr, port):
self.cfgPath = cfgPath self.cfgPath = cfgPath
@ -355,6 +390,7 @@ class DbConnNative(DbConn):
# _connInfoDisplayed = False # TODO: find another way to display this # _connInfoDisplayed = False # TODO: find another way to display this
totalConnections = 0 # Not private totalConnections = 0 # Not private
totalRequests = 0 totalRequests = 0
time_cost = -1
def __init__(self, dbTarget): def __init__(self, dbTarget):
super().__init__(dbTarget) super().__init__(dbTarget)
@ -413,8 +449,19 @@ class DbConnNative(DbConn):
"Cannot exec SQL unless db connection is open", CrashGenError.DB_CONNECTION_NOT_OPEN) "Cannot exec SQL unless db connection is open", CrashGenError.DB_CONNECTION_NOT_OPEN)
Logging.debug("[SQL] Executing SQL: {}".format(sql)) Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql self._lastSql = sql
time_cost = -1
nRows = 0
time_start = time.time()
self.saveSqlForCurrentThread(sql) # Save in global structure too. #TODO: combine with above self.saveSqlForCurrentThread(sql) # Save in global structure too. #TODO: combine with above
nRows = self._tdSql.execute(sql) try:
nRows= self._tdSql.execute(sql)
except Exception as e:
self.sql_exec_spend(-2)
finally:
time_cost = time.time() - time_start
self.sql_exec_spend(time_cost)
cls = self.__class__ cls = self.__class__
cls.totalRequests += 1 cls.totalRequests += 1
Logging.debug( Logging.debug(