tetst:check stream task status in common.py

This commit is contained in:
chenhaoran 2024-08-28 20:54:42 +08:00
parent 6e8d31a2df
commit 639ec1211d
5 changed files with 121 additions and 47 deletions

View File

@ -980,7 +980,7 @@ class TDCom:
tdSql.execute(f'drop stream if exists {stream_name};') tdSql.execute(f'drop stream if exists {stream_name};')
def check_stream_info_string(self, info): def check_stream_wal_info(self, wal_info):
# This method is defined for the 'info' column of the 'information_schema.ins_stream_tasks'. # This method is defined for the 'info' column of the 'information_schema.ins_stream_tasks'.
# Define the regular expression pattern to match the required format # Define the regular expression pattern to match the required format
# This pattern looks for a number followed by an optional space and then a pair of square brackets # This pattern looks for a number followed by an optional space and then a pair of square brackets
@ -988,7 +988,7 @@ class TDCom:
pattern = r'(\d+)\s*\[(\d+),\s*(\d+)\]' pattern = r'(\d+)\s*\[(\d+),\s*(\d+)\]'
# Use the search function from the re module to find a match in the string # Use the search function from the re module to find a match in the string
match = re.search(pattern, info) match = re.search(pattern, wal_info)
# Check if a match was found # Check if a match was found
if match: if match:
@ -1001,14 +1001,8 @@ class TDCom:
# If no match was found, or the pattern does not match the expected format, return False # If no match was found, or the pattern does not match the expected format, return False
return False return False
def print_error_frame_info(self, sql, elm, expect_elm):
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, elm, expect_elm)
# tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
def check_stream_task_status(self, stream_name, vgroups, timeout=60): def check_stream_task_status(self, stream_name, vgroups, stream_timeout=None):
"""check stream status """check stream status
Args: Args:
@ -1017,37 +1011,114 @@ class TDCom:
Returns: Returns:
str: status str: status
""" """
#check stream task rows timeout = self.stream_timeout if stream_timeout is None else stream_timeout
sql = f"select `task_id`,node_id,stream_name,status,info,history_task_id from information_schema.ins_stream_tasks where stream_name='{stream_name}' and `level`='source';"
tdSql.query(sql) #check stream task rows
sql_task_all = f"select `task_id`,node_id,stream_name,status,info,history_task_id from information_schema.ins_stream_tasks where stream_name='{stream_name}' and `level`='source';"
sql_task_status = f"select distinct(status) from information_schema.ins_stream_tasks where stream_name='{stream_name}' and `level`='source';"
sql_task_history = f"select distinct(history_task_id) from information_schema.ins_stream_tasks where stream_name='{stream_name}' and `level`='source';"
tdSql.query(sql_task_all)
tdSql.checkRows(vgroups) tdSql.checkRows(vgroups)
#check stream task status #check stream task status
checktimes=0 checktimes = 1
while checktimes < timeout: check_stream_success = 0
check_stream_success = 0 vgroup_num = 0
while checktimes <= timeout:
tdLog.notice(f"checktimes:{checktimes}")
try: try:
checktimes += 1 result_task_alll = tdSql.query(sql_task_all,row_tag=True)
for i in range(vgroups): result_task_alll_rows = tdSql.query(sql_task_all)
tdSql.query(sql) result_task_status = tdSql.query(sql_task_status,row_tag=True)
if tdSql.queryResult[i][3] == "ready" and self.check_stream_info_string(tdSql.queryResult[i][4]) and tdSql.queryResult[i][5] == None: result_task_status_rows = tdSql.query(sql_task_status)
check_stream_success += 1 result_task_history = tdSql.query(sql_task_history,row_tag=True)
tdLog.info(f"check stream task list[{check_stream_success}] sucessfully :") result_task_history_rows = tdSql.query(sql_task_history)
else:
break tdLog.notice(f"Try to check stream status, check times: {checktimes} and stream task list[{check_stream_success}]")
# print(f"result_task_status:{result_task_status},result_task_history:{result_task_history},result_task_alll:{result_task_alll}")
if result_task_status_rows == 1 and result_task_status ==[('ready',)] :
if result_task_history_rows == 1 and result_task_history == [(None,)] :
for vgroup_num in range(vgroups):
if self.check_stream_wal_info(result_task_alll[vgroup_num][4]) :
check_stream_success += 1
tdLog.info(f"check stream task list[{check_stream_success}] sucessfully :")
else:
check_stream_success = 0
break
if check_stream_success == vgroups: if check_stream_success == vgroups:
break break
time.sleep(1) time.sleep(1)
tdLog.notice(f"Try to check stream status again, check times: {checktimes}") checktimes += 1
if checktimes == timeout: vgroup_num = vgroup_num
self.print_error_frame_info(sql,tdSql.queryResult,"status is ready,info is finished and history_task_id is NULL")
except Exception as e: except Exception as e:
tdLog.notice(f"Try to check stream status again, check times: {checktimes}") tdLog.notice(f"Try to check stream status again, check times: {checktimes}")
if checktimes == timeout: checktimes += 1
self.print_error_frame_info(sql,tdSql.queryResult,"status is ready,info is finished and history_task_id is NULL") tdSql.print_error_frame_info(result_task_alll[vgroup_num],"status is ready,info is finished and history_task_id is NULL",sql_task_all)
i+=1 else:
time.sleep(1) checktimes_end = checktimes - 1
tdLog.notice(f"it has spend {checktimes_end} for checking stream task status but it failed")
if checktimes_end == timeout:
tdSql.print_error_frame_info(result_task_alll[vgroup_num],"status is ready,info is finished and history_task_id is NULL",sql_task_all)
# def check_stream_task_status(self, stream_name, vgroups, stream_timeout=None):
# """check stream status
# Args:
# stream_name (str): stream_name
# vgroups (int): vgroups
# Returns:
# str: status
# """
# timeout = self.stream_timeout if stream_timeout is None else stream_timeout
# #check stream task rows
# sql_task_all = f"select `task_id`,node_id,stream_name,status,info,history_task_id from information_schema.ins_stream_tasks where stream_name='{stream_name}' and `level`='source';"
# sql_task_status = f"select distinct(status) from information_schema.ins_stream_tasks where stream_name='{stream_name}' and `level`='source';"
# sql_task_history = f"select distinct(history_task_id) from information_schema.ins_stream_tasks where stream_name='{stream_name}' and `level`='source';"
# tdSql.query(sql_task_all)
# tdSql.checkRows(vgroups)
# #check stream task status
# checktimes = 1
# check_stream_success = 0
# vgroup_num = 0
# while checktimes <= timeout:
# print(f"checktimes:{checktimes}")
# try:
# result_task_alll = tdSql.query(sql_task_all,row_tag=True)
# result_task_alll_rows = tdSql.query(sql_task_all)
# result_task_status = tdSql.query(sql_task_status,row_tag=True)
# result_task_status_rows = tdSql.query(sql_task_status)
# result_task_history = tdSql.query(sql_task_history,row_tag=True)
# result_task_history_rows = tdSql.query(sql_task_history)
# tdLog.notice(f"Try to check stream status, check times: {checktimes} and stream task list[{check_stream_success}]")
# print(f"result_task_status:{result_task_status},result_task_history:{result_task_history},result_task_alll:{result_task_alll}")
# for vgroup_num in range(vgroups):
# if result_task_alll[vgroup_num][3] == "ready" and self.check_stream_wal_info(result_task_alll[vgroup_num][4]) and result_task_alll[vgroup_num][5] == None:
# check_stream_success += 1
# tdLog.info(f"check stream task list[{check_stream_success}] sucessfully :")
# else:
# check_stream_success = 0
# break
# if check_stream_success == vgroups:
# break
# time.sleep(1)
# checktimes += 1
# vgroup_num = vgroup_num
# except Exception as e:
# tdLog.notice(f"Try to check stream status again, check times: {checktimes}")
# checktimes += 1
# tdSql.print_error_frame_info(result_task_alll[vgroup_num],"status is ready,info is finished and history_task_id is NULL",sql_task_all)
# else:
# checktimes_end = checktimes - 1
# tdLog.notice(f"it has spend {checktimes_end} for checking stream task status but it failed")
# if checktimes_end == timeout:
# tdSql.print_error_frame_info(result_task_alll[vgroup_num],"status is ready,info is finished and history_task_id is NULL",sql_task_all)

