This commit is contained in:
wenzhouwww@live.cn 2022-11-10 20:23:00 +08:00
parent 86b5b89531
commit 20071f09a7
2 changed files with 196 additions and 46 deletions

View File

@ -420,10 +420,12 @@ class ThreadCoordinator:
except threading.BrokenBarrierError as err:
self._execStats.registerFailure("Aborted due to worker thread timeout")
Logging.error("\n")
Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format(
ThreadCoordinator.WORKER_THREAD_TIMEOUT))
Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):")
ts = ThreadStacks()
ts.record_current_time(time.time()) # record thread exit time at current moment
ts.print(filterInternal=True)
workerTimeout = True
@ -547,7 +549,12 @@ class ThreadCoordinator:
# pick a task type for current state
db = self.pickDatabase()
taskType = db.getStateMachine().pickTaskType() # dynamic name of class
if Dice.throw(2)==1:
taskType = db.getStateMachine().pickTaskType() # dynamic name of class
else:
taskType = db.getStateMachine().balance_pickTaskType() # and an method can get balance task types
pass
return taskType(self._execStats, db) # create a task from it
def resetExecutedTasks(self):
@ -679,6 +686,8 @@ class AnyState:
CAN_CREATE_TOPIC = 3 # super table must exists
CAN_CREATE_CONSUMERS = 3
CAN_DROP_FIXED_SUPER_TABLE = 4
CAN_DROP_TOPIC = 4
CAN_DROP_STREAM = 4
CAN_ADD_DATA = 5
CAN_READ_DATA = 6
CAN_DELETE_DATA = 6
@ -734,13 +743,19 @@ class AnyState:
def canCreateTopic(self):
return self._info[self.CAN_CREATE_TOPIC]
def canDropTopic(self):
return self._info[self.CAN_DROP_TOPIC]
def canCreateConsumers(self):
return self._info[self.CAN_CREATE_CONSUMERS]
def canCreateStream(self):
def canCreateStreams(self):
return self._info[self.CAN_CREATE_STREAM]
def canDropStream(self):
return self._info[self.CAN_DROP_STREAM]
def canAddData(self):
return self._info[self.CAN_ADD_DATA]
@ -919,7 +934,7 @@ class StateHasData(AnyState):
): # only if we didn't create one
# we shouldn't have dropped it
self.assertNoTask(tasks, TaskDropDb)
if (not self.hasTask(tasks, TaskCreateSuperTable)
if not( self.hasTask(tasks, TaskCreateSuperTable)
): # if we didn't create the table
# we should not have a task that drops it
self.assertNoTask(tasks, TaskDropSuperTable)
@ -1075,6 +1090,28 @@ class StateMechine:
# Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
return taskTypes[i]
def balance_pickTaskType(self):
# all the task types we can choose from at curent state
BasicTypes = self.getTaskTypes()
weightsTypes = BasicTypes.copy()
# this matrixs can balance the Frequency of different types of tasks
weight_matrixs = {'TaskDropDb': 5 , 'TaskDropTopics': 20 , 'TaskDropStreams':10 , 'TaskDropStreamTables':10 ,
'TaskReadData':50 , 'TaskDropSuperTable':5 , 'TaskAlterTags':3 , 'TaskAddData':10,
'TaskDeleteData':10 , 'TaskCreateDb':10 , 'TaskCreateStream': 3, 'TaskCreateTopic' :3,
'TaskCreateConsumers':10, 'TaskCreateSuperTable': 10 } # task type : weghts matrixs
for task , weights in weight_matrixs.items():
for basicType in BasicTypes:
if basicType.__name__ == task:
for _ in range(weights):
weightsTypes.append(basicType)
task = random.sample(weightsTypes,1)
return task[0]
# ref:
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
def _weighted_choice_sub(self, weights) -> int:
@ -1376,6 +1413,10 @@ class Task():
0x396, # Database in creating status
0x386, # Database in droping status
0x03E1, # failed on tmq_subscribe ,topic not exist
0x03ed , # Topic must be dropped first, SQL: drop database db_0
0x0203 , # Invalid value
1000 # REST catch-all error
@ -1693,19 +1734,24 @@ class TaskDropDb(StateTransitionTask):
# drop topics before drop db
if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
# if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),self._db.getFixedSuperTableName)
self.execWtSql(wt, "drop database {}".format(self._db.getName()))
# self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
# self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),self._db.getFixedSuperTableName)
try:
self.queryWtSql(wt, "drop database {}".format(self._db.getName())) # drop database maybe failed ,because topic exists
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x0203]: # drop maybe failed
pass
Logging.debug("[OPS] database dropped at {}".format(time.time()))
'''
# Streams will generator TD-20237 it will crash taosd , start this task when this issue fixed
class TaskCreateStream(StateTransitionTask):
@classmethod
@ -1714,7 +1760,7 @@ class TaskCreateStream(StateTransitionTask):
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canCreateStream()
return state.canCreateStreams()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
dbname = self._db.getName()
@ -1783,7 +1829,6 @@ class TaskCreateStream(StateTransitionTask):
if errno in [0x03f0]: # stream already exists
# stream need drop before drop table
pass
'''
class TaskCreateTopic(StateTransitionTask):
@ -1855,7 +1900,7 @@ class TaskCreateTopic(StateTransitionTask):
Logging.debug("[OPS] topic is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x03f0]: # topic already exists
if errno in [0x03f0 ]: # topic already exists
# topic need drop before drop table
pass
@ -1877,7 +1922,7 @@ class TaskCreateTopic(StateTransitionTask):
topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic,dbname,stbname)
try:
self.execWtSql(wt, "use {}".format(dbname))
self.execWtSql(wt, topic_sql)
self.queryWtSql(wt, topic_sql)
Logging.debug("[OPS] stable topic is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
@ -1897,7 +1942,81 @@ class TaskCreateTopic(StateTransitionTask):
pass
else:
pass
class TaskDropTopics(StateTransitionTask):
@classmethod
def getEndState(cls):
return StateHasData()
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canDropTopic()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet")
return
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place
tblName = sTable.getName()
if sTable.hasTopics(wt.getDbConn()):
sTable.dropTopics(wt.getDbConn(),dbname,None) # drop topics of database
sTable.dropTopics(wt.getDbConn(),dbname,tblName) # drop topics of stable
class TaskDropStreams(StateTransitionTask):
@classmethod
def getEndState(cls):
return StateHasData()
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canDropStream()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet")
return
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place
# tblName = sTable.getName()
if sTable.hasStreams(wt.getDbConn()):
sTable.dropStreams(wt.getDbConn()) # drop stream of database
# sTable.dropStreamTables(wt.getDbConn()) # drop streamtables of stable
class TaskDropStreamTables(StateTransitionTask):
@classmethod
def getEndState(cls):
return StateHasData()
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canDropStream()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet")
return
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place
# tblName = sTable.getName()
if sTable.hasStreamTables(wt.getDbConn()):
# sTable.dropStreams(wt.getDbConn())
sTable.dropStreamTables(wt.getDbConn()) # drop stream tables
class TaskCreateConsumers(StateTransitionTask):
@ -1918,9 +2037,10 @@ class TaskCreateConsumers(StateTransitionTask):
# wt.execSql("use db") # should always be in place
# create Consumers
if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task
if sTable.hasTopics(wt.getDbConn()):
sTable.createConsumer(wt.getDbConn(),random.randint(1,10))
# if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task
if sTable.hasTopics(wt.getDbConn()):
sTable.createConsumer(wt.getDbConn(),random.randint(1,10))
pass
else:
print(" restful not support tmq consumers")
return
@ -1968,17 +2088,17 @@ class TdSuperTable:
dbName = self._dbName
if self.exists(dbc) : # if myself exists
fullTableName = dbName + '.' + self._stName
if self.hasStreams(dbc):
self.dropStreams(dbc)
self.dropStreamTables(dbc)
if self.hasTopics(dbc):
self.dropTopics(dbName,None)
self.dropTopics(dbName,self._stName)
# if self.hasStreams(dbc):
# self.dropStreams(dbc)
# self.dropStreamTables(dbc)
# if self.hasTopics(dbc):
# self.dropTopics(dbName,None)
# self.dropTopics(dbName,self._stName)
try:
dbc.execute("DROP TABLE {}".format(fullTableName))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist
if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist # Stream must be dropped first, SQL: DROP TABLE db_0.fs_table
pass
# # stream need drop before drop table
# for stream in self.getStreamName():
@ -2011,17 +2131,21 @@ class TdSuperTable:
if dbc.existsSuperTable(self._stName):
if dropIfExists:
if self.hasStreams(dbc):
self.dropStreams(dbc)
self.dropStreamTables(dbc)
# if self.hasStreams(dbc):
# self.dropStreams(dbc)
# self.dropStreamTables(dbc)
# drop topics before drop stables
if self.hasTopics(dbc):
self.dropTopics(dbc,self._dbName,None)
self.dropTopics(dbc,self._dbName,self._stName )
# # drop topics before drop stables
# if self.hasTopics(dbc):
# self.dropTopics(dbc,self._dbName,None)
# self.dropTopics(dbc,self._dbName,self._stName )
dbc.execute("DROP TABLE {}".format(fullTableName))
try:
dbc.execute("DROP TABLE {}".format(fullTableName))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist # Stream must be dropped first, SQL: DROP TABLE db_0.fs_table
pass
pass
# dbc.execute("DROP TABLE {}".format(fullTableName))
@ -2124,6 +2248,7 @@ class TdSuperTable:
if dbname in topic[0] and topic[0].startswith("database"):
try:
dbc.execute('drop topic {}'.format(topic[0]))
Logging.debug("[OPS] topic {} is droping at {}".format(topic,time.time()))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x03EB]: # Topic subscribed cannot be dropped
@ -2139,6 +2264,7 @@ class TdSuperTable:
for topic in topics:
if topic[0].startswith(self._dbName) and topic[0].endswith('topic'):
dbc.execute('drop topic {}'.format(topic[0]))
Logging.debug("[OPS] topic {} is droping at {}".format(topic,time.time()))
return True
else:
return True
@ -2454,15 +2580,16 @@ class TaskDropSuperTable(StateTransitionTask):
# Drop the super table itself
tblName = self._db.getFixedSuperTableName()
# drop streams before drop stables
if self._db.getFixedSuperTable().hasStreams(wt.getDbConn()):
self._db.getFixedSuperTable().dropStreams(wt.getDbConn())
self._db.getFixedSuperTable().dropStreamTables(wt.getDbConn())
# # drop streams before drop stables
# if self._db.getFixedSuperTable().hasStreams(wt.getDbConn()):
# self._db.getFixedSuperTable().dropStreams(wt.getDbConn())
# self._db.getFixedSuperTable().dropStreamTables(wt.getDbConn())
# # drop topics before drop stables
# if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
# self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
# self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),tblName)
# drop topics before drop stables
if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),tblName)
try:
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
@ -2948,6 +3075,9 @@ class ThreadStacks: # stack info for all threads
shortTid = th.native_id % 10000 #type: ignore
self._allStacks[shortTid] = stack # Was using th.native_id
def record_current_time(self,current_time):
self.current_time = current_time
def print(self, filteredEndName = None, filterInternal = False):
for shortTid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
lastFrame = stack[-1]
@ -2962,9 +3092,11 @@ class ThreadStacks: # stack info for all threads
continue # ignore
# Now print
print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(shortTid))
lastSqlForThread = DbConn.fetchSqlForThread(shortTid)
time_cost = DbConn.get_time_cost()
print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, time_cost ,lastSqlForThread))
last_sql_commit_time = DbConn.get_save_sql_time(shortTid)
# time_cost = DbConn.get_time_cost()
print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, self.current_time-last_sql_commit_time ,lastSqlForThread))
stackFrame = 0
for frame in stack: # was using: reversed(stack)
# print(frame)

