tetst:check stream task status in common.py
This commit is contained in:
parent
87dd15ed68
commit
6e8d31a2df
|
@ -979,6 +979,79 @@ class TDCom:
|
||||||
for stream_name in stream_name_list:
|
for stream_name in stream_name_list:
|
||||||
tdSql.execute(f'drop stream if exists {stream_name};')
|
tdSql.execute(f'drop stream if exists {stream_name};')
|
||||||
|
|
||||||
|
|
||||||
|
def check_stream_info_string(self, info):
|
||||||
|
# This method is defined for the 'info' column of the 'information_schema.ins_stream_tasks'.
|
||||||
|
# Define the regular expression pattern to match the required format
|
||||||
|
# This pattern looks for a number followed by an optional space and then a pair of square brackets
|
||||||
|
# containing two numbers separated by a comma.
|
||||||
|
pattern = r'(\d+)\s*\[(\d+),\s*(\d+)\]'
|
||||||
|
|
||||||
|
# Use the search function from the re module to find a match in the string
|
||||||
|
match = re.search(pattern, info)
|
||||||
|
|
||||||
|
# Check if a match was found
|
||||||
|
if match:
|
||||||
|
# Extract the numbers from the matching groups
|
||||||
|
first_number = int(match.group(1)) # The number before the brackets
|
||||||
|
second_number = int(match.group(3)) # The second number inside the brackets
|
||||||
|
|
||||||
|
# Compare the extracted numbers and return the result
|
||||||
|
return first_number == second_number
|
||||||
|
|
||||||
|
# If no match was found, or the pattern does not match the expected format, return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
def print_error_frame_info(self, sql, elm, expect_elm):
|
||||||
|
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
|
args = (caller.filename, caller.lineno, sql, elm, expect_elm)
|
||||||
|
# tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
|
||||||
|
raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
|
||||||
|
|
||||||
|
def check_stream_task_status(self, stream_name, vgroups, timeout=60):
|
||||||
|
"""check stream status
|
||||||
|
|
||||||
|
Args:
|
||||||
|
stream_name (str): stream_name
|
||||||
|
vgroups (int): vgroups
|
||||||
|
Returns:
|
||||||
|
str: status
|
||||||
|
"""
|
||||||
|
#check stream task rows
|
||||||
|
sql = f"select `task_id`,node_id,stream_name,status,info,history_task_id from information_schema.ins_stream_tasks where stream_name='{stream_name}' and `level`='source';"
|
||||||
|
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(vgroups)
|
||||||
|
#check stream task status
|
||||||
|
checktimes=0
|
||||||
|
while checktimes < timeout:
|
||||||
|
check_stream_success = 0
|
||||||
|
try:
|
||||||
|
checktimes += 1
|
||||||
|
for i in range(vgroups):
|
||||||
|
tdSql.query(sql)
|
||||||
|
if tdSql.queryResult[i][3] == "ready" and self.check_stream_info_string(tdSql.queryResult[i][4]) and tdSql.queryResult[i][5] == None:
|
||||||
|
check_stream_success += 1
|
||||||
|
tdLog.info(f"check stream task list[{check_stream_success}] sucessfully :")
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
if check_stream_success == vgroups:
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
tdLog.notice(f"Try to check stream status again, check times: {checktimes}")
|
||||||
|
if checktimes == timeout:
|
||||||
|
self.print_error_frame_info(sql,tdSql.queryResult,"status is ready,info is finished and history_task_id is NULL")
|
||||||
|
except Exception as e:
|
||||||
|
tdLog.notice(f"Try to check stream status again, check times: {checktimes}")
|
||||||
|
if checktimes == timeout:
|
||||||
|
self.print_error_frame_info(sql,tdSql.queryResult,"status is ready,info is finished and history_task_id is NULL")
|
||||||
|
i+=1
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def drop_db(self, dbname="test"):
|
def drop_db(self, dbname="test"):
|
||||||
"""drop a db
|
"""drop a db
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,12 @@ class TDSql:
|
||||||
def close(self):
|
def close(self):
|
||||||
self.cursor.close()
|
self.cursor.close()
|
||||||
|
|
||||||
|
def print_error_frame_info(self, elm, expect_elm):
|
||||||
|
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
|
args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
|
||||||
|
# tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
|
||||||
|
raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
|
||||||
|
|
||||||
def prepare(self, dbname="db", drop=True, **kwargs):
|
def prepare(self, dbname="db", drop=True, **kwargs):
|
||||||
tdLog.info(f"prepare database:{dbname}")
|
tdLog.info(f"prepare database:{dbname}")
|
||||||
s = 'reset query cache'
|
s = 'reset query cache'
|
||||||
|
@ -331,13 +337,14 @@ class TDSql:
|
||||||
return self.queryRows
|
return self.queryRows
|
||||||
|
|
||||||
def checkRows(self, expectedRows):
|
def checkRows(self, expectedRows):
|
||||||
if self.queryRows == expectedRows:
|
self.checkEqual(self.queryRows, expectedRows)
|
||||||
tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectedRows))
|
# if self.queryRows == expectedRows:
|
||||||
return True
|
# tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectedRows))
|
||||||
else:
|
# return True
|
||||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
# else:
|
||||||
args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectedRows)
|
# caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
|
# args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectedRows)
|
||||||
|
# tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
|
||||||
|
|
||||||
def checkRows_not_exited(self, expectedRows):
|
def checkRows_not_exited(self, expectedRows):
|
||||||
"""
|
"""
|
||||||
|
@ -640,12 +647,8 @@ class TDSql:
|
||||||
if self.__check_equal(elm, expect_elm):
|
if self.__check_equal(elm, expect_elm):
|
||||||
tdLog.info("sql:%s, elm:%s == expect_elm:%s" % (self.sql, elm, expect_elm))
|
tdLog.info("sql:%s, elm:%s == expect_elm:%s" % (self.sql, elm, expect_elm))
|
||||||
return
|
return
|
||||||
|
self.print_error_frame_info(elm, expect_elm)
|
||||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
|
||||||
args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
|
|
||||||
# tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
|
|
||||||
raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
|
|
||||||
|
|
||||||
def checkNotEqual(self, elm, expect_elm):
|
def checkNotEqual(self, elm, expect_elm):
|
||||||
if elm != expect_elm:
|
if elm != expect_elm:
|
||||||
tdLog.info("sql:%s, elm:%s != expect_elm:%s" % (self.sql, elm, expect_elm))
|
tdLog.info("sql:%s, elm:%s != expect_elm:%s" % (self.sql, elm, expect_elm))
|
||||||
|
|
Loading…
Reference in New Issue