diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 1141ca403d..f65301b66c 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -979,6 +979,79 @@ class TDCom: for stream_name in stream_name_list: tdSql.execute(f'drop stream if exists {stream_name};') + + def check_stream_info_string(self, info): + # This method is defined for the 'info' column of the 'information_schema.ins_stream_tasks'. + # Define the regular expression pattern to match the required format + # This pattern looks for a number followed by an optional space and then a pair of square brackets + # containing two numbers separated by a comma. + pattern = r'(\d+)\s*\[(\d+),\s*(\d+)\]' + + # Use the search function from the re module to find a match in the string + match = re.search(pattern, info) + + # Check if a match was found + if match: + # Extract the numbers from the matching groups + first_number = int(match.group(1)) # The number before the brackets + second_number = int(match.group(3)) # The second number inside the brackets + + # Compare the extracted numbers and return the result + return first_number == second_number + + # If no match was found, or the pattern does not match the expected format, return False + return False + + def print_error_frame_info(self, sql, elm, expect_elm): + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, sql, elm, expect_elm) + # tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) + raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) + + def check_stream_task_status(self, stream_name, vgroups, timeout=60): + """check stream status + + Args: + stream_name (str): stream_name + vgroups (int): vgroups + Returns: + str: status + """ + #check stream task rows + sql = f"select `task_id`,node_id,stream_name,status,info,history_task_id from information_schema.ins_stream_tasks where stream_name='{stream_name}' and `level`='source';" + + tdSql.query(sql) + tdSql.checkRows(vgroups) + #check stream task status + checktimes=0 + while checktimes < timeout: + check_stream_success = 0 + try: + checktimes += 1 + for i in range(vgroups): + tdSql.query(sql) + if tdSql.queryResult[i][3] == "ready" and self.check_stream_info_string(tdSql.queryResult[i][4]) and tdSql.queryResult[i][5] == None: + check_stream_success += 1 + tdLog.info(f"check stream task list[{check_stream_success}] sucessfully :") + else: + break + if check_stream_success == vgroups: + break + time.sleep(1) + tdLog.notice(f"Try to check stream status again, check times: {checktimes}") + if checktimes == timeout: + self.print_error_frame_info(sql,tdSql.queryResult,"status is ready,info is finished and history_task_id is NULL") + except Exception as e: + tdLog.notice(f"Try to check stream status again, check times: {checktimes}") + if checktimes == timeout: + self.print_error_frame_info(sql,tdSql.queryResult,"status is ready,info is finished and history_task_id is NULL") + i+=1 + time.sleep(1) + + + + + def drop_db(self, dbname="test"): """drop a db diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 7f6b129bb9..c121241414 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -61,6 +61,12 @@ class TDSql: def close(self): self.cursor.close() + def print_error_frame_info(self, elm, expect_elm): + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, self.sql, elm, expect_elm) + # tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) + raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) + def prepare(self, dbname="db", drop=True, **kwargs): tdLog.info(f"prepare database:{dbname}") s = 'reset query cache' @@ -331,13 +337,14 @@ class TDSql: return self.queryRows def checkRows(self, expectedRows): - if self.queryRows == expectedRows: - tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectedRows)) - return True - else: - caller = inspect.getframeinfo(inspect.stack()[1][0]) - args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectedRows) - tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args) + self.checkEqual(self.queryRows, expectedRows) + # if self.queryRows == expectedRows: + # tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectedRows)) + # return True + # else: + # caller = inspect.getframeinfo(inspect.stack()[1][0]) + # args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectedRows) + # tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args) def checkRows_not_exited(self, expectedRows): """ @@ -640,12 +647,8 @@ class TDSql: if self.__check_equal(elm, expect_elm): tdLog.info("sql:%s, elm:%s == expect_elm:%s" % (self.sql, elm, expect_elm)) return - - caller = inspect.getframeinfo(inspect.stack()[1][0]) - args = (caller.filename, caller.lineno, self.sql, elm, expect_elm) - # tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) - raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) - + self.print_error_frame_info(elm, expect_elm) + def checkNotEqual(self, elm, expect_elm): if elm != expect_elm: tdLog.info("sql:%s, elm:%s != expect_elm:%s" % (self.sql, elm, expect_elm))