View File

@ -61,9 +61,10 @@ class TDSql:
def close(self): def close(self):
self.cursor.close() self.cursor.close()
def print_error_frame_info(self, elm, expect_elm): def print_error_frame_info(self, elm, expect_elm, sql=None):
caller = inspect.getframeinfo(inspect.stack()[1][0]) caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, elm, expect_elm) print_sql = self.sql if sql is None else sql
args = (caller.filename, caller.lineno, print_sql, elm, expect_elm)
# tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) # tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)

View File

@ -97,6 +97,6 @@ else
if [ $python_error -ne 0 ] || [ $python_taos_error -ne 0 ] ; then if [ $python_error -ne 0 ] || [ $python_taos_error -ne 0 ] ; then
cat ${LOG_DIR}/*.info |grep "#" | grep -w "TDinternal" cat ${LOG_DIR}/*.info |grep "#" | grep -w "TDinternal"
fi fi
cat ${LOG_DIR}/*.asan cat ${LOG_DIR}/*.asan |grep "#" | grep -w "TDinternal"
exit 1 exit 1
fi fi

View File

@ -15,42 +15,42 @@ fi
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ]; do while [ -n "$PID" ]; do
echo kill -15 $PID echo kill -9 $PID
#pkill -15 taosd #pkill -9 taosd
kill -15 $PID kill -9 $PID
echo "Killing taosd processes" echo "Killing taosd processes"
if [ "$OS_TYPE" != "Darwin" ]; then if [ "$OS_TYPE" != "Darwin" ]; then
fuser -k -n tcp 6030 fuser -k -n tcp 6030
else else
lsof -nti:6030 | xargs kill -15 lsof -nti:6030 | xargs kill -9
fi fi
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
done done
PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ]; do while [ -n "$PID" ]; do
echo kill -15 $PID echo kill -9 $PID
#pkill -9 taos #pkill -9 taos
kill -15 $PID kill -9 $PID
echo "Killing taos processes" echo "Killing taos processes"
if [ "$OS_TYPE" != "Darwin" ]; then if [ "$OS_TYPE" != "Darwin" ]; then
fuser -k -n tcp 6030 fuser -k -n tcp 6030
else else
lsof -nti:6030 | xargs kill -15 lsof -nti:6030 | xargs kill -9
fi fi
PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'`
done done
PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ]; do while [ -n "$PID" ]; do
echo kill -15 $PID echo kill -9 $PID
#pkill -15 tmq_sim #pkill -9 tmq_sim
kill -15 $PID kill -9 $PID
echo "Killing tmq_sim processes" echo "Killing tmq_sim processes"
if [ "$OS_TYPE" != "Darwin" ]; then if [ "$OS_TYPE" != "Darwin" ]; then
fuser -k -n tcp 6030 fuser -k -n tcp 6030
else else
lsof -nti:6030 | xargs kill -15 lsof -nti:6030 | xargs kill -9
fi fi
PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'`
done done

View File

@ -22,6 +22,7 @@ from util.cases import tdCases
from util.sql import tdSql from util.sql import tdSql
from util.dnodes import tdDnodes from util.dnodes import tdDnodes
from util.dnodes import * from util.dnodes import *
from util.common import *
class TDTestCase: class TDTestCase:
updatecfgDict = {'maxSQLLength':1048576,'debugFlag': 135} updatecfgDict = {'maxSQLLength':1048576,'debugFlag': 135}
@ -158,7 +159,8 @@ class TDTestCase:
fake.pystr() ,fake.pystr() ,fake.pyfloat(),fake.pyfloat(),fake.random_int(min=-2147483647, max=2147483647, step=1))) fake.pystr() ,fake.pystr() ,fake.pyfloat(),fake.pyfloat(),fake.random_int(min=-2147483647, max=2147483647, step=1)))
# create stream # create stream
tdSql.execute('''create stream current_stream trigger at_once IGNORE EXPIRED 0 into stream_max_stable_1 as select _wstart as startts, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 where ts is not null interval (5s);''') stream_name="current_stream"
tdSql.execute(f'''create stream {stream_name} trigger at_once IGNORE EXPIRED 0 into stream_max_stable_1 as select _wstart as startts, _wend as wend, max(q_int) as max_int, min(q_bigint) as min_int from stable_1 where ts is not null interval (5s);''')
# insert data positive # insert data positive
for i in range(num_random*n): for i in range(num_random*n):
@ -287,8 +289,8 @@ class TDTestCase:
tdSql.query("select count(*) from hn_table_1_r;") tdSql.query("select count(*) from hn_table_1_r;")
tdSql.checkData(0,0,num_random*n) tdSql.checkData(0,0,num_random*n)
sleep(5)
# stream data check # stream data check
tdCom.check_stream_task_status(stream_name,vgroups,90)
tdSql.query("select startts,wend,max_int from stream_max_stable_1 ;") tdSql.query("select startts,wend,max_int from stream_max_stable_1 ;")
tdSql.checkRows(20) tdSql.checkRows(20)
tdSql.query("select sum(max_int) from stream_max_stable_1 ;") tdSql.query("select sum(max_int) from stream_max_stable_1 ;")