Merge branch '3.0' of https://github.com/taosdata/TDengine into TEST/3.0/TD-28332

This commit is contained in:
zk66214 2024-01-18 20:08:18 +08:00
commit a2ceb580c5
8 changed files with 195 additions and 181 deletions

View File

@ -119,6 +119,8 @@ class TDTestCase(TBase):
# stop taosd test taos as server
sc.dnodeStop(idx)
etool.exeBinFile("taos", f'-n server', wait=False)
time.sleep(3)
eos.exe("pkill -9 taos")
# run
def run(self):

View File

@ -52,13 +52,14 @@ class TDTestCase(TBase):
tdLog.info(f"do query.")
# __group_key
sql = f"select count(*),_group_key(uti),uti from {self.stb} partition by uti;"
tdSql.execute(sql)
tdSql.checkRows(251)
sql = f"select count(*),_group_key(uti),uti from {self.stb} partition by uti"
tdSql.query(sql)
# column index 1 value same with 2
tdSql.checkSameColumn(1, 2)
sql = f"select count(*),_group_key(usi) from {self.stb} group by usi;"
tdSql.execute(sql)
tdSql.checkRows(997)
sql = f"select count(*),_group_key(usi),usi from {self.stb} group by usi limit 100;"
tdSql.query(sql)
tdSql.checkSameColumn(1, 2)
# tail
sql1 = "select ts,ui from d0 order by ts desc limit 5 offset 2;"

View File

@ -193,10 +193,10 @@ class TBase:
# sql
rows1 = tdSql.query(sql1,queryTimes=2)
res1 = copy.deepcopy(tdSql.queryResult)
res1 = copy.deepcopy(tdSql.res)
tdSql.query(sql2,queryTimes=2)
res2 = tdSql.queryResult
res2 = tdSql.res
rowlen1 = len(res1)
rowlen2 = len(res2)

View File

