From 70297a4648a6321ed2991cac27f6c41831b31ba3 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 22 Mar 2023 17:17:53 +0800 Subject: [PATCH 01/24] test: first submit udf python case --- tests/system-test/0-others/udfpy/af_min.py | 30 ++ tests/system-test/0-others/udfpy/af_null.py | 19 ++ tests/system-test/0-others/udfpy/af_sum.py | 28 ++ .../0-others/udfpy/sf_multi_args.py | 20 ++ tests/system-test/0-others/udfpy/sf_null.py | 16 + tests/system-test/0-others/udfpy/sf_origin.py | 15 + tests/system-test/0-others/udfpy_main.py | 298 ++++++++++++++++++ 7 files changed, 426 insertions(+) create mode 100644 tests/system-test/0-others/udfpy/af_min.py create mode 100644 tests/system-test/0-others/udfpy/af_null.py create mode 100644 tests/system-test/0-others/udfpy/af_sum.py create mode 100644 tests/system-test/0-others/udfpy/sf_multi_args.py create mode 100644 tests/system-test/0-others/udfpy/sf_null.py create mode 100644 tests/system-test/0-others/udfpy/sf_origin.py create mode 100644 tests/system-test/0-others/udfpy_main.py diff --git a/tests/system-test/0-others/udfpy/af_min.py b/tests/system-test/0-others/udfpy/af_min.py new file mode 100644 index 0000000000..0f4e579761 --- /dev/null +++ b/tests/system-test/0-others/udfpy/af_min.py @@ -0,0 +1,30 @@ +import pickle + +def init(): + pass + +def destroy(): + pass + +def start(): + return pickle.dumps([]) + +def finish(buf): + mins = pickle.loads(buf) + min_val = None + for min in mins: + if min < min_val: + min_val = min + return min_val + +def reduce(datablock, buf): + (rows, cols) = datablock.shape() + mins = pickle.loads(buf) + min = None + for i in range(rows): + val = datablock.data(i, 0) + if min is None or (val is not None and val < min) : + min = val + if min is not None: + mins.append(min) + return pickle.dumps(mins) diff --git a/tests/system-test/0-others/udfpy/af_null.py b/tests/system-test/0-others/udfpy/af_null.py new file mode 100644 index 0000000000..230eac6888 --- /dev/null +++ b/tests/system-test/0-others/udfpy/af_null.py @@ -0,0 +1,19 @@ +import pickle + +def init(): + pass + +def destroy(): + pass + +def start(): + return pickle.dumps([]) + +def finish(buf): + return None + +def reduce(datablock, buf): + (rows, cols) = datablock.shape() + mins = pickle.loads(buf) + mins.append(None) + return pickle.dumps(mins) diff --git a/tests/system-test/0-others/udfpy/af_sum.py b/tests/system-test/0-others/udfpy/af_sum.py new file mode 100644 index 0000000000..e32cf3fa31 --- /dev/null +++ b/tests/system-test/0-others/udfpy/af_sum.py @@ -0,0 +1,28 @@ +import pickle + +def init(): + pass + +def destroy(): + pass + +def start(): + return pickle.dumps([]) + +def finish(buf): + sums = pickle.loads(buf) + all = 0 + for sum in sums: + all += sum + return all + +def reduce(datablock, buf): + (rows, cols) = datablock.shape() + sums = pickle.loads(buf) + sum = 0 + for i in range(rows): + val = datablock.data(i, 0) + if val is not None: + sum += val + sums.append(sum) + return pickle.dumps(sums) diff --git a/tests/system-test/0-others/udfpy/sf_multi_args.py b/tests/system-test/0-others/udfpy/sf_multi_args.py new file mode 100644 index 0000000000..5d8194b3ad --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_multi_args.py @@ -0,0 +1,20 @@ +# init +def init(): + pass + +# destory +def destory(): + pass + +# return origin column one value +def process(block): + (nrows, ncols) = block.shape() + results = [] + for i in range(nrows): + rows = [] + for j in range(ncols): + val = block.data(i, j) + rows.append(val) + results.append(','.join(rows)) + return results + diff --git a/tests/system-test/0-others/udfpy/sf_null.py b/tests/system-test/0-others/udfpy/sf_null.py new file mode 100644 index 0000000000..a82b147d01 --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_null.py @@ -0,0 +1,16 @@ + +# init +def init(): + pass + +# destory +def destory(): + pass + +# return origin column one value +def process(block): + (rows, cols) = block.shape() + results = [] + for i in range(rows): + results.append(None) + return results \ No newline at end of file diff --git a/tests/system-test/0-others/udfpy/sf_origin.py b/tests/system-test/0-others/udfpy/sf_origin.py new file mode 100644 index 0000000000..7588d0402d --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_origin.py @@ -0,0 +1,15 @@ +# init +def init(): + pass + +# destory +def destory(): + pass + +# return origin column one value +def process(block): + (rows, cols) = block.shape() + results = [] + for i in range(rows): + results.append(block.data(i,0)) + return results diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py new file mode 100644 index 0000000000..9cc329ca19 --- /dev/null +++ b/tests/system-test/0-others/udfpy_main.py @@ -0,0 +1,298 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * + +import random +import os + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + + # udf path + self.udf_path = os.path.dirname(os.path.realpath(__file__)) + "/udfpy" + + + self.column_dict = { + 'ts': 'timestamp', + 'col1': 'tinyint', + 'col2': 'smallint', + 'col3': 'int', + 'col4': 'bigint', + 'col5': 'tinyint unsigned', + 'col6': 'smallint unsigned', + 'col7': 'int unsigned', + 'col8': 'bigint unsigned', + 'col9': 'float', + 'col10': 'double', + 'col11': 'bool', + 'col12': 'varchar(20)', + 'col13': 'nchar(20)', + 'col14': 'timestamp' + } + self.tag_dict = { + 't1': 'tinyint', + 't2': 'smallint', + 't3': 'int', + 't4': 'bigint', + 't5': 'tinyint unsigned', + 't6': 'smallint unsigned', + 't7': 'int unsigned', + 't8': 'bigint unsigned', + 't9': 'float', + 't10': 'double', + 't11': 'bool', + 't12': 'varchar(20)', + 't13': 'nchar(20)', + 't14': 'timestamp' + } + + def set_stb_sql(self,stbname,column_dict,tag_dict): + column_sql = '' + tag_sql = '' + for k,v in column_dict.items(): + column_sql += f"{k} {v}, " + for k,v in tag_dict.items(): + tag_sql += f"{k} {v}, " + create_stb_sql = f'create stable {stbname} ({column_sql[:-2]}) tags ({tag_sql[:-2]})' + return create_stb_sql + + # create stable and child tables + def create_table(self, stbname, tbname, count): + tdSql.prepare() + tdSql.execute('use db') + self.child_count = count + self.stbname = stbname + self.tbname = tbname + + # create stable + create_table_sql = self.set_stb_sql(stbname, self.column_dict, self.tag_dict) + tdSql.execute(create_table_sql) + + # create child table + for i in range(count): + ti = i % 128 + tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}",now' + sql = f'create table {tbname}{i} using {stbname} tags({tags})' + tdSql.execute(sql) + + tdLog.info(f" create {count} child tables ok.") + + def create_udfpy_impl(self, funs, filename): + for name, outtype in funs.items(): + sql = f' create function {name} as "{self.udf_path}/{filename} {outtype} " language "Python" ' + tdSql.execute(sql) + + + def create_udfpy_dicts(self, dicts, filename): + for k,v in dicts: + self.create_udfpy_impl(k, v, filename) + + # create_udfpy_function + def create_udfpy_function(self): + # function + + + # scalar funciton + self.scalar_funs = { + 'sf1': 'tinyint', + 'sf2': 'smallint', + 'sf3': 'int', + 'sf4': 'bigint', + 'sf5': 'tinyint unsigned', + 'sf6': 'smallint unsigned', + 'sf7': 'int unsigned', + 'sf8': 'bigint unsigned', + 'sf9': 'float', + 'sf10': 'double', + 'sf11': 'bool', + 'sf12': 'varchar(20)', + 'sf13': 'nchar(20)', + 'sf14': 'timestamp' + } + # agg function + self.agg_funs = { + 'af1': 'tinyint', + 'af2': 'smallint', + 'af3': 'int', + 'af4': 'bigint', + 'af5': 'tinyint unsigned', + 'af6': 'smallint unsigned', + 'af7': 'int unsigned', + 'af8': 'bigint unsigned', + 'af9': 'float', + 'af10': 'double', + 'af11': 'bool', + 'af12': 'varchar(20)', + 'af13': 'nchar(20)', + 'af14': 'timestamp' + } + + # files + self.create_udfpy_function(self.scalar_funs, "fun_origin") + self.create_udf_sf("sf_multi_args", "binary(1024)") + + #self.create_udfpy_function(self.agg_funs, None) + + def create_udf_sf(self, fun_name, out_type): + sql = f'create function {fun_name} as {self.udf_path}{fun_name}.py {out_type} language "Python"' + tdSql.execute(sql) + + def create_udf_af(self, fun_name, out_type, bufsize): + sql = f'create aggregate function {fun_name} as {self.udf_path}{fun_name}.py {out_type} bufsize {bufsize} language "Python"' + tdSql.execute(sql) + + + # sql1 query result eual with sql2 + def verify_same_result(self, sql1, sql2): + # query + result1 = tdSql.getResult(sql1) + tdSql.query(sql2) + + for i, row in enumerate(result1): + for j , val in enumerate(row): + tdSql.checkData(i, j, result1[i][j]) + + # same value like select col1, udf_fun1(col1) from st + def verfiy_same_value(sql): + tdSql.query(sql) + nrows = tdSql.getRows() + for i in range(nrows): + val = tdSql.getData(i, 0) + tdSql.checkData(i, 1, val) + + # verify multi values + def verify_same_multi_values(self, sql): + tdSql.query(sql) + nrows = tdSql.getRows() + for i in range(nrows): + udf_val = tdSql.getData(i, 0) + vals = udf_val.split(',') + for j,val in enumerate(vals, 1): + tdSql.checkData(i, j, val) + + # query multi-args + def query_multi_args(self): + cols = self.column_dict.keys() + self.tag_dict.keys() + ncols = len(cols) + for i in range(2, ncols): + sample = random.sample(i) + cols_name = ','.join(sample) + sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname}' + self.verify_same_multi_values(sql) + + + # query_udfpy + def query_scalar_udfpy(self): + # col + for col_name, col_type in self.column_dict: + for fun_name, out_type in self.scalar_funs: + sql = f'select {col_name} {fun_name}({col_name}) from {self.stbname}' + self.verify_same_value(sql) + + # multi-args + self.query_multi_args() + + # create aggregate + def create_aggr_udfpy(self): + # all type check null + for col_name, col_type in self.column_dict: + self.create_udf_af(f"af_null_{col_name}", f"{col_type}", 10*1024*1024) + + # min + self.create_udf_af(f"af_min_float", f"float", 10*1024*1024) + self.create_udf_af(f"af_min_int", f"int", 10*1024*1024) + + # sum + self.create_udf_af(f"af_sum_float", f"float", 100*1024*1024) + self.create_udf_af(f"af_sum_int", f"sum", 100*1024*1024) + + + # query aggregate + def query_aggr_udfpy(self) : + # all type check null + for col_name, col_type in self.column_dict: + fun_name = f"af_null_{col_name}" + sql = f'select {fun_name}(col_name) from {self.stbname}' + tdSql.query(sql) + tdSql.checkData(0, 0, "NULL") + + # min + sql = f'select min(col3), af_min_int(col3) from {self.stbname}' + self.verfiy_same_value(sql) + sql = f'select min(col7), af_min_int(col7) from {self.stbname}' + self.verfiy_same_value(sql) + sql = f'select min(col9), af_min_float(col9) from {self.stbname}' + self.verfiy_same_value(sql) + + # sum + sql = f'select sum(col3), af_sum_int(col3) from {self.stbname}' + self.verfiy_same_value(sql) + sql = f'select sum(col7), af_sum_int(col7) from {self.stbname}' + self.verfiy_same_value(sql) + sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}' + self.verfiy_same_value(sql) + + + + + # insert to child table d1 data + def insert_data(self, tbname, rows): + ts = 1670000000000 + for i in range(self.child_count): + for j in range(rows): + ti = j % 128 + cols = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}",now' + sql = f'insert into {tbname}{i} values({ts+j},{cols});' + tdSql.execute(sql) + + tdLog.info(f" insert {rows} for each child table.") + + + # run + def run(self): + # var + stable = "meters" + tbname = "d" + count = 100 + # do + self.create_table(stable, tbname, count) + self.insert_data(tbname, 1000) + + # scalar + self.create_scalar_udfpy() + self.query_scalar_udfpy() + + # aggregate + self.create_aggr_udfpy() + self.query_aggr_udfpy() + + + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file From 8ac1e7f02c62496db1b63a44bb9366e646344a87 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 22 Mar 2023 17:18:24 +0800 Subject: [PATCH 02/24] test: first submit udf python case --- tests/system-test/0-others/udfpy_main.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index 9cc329ca19..1b5a1bab38 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -252,8 +252,6 @@ class TDTestCase: self.verfiy_same_value(sql) sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}' self.verfiy_same_value(sql) - - # insert to child table d1 data @@ -266,6 +264,15 @@ class TDTestCase: sql = f'insert into {tbname}{i} values({ts+j},{cols});' tdSql.execute(sql) + # partial columns upate + sql = f'insert into {tbname}0(ts, col1, col9, col11) values(now, 100, 200, 0)' + tdSql.execute(sql) + sql = f'insert into {tbname}0(ts, col2, col5, col8) values(now, 100, 200, 300)' + tdSql.execute(sql) + sql = f'insert into {tbname}0(ts, col3, col7, col13) values(now, null, null, null)' + tdSql.execute(sql) + sql = f'insert into {tbname}0(ts) values(now)' + tdSql.execute(sql) tdLog.info(f" insert {rows} for each child table.") @@ -288,8 +295,6 @@ class TDTestCase: self.query_aggr_udfpy() - - def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) From cb59cc830846f040fc064312cabd78a7226342b1 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 22 Mar 2023 18:28:50 +0800 Subject: [PATCH 03/24] test: select nested query of udf --- tests/system-test/0-others/udfpy_main.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index 1b5a1bab38..0de48269e9 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -207,8 +207,11 @@ class TDTestCase: # col for col_name, col_type in self.column_dict: for fun_name, out_type in self.scalar_funs: - sql = f'select {col_name} {fun_name}({col_name}) from {self.stbname}' + sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname}' self.verify_same_value(sql) + sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc' + self.verify_same_value(sql) + # multi-args self.query_multi_args() From 6345952f8829b9219f785d95458c987c9d0661c2 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 22 Mar 2023 20:30:25 +0800 Subject: [PATCH 04/24] test: debug python file --- tests/pytest/util/cases.py | 2 + .../0-others/udfpy/sf_multi_args.py | 4 +- tests/system-test/0-others/udfpy/sf_null.py | 4 +- tests/system-test/0-others/udfpy/sf_origin.py | 4 +- tests/system-test/0-others/udfpy_main.py | 45 +++++++++---------- 5 files changed, 29 insertions(+), 30 deletions(-) diff --git a/tests/pytest/util/cases.py b/tests/pytest/util/cases.py index 4830d2f8b0..536b8f30d3 100644 --- a/tests/pytest/util/cases.py +++ b/tests/pytest/util/cases.py @@ -17,6 +17,7 @@ import time import datetime import inspect import importlib +import traceback from util.log import * @@ -75,6 +76,7 @@ class TDCases: case.run() except Exception as e: tdLog.notice(repr(e)) + traceback.print_exc() tdLog.exit("%s failed" % (fileName)) case.stop() runNum += 1 diff --git a/tests/system-test/0-others/udfpy/sf_multi_args.py b/tests/system-test/0-others/udfpy/sf_multi_args.py index 5d8194b3ad..f5585b4e87 100644 --- a/tests/system-test/0-others/udfpy/sf_multi_args.py +++ b/tests/system-test/0-others/udfpy/sf_multi_args.py @@ -2,8 +2,8 @@ def init(): pass -# destory -def destory(): +# destroy +def destroy(): pass # return origin column one value diff --git a/tests/system-test/0-others/udfpy/sf_null.py b/tests/system-test/0-others/udfpy/sf_null.py index a82b147d01..c22ca95b19 100644 --- a/tests/system-test/0-others/udfpy/sf_null.py +++ b/tests/system-test/0-others/udfpy/sf_null.py @@ -3,8 +3,8 @@ def init(): pass -# destory -def destory(): +# destroy +def destroy(): pass # return origin column one value diff --git a/tests/system-test/0-others/udfpy/sf_origin.py b/tests/system-test/0-others/udfpy/sf_origin.py index 7588d0402d..9158e044d2 100644 --- a/tests/system-test/0-others/udfpy/sf_origin.py +++ b/tests/system-test/0-others/udfpy/sf_origin.py @@ -2,8 +2,8 @@ def init(): pass -# destory -def destory(): +# destroy +def destroy(): pass # return origin column one value diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index 0de48269e9..07fd64d983 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -98,21 +98,15 @@ class TDTestCase: tdLog.info(f" create {count} child tables ok.") - def create_udfpy_impl(self, funs, filename): - for name, outtype in funs.items(): - sql = f' create function {name} as "{self.udf_path}/{filename} {outtype} " language "Python" ' + # create with dicts + def create_sf_dicts(self, dicts, filename): + for fun_name, out_type in dicts.items(): + sql = f' create function {fun_name} as "{self.udf_path}/{filename}" outputtype {out_type} language "Python" ' tdSql.execute(sql) - - - def create_udfpy_dicts(self, dicts, filename): - for k,v in dicts: - self.create_udfpy_impl(k, v, filename) + tdLog.info(sql) # create_udfpy_function - def create_udfpy_function(self): - # function - - + def create_scalar_udfpy(self): # scalar funciton self.scalar_funs = { 'sf1': 'tinyint', @@ -149,18 +143,19 @@ class TDTestCase: } # files - self.create_udfpy_function(self.scalar_funs, "fun_origin") + self.create_sf_dicts(self.scalar_funs, "sf_origin.py") self.create_udf_sf("sf_multi_args", "binary(1024)") - #self.create_udfpy_function(self.agg_funs, None) - + # fun_name == fun_name.py def create_udf_sf(self, fun_name, out_type): - sql = f'create function {fun_name} as {self.udf_path}{fun_name}.py {out_type} language "Python"' + sql = f'create function {fun_name} as "{self.udf_path}/{fun_name}.py" outputtype {out_type} language "Python" ' tdSql.execute(sql) + tdLog.info(sql) def create_udf_af(self, fun_name, out_type, bufsize): - sql = f'create aggregate function {fun_name} as {self.udf_path}{fun_name}.py {out_type} bufsize {bufsize} language "Python"' + sql = f'create aggregate function {fun_name} as "{self.udf_path}/{fun_name}.py" outputtype {out_type} bufsize {bufsize} language "Python" ' tdSql.execute(sql) + tdLog.info(sql) # sql1 query result eual with sql2 @@ -174,7 +169,7 @@ class TDTestCase: tdSql.checkData(i, j, result1[i][j]) # same value like select col1, udf_fun1(col1) from st - def verfiy_same_value(sql): + def verify_same_value(sql): tdSql.query(sql) nrows = tdSql.getRows() for i in range(nrows): @@ -205,11 +200,13 @@ class TDTestCase: # query_udfpy def query_scalar_udfpy(self): # col - for col_name, col_type in self.column_dict: - for fun_name, out_type in self.scalar_funs: + for col_name, col_type in self.column_dict.items(): + for fun_name, out_type in self.scalar_funs.items(): sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname}' + tdLog.info(sql) self.verify_same_value(sql) sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc' + tdLog.info(sql) self.verify_same_value(sql) @@ -262,8 +259,8 @@ class TDTestCase: ts = 1670000000000 for i in range(self.child_count): for j in range(rows): - ti = j % 128 - cols = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}",now' + tj = j % 128 + cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}",now' sql = f'insert into {tbname}{i} values({ts+j},{cols});' tdSql.execute(sql) @@ -284,10 +281,10 @@ class TDTestCase: # var stable = "meters" tbname = "d" - count = 100 + count = 10 # do self.create_table(stable, tbname, count) - self.insert_data(tbname, 1000) + self.insert_data(tbname, 100) # scalar self.create_scalar_udfpy() From e2e76f9e287ab828e79118429c76f329942b480f Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 23 Mar 2023 19:22:40 +0800 Subject: [PATCH 05/24] test: udfpy_main.py case passed ! --- .../system-test/0-others/tag_index_cluster.py | 171 ++++++++++++++++++ tests/system-test/0-others/udfpy/af_min.py | 2 +- .../0-others/udfpy/sf_multi_args.py | 5 +- tests/system-test/0-others/udfpy_main.py | 71 +++++--- 4 files changed, 218 insertions(+), 31 deletions(-) create mode 100644 tests/system-test/0-others/tag_index_cluster.py diff --git a/tests/system-test/0-others/tag_index_cluster.py b/tests/system-test/0-others/tag_index_cluster.py new file mode 100644 index 0000000000..b1ae74f567 --- /dev/null +++ b/tests/system-test/0-others/tag_index_cluster.py @@ -0,0 +1,171 @@ +from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +import time +import socket +import subprocess + +class MyDnodes(TDDnodes): + def __init__(self ,dnodes_lists): + super(MyDnodes,self).__init__() + self.dnodes = dnodes_lists # dnode must be TDDnode instance + self.simDeployed = False + +class TagCluster: + noConn = True + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + self.depoly_cluster(5) + self.master_dnode = self.TDDnodes.dnodes[0] + self.host=self.master_dnode.cfgDict["fqdn"] + conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir) + tdSql.init(conn1.cursor()) + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + + def depoly_cluster(self ,dnodes_nums): + + testCluster = False + valgrind = 0 + hostname = socket.gethostname() + dnodes = [] + start_port = 6030 + for num in range(1, dnodes_nums+1): + dnode = TDDnode(num) + dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") + dnode.addExtraCfg("fqdn", f"{hostname}") + dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") + dnode.addExtraCfg("monitorFqdn", hostname) + dnode.addExtraCfg("monitorPort", 7043) + dnodes.append(dnode) + + self.TDDnodes = MyDnodes(dnodes) + self.TDDnodes.init("") + self.TDDnodes.setTestCluster(testCluster) + self.TDDnodes.setValgrind(valgrind) + + self.TDDnodes.setAsan(tdDnodes.getAsan()) + self.TDDnodes.stopAll() + for dnode in self.TDDnodes.dnodes: + self.TDDnodes.deploy(dnode.index,{}) + + for dnode in self.TDDnodes.dnodes: + self.TDDnodes.starttaosd(dnode.index) + + # create cluster + for dnode in self.TDDnodes.dnodes[1:]: + # print(dnode.cfgDict) + dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] + dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] + dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] + cmd = f"{self.getBuildPath()}/build/bin/taos -h {dnode_first_host} -P {dnode_first_port} -s \"create dnode \\\"{dnode_id}\\\"\"" + print(cmd) + os.system(cmd) + + time.sleep(2) + tdLog.info(" create cluster done! ") + + def five_dnode_one_mnode(self): + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + tdSql.checkData(0,4,'ready') + tdSql.checkData(4,4,'ready') + tdSql.query("select * from information_schema.ins_mnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(0,2,'leader') + tdSql.checkData(0,3,'ready') + + + tdSql.error("create mnode on dnode 1;") + tdSql.error("drop mnode on dnode 1;") + + tdSql.execute("drop database if exists db") + tdSql.execute("create database if not exists db replica 1 duration 300") + tdSql.execute("use db") + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + tdSql.query('select * from information_schema.ins_databases;') + tdSql.checkData(2,5,'on') + tdSql.error("alter database db strict 'off'") + # tdSql.execute('alter database db strict 'on'') + # tdSql.query('select * from information_schema.ins_databases;') + # tdSql.checkData(2,5,'on') + + def getConnection(self, dnode): + host = dnode.cfgDict["fqdn"] + port = dnode.cfgDict["serverPort"] + config_dir = dnode.cfgDir + return taos.connect(host=host, port=int(port), config=config_dir) + + def check_alive(self): + # check cluster alive + tdLog.printNoPrefix("======== test cluster alive: ") + tdSql.checkDataLoop(0, 0, 1, "show cluster alive;", 20, 0.5) + + tdSql.query("show db.alive;") + tdSql.checkData(0, 0, 1) + + # stop 3 dnode + self.TDDnodes.stoptaosd(3) + tdSql.checkDataLoop(0, 0, 2, "show cluster alive;", 20, 0.5) + + tdSql.query("show db.alive;") + tdSql.checkData(0, 0, 2) + + # stop 2 dnode + self.TDDnodes.stoptaosd(2) + tdSql.checkDataLoop(0, 0, 0, "show cluster alive;", 20, 0.5) + + tdSql.query("show db.alive;") + tdSql.checkData(0, 0, 0) + + + def run(self): + # print(self.master_dnode.cfgDict) + self.five_dnode_one_mnode() + # check cluster and db alive + self.check_alive() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") diff --git a/tests/system-test/0-others/udfpy/af_min.py b/tests/system-test/0-others/udfpy/af_min.py index 0f4e579761..9f1aadf414 100644 --- a/tests/system-test/0-others/udfpy/af_min.py +++ b/tests/system-test/0-others/udfpy/af_min.py @@ -13,7 +13,7 @@ def finish(buf): mins = pickle.loads(buf) min_val = None for min in mins: - if min < min_val: + if min_val is None or (min is not None and min < min_val): min_val = min return min_val diff --git a/tests/system-test/0-others/udfpy/sf_multi_args.py b/tests/system-test/0-others/udfpy/sf_multi_args.py index f5585b4e87..1026661d8d 100644 --- a/tests/system-test/0-others/udfpy/sf_multi_args.py +++ b/tests/system-test/0-others/udfpy/sf_multi_args.py @@ -14,7 +14,10 @@ def process(block): rows = [] for j in range(ncols): val = block.data(i, j) - rows.append(val) + if type(val) is bytes: + rows.append(val.decode('utf-8')) + else: + rows.append(repr(val)) results.append(','.join(rows)) return results diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index 07fd64d983..04c1f5061d 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -109,6 +109,7 @@ class TDTestCase: def create_scalar_udfpy(self): # scalar funciton self.scalar_funs = { + 'sf0': 'timestamp', 'sf1': 'tinyint', 'sf2': 'smallint', 'sf3': 'int', @@ -121,8 +122,7 @@ class TDTestCase: 'sf10': 'double', 'sf11': 'bool', 'sf12': 'varchar(20)', - 'sf13': 'nchar(20)', - 'sf14': 'timestamp' + 'sf13': 'nchar(20)' } # agg function self.agg_funs = { @@ -152,8 +152,8 @@ class TDTestCase: tdSql.execute(sql) tdLog.info(sql) - def create_udf_af(self, fun_name, out_type, bufsize): - sql = f'create aggregate function {fun_name} as "{self.udf_path}/{fun_name}.py" outputtype {out_type} bufsize {bufsize} language "Python" ' + def create_udf_af(self, fun_name, filename, out_type, bufsize): + sql = f'create aggregate function {fun_name} as "{self.udf_path}/{filename}" outputtype {out_type} bufsize {bufsize} language "Python" ' tdSql.execute(sql) tdLog.info(sql) @@ -169,7 +169,7 @@ class TDTestCase: tdSql.checkData(i, j, result1[i][j]) # same value like select col1, udf_fun1(col1) from st - def verify_same_value(sql): + def verify_same_value(self, sql): tdSql.query(sql) nrows = tdSql.getRows() for i in range(nrows): @@ -188,10 +188,16 @@ class TDTestCase: # query multi-args def query_multi_args(self): - cols = self.column_dict.keys() + self.tag_dict.keys() + cols = list(self.column_dict.keys()) + list(self.tag_dict.keys()) + cols.remove("col13") + cols.remove("t13") ncols = len(cols) + print(cols) + for i in range(2, ncols): - sample = random.sample(i) + print(i) + sample = random.sample(cols, i) + print(sample) cols_name = ','.join(sample) sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname}' self.verify_same_multi_values(sql) @@ -202,12 +208,13 @@ class TDTestCase: # col for col_name, col_type in self.column_dict.items(): for fun_name, out_type in self.scalar_funs.items(): - sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname}' - tdLog.info(sql) - self.verify_same_value(sql) - sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc' - tdLog.info(sql) - self.verify_same_value(sql) + if col_type == out_type : + sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname}' + tdLog.info(sql) + self.verify_same_value(sql) + sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc' + tdLog.info(sql) + self.verify_same_value(sql) # multi-args @@ -216,42 +223,48 @@ class TDTestCase: # create aggregate def create_aggr_udfpy(self): # all type check null - for col_name, col_type in self.column_dict: - self.create_udf_af(f"af_null_{col_name}", f"{col_type}", 10*1024*1024) + for col_name, col_type in self.column_dict.items(): + self.create_udf_af(f"af_null_{col_name}", "af_null.py", col_type, 10*1024) # min - self.create_udf_af(f"af_min_float", f"float", 10*1024*1024) - self.create_udf_af(f"af_min_int", f"int", 10*1024*1024) + file_name = "af_min.py" + fun_name = "af_min_float" + self.create_udf_af(fun_name, file_name, f"float", 10*1024) + fun_name = "af_min_int" + self.create_udf_af(fun_name, file_name, f"int", 10*1024) # sum - self.create_udf_af(f"af_sum_float", f"float", 100*1024*1024) - self.create_udf_af(f"af_sum_int", f"sum", 100*1024*1024) + file_name = "af_sum.py" + fun_name = "af_sum_float" + self.create_udf_af(fun_name, file_name, f"float", 10*1024) + fun_name = "af_sum_int" + self.create_udf_af(fun_name, file_name, f"int", 10*1024) # query aggregate def query_aggr_udfpy(self) : # all type check null - for col_name, col_type in self.column_dict: + for col_name, col_type in self.column_dict.items(): fun_name = f"af_null_{col_name}" - sql = f'select {fun_name}(col_name) from {self.stbname}' + sql = f'select {fun_name}({col_name}) from {self.stbname}' tdSql.query(sql) - tdSql.checkData(0, 0, "NULL") + tdSql.checkData(0, 0, "None") # min sql = f'select min(col3), af_min_int(col3) from {self.stbname}' - self.verfiy_same_value(sql) + self.verify_same_value(sql) sql = f'select min(col7), af_min_int(col7) from {self.stbname}' - self.verfiy_same_value(sql) + self.verify_same_value(sql) sql = f'select min(col9), af_min_float(col9) from {self.stbname}' - self.verfiy_same_value(sql) + self.verify_same_value(sql) # sum sql = f'select sum(col3), af_sum_int(col3) from {self.stbname}' - self.verfiy_same_value(sql) + self.verify_same_value(sql) sql = f'select sum(col7), af_sum_int(col7) from {self.stbname}' - self.verfiy_same_value(sql) + self.verify_same_value(sql) sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}' - self.verfiy_same_value(sql) + self.verify_same_value(sql) # insert to child table d1 data @@ -284,7 +297,7 @@ class TDTestCase: count = 10 # do self.create_table(stable, tbname, count) - self.insert_data(tbname, 100) + self.insert_data(tbname, 10) # scalar self.create_scalar_udfpy() From 2cf21bd134cebf17dfd4e52a1de722ac1c661cc9 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 24 Mar 2023 10:16:45 +0800 Subject: [PATCH 06/24] test: update code --- tests/system-test/0-others/udfpy_main.py | 50 ++++++++++++++++++------ 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index 04c1f5061d..75a7e8b308 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -89,12 +89,15 @@ class TDTestCase: create_table_sql = self.set_stb_sql(stbname, self.column_dict, self.tag_dict) tdSql.execute(create_table_sql) + batch_size = 1000 # create child table for i in range(count): ti = i % 128 tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}",now' - sql = f'create table {tbname}{i} using {stbname} tags({tags})' - tdSql.execute(sql) + sql = f'create table {tbname}{i} using {stbname} tags({tags});' + tdSql.execute(sql) + if i % batch_size == 0: + tdLog.info(f" create child table {i} ...") tdLog.info(f" create {count} child tables ok.") @@ -144,16 +147,22 @@ class TDTestCase: # files self.create_sf_dicts(self.scalar_funs, "sf_origin.py") - self.create_udf_sf("sf_multi_args", "binary(1024)") + fun_name = "sf_multi_args" + self.create_udf_sf(fun_name, f'{fun_name}.py', "binary(1024)") + + # all type check null + for col_name, col_type in self.column_dict.items(): + self.create_udf_sf(f"sf_null_{col_name}", "sf_null.py", col_type) + # fun_name == fun_name.py - def create_udf_sf(self, fun_name, out_type): - sql = f'create function {fun_name} as "{self.udf_path}/{fun_name}.py" outputtype {out_type} language "Python" ' + def create_udf_sf(self, fun_name, file_name, out_type): + sql = f'create function {fun_name} as "{self.udf_path}/{file_name}" outputtype {out_type} language "Python" ' tdSql.execute(sql) tdLog.info(sql) - def create_udf_af(self, fun_name, filename, out_type, bufsize): - sql = f'create aggregate function {fun_name} as "{self.udf_path}/{filename}" outputtype {out_type} bufsize {bufsize} language "Python" ' + def create_udf_af(self, fun_name, file_name, out_type, bufsize): + sql = f'create aggregate function {fun_name} as "{self.udf_path}/{file_name}" outputtype {out_type} bufsize {bufsize} language "Python" ' tdSql.execute(sql) tdLog.info(sql) @@ -216,10 +225,17 @@ class TDTestCase: tdLog.info(sql) self.verify_same_value(sql) - # multi-args self.query_multi_args() + # all type check null + for col_name, col_type in self.column_dict.items(): + fun_name = f"sf_null_{col_name}" + sql = f'select {fun_name}({col_name}) from {self.stbname}' + tdSql.query(sql) + tdSql.checkData(0, 0, "None") + + # create aggregate def create_aggr_udfpy(self): # all type check null @@ -270,12 +286,21 @@ class TDTestCase: # insert to child table d1 data def insert_data(self, tbname, rows): ts = 1670000000000 + sqls = "" + batch_size = 300 for i in range(self.child_count): for j in range(rows): tj = j % 128 cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}",now' sql = f'insert into {tbname}{i} values({ts+j},{cols});' - tdSql.execute(sql) + sqls += sql + if j % batch_size == 0: + tdSql.execute(sqls) + tdLog.info(f" child table={i} rows={j} insert data.") + sqls = "" + # end + if sqls != "": + tdSql.execute(sqls) # partial columns upate sql = f'insert into {tbname}0(ts, col1, col9, col11) values(now, 100, 200, 0)' @@ -286,7 +311,7 @@ class TDTestCase: tdSql.execute(sql) sql = f'insert into {tbname}0(ts) values(now)' tdSql.execute(sql) - tdLog.info(f" insert {rows} for each child table.") + tdLog.info(f" insert {rows} to child table {self.child_count} .") # run @@ -294,10 +319,11 @@ class TDTestCase: # var stable = "meters" tbname = "d" - count = 10 + count = 10000 + rows = 1000 # do self.create_table(stable, tbname, count) - self.insert_data(tbname, 10) + self.insert_data(tbname, rows) # scalar self.create_scalar_udfpy() From 17f948deca8101c74b7f30575b8d2e1dcc55ce5c Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 24 Mar 2023 18:57:07 +0800 Subject: [PATCH 07/24] test: add string concat function --- tests/system-test/0-others/udfpy/af_count.py | 29 ++++ tests/system-test/0-others/udfpy/af_sum.py | 18 ++- .../0-others/udfpy/sf_concat_nch.py | 23 +++ .../0-others/udfpy/sf_concat_var.py | 22 +++ tests/system-test/0-others/udfpy_main.py | 143 ++++++++++++++---- 5 files changed, 199 insertions(+), 36 deletions(-) create mode 100644 tests/system-test/0-others/udfpy/af_count.py create mode 100644 tests/system-test/0-others/udfpy/sf_concat_nch.py create mode 100644 tests/system-test/0-others/udfpy/sf_concat_var.py diff --git a/tests/system-test/0-others/udfpy/af_count.py b/tests/system-test/0-others/udfpy/af_count.py new file mode 100644 index 0000000000..285ef96b55 --- /dev/null +++ b/tests/system-test/0-others/udfpy/af_count.py @@ -0,0 +1,29 @@ +import pickle + +def init(): + pass + +def destroy(): + pass + +def start(): + return pickle.dumps([]) + +def finish(buf): + counts = pickle.loads(buf) + all_count = 0 + for count in counts: + all_count += count + + return all_count + +def reduce(datablock, buf): + (rows, cols) = datablock.shape() + counts = pickle.loads(buf) + batch_count = 0 + for i in range(rows): + val = datablock.data(i, 0) + if val is not None: + batch_count += 1 + counts.append(batch_count) + return pickle.dumps(counts) diff --git a/tests/system-test/0-others/udfpy/af_sum.py b/tests/system-test/0-others/udfpy/af_sum.py index e32cf3fa31..ac7aa16924 100644 --- a/tests/system-test/0-others/udfpy/af_sum.py +++ b/tests/system-test/0-others/udfpy/af_sum.py @@ -11,18 +11,26 @@ def start(): def finish(buf): sums = pickle.loads(buf) - all = 0 + all = None for sum in sums: - all += sum + if all is None: + all = sum + else: + all += sum return all def reduce(datablock, buf): (rows, cols) = datablock.shape() sums = pickle.loads(buf) - sum = 0 + sum = None for i in range(rows): val = datablock.data(i, 0) if val is not None: - sum += val - sums.append(sum) + if sum is None: + sum = val + else: + sum += val + + if sum is not None: + sums.append(sum) return pickle.dumps(sums) diff --git a/tests/system-test/0-others/udfpy/sf_concat_nch.py b/tests/system-test/0-others/udfpy/sf_concat_nch.py new file mode 100644 index 0000000000..c64bfa8ad3 --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_concat_nch.py @@ -0,0 +1,23 @@ +# init +def init(): + pass + +# destroy +def destroy(): + pass + +def process(block): + (nrows, ncols) = block.shape() + results = [] + for i in range(nrows): + row = [] + for j in range(ncols): + val = block.data(i, j) + if val is None: + return [None] + row.append(val.decode('utf_32_le')) + row_str = ''.join(row) + results.append(row_str.encode('utf_32_le')) + return results + + diff --git a/tests/system-test/0-others/udfpy/sf_concat_var.py b/tests/system-test/0-others/udfpy/sf_concat_var.py new file mode 100644 index 0000000000..0a63821aa7 --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_concat_var.py @@ -0,0 +1,22 @@ +# init +def init(): + pass + +# destroy +def destroy(): + pass + +def process(block): + (nrows, ncols) = block.shape() + results = [] + for i in range(nrows): + row = [] + for j in range(ncols): + val = block.data(i, j) + if val is None: + return [None] + row.append(val.decode('utf-8')) + results.append(''.join(row)) + return results + + diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index 75a7e8b308..b4fd77f93a 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -22,6 +22,33 @@ import random import os +class PerfDB: + def __init__(self): + self.sqls = [] + self.spends = [] + + # execute + def execute(self, sql): + print(f" perfdb execute {sql}") + stime = time.time() + ret = tdSql.execute(sql, 1) + spend = time.time() - stime + + self.sqls.append(sql) + self.spends.append(spend) + return ret + + # query + def query(self, sql): + print(f" perfdb query {sql}") + start = time.time() + ret = tdSql.query(sql, None, 1) + spend = time.time() - start + self.sqls.append(sql) + self.spends.append(spend) + return ret + + class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -47,8 +74,7 @@ class TDTestCase: 'col10': 'double', 'col11': 'bool', 'col12': 'varchar(20)', - 'col13': 'nchar(20)', - 'col14': 'timestamp' + 'col13': 'nchar(100)', } self.tag_dict = { 't1': 'tinyint', @@ -63,8 +89,7 @@ class TDTestCase: 't10': 'double', 't11': 'bool', 't12': 'varchar(20)', - 't13': 'nchar(20)', - 't14': 'timestamp' + 't13': 'nchar(100)', } def set_stb_sql(self,stbname,column_dict,tag_dict): @@ -93,7 +118,7 @@ class TDTestCase: # create child table for i in range(count): ti = i % 128 - tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}",now' + tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}"' sql = f'create table {tbname}{i} using {stbname} tags({tags});' tdSql.execute(sql) if i % batch_size == 0: @@ -125,7 +150,7 @@ class TDTestCase: 'sf10': 'double', 'sf11': 'bool', 'sf12': 'varchar(20)', - 'sf13': 'nchar(20)' + 'sf13': 'nchar(100)' } # agg function self.agg_funs = { @@ -141,11 +166,11 @@ class TDTestCase: 'af10': 'double', 'af11': 'bool', 'af12': 'varchar(20)', - 'af13': 'nchar(20)', + 'af13': 'nchar(100)', 'af14': 'timestamp' } - # files + # multi_args self.create_sf_dicts(self.scalar_funs, "sf_origin.py") fun_name = "sf_multi_args" self.create_udf_sf(fun_name, f'{fun_name}.py', "binary(1024)") @@ -154,6 +179,12 @@ class TDTestCase: for col_name, col_type in self.column_dict.items(): self.create_udf_sf(f"sf_null_{col_name}", "sf_null.py", col_type) + # concat + fun_name = "sf_concat_var" + self.create_udf_sf(fun_name, f'{fun_name}.py', "varchar(1024)") + fun_name = "sf_concat_nch" + self.create_udf_sf(fun_name, f'{fun_name}.py', "nchar(1024)") + # fun_name == fun_name.py def create_udf_sf(self, fun_name, file_name, out_type): @@ -200,15 +231,14 @@ class TDTestCase: cols = list(self.column_dict.keys()) + list(self.tag_dict.keys()) cols.remove("col13") cols.remove("t13") + cols.remove("ts") ncols = len(cols) print(cols) - for i in range(2, ncols): - print(i) sample = random.sample(cols, i) print(sample) cols_name = ','.join(sample) - sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname}' + sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname} limit 10' self.verify_same_multi_values(sql) @@ -218,10 +248,10 @@ class TDTestCase: for col_name, col_type in self.column_dict.items(): for fun_name, out_type in self.scalar_funs.items(): if col_type == out_type : - sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname}' + sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname} limit 10' tdLog.info(sql) self.verify_same_value(sql) - sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc' + sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc limit 10' tdLog.info(sql) self.verify_same_value(sql) @@ -229,12 +259,22 @@ class TDTestCase: self.query_multi_args() # all type check null - for col_name, col_type in self.column_dict.items(): + for col_name, col_type in self.column_dict.items(): fun_name = f"sf_null_{col_name}" sql = f'select {fun_name}({col_name}) from {self.stbname}' tdSql.query(sql) - tdSql.checkData(0, 0, "None") + if col_type != "timestamp": + tdSql.checkData(0, 0, "None") + else: + val = tdSql.getData(0, 0) + if val is not None: + tdLog.exit(f" check {sql} not expect None.") + # concat + sql = f'select sf_concat_var(col12, t12), concat(col12, t12) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select sf_concat_nch(col13, t13), concat(col13, t13) from {self.stbname}' + self.verify_same_value(sql) # create aggregate def create_aggr_udfpy(self): @@ -255,6 +295,17 @@ class TDTestCase: self.create_udf_af(fun_name, file_name, f"float", 10*1024) fun_name = "af_sum_int" self.create_udf_af(fun_name, file_name, f"int", 10*1024) + fun_name = "af_sum_bigint" + self.create_udf_af(fun_name, file_name, f"bigint", 10*1024) + + # count + file_name = "af_count.py" + fun_name = "af_count_float" + self.create_udf_af(fun_name, file_name, f"float", 10*1024) + fun_name = "af_count_int" + self.create_udf_af(fun_name, file_name, f"int", 10*1024) + fun_name = "af_count_bigint" + self.create_udf_af(fun_name, file_name, f"bigint", 10*1024) # query aggregate @@ -264,7 +315,12 @@ class TDTestCase: fun_name = f"af_null_{col_name}" sql = f'select {fun_name}({col_name}) from {self.stbname}' tdSql.query(sql) - tdSql.checkData(0, 0, "None") + if col_type != "timestamp": + tdSql.checkData(0, 0, "None") + else: + val = tdSql.getData(0, 0) + if val is not None: + tdLog.exit(f" check {sql} not expect None.") # min sql = f'select min(col3), af_min_int(col3) from {self.stbname}' @@ -275,32 +331,55 @@ class TDTestCase: self.verify_same_value(sql) # sum - sql = f'select sum(col3), af_sum_int(col3) from {self.stbname}' + sql = f'select sum(col1), af_sum_int(col1) from d0' self.verify_same_value(sql) - sql = f'select sum(col7), af_sum_int(col7) from {self.stbname}' + sql = f'select sum(col3), af_sum_bigint(col3) from {self.stbname}' self.verify_same_value(sql) sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}' self.verify_same_value(sql) - + + # count + sql = f'select count(col1), af_count_int(col1) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select count(col7), af_count_bigint(col7) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select count(col8), af_count_float(col8) from {self.stbname}' + self.verify_same_value(sql) + + # nest + sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname})' + self.verify_same_value(sql) + # group by + sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname} group by tbname)' + self.verify_same_value(sql) + # two filed expr + sql = f'select sum(col1+col2),af_sum_float(col1+col2) from {self.stbname};' + self.verify_same_value(sql) + # interval + sql = f'select af_sum_float(col2+col3),sum(col3+col2) from {self.stbname} interval(1s)' + self.verify_same_value(sql) + # insert to child table d1 data def insert_data(self, tbname, rows): ts = 1670000000000 - sqls = "" + values = "" batch_size = 300 + child_name = "" for i in range(self.child_count): for j in range(rows): tj = j % 128 - cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}",now' - sql = f'insert into {tbname}{i} values({ts+j},{cols});' - sqls += sql - if j % batch_size == 0: - tdSql.execute(sqls) + cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}涛思数据codepage is utf_32_le"' + value = f'({ts+j},{cols})' + if values == "": + values = value + else: + values += f",{value}" + if j % batch_size == 0 or j + 1 == rows: + sql = f'insert into {tbname}{i} values {values};' + tdSql.execute(sql) tdLog.info(f" child table={i} rows={j} insert data.") - sqls = "" - # end - if sqls != "": - tdSql.execute(sqls) + values = "" # partial columns upate sql = f'insert into {tbname}0(ts, col1, col9, col11) values(now, 100, 200, 0)' @@ -319,8 +398,8 @@ class TDTestCase: # var stable = "meters" tbname = "d" - count = 10000 - rows = 1000 + count = 3 + rows = 1000000 # do self.create_table(stable, tbname, count) self.insert_data(tbname, rows) @@ -333,6 +412,8 @@ class TDTestCase: self.create_aggr_udfpy() self.query_aggr_udfpy() + # show performance + def stop(self): tdSql.close() From d4c63827c2427ba4569bbf664237169e9fd88b4b Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 24 Mar 2023 19:44:08 +0800 Subject: [PATCH 08/24] test: case all passed --- tests/system-test/0-others/udfpy/sf_concat_nch.py | 10 +++++++--- tests/system-test/0-others/udfpy/sf_concat_var.py | 8 ++++++-- tests/system-test/0-others/udfpy_main.py | 10 +++++----- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/tests/system-test/0-others/udfpy/sf_concat_nch.py b/tests/system-test/0-others/udfpy/sf_concat_nch.py index c64bfa8ad3..84d8eb2c96 100644 --- a/tests/system-test/0-others/udfpy/sf_concat_nch.py +++ b/tests/system-test/0-others/udfpy/sf_concat_nch.py @@ -14,10 +14,14 @@ def process(block): for j in range(ncols): val = block.data(i, j) if val is None: - return [None] + row = None + break row.append(val.decode('utf_32_le')) - row_str = ''.join(row) - results.append(row_str.encode('utf_32_le')) + if row is None: + results.append(None) + else: + row_str = ''.join(row) + results.append(row_str.encode('utf_32_le')) return results diff --git a/tests/system-test/0-others/udfpy/sf_concat_var.py b/tests/system-test/0-others/udfpy/sf_concat_var.py index 0a63821aa7..fc8292c718 100644 --- a/tests/system-test/0-others/udfpy/sf_concat_var.py +++ b/tests/system-test/0-others/udfpy/sf_concat_var.py @@ -14,9 +14,13 @@ def process(block): for j in range(ncols): val = block.data(i, j) if val is None: - return [None] + row = None + break row.append(val.decode('utf-8')) - results.append(''.join(row)) + if row is None: + results.append(None) + else: + results.append(''.join(row)) return results diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index b4fd77f93a..a2176a041c 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -73,7 +73,7 @@ class TDTestCase: 'col9': 'float', 'col10': 'double', 'col11': 'bool', - 'col12': 'varchar(20)', + 'col12': 'varchar(120)', 'col13': 'nchar(100)', } self.tag_dict = { @@ -88,7 +88,7 @@ class TDTestCase: 't9': 'float', 't10': 'double', 't11': 'bool', - 't12': 'varchar(20)', + 't12': 'varchar(120)', 't13': 'nchar(100)', } @@ -149,7 +149,7 @@ class TDTestCase: 'sf9': 'float', 'sf10': 'double', 'sf11': 'bool', - 'sf12': 'varchar(20)', + 'sf12': 'varchar(120)', 'sf13': 'nchar(100)' } # agg function @@ -165,7 +165,7 @@ class TDTestCase: 'af9': 'float', 'af10': 'double', 'af11': 'bool', - 'af12': 'varchar(20)', + 'af12': 'varchar(120)', 'af13': 'nchar(100)', 'af14': 'timestamp' } @@ -399,7 +399,7 @@ class TDTestCase: stable = "meters" tbname = "d" count = 3 - rows = 1000000 + rows = 1000 # do self.create_table(stable, tbname, count) self.insert_data(tbname, rows) From f95f6c9f788ef4c362a4f1b3cd668618e43ffe8c Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 25 Mar 2023 09:15:13 +0800 Subject: [PATCH 09/24] feat: auto tab add create function --- tools/shell/src/shellAuto.c | 57 +++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index a8986351b7..318d3574d2 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -92,7 +92,7 @@ SWords shellCommands[] = { {"create qnode on dnode ;", 0, 0, NULL}, {"create stream into as select", 0, 0, NULL}, // 26 append sub sql {"create topic as select", 0, 0, NULL}, // 27 append sub sql - {"create function ", 0, 0, NULL}, + {"create function as outputtype language ", 0, 0, NULL}, {"create user pass sysinfo 0;", 0, 0, NULL}, {"create user pass sysinfo 1;", 0, 0, NULL}, {"describe ", 0, 0, NULL}, @@ -105,7 +105,7 @@ SWords shellCommands[] = { {"drop qnode on dnode ;", 0, 0, NULL}, {"drop user ;", 0, 0, NULL}, // 40 - {"drop function", 0, 0, NULL}, + {"drop function ;", 0, 0, NULL}, {"drop consumer group on ", 0, 0, NULL}, {"drop topic ;", 0, 0, NULL}, {"drop stream ;", 0, 0, NULL}, @@ -272,6 +272,8 @@ char* key_systable[] = { "ins_subscriptions", "ins_streams", "ins_stream_tasks", "ins_vnodes", "ins_user_privileges", "perf_connections", "perf_queries", "perf_consumers", "perf_trans", "perf_apps"}; +char* language[] = {"\'Python\'", "\'C\'"} + // // ------- global variant define --------- // @@ -291,25 +293,29 @@ bool waitAutoFill = false; #define WT_VAR_USERNAME 4 #define WT_VAR_TOPIC 5 #define WT_VAR_STREAM 6 -#define WT_VAR_ALLTABLE 7 -#define WT_VAR_FUNC 8 -#define WT_VAR_KEYWORD 9 -#define WT_VAR_TBACTION 10 -#define WT_VAR_DBOPTION 11 -#define WT_VAR_ALTER_DBOPTION 12 -#define WT_VAR_DATATYPE 13 -#define WT_VAR_KEYTAGS 14 -#define WT_VAR_ANYWORD 15 -#define WT_VAR_TBOPTION 16 -#define WT_VAR_USERACTION 17 -#define WT_VAR_KEYSELECT 18 -#define WT_VAR_SYSTABLE 19 +#define WT_VAR_UDFNAME 7 -#define WT_VAR_CNT 20 - -#define WT_FROM_DB_MAX 6 // max get content from db +#define WT_FROM_DB_MAX 7 // max get content from db #define WT_FROM_DB_CNT (WT_FROM_DB_MAX + 1) +#define WT_VAR_ALLTABLE 8 +#define WT_VAR_FUNC 9 +#define WT_VAR_KEYWORD 10 +#define WT_VAR_TBACTION 11 +#define WT_VAR_DBOPTION 12 +#define WT_VAR_ALTER_DBOPTION 13 +#define WT_VAR_DATATYPE 14 +#define WT_VAR_KEYTAGS 15 +#define WT_VAR_ANYWORD 16 +#define WT_VAR_TBOPTION 17 +#define WT_VAR_USERACTION 18 +#define WT_VAR_KEYSELECT 19 +#define WT_VAR_SYSTABLE 20 +#define WT_VAR_LANGUAGE 21 + +#define WT_VAR_CNT 22 + + #define WT_TEXT 0xFF char dbName[256] = ""; // save use database name; @@ -319,13 +325,13 @@ TdThreadMutex tiresMutex; // save thread handle obtain var name from db server TdThread* threads[WT_FROM_DB_CNT]; // obtain var name with sql from server -char varTypes[WT_VAR_CNT][64] = {"", "", "", "", "", - "", "", "", "", "", - "", "", "", "", "", - "", "", "", ""}; +char varTypes[WT_VAR_CNT][64] = { + "", "", "", "", "", "", "", + "", "", "", "", "", "", "", + "", "", "", "", "", "", "sys_table", "language"}; char varSqls[WT_FROM_DB_CNT][64] = {"show databases;", "show stables;", "show tables;", "show dnodes;", - "show users;", "show topics;", "show streams;"}; + "show users;", "show topics;", "show streams;", "show functions;"}; // var words current cursor, if user press any one key except tab, cursorVar can be reset to -1 int cursorVar = -1; @@ -390,7 +396,7 @@ void showHelp() { create qnode on dnode ;\n\ create stream into as select ...\n\ create topic as select ...\n\ - create function ...\n\ + create function as outputtype language \'C\' | \'Python\' ;\n\ create user pass ...\n\ ----- D ----- \n\ describe \n\ @@ -401,7 +407,7 @@ void showHelp() { drop mnode on dnode ;\n\ drop qnode on dnode ;\n\ drop user ;\n\ - drop function ;\n\ + drop function ;\n\ drop consumer group ... \n\ drop topic ;\n\ drop stream ;\n\ @@ -643,6 +649,7 @@ bool shellAutoInit() { GenerateVarType(WT_VAR_USERACTION, user_actions, sizeof(user_actions) / sizeof(char*)); GenerateVarType(WT_VAR_KEYSELECT, key_select, sizeof(key_select) / sizeof(char*)); GenerateVarType(WT_VAR_SYSTABLE, key_systable, sizeof(key_systable) / sizeof(char*)); + GenerateVarType(WT_VAR_LANGUAGE, key_systable, sizeof(language) / sizeof(char*)); return true; } From ddc1689759a97635af0cb4ad710edead6a2f6580 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 25 Mar 2023 09:20:39 +0800 Subject: [PATCH 10/24] feat: auto tab add create function --- tools/shell/src/shellAuto.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 318d3574d2..14dcef86e2 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -272,7 +272,7 @@ char* key_systable[] = { "ins_subscriptions", "ins_streams", "ins_stream_tasks", "ins_vnodes", "ins_user_privileges", "perf_connections", "perf_queries", "perf_consumers", "perf_trans", "perf_apps"}; -char* language[] = {"\'Python\'", "\'C\'"} +char* language[] = {"\'Python\'", "\'C\'"}; // // ------- global variant define --------- From d6b972c2a143dcbe74007a20d5fe2ca27c0b7b17 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 25 Mar 2023 09:29:01 +0800 Subject: [PATCH 11/24] feat: auto tab add create function1 --- tools/shell/src/shellAuto.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 14dcef86e2..554f3edf1e 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -93,6 +93,7 @@ SWords shellCommands[] = { {"create stream into as select", 0, 0, NULL}, // 26 append sub sql {"create topic as select", 0, 0, NULL}, // 27 append sub sql {"create function as outputtype language ", 0, 0, NULL}, + {"create aggregate function as outputtype bufsize language ", 0, 0, NULL}, {"create user pass sysinfo 0;", 0, 0, NULL}, {"create user pass sysinfo 1;", 0, 0, NULL}, {"describe ", 0, 0, NULL}, @@ -397,6 +398,7 @@ void showHelp() { create stream into as select ...\n\ create topic as select ...\n\ create function as outputtype language \'C\' | \'Python\' ;\n\ + create aggregate function as outputtype bufsize language \'C\' | \'Python\';\n\ create user pass ...\n\ ----- D ----- \n\ describe \n\ From 6a16926098bd66982539c0b36abd0d76c05aa395 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 25 Mar 2023 09:35:06 +0800 Subject: [PATCH 12/24] feat: auto tab add create function1 --- tools/shell/src/shellAuto.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 554f3edf1e..7b00984734 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -92,8 +92,8 @@ SWords shellCommands[] = { {"create qnode on dnode ;", 0, 0, NULL}, {"create stream into as select", 0, 0, NULL}, // 26 append sub sql {"create topic as select", 0, 0, NULL}, // 27 append sub sql - {"create function as outputtype language ", 0, 0, NULL}, - {"create aggregate function as outputtype bufsize language ", 0, 0, NULL}, + {"create function as outputtype language ", 0, 0, NULL}, + {"create aggregate function as outputtype bufsize language ", 0, 0, NULL}, {"create user pass sysinfo 0;", 0, 0, NULL}, {"create user pass sysinfo 1;", 0, 0, NULL}, {"describe ", 0, 0, NULL}, @@ -273,7 +273,7 @@ char* key_systable[] = { "ins_subscriptions", "ins_streams", "ins_stream_tasks", "ins_vnodes", "ins_user_privileges", "perf_connections", "perf_queries", "perf_consumers", "perf_trans", "perf_apps"}; -char* language[] = {"\'Python\'", "\'C\'"}; +char* udf_language[] = {"\'Python\'", "\'C\'"}; // // ------- global variant define --------- @@ -651,7 +651,7 @@ bool shellAutoInit() { GenerateVarType(WT_VAR_USERACTION, user_actions, sizeof(user_actions) / sizeof(char*)); GenerateVarType(WT_VAR_KEYSELECT, key_select, sizeof(key_select) / sizeof(char*)); GenerateVarType(WT_VAR_SYSTABLE, key_systable, sizeof(key_systable) / sizeof(char*)); - GenerateVarType(WT_VAR_LANGUAGE, key_systable, sizeof(language) / sizeof(char*)); + GenerateVarType(WT_VAR_LANGUAGE, udf_language, sizeof(udf_language) / sizeof(char*)); return true; } From a52af4a07e64b59e84e2422f1891f606f9cde375 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 25 Mar 2023 10:21:12 +0800 Subject: [PATCH 13/24] feat: auto tab fix udf_language --- tools/shell/src/shellAuto.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 7b00984734..b2b9d122fa 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -109,7 +109,7 @@ SWords shellCommands[] = { {"drop function ;", 0, 0, NULL}, {"drop consumer group on ", 0, 0, NULL}, {"drop topic ;", 0, 0, NULL}, - {"drop stream ;", 0, 0, NULL}, + {"drop stream ;", 0, {"explain select", 0, 0, NULL}, // 44 append sub sql {"flush database ;", 0, 0, NULL}, {"help;", 0, 0, NULL}, @@ -329,7 +329,7 @@ TdThread* threads[WT_FROM_DB_CNT]; char varTypes[WT_VAR_CNT][64] = { "", "", "", "", "", "", "", "", "", "", "", "", "", "", - "", "", "", "", "", "", "sys_table", "language"}; + "", "", "", "", "", "", "sys_table", "udf_language"}; char varSqls[WT_FROM_DB_CNT][64] = {"show databases;", "show stables;", "show tables;", "show dnodes;", "show users;", "show topics;", "show streams;", "show functions;"}; From 11be6c6f85a9be39ab93b29086fe7e0cf1ef5601 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 25 Mar 2023 10:25:14 +0800 Subject: [PATCH 14/24] feat: auto tab fix udf_language1 --- tools/shell/src/shellAuto.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index b2b9d122fa..b28855ad2e 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -84,8 +84,7 @@ SWords shellCommands[] = { {"create table using tags(", 0, 0, NULL}, {"create database " " " - " ;", - 0, 0, NULL}, + " ;", 0, 0, NULL}, {"create dnode ", 0, 0, NULL}, {"create index on ()", 0, 0, NULL}, {"create mnode on dnode ;", 0, 0, NULL}, @@ -109,7 +108,7 @@ SWords shellCommands[] = { {"drop function ;", 0, 0, NULL}, {"drop consumer group on ", 0, 0, NULL}, {"drop topic ;", 0, 0, NULL}, - {"drop stream ;", 0, + {"drop stream ;", 0, NULL}, {"explain select", 0, 0, NULL}, // 44 append sub sql {"flush database ;", 0, 0, NULL}, {"help;", 0, 0, NULL}, @@ -329,7 +328,7 @@ TdThread* threads[WT_FROM_DB_CNT]; char varTypes[WT_VAR_CNT][64] = { "", "", "", "", "", "", "", "", "", "", "", "", "", "", - "", "", "", "", "", "", "sys_table", "udf_language"}; + "", "", "", "", "", "", "", ""}; char varSqls[WT_FROM_DB_CNT][64] = {"show databases;", "show stables;", "show tables;", "show dnodes;", "show users;", "show topics;", "show streams;", "show functions;"}; From 018f922661e9cff36c062e069edf8459ae77c962 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 25 Mar 2023 10:26:31 +0800 Subject: [PATCH 15/24] feat: auto tab fix udf_language2 --- tools/shell/src/shellAuto.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index b28855ad2e..140720af81 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -108,7 +108,7 @@ SWords shellCommands[] = { {"drop function ;", 0, 0, NULL}, {"drop consumer group on ", 0, 0, NULL}, {"drop topic ;", 0, 0, NULL}, - {"drop stream ;", 0, NULL}, + {"drop stream ;", 0, 0, NULL}, {"explain select", 0, 0, NULL}, // 44 append sub sql {"flush database ;", 0, 0, NULL}, {"help;", 0, 0, NULL}, From 1b3f9f4a08f0f69bb3e35361a8ecab6e1f2abd7c Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sun, 26 Mar 2023 21:36:59 +0800 Subject: [PATCH 16/24] test: change batch size --- tests/system-test/0-others/udfpy_main.py | 38 +++++++++++++----------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index a2176a041c..55a60d4314 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -240,6 +240,7 @@ class TDTestCase: cols_name = ','.join(sample) sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname} limit 10' self.verify_same_multi_values(sql) + tdLog.info(sql) # query_udfpy @@ -251,7 +252,7 @@ class TDTestCase: sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname} limit 10' tdLog.info(sql) self.verify_same_value(sql) - sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc limit 10' + sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} limit 100) order by b,a desc' tdLog.info(sql) self.verify_same_value(sql) @@ -271,41 +272,43 @@ class TDTestCase: tdLog.exit(f" check {sql} not expect None.") # concat - sql = f'select sf_concat_var(col12, t12), concat(col12, t12) from {self.stbname}' + sql = f'select sf_concat_var(col12, t12), concat(col12, t12) from {self.stbname} limit 1000' self.verify_same_value(sql) - sql = f'select sf_concat_nch(col13, t13), concat(col13, t13) from {self.stbname}' + sql = f'select sf_concat_nch(col13, t13), concat(col13, t13) from {self.stbname} limit 1000' self.verify_same_value(sql) # create aggregate def create_aggr_udfpy(self): + + bufsize = 200 * 1024 # all type check null for col_name, col_type in self.column_dict.items(): - self.create_udf_af(f"af_null_{col_name}", "af_null.py", col_type, 10*1024) + self.create_udf_af(f"af_null_{col_name}", "af_null.py", col_type, bufsize) # min file_name = "af_min.py" fun_name = "af_min_float" - self.create_udf_af(fun_name, file_name, f"float", 10*1024) + self.create_udf_af(fun_name, file_name, f"float", bufsize) fun_name = "af_min_int" - self.create_udf_af(fun_name, file_name, f"int", 10*1024) + self.create_udf_af(fun_name, file_name, f"int", bufsize) # sum file_name = "af_sum.py" fun_name = "af_sum_float" - self.create_udf_af(fun_name, file_name, f"float", 10*1024) + self.create_udf_af(fun_name, file_name, f"float", bufsize) fun_name = "af_sum_int" - self.create_udf_af(fun_name, file_name, f"int", 10*1024) + self.create_udf_af(fun_name, file_name, f"int", bufsize) fun_name = "af_sum_bigint" - self.create_udf_af(fun_name, file_name, f"bigint", 10*1024) + self.create_udf_af(fun_name, file_name, f"bigint", bufsize) # count file_name = "af_count.py" fun_name = "af_count_float" - self.create_udf_af(fun_name, file_name, f"float", 10*1024) + self.create_udf_af(fun_name, file_name, f"float", bufsize) fun_name = "af_count_int" - self.create_udf_af(fun_name, file_name, f"int", 10*1024) + self.create_udf_af(fun_name, file_name, f"int", bufsize) fun_name = "af_count_bigint" - self.create_udf_af(fun_name, file_name, f"bigint", 10*1024) + self.create_udf_af(fun_name, file_name, f"bigint", bufsize) # query aggregate @@ -364,7 +367,7 @@ class TDTestCase: def insert_data(self, tbname, rows): ts = 1670000000000 values = "" - batch_size = 300 + batch_size = 500 child_name = "" for i in range(self.child_count): for j in range(rows): @@ -399,17 +402,16 @@ class TDTestCase: stable = "meters" tbname = "d" count = 3 - rows = 1000 + rows = 3000000 # do self.create_table(stable, tbname, count) self.insert_data(tbname, rows) - # scalar + # create self.create_scalar_udfpy() - self.query_scalar_udfpy() - - # aggregate self.create_aggr_udfpy() + # query + self.query_scalar_udfpy() self.query_aggr_udfpy() # show performance From 6d42ddb7f9828f1929469418eb0ccd47f8a123da Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Mon, 27 Mar 2023 19:58:47 +0800 Subject: [PATCH 17/24] test: add udfpy_main.py to ci --- tests/parallel_test/cases.task | 1 + tests/system-test/0-others/udfpy/af_count.py | 20 ++++++-------------- tests/system-test/0-others/udfpy_main.py | 4 ++-- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 6e0b180ed8..00e7c6ddd1 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -120,6 +120,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py +,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py diff --git a/tests/system-test/0-others/udfpy/af_count.py b/tests/system-test/0-others/udfpy/af_count.py index 285ef96b55..226e02235f 100644 --- a/tests/system-test/0-others/udfpy/af_count.py +++ b/tests/system-test/0-others/udfpy/af_count.py @@ -1,5 +1,3 @@ -import pickle - def init(): pass @@ -7,23 +5,17 @@ def destroy(): pass def start(): - return pickle.dumps([]) + return pickle.dumps(0) def finish(buf): - counts = pickle.loads(buf) - all_count = 0 - for count in counts: - all_count += count - - return all_count + count = pickle.loads(buf) + return count def reduce(datablock, buf): (rows, cols) = datablock.shape() - counts = pickle.loads(buf) - batch_count = 0 + count = pickle.loads(buf) for i in range(rows): val = datablock.data(i, 0) if val is not None: - batch_count += 1 - counts.append(batch_count) - return pickle.dumps(counts) + count += 1 + return pickle.dumps(count) \ No newline at end of file diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index 55a60d4314..eaadfbdbd6 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -401,8 +401,8 @@ class TDTestCase: # var stable = "meters" tbname = "d" - count = 3 - rows = 3000000 + count = 10 + rows = 50000 # do self.create_table(stable, tbname, count) self.insert_data(tbname, rows) From 70043090dcd89668a14cd13b4e3bae2b5410e3b9 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 28 Mar 2023 10:55:27 +0800 Subject: [PATCH 18/24] test: change sum to high performance version --- tests/system-test/0-others/udfpy/af_count.py | 2 ++ tests/system-test/0-others/udfpy/af_sum.py | 22 ++++-------- tests/system-test/0-others/udfpy_main.py | 37 +++++++++++++++++--- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/tests/system-test/0-others/udfpy/af_count.py b/tests/system-test/0-others/udfpy/af_count.py index 226e02235f..ce29abca13 100644 --- a/tests/system-test/0-others/udfpy/af_count.py +++ b/tests/system-test/0-others/udfpy/af_count.py @@ -1,3 +1,5 @@ +import pickle + def init(): pass diff --git a/tests/system-test/0-others/udfpy/af_sum.py b/tests/system-test/0-others/udfpy/af_sum.py index ac7aa16924..8b88aba56c 100644 --- a/tests/system-test/0-others/udfpy/af_sum.py +++ b/tests/system-test/0-others/udfpy/af_sum.py @@ -7,30 +7,20 @@ def destroy(): pass def start(): - return pickle.dumps([]) + return pickle.dumps(None) def finish(buf): - sums = pickle.loads(buf) - all = None - for sum in sums: - if all is None: - all = sum - else: - all += sum - return all + sum = pickle.loads(buf) + return sum def reduce(datablock, buf): (rows, cols) = datablock.shape() - sums = pickle.loads(buf) - sum = None + sum = pickle.loads(buf) for i in range(rows): val = datablock.data(i, 0) if val is not None: if sum is None: sum = val else: - sum += val - - if sum is not None: - sums.append(sum) - return pickle.dumps(sums) + sum += val + return pickle.dumps(sum) diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index eaadfbdbd6..e76795ac28 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -209,12 +209,12 @@ class TDTestCase: tdSql.checkData(i, j, result1[i][j]) # same value like select col1, udf_fun1(col1) from st - def verify_same_value(self, sql): + def verify_same_value(self, sql, col=0): tdSql.query(sql) nrows = tdSql.getRows() for i in range(nrows): - val = tdSql.getData(i, 0) - tdSql.checkData(i, 1, val) + val = tdSql.getData(i, col) + tdSql.checkData(i, col + 1, val) # verify multi values def verify_same_multi_values(self, sql): @@ -395,6 +395,24 @@ class TDTestCase: tdSql.execute(sql) tdLog.info(f" insert {rows} to child table {self.child_count} .") + + # create stream + def create_stream(self): + sql = f"create stream ma into sta subtable(concat('sta_',tbname)) \ + as select _wstart,count(col1),af_count_bigint(col1) from {self.stbname} partition by tbname interval(1s);" + tdSql.execute(sql) + tdLog.info(sql) + + # query stream + def verify_stream(self): + sql = f"select * from sta limit 10" + self.verify_same_value(sql, 1) + + # create tmq + def create_tmq(self): + sql = f"create topic topa as select concat(col12,t12),sf_concat_var(col12,t12) from {self.stbname};" + tdSql.execute(sql) + tdLog.info(sql) # run def run(self): @@ -402,14 +420,23 @@ class TDTestCase: stable = "meters" tbname = "d" count = 10 - rows = 50000 + rows = 5000 # do self.create_table(stable, tbname, count) - self.insert_data(tbname, rows) # create self.create_scalar_udfpy() self.create_aggr_udfpy() + + # create stream + self.create_stream() + + # create tmq + self.create_tmq() + + # insert data + self.insert_data(tbname, rows) + # query self.query_scalar_udfpy() self.query_aggr_udfpy() From 9b729c289bc268dc3fdef8dc1f76d3b6fb9cd2a1 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 28 Mar 2023 12:54:57 +0800 Subject: [PATCH 19/24] test: add install taospyudf --- tests/system-test/0-others/udfpy_main.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index e76795ac28..2dd97ea2c5 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -20,6 +20,7 @@ from util.sqlset import * import random import os +import subprocess class PerfDB: @@ -414,8 +415,17 @@ class TDTestCase: tdSql.execute(sql) tdLog.info(sql) + def install_taospy(self): + tdLog.info("install taospyudf...") + packs = ["taospyudf"] + for pack in packs: + subprocess.check_call([sys.executable, '-m', 'pip', 'install', pack]) + tdLog.info("install taospyudf successfully.") + # run def run(self): + self.install_taospy() + # var stable = "meters" tbname = "d" From 9289f7de8bdde5f9eb3fdfe46775e1597276ed87 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 28 Mar 2023 13:58:40 +0800 Subject: [PATCH 20/24] test: add retentsion for topic --- tests/system-test/0-others/udfpy_main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index 2dd97ea2c5..23fbfd1e09 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -105,7 +105,7 @@ class TDTestCase: # create stable and child tables def create_table(self, stbname, tbname, count): - tdSql.prepare() + tdSql.execute("create database db wal_retention_period 4") tdSql.execute('use db') self.child_count = count self.stbname = stbname From e98fe94eb2ee2a9b4570a489225870efbcd8af48 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 28 Mar 2023 15:08:38 +0800 Subject: [PATCH 21/24] test : add ldconfig to call --- tests/system-test/0-others/udfpy_main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index 23fbfd1e09..c6b2af0f83 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -420,6 +420,8 @@ class TDTestCase: packs = ["taospyudf"] for pack in packs: subprocess.check_call([sys.executable, '-m', 'pip', 'install', pack]) + tdLog.info("call ldconfig...") + os.system("ldconfig") tdLog.info("install taospyudf successfully.") # run From 883734a6b833a8f5c0dd81e37ee6e6083c564d83 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 29 Mar 2023 11:33:18 +0800 Subject: [PATCH 22/24] fix: use offical pypi repository --- tests/system-test/0-others/udfpy_main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index c6b2af0f83..916b032edb 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -419,7 +419,7 @@ class TDTestCase: tdLog.info("install taospyudf...") packs = ["taospyudf"] for pack in packs: - subprocess.check_call([sys.executable, '-m', 'pip', 'install', pack]) + subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-i', 'https://pypi.org/simple', '-U', pack]) tdLog.info("call ldconfig...") os.system("ldconfig") tdLog.info("install taospyudf successfully.") @@ -461,4 +461,4 @@ class TDTestCase: tdLog.success("%s successfully executed" % __file__) tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file +tdCases.addLinux(__file__, TDTestCase()) From df2ecea34ff3173086aadec468670cc474edf747 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 29 Mar 2023 16:57:15 +0800 Subject: [PATCH 23/24] test: tag_index_cluster.py file was added by mistake --- .../system-test/0-others/tag_index_cluster.py | 171 ------------------ 1 file changed, 171 deletions(-) delete mode 100644 tests/system-test/0-others/tag_index_cluster.py diff --git a/tests/system-test/0-others/tag_index_cluster.py b/tests/system-test/0-others/tag_index_cluster.py deleted file mode 100644 index b1ae74f567..0000000000 --- a/tests/system-test/0-others/tag_index_cluster.py +++ /dev/null @@ -1,171 +0,0 @@ -from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE -import taos -import sys -import time -import os - -from util.log import * -from util.sql import * -from util.cases import * -from util.dnodes import * -from util.dnodes import TDDnodes -from util.dnodes import TDDnode -import time -import socket -import subprocess - -class MyDnodes(TDDnodes): - def __init__(self ,dnodes_lists): - super(MyDnodes,self).__init__() - self.dnodes = dnodes_lists # dnode must be TDDnode instance - self.simDeployed = False - -class TagCluster: - noConn = True - def init(self, conn, logSql, replicaVar=1): - tdLog.debug(f"start to excute {__file__}") - self.TDDnodes = None - self.depoly_cluster(5) - self.master_dnode = self.TDDnodes.dnodes[0] - self.host=self.master_dnode.cfgDict["fqdn"] - conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir) - tdSql.init(conn1.cursor()) - - - def getBuildPath(self): - selfPath = os.path.dirname(os.path.realpath(__file__)) - - if ("community" in selfPath): - projPath = selfPath[:selfPath.find("community")] - else: - projPath = selfPath[:selfPath.find("tests")] - - for root, dirs, files in os.walk(projPath): - if ("taosd" in files or "taosd.exe" in files): - rootRealPath = os.path.dirname(os.path.realpath(root)) - if ("packaging" not in rootRealPath): - buildPath = root[:len(root) - len("/build/bin")] - break - return buildPath - - - def depoly_cluster(self ,dnodes_nums): - - testCluster = False - valgrind = 0 - hostname = socket.gethostname() - dnodes = [] - start_port = 6030 - for num in range(1, dnodes_nums+1): - dnode = TDDnode(num) - dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") - dnode.addExtraCfg("fqdn", f"{hostname}") - dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") - dnode.addExtraCfg("monitorFqdn", hostname) - dnode.addExtraCfg("monitorPort", 7043) - dnodes.append(dnode) - - self.TDDnodes = MyDnodes(dnodes) - self.TDDnodes.init("") - self.TDDnodes.setTestCluster(testCluster) - self.TDDnodes.setValgrind(valgrind) - - self.TDDnodes.setAsan(tdDnodes.getAsan()) - self.TDDnodes.stopAll() - for dnode in self.TDDnodes.dnodes: - self.TDDnodes.deploy(dnode.index,{}) - - for dnode in self.TDDnodes.dnodes: - self.TDDnodes.starttaosd(dnode.index) - - # create cluster - for dnode in self.TDDnodes.dnodes[1:]: - # print(dnode.cfgDict) - dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] - dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] - dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] - cmd = f"{self.getBuildPath()}/build/bin/taos -h {dnode_first_host} -P {dnode_first_port} -s \"create dnode \\\"{dnode_id}\\\"\"" - print(cmd) - os.system(cmd) - - time.sleep(2) - tdLog.info(" create cluster done! ") - - def five_dnode_one_mnode(self): - tdSql.query("select * from information_schema.ins_dnodes;") - tdSql.checkData(0,1,'%s:6030'%self.host) - tdSql.checkData(4,1,'%s:6430'%self.host) - tdSql.checkData(0,4,'ready') - tdSql.checkData(4,4,'ready') - tdSql.query("select * from information_schema.ins_mnodes;") - tdSql.checkData(0,1,'%s:6030'%self.host) - tdSql.checkData(0,2,'leader') - tdSql.checkData(0,3,'ready') - - - tdSql.error("create mnode on dnode 1;") - tdSql.error("drop mnode on dnode 1;") - - tdSql.execute("drop database if exists db") - tdSql.execute("create database if not exists db replica 1 duration 300") - tdSql.execute("use db") - tdSql.execute( - '''create table stb1 - (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) - tags (t1 int) - ''' - ) - tdSql.execute( - ''' - create table t1 - (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) - ''' - ) - for i in range(4): - tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') - - tdSql.query('select * from information_schema.ins_databases;') - tdSql.checkData(2,5,'on') - tdSql.error("alter database db strict 'off'") - # tdSql.execute('alter database db strict 'on'') - # tdSql.query('select * from information_schema.ins_databases;') - # tdSql.checkData(2,5,'on') - - def getConnection(self, dnode): - host = dnode.cfgDict["fqdn"] - port = dnode.cfgDict["serverPort"] - config_dir = dnode.cfgDir - return taos.connect(host=host, port=int(port), config=config_dir) - - def check_alive(self): - # check cluster alive - tdLog.printNoPrefix("======== test cluster alive: ") - tdSql.checkDataLoop(0, 0, 1, "show cluster alive;", 20, 0.5) - - tdSql.query("show db.alive;") - tdSql.checkData(0, 0, 1) - - # stop 3 dnode - self.TDDnodes.stoptaosd(3) - tdSql.checkDataLoop(0, 0, 2, "show cluster alive;", 20, 0.5) - - tdSql.query("show db.alive;") - tdSql.checkData(0, 0, 2) - - # stop 2 dnode - self.TDDnodes.stoptaosd(2) - tdSql.checkDataLoop(0, 0, 0, "show cluster alive;", 20, 0.5) - - tdSql.query("show db.alive;") - tdSql.checkData(0, 0, 0) - - - def run(self): - # print(self.master_dnode.cfgDict) - self.five_dnode_one_mnode() - # check cluster and db alive - self.check_alive() - - def stop(self): - tdSql.close() - tdLog.success(f"{__file__} successfully executed") From 7568a89553d955cc763d336ce50f32bf971c6f8d Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 29 Mar 2023 18:44:24 +0800 Subject: [PATCH 24/24] test: tmqDnodeRestart.py set query retry times from 10 to 50 --- tests/system-test/7-tmq/tmqDnodeRestart.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py index afd54c9d02..3ad7d7692d 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart.py @@ -146,7 +146,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - tdSql.query(queryString) + tdSql.query(queryString, None, 50) totalRowsFromQury = tdSql.getRows() tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury)) @@ -236,7 +236,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - tdSql.query(queryString) + tdSql.query(queryString, None, 50) totalRowsFromQuery = tdSql.getRows() tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery))