diff --git a/tests/system-test/0-others/udfpy/af_count.py b/tests/system-test/0-others/udfpy/af_count.py new file mode 100644 index 0000000000..285ef96b55 --- /dev/null +++ b/tests/system-test/0-others/udfpy/af_count.py @@ -0,0 +1,29 @@ +import pickle + +def init(): + pass + +def destroy(): + pass + +def start(): + return pickle.dumps([]) + +def finish(buf): + counts = pickle.loads(buf) + all_count = 0 + for count in counts: + all_count += count + + return all_count + +def reduce(datablock, buf): + (rows, cols) = datablock.shape() + counts = pickle.loads(buf) + batch_count = 0 + for i in range(rows): + val = datablock.data(i, 0) + if val is not None: + batch_count += 1 + counts.append(batch_count) + return pickle.dumps(counts) diff --git a/tests/system-test/0-others/udfpy/af_sum.py b/tests/system-test/0-others/udfpy/af_sum.py index e32cf3fa31..ac7aa16924 100644 --- a/tests/system-test/0-others/udfpy/af_sum.py +++ b/tests/system-test/0-others/udfpy/af_sum.py @@ -11,18 +11,26 @@ def start(): def finish(buf): sums = pickle.loads(buf) - all = 0 + all = None for sum in sums: - all += sum + if all is None: + all = sum + else: + all += sum return all def reduce(datablock, buf): (rows, cols) = datablock.shape() sums = pickle.loads(buf) - sum = 0 + sum = None for i in range(rows): val = datablock.data(i, 0) if val is not None: - sum += val - sums.append(sum) + if sum is None: + sum = val + else: + sum += val + + if sum is not None: + sums.append(sum) return pickle.dumps(sums) diff --git a/tests/system-test/0-others/udfpy/sf_concat_nch.py b/tests/system-test/0-others/udfpy/sf_concat_nch.py new file mode 100644 index 0000000000..c64bfa8ad3 --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_concat_nch.py @@ -0,0 +1,23 @@ +# init +def init(): + pass + +# destroy +def destroy(): + pass + +def process(block): + (nrows, ncols) = block.shape() + results = [] + for i in range(nrows): + row = [] + for j in range(ncols): + val = block.data(i, j) + if val is None: + return [None] + row.append(val.decode('utf_32_le')) + row_str = ''.join(row) + results.append(row_str.encode('utf_32_le')) + return results + + diff --git a/tests/system-test/0-others/udfpy/sf_concat_var.py b/tests/system-test/0-others/udfpy/sf_concat_var.py new file mode 100644 index 0000000000..0a63821aa7 --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_concat_var.py @@ -0,0 +1,22 @@ +# init +def init(): + pass + +# destroy +def destroy(): + pass + +def process(block): + (nrows, ncols) = block.shape() + results = [] + for i in range(nrows): + row = [] + for j in range(ncols): + val = block.data(i, j) + if val is None: + return [None] + row.append(val.decode('utf-8')) + results.append(''.join(row)) + return results + + diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index 75a7e8b308..b4fd77f93a 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -22,6 +22,33 @@ import random import os +class PerfDB: + def __init__(self): + self.sqls = [] + self.spends = [] + + # execute + def execute(self, sql): + print(f" perfdb execute {sql}") + stime = time.time() + ret = tdSql.execute(sql, 1) + spend = time.time() - stime + + self.sqls.append(sql) + self.spends.append(spend) + return ret + + # query + def query(self, sql): + print(f" perfdb query {sql}") + start = time.time() + ret = tdSql.query(sql, None, 1) + spend = time.time() - start + self.sqls.append(sql) + self.spends.append(spend) + return ret + + class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -47,8 +74,7 @@ class TDTestCase: 'col10': 'double', 'col11': 'bool', 'col12': 'varchar(20)', - 'col13': 'nchar(20)', - 'col14': 'timestamp' + 'col13': 'nchar(100)', } self.tag_dict = { 't1': 'tinyint', @@ -63,8 +89,7 @@ class TDTestCase: 't10': 'double', 't11': 'bool', 't12': 'varchar(20)', - 't13': 'nchar(20)', - 't14': 'timestamp' + 't13': 'nchar(100)', } def set_stb_sql(self,stbname,column_dict,tag_dict): @@ -93,7 +118,7 @@ class TDTestCase: # create child table for i in range(count): ti = i % 128 - tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}",now' + tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}"' sql = f'create table {tbname}{i} using {stbname} tags({tags});' tdSql.execute(sql) if i % batch_size == 0: @@ -125,7 +150,7 @@ class TDTestCase: 'sf10': 'double', 'sf11': 'bool', 'sf12': 'varchar(20)', - 'sf13': 'nchar(20)' + 'sf13': 'nchar(100)' } # agg function self.agg_funs = { @@ -141,11 +166,11 @@ class TDTestCase: 'af10': 'double', 'af11': 'bool', 'af12': 'varchar(20)', - 'af13': 'nchar(20)', + 'af13': 'nchar(100)', 'af14': 'timestamp' } - # files + # multi_args self.create_sf_dicts(self.scalar_funs, "sf_origin.py") fun_name = "sf_multi_args" self.create_udf_sf(fun_name, f'{fun_name}.py', "binary(1024)") @@ -154,6 +179,12 @@ class TDTestCase: for col_name, col_type in self.column_dict.items(): self.create_udf_sf(f"sf_null_{col_name}", "sf_null.py", col_type) + # concat + fun_name = "sf_concat_var" + self.create_udf_sf(fun_name, f'{fun_name}.py', "varchar(1024)") + fun_name = "sf_concat_nch" + self.create_udf_sf(fun_name, f'{fun_name}.py', "nchar(1024)") + # fun_name == fun_name.py def create_udf_sf(self, fun_name, file_name, out_type): @@ -200,15 +231,14 @@ class TDTestCase: cols = list(self.column_dict.keys()) + list(self.tag_dict.keys()) cols.remove("col13") cols.remove("t13") + cols.remove("ts") ncols = len(cols) print(cols) - for i in range(2, ncols): - print(i) sample = random.sample(cols, i) print(sample) cols_name = ','.join(sample) - sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname}' + sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname} limit 10' self.verify_same_multi_values(sql) @@ -218,10 +248,10 @@ class TDTestCase: for col_name, col_type in self.column_dict.items(): for fun_name, out_type in self.scalar_funs.items(): if col_type == out_type : - sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname}' + sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname} limit 10' tdLog.info(sql) self.verify_same_value(sql) - sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc' + sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc limit 10' tdLog.info(sql) self.verify_same_value(sql) @@ -229,12 +259,22 @@ class TDTestCase: self.query_multi_args() # all type check null - for col_name, col_type in self.column_dict.items(): + for col_name, col_type in self.column_dict.items(): fun_name = f"sf_null_{col_name}" sql = f'select {fun_name}({col_name}) from {self.stbname}' tdSql.query(sql) - tdSql.checkData(0, 0, "None") + if col_type != "timestamp": + tdSql.checkData(0, 0, "None") + else: + val = tdSql.getData(0, 0) + if val is not None: + tdLog.exit(f" check {sql} not expect None.") + # concat + sql = f'select sf_concat_var(col12, t12), concat(col12, t12) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select sf_concat_nch(col13, t13), concat(col13, t13) from {self.stbname}' + self.verify_same_value(sql) # create aggregate def create_aggr_udfpy(self): @@ -255,6 +295,17 @@ class TDTestCase: self.create_udf_af(fun_name, file_name, f"float", 10*1024) fun_name = "af_sum_int" self.create_udf_af(fun_name, file_name, f"int", 10*1024) + fun_name = "af_sum_bigint" + self.create_udf_af(fun_name, file_name, f"bigint", 10*1024) + + # count + file_name = "af_count.py" + fun_name = "af_count_float" + self.create_udf_af(fun_name, file_name, f"float", 10*1024) + fun_name = "af_count_int" + self.create_udf_af(fun_name, file_name, f"int", 10*1024) + fun_name = "af_count_bigint" + self.create_udf_af(fun_name, file_name, f"bigint", 10*1024) # query aggregate @@ -264,7 +315,12 @@ class TDTestCase: fun_name = f"af_null_{col_name}" sql = f'select {fun_name}({col_name}) from {self.stbname}' tdSql.query(sql) - tdSql.checkData(0, 0, "None") + if col_type != "timestamp": + tdSql.checkData(0, 0, "None") + else: + val = tdSql.getData(0, 0) + if val is not None: + tdLog.exit(f" check {sql} not expect None.") # min sql = f'select min(col3), af_min_int(col3) from {self.stbname}' @@ -275,32 +331,55 @@ class TDTestCase: self.verify_same_value(sql) # sum - sql = f'select sum(col3), af_sum_int(col3) from {self.stbname}' + sql = f'select sum(col1), af_sum_int(col1) from d0' self.verify_same_value(sql) - sql = f'select sum(col7), af_sum_int(col7) from {self.stbname}' + sql = f'select sum(col3), af_sum_bigint(col3) from {self.stbname}' self.verify_same_value(sql) sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}' self.verify_same_value(sql) - + + # count + sql = f'select count(col1), af_count_int(col1) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select count(col7), af_count_bigint(col7) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select count(col8), af_count_float(col8) from {self.stbname}' + self.verify_same_value(sql) + + # nest + sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname})' + self.verify_same_value(sql) + # group by + sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname} group by tbname)' + self.verify_same_value(sql) + # two filed expr + sql = f'select sum(col1+col2),af_sum_float(col1+col2) from {self.stbname};' + self.verify_same_value(sql) + # interval + sql = f'select af_sum_float(col2+col3),sum(col3+col2) from {self.stbname} interval(1s)' + self.verify_same_value(sql) + # insert to child table d1 data def insert_data(self, tbname, rows): ts = 1670000000000 - sqls = "" + values = "" batch_size = 300 + child_name = "" for i in range(self.child_count): for j in range(rows): tj = j % 128 - cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}",now' - sql = f'insert into {tbname}{i} values({ts+j},{cols});' - sqls += sql - if j % batch_size == 0: - tdSql.execute(sqls) + cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}涛思数据codepage is utf_32_le"' + value = f'({ts+j},{cols})' + if values == "": + values = value + else: + values += f",{value}" + if j % batch_size == 0 or j + 1 == rows: + sql = f'insert into {tbname}{i} values {values};' + tdSql.execute(sql) tdLog.info(f" child table={i} rows={j} insert data.") - sqls = "" - # end - if sqls != "": - tdSql.execute(sqls) + values = "" # partial columns upate sql = f'insert into {tbname}0(ts, col1, col9, col11) values(now, 100, 200, 0)' @@ -319,8 +398,8 @@ class TDTestCase: # var stable = "meters" tbname = "d" - count = 10000 - rows = 1000 + count = 3 + rows = 1000000 # do self.create_table(stable, tbname, count) self.insert_data(tbname, rows) @@ -333,6 +412,8 @@ class TDTestCase: self.create_aggr_udfpy() self.query_aggr_udfpy() + # show performance + def stop(self): tdSql.close()