View File

@ -32,7 +32,7 @@ class DbConn:
# class variables
lastSqlFromThreads : dict[int, str] = {} # stored by thread id, obtained from threading.current_thread().ident%10000
spendThreads : dict[int, float] = {} # stored by thread id, obtained from threading.current_thread().ident%10000
current_time : dict[int, float] = {} # save current time
@classmethod
def saveSqlForCurrentThread(cls, sql: str):
'''
@ -44,6 +44,7 @@ class DbConn:
th = threading.current_thread()
shortTid = th.native_id % 10000 #type: ignore
cls.lastSqlFromThreads[shortTid] = sql # Save this for later
cls.record_save_sql_time()
@classmethod
def fetchSqlForThread(cls, shortTid : int) -> str :
@ -53,6 +54,25 @@ class DbConn:
raise CrashGenError("No last-attempted-SQL found for thread id: {}".format(shortTid))
return cls.lastSqlFromThreads[shortTid]
@classmethod
def get_save_sql_time(cls, shortTid : int):
'''
Let us save the last SQL statement on a per-thread basis, so that when later we
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
'''
return cls.current_time[shortTid]
@classmethod
def record_save_sql_time(cls):
'''
Let us save the last SQL statement on a per-thread basis, so that when later we
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
'''
th = threading.current_thread()
shortTid = th.native_id % 10000 #type: ignore
cls.current_time[shortTid] = float(time.time()) # Save this for later
@classmethod
def sql_exec_spend(cls, cost: float):
@ -460,7 +480,6 @@ class DbConnNative(DbConn):
finally:
time_cost = time.time() - time_start
self.sql_exec_spend(time_cost)
cls = self.__class__
cls.totalRequests += 1
@ -541,4 +560,3 @@ class DbManager():
self._dbConn.close()
self._dbConn = None
Logging.debug("DbManager closed DB connection...")