@ -795,7 +795,7 @@ class TDCom:
def getOneRow(self, location, containElm):
res_list = list()
if 0 <= location < tdSql.queryRows:
for row in tdSql.queryResult:
for row in tdSql.res:
if row[location] == containElm:
res_list.append(row)
return res_list
@ -943,7 +943,7 @@ class TDCom:
"""drop all streams
"""
tdSql.query("show streams")
stream_name_list = list(map(lambda x: x[0], tdSql.queryResult))
stream_name_list = list(map(lambda x: x[0], tdSql.res))
for stream_name in stream_name_list:
tdSql.execute(f'drop stream if exists {stream_name};')
@ -962,7 +962,7 @@ class TDCom:
"""drop all databases
"""
tdSql.query("show databases;")
db_list = list(map(lambda x: x[0], tdSql.queryResult))
db_list = list(map(lambda x: x[0], tdSql.res))
for dbname in db_list:
if dbname not in self.white_list and "telegraf" not in dbname:
tdSql.execute(f'drop database if exists `{dbname}`')
@ -1412,7 +1412,7 @@ class TDCom:
input_function (str): scalar
"""
tdSql.query(sql)
res = tdSql.queryResult
res = tdSql.res
if input_function in ["acos", "asin", "atan", "cos", "log", "pow", "sin", "sqrt", "tan"]:
tdSql.checkEqual(res[1][1], "DOUBLE")
tdSql.checkEqual(res[2][1], "DOUBLE")
@ -1490,7 +1490,7 @@ class TDCom:
bigint: bigint-ts
"""
tdSql.query(f'select cast({str_ts} as bigint)')
return tdSql.queryResult[0][0]
return tdSql.res[0][0]
def cast_query_data(self, query_data):
"""cast query-result for existed-stb
@ -1514,7 +1514,7 @@ class TDCom:
tdSql.query(f'select cast("{v}" as binary(6))')
else:
tdSql.query(f'select cast("{v}" as {" ".join(col_tag_type_list[i].strip().split(" ")[1:])})')
query_data_l[i] = tdSql.queryResult[0][0]
query_data_l[i] = tdSql.res[0][0]
else:
query_data_l[i] = v
nl.append(tuple(query_data_l))
@ -1566,9 +1566,9 @@ class TDCom:
if tag_value_list:
dvalue = len(self.tag_type_str.split(',')) - defined_tag_count
tdSql.query(sql1)
res1 = tdSql.queryResult
res1 = tdSql.res
tdSql.query(sql2)
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
res2 = self.cast_query_data(tdSql.res) if tag_value_list or use_exist_stb else tdSql.res
tdSql.sql = sql1
new_list = list()
if tag_value_list:
@ -1601,10 +1601,10 @@ class TDCom:
tdLog.info("query retrying ...")
new_list = list()
tdSql.query(sql1)
res1 = tdSql.queryResult
res1 = tdSql.res
tdSql.query(sql2)
# res2 = tdSql.queryResult
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
# res2 = tdSql.res
res2 = self.cast_query_data(tdSql.res) if tag_value_list or use_exist_stb else tdSql.res
tdSql.sql = sql1
if tag_value_list:
@ -1643,10 +1643,10 @@ class TDCom:
tdLog.info("query retrying ...")
new_list = list()
tdSql.query(sql1)
res1 = tdSql.queryResult
res1 = tdSql.res
tdSql.query(sql2)
# res2 = tdSql.queryResult
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
# res2 = tdSql.res
res2 = self.cast_query_data(tdSql.res) if tag_value_list or use_exist_stb else tdSql.res
tdSql.sql = sql1
if tag_value_list:

View File

@ -86,10 +86,9 @@ class ConfigureyCluster:
count=0
while count < 5:
tdSql.query("select * from information_schema.ins_dnodes")
# tdLog.debug(tdSql.queryResult)
status=0
for i in range(self.dnodeNums):
if tdSql.queryResult[i][4] == "ready":
if tdSql.res[i][4] == "ready":
status+=1
# tdLog.debug(status)

View File

@ -78,6 +78,107 @@ class TDSql:
self.cursor.execute(s)
time.sleep(2)
#
# do execute
#
def query(self, sql, row_tag=None, queryTimes=10, count_expected_res=None):
self.sql = sql
i=1
while i <= queryTimes:
try:
self.cursor.execute(sql)
self.res = self.cursor.fetchall()
self.queryRows = len(self.res)
self.queryCols = len(self.cursor.description)
if count_expected_res is not None:
counter = 0
while count_expected_res != self.res[0][0]:
self.cursor.execute(sql)
self.res = self.cursor.fetchall()
if counter < queryTimes:
counter += 0.5
time.sleep(0.5)
else:
return False
if row_tag:
return self.res
return self.queryRows
except Exception as e:
tdLog.notice("Try to query again, query times: %d "%i)
if i == queryTimes:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
i+=1
time.sleep(1)
pass
def executeTimes(self, sql, times):
for i in range(times):
try:
return self.cursor.execute(sql)
except BaseException:
time.sleep(1)
continue
def execute(self, sql, queryTimes=30, show=False):
self.sql = sql
if show:
tdLog.info(sql)
i=1
while i <= queryTimes:
try:
self.affectedRows = self.cursor.execute(sql)
return self.affectedRows
except Exception as e:
tdLog.notice("Try to execute sql again, query times: %d "%i)
if i == queryTimes:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
i+=1
time.sleep(1)
pass
# execute many sql
def executes(self, sqls, queryTimes=30, show=False):
for sql in sqls:
self.execute(sql, queryTimes, show)
def waitedQuery(self, sql, expectRows, timeout):
tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectRows, timeout))
self.sql = sql
try:
for i in range(timeout):
self.cursor.execute(sql)
self.res = self.cursor.fetchall()
self.queryRows = len(self.res)
self.queryCols = len(self.cursor.description)
tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectRows, self.queryRows))
if self.queryRows >= expectRows:
return (self.queryRows, i)
time.sleep(1)
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return (self.queryRows, timeout)
def is_err_sql(self, sql):
err_flag = True
try:
self.cursor.execute(sql)
except BaseException:
err_flag = False
return False if err_flag else True
def error(self, sql, expectedErrno = None, expectErrInfo = None):
caller = inspect.getframeinfo(inspect.stack()[1][0])
expectErrNotOccured = True
@ -95,7 +196,7 @@ class TDSql:
else:
self.queryRows = 0
self.queryCols = 0
self.queryResult = None
self.res = None
if expectedErrno != None:
if expectedErrno == self.errno:
@ -115,49 +216,25 @@ class TDSql:
return self.error_info
def query(self, sql, row_tag=None, queryTimes=10, count_expected_res=None):
#
# get session
#
def getData(self, row, col):
self.checkRowCol(row, col)
return self.res[row][col]
def getResult(self, sql):
self.sql = sql
i=1
while i <= queryTimes:
try:
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
if count_expected_res is not None:
counter = 0
while count_expected_res != self.queryResult[0][0]:
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
if counter < queryTimes:
counter += 0.5
time.sleep(0.5)
else:
return False
if row_tag:
return self.queryResult
return self.queryRows
except Exception as e:
tdLog.notice("Try to query again, query times: %d "%i)
if i == queryTimes:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
i+=1
time.sleep(1)
pass
def is_err_sql(self, sql):
err_flag = True
try:
self.cursor.execute(sql)
except BaseException:
err_flag = False
return False if err_flag else True
self.res = self.cursor.fetchall()
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return self.res
def getVariable(self, search_attr):
'''
@ -193,29 +270,19 @@ class TDSql:
return col_name_list, col_type_list
return col_name_list
def waitedQuery(self, sql, expectRows, timeout):
tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectRows, timeout))
self.sql = sql
try:
for i in range(timeout):
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectRows, self.queryRows))
if self.queryRows >= expectRows:
return (self.queryRows, i)
time.sleep(1)
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return (self.queryRows, timeout)
def getRows(self):
return self.queryRows
# get first value
def getFirstValue(self, sql) :
self.query(sql)
return self.getData(0, 0)
#
# check session
#
def checkRows(self, expectRows):
if self.queryRows == expectRows:
tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows))
@ -273,26 +340,26 @@ class TDSql:
self.checkRowCol(row, col)
if self.queryResult[row][col] != data:
if self.res[row][col] != data:
if self.cursor.istype(col, "TIMESTAMP"):
# suppose user want to check nanosecond timestamp if a longer data passed``
if isinstance(data,str) :
if (len(data) >= 28):
if self.queryResult[row][col] == _parse_ns_timestamp(data):
if self.res[row][col] == _parse_ns_timestamp(data):
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
else:
if self.queryResult[row][col].astimezone(datetime.timezone.utc) == _parse_datetime(data).astimezone(datetime.timezone.utc):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
if self.res[row][col].astimezone(datetime.timezone.utc) == _parse_datetime(data).astimezone(datetime.timezone.utc):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.res[row][col]} == expect:{data}")
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
elif isinstance(data,int):
@ -304,72 +371,72 @@ class TDSql:
precision = 'ns'
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
success = False
if precision == 'ms':
dt_obj = self.queryResult[row][col]
dt_obj = self.res[row][col]
ts = int(int((dt_obj-datetime.datetime.fromtimestamp(0,dt_obj.tzinfo)).total_seconds())*1000) + int(dt_obj.microsecond/1000)
if ts == data:
success = True
elif precision == 'us':
dt_obj = self.queryResult[row][col]
dt_obj = self.res[row][col]
ts = int(int((dt_obj-datetime.datetime.fromtimestamp(0,dt_obj.tzinfo)).total_seconds())*1e6) + int(dt_obj.microsecond)
if ts == data:
success = True
elif precision == 'ns':
if data == self.queryResult[row][col]:
if data == self.res[row][col]:
success = True
if success:
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
elif isinstance(data,datetime.datetime):
dt_obj = self.queryResult[row][col]
dt_obj = self.res[row][col]
delt_data = data-datetime.datetime.fromtimestamp(0,data.tzinfo)
delt_result = self.queryResult[row][col] - datetime.datetime.fromtimestamp(0,self.queryResult[row][col].tzinfo)
delt_result = self.res[row][col] - datetime.datetime.fromtimestamp(0,self.res[row][col].tzinfo)
if delt_data == delt_result:
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
if str(self.queryResult[row][col]) == str(data):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
if str(self.res[row][col]) == str(data):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.res[row][col]} == expect:{data}")
if(show):
tdLog.info("check successfully")
return
elif isinstance(data, float):
if abs(data) >= 1 and abs((self.queryResult[row][col] - data) / data) <= 0.000001:
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
if abs(data) >= 1 and abs((self.res[row][col] - data) / data) <= 0.000001:
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.res[row][col]} == expect:{data}")
if(show):
tdLog.info("check successfully")
elif abs(data) < 1 and abs(self.queryResult[row][col] - data) <= 0.000001:
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}")
elif abs(data) < 1 and abs(self.res[row][col] - data) <= 0.000001:
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.res[row][col]} == expect:{data}")
if(show):
tdLog.info("check successfully")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
if(show):
tdLog.info("check successfully")
@ -397,23 +464,23 @@ class TDSql:
def checkDataNoExit(self, row, col, data):
if self.checkRowColNoExit(row, col) == False:
return False
if self.queryResult[row][col] != data:
if self.res[row][col] != data:
if self.cursor.istype(col, "TIMESTAMP"):
# suppose user want to check nanosecond timestamp if a longer data passed
if (len(data) >= 28):
if pd.to_datetime(self.queryResult[row][col]) == pd.to_datetime(data):
if pd.to_datetime(self.res[row][col]) == pd.to_datetime(data):
return True
else:
if self.queryResult[row][col] == _parse_datetime(data):
if self.res[row][col] == _parse_datetime(data):
return True
return False
if str(self.queryResult[row][col]) == str(data):
if str(self.res[row][col]) == str(data):
return True
elif isinstance(data, float):
if abs(data) >= 1 and abs((self.queryResult[row][col] - data) / data) <= 0.000001:
if abs(data) >= 1 and abs((self.res[row][col] - data) / data) <= 0.000001:
return True
elif abs(data) < 1 and abs(self.queryResult[row][col] - data) <= 0.000001:
elif abs(data) < 1 and abs(self.res[row][col] - data) <= 0.000001:
return True
else:
return False
@ -437,56 +504,6 @@ class TDSql:
self.query(sql)
self.checkData(row, col, data)
def getData(self, row, col):
self.checkRowCol(row, col)
return self.queryResult[row][col]
def getResult(self, sql):
self.sql = sql
try:
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return self.queryResult
def executeTimes(self, sql, times):
for i in range(times):
try:
return self.cursor.execute(sql)
except BaseException:
time.sleep(1)
continue
def execute(self, sql, queryTimes=30, show=False):
self.sql = sql
if show:
tdLog.info(sql)
i=1
while i <= queryTimes:
try:
self.affectedRows = self.cursor.execute(sql)
return self.affectedRows
except Exception as e:
tdLog.notice("Try to execute sql again, query times: %d "%i)
if i == queryTimes:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
i+=1
time.sleep(1)
pass
# execute many sql
def executes(self, sqls, queryTimes=30, show=False):
for sql in sqls:
self.execute(sql, queryTimes, show)
def checkAffectedRows(self, expectAffectedRows):
if self.affectedRows != expectAffectedRows:
caller = inspect.getframeinfo(inspect.stack()[1][0])
@ -545,16 +562,21 @@ class TDSql:
self.query(sql)
self.checkData(0, 0, expectCnt)
# get first value
def getFirstValue(self, sql) :
self.query(sql)
return self.getData(0, 0)
# expect first value
def checkFirstValue(self, sql, expect):
self.query(sql)
self.checkData(0, 0, expect)
# colIdx1 value same with colIdx2
def checkSameColumn(self, c1, c2):
for i in range(self.queryRows):
if self.res[i][c1] != self.res[i][c2]:
tdLog.exit(f"Not same. row={i} col1={c1} col2={c2}. {self.res[i][c1]}!={self.res[i][c2]}")
tdLog.info(f"check {self.queryRows} rows two column value same. column index [{c1},{c2}]")
#
# others session
#
def get_times(self, time_str, precision="ms"):
caller = inspect.getframeinfo(inspect.stack()[1][0])

View File

@ -560,17 +560,6 @@ if __name__ == "__main__":
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
# tdSql.init(conn.cursor())
# tdSql.execute("create qnode on dnode 1")
# tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
# tdSql.query("show local variables;")
# for i in range(tdSql.queryRows):
# if tdSql.queryResult[i][0] == "queryPolicy" :
# if int(tdSql.queryResult[i][1]) == int(queryPolicy):
# tdLog.info('alter queryPolicy to %d successfully'%queryPolicy)
# else :
# tdLog.debug(tdSql.queryResult)
# tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
cursor = conn.cursor()
cursor.execute("create qnode on dnode 1")

View File

@ -17,6 +17,7 @@ fi
,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3_basic.py -L 3 -D 1
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py