diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index d8946780a2..1ef89dec4e 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -1761,7 +1761,10 @@ class TaskCreateSuperTable(StateTransitionTask): sTable = self._db.getFixedSuperTable() # type: TdSuperTable # wt.execSql("use db") # should always be in place - + + if sTable.hasStreams(wt.getDbConn()) or sTable.hasStreamTables(wt.getDbConn()): + sTable.dropStreams(wt.getDbConn()) + sTable.dropStreamTables(wt.getDbConn()) sTable.create(wt.getDbConn(), {'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, { 'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT}, @@ -2557,16 +2560,20 @@ class TaskDeleteData(StateTransitionTask): # Now read it back and verify, we might encounter an error if table is dropped if Config.getConfig().verify_data: # only if command line asks for it try: - readBack = dbc.queryScalar("SELECT * from {}.{} WHERE ts='{}'". + dbc.query("SELECT * from {}.{} WHERE ts='{}'". format(db.getName(), regTableName, nextTick)) - if readBack == None : - pass + result = dbc.getQueryResult() + if len(result)==0: + # means data has been delete + print("D1",end="") # DF means delete failed + else: + print("DF",end="") # DF means delete failed except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) - if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result - print("D1",end="") # D1 means delete data success and only 1 record + # if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result + # print("D1",end="") # D1 means delete data success and only 1 record - elif errno in [0x218, 0x362]: # table doesn't exist + if errno in [0x218, 0x362,0x2662]: # table doesn't exist # do nothing pass else: @@ -2612,16 +2619,20 @@ class TaskDeleteData(StateTransitionTask): # Now read it back and verify, we might encounter an error if table is dropped if Config.getConfig().verify_data: # only if command line asks for it try: - readBack = dbc.queryScalar("SELECT * from {}.{} ". - format(db.getName(), regTableName)) - if readBack == None : - pass + dbc.query("SELECT * from {}.{} WHERE ts='{}'". + format(db.getName(), regTableName, nextTick)) + result = dbc.getQueryResult() + if len(result)==0: + # means data has been delete + print("DA",end="") + else: + print("DF",end="") # DF means delete failed except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) - if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result - print("Da",end="") # Da means delete data success and for all datas + # if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result + # print("Da",end="") # Da means delete data success and for all datas - elif errno in [0x218, 0x362]: # table doesn't exist + if errno in [0x218, 0x362,0x2662]: # table doesn't exist # do nothing pass else: diff --git a/tests/pytest/crash_gen/shared/db.py b/tests/pytest/crash_gen/shared/db.py index 60c830f4f7..f97e4025e5 100644 --- a/tests/pytest/crash_gen/shared/db.py +++ b/tests/pytest/crash_gen/shared/db.py @@ -26,9 +26,12 @@ class DbConn: TYPE_NATIVE = "native-c" TYPE_REST = "rest-api" TYPE_INVALID = "invalid" + + # class variables lastSqlFromThreads : dict[int, str] = {} # stored by thread id, obtained from threading.current_thread().ident%10000 + spendThreads : dict[int, float] = {} # stored by thread id, obtained from threading.current_thread().ident%10000 @classmethod def saveSqlForCurrentThread(cls, sql: str): @@ -37,15 +40,36 @@ class DbConn: run into a dead-lock situation, we can pick out the deadlocked thread, and use that information to find what what SQL statement is stuck. ''' + th = threading.current_thread() shortTid = th.native_id % 10000 #type: ignore cls.lastSqlFromThreads[shortTid] = sql # Save this for later @classmethod - def fetchSqlForThread(cls, shortTid : int) -> str : + def fetchSqlForThread(cls, shortTid : int) -> str : + + print("=======================") if shortTid not in cls.lastSqlFromThreads: raise CrashGenError("No last-attempted-SQL found for thread id: {}".format(shortTid)) - return cls.lastSqlFromThreads[shortTid] + return cls.lastSqlFromThreads[shortTid] + + + @classmethod + def sql_exec_spend(cls, cost: float): + ''' + Let us save the last SQL statement on a per-thread basis, so that when later we + run into a dead-lock situation, we can pick out the deadlocked thread, and use + that information to find what what SQL statement is stuck. + ''' + th = threading.current_thread() + shortTid = th.native_id % 10000 #type: ignore + cls.spendThreads[shortTid] = cost # Save this for later + + @classmethod + def get_time_cost(cls) ->float: + th = threading.current_thread() + shortTid = th.native_id % 10000 #type: ignore + return cls.spendThreads.get(shortTid) @classmethod def create(cls, connType, dbTarget): @@ -61,6 +85,7 @@ class DbConn: def createNative(cls, dbTarget) -> DbConn: return cls.create(cls.TYPE_NATIVE, dbTarget) + @classmethod def createRest(cls, dbTarget) -> DbConn: return cls.create(cls.TYPE_REST, dbTarget) @@ -75,6 +100,7 @@ class DbConn: return "[DbConn: type={}, target={}]".format(self._type, self._dbTarget) def getLastSql(self): + return self._lastSql def open(self): @@ -184,13 +210,19 @@ class DbConnRest(DbConn): def _doSql(self, sql): self._lastSql = sql # remember this, last SQL attempted self.saveSqlForCurrentThread(sql) # Save in global structure too. #TODO: combine with above - try: + time_cost = -1 + time_start = time.time() + try: r = requests.post(self._url, data = sql, - auth = HTTPBasicAuth('root', 'taosdata')) + auth = HTTPBasicAuth('root', 'taosdata')) except: print("REST API Failure (TODO: more info here)") + self.sql_exec_spend(-2) raise + finally: + time_cost = time.time()- time_start + self.sql_exec_spend(time_cost) rj = r.json() # Sanity check for the "Json Result" if ('status' not in rj): @@ -223,6 +255,8 @@ class DbConnRest(DbConn): "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) return nRows + + def query(self, sql): # return rows affected return self.execute(sql) @@ -336,6 +370,7 @@ class MyTDSql: raise return self.affectedRows + class DbTarget: def __init__(self, cfgPath, hostAddr, port): self.cfgPath = cfgPath @@ -355,6 +390,7 @@ class DbConnNative(DbConn): # _connInfoDisplayed = False # TODO: find another way to display this totalConnections = 0 # Not private totalRequests = 0 + time_cost = -1 def __init__(self, dbTarget): super().__init__(dbTarget) @@ -413,8 +449,19 @@ class DbConnNative(DbConn): "Cannot exec SQL unless db connection is open", CrashGenError.DB_CONNECTION_NOT_OPEN) Logging.debug("[SQL] Executing SQL: {}".format(sql)) self._lastSql = sql + time_cost = -1 + nRows = 0 + time_start = time.time() self.saveSqlForCurrentThread(sql) # Save in global structure too. #TODO: combine with above - nRows = self._tdSql.execute(sql) + try: + nRows= self._tdSql.execute(sql) + except Exception as e: + self.sql_exec_spend(-2) + finally: + time_cost = time.time() - time_start + self.sql_exec_spend(time_cost) + + cls = self.__class__ cls.totalRequests += 1 Logging.debug(