test: add string concat function

This commit is contained in:
Alex Duan 2023-03-24 18:57:07 +08:00
parent 1434fea2de
commit 17f948deca
5 changed files with 199 additions and 36 deletions

View File

@ -0,0 +1,29 @@
import pickle
def init():
pass
def destroy():
pass
def start():
return pickle.dumps([])
def finish(buf):
counts = pickle.loads(buf)
all_count = 0
for count in counts:
all_count += count
return all_count
def reduce(datablock, buf):
(rows, cols) = datablock.shape()
counts = pickle.loads(buf)
batch_count = 0
for i in range(rows):
val = datablock.data(i, 0)
if val is not None:
batch_count += 1
counts.append(batch_count)
return pickle.dumps(counts)

View File

@ -11,18 +11,26 @@ def start():
def finish(buf): def finish(buf):
sums = pickle.loads(buf) sums = pickle.loads(buf)
all = 0 all = None
for sum in sums: for sum in sums:
all += sum if all is None:
all = sum
else:
all += sum
return all return all
def reduce(datablock, buf): def reduce(datablock, buf):
(rows, cols) = datablock.shape() (rows, cols) = datablock.shape()
sums = pickle.loads(buf) sums = pickle.loads(buf)
sum = 0 sum = None
for i in range(rows): for i in range(rows):
val = datablock.data(i, 0) val = datablock.data(i, 0)
if val is not None: if val is not None:
sum += val if sum is None:
sums.append(sum) sum = val
else:
sum += val
if sum is not None:
sums.append(sum)
return pickle.dumps(sums) return pickle.dumps(sums)

View File

@ -0,0 +1,23 @@
# init
def init():
pass
# destroy
def destroy():
pass
def process(block):
(nrows, ncols) = block.shape()
results = []
for i in range(nrows):
row = []
for j in range(ncols):
val = block.data(i, j)
if val is None:
return [None]
row.append(val.decode('utf_32_le'))
row_str = ''.join(row)
results.append(row_str.encode('utf_32_le'))
return results

View File

@ -0,0 +1,22 @@
# init
def init():
pass
# destroy
def destroy():
pass
def process(block):
(nrows, ncols) = block.shape()
results = []
for i in range(nrows):
row = []
for j in range(ncols):
val = block.data(i, j)
if val is None:
return [None]
row.append(val.decode('utf-8'))
results.append(''.join(row))
return results

View File

@ -22,6 +22,33 @@ import random
import os import os
class PerfDB:
def __init__(self):
self.sqls = []
self.spends = []
# execute
def execute(self, sql):
print(f" perfdb execute {sql}")
stime = time.time()
ret = tdSql.execute(sql, 1)
spend = time.time() - stime
self.sqls.append(sql)
self.spends.append(spend)
return ret
# query
def query(self, sql):
print(f" perfdb query {sql}")
start = time.time()
ret = tdSql.query(sql, None, 1)
spend = time.time() - start
self.sqls.append(sql)
self.spends.append(spend)
return ret
class TDTestCase: class TDTestCase:
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
@ -47,8 +74,7 @@ class TDTestCase:
'col10': 'double', 'col10': 'double',
'col11': 'bool', 'col11': 'bool',
'col12': 'varchar(20)', 'col12': 'varchar(20)',
'col13': 'nchar(20)', 'col13': 'nchar(100)',
'col14': 'timestamp'
} }
self.tag_dict = { self.tag_dict = {
't1': 'tinyint', 't1': 'tinyint',
@ -63,8 +89,7 @@ class TDTestCase:
't10': 'double', 't10': 'double',
't11': 'bool', 't11': 'bool',
't12': 'varchar(20)', 't12': 'varchar(20)',
't13': 'nchar(20)', 't13': 'nchar(100)',
't14': 'timestamp'
} }
def set_stb_sql(self,stbname,column_dict,tag_dict): def set_stb_sql(self,stbname,column_dict,tag_dict):
@ -93,7 +118,7 @@ class TDTestCase:
# create child table # create child table
for i in range(count): for i in range(count):
ti = i % 128 ti = i % 128
tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}",now' tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}"'
sql = f'create table {tbname}{i} using {stbname} tags({tags});' sql = f'create table {tbname}{i} using {stbname} tags({tags});'
tdSql.execute(sql) tdSql.execute(sql)
if i % batch_size == 0: if i % batch_size == 0:
@ -125,7 +150,7 @@ class TDTestCase:
'sf10': 'double', 'sf10': 'double',
'sf11': 'bool', 'sf11': 'bool',
'sf12': 'varchar(20)', 'sf12': 'varchar(20)',
'sf13': 'nchar(20)' 'sf13': 'nchar(100)'
} }
# agg function # agg function
self.agg_funs = { self.agg_funs = {
@ -141,11 +166,11 @@ class TDTestCase:
'af10': 'double', 'af10': 'double',
'af11': 'bool', 'af11': 'bool',
'af12': 'varchar(20)', 'af12': 'varchar(20)',
'af13': 'nchar(20)', 'af13': 'nchar(100)',
'af14': 'timestamp' 'af14': 'timestamp'
} }
# files # multi_args
self.create_sf_dicts(self.scalar_funs, "sf_origin.py") self.create_sf_dicts(self.scalar_funs, "sf_origin.py")
fun_name = "sf_multi_args" fun_name = "sf_multi_args"
self.create_udf_sf(fun_name, f'{fun_name}.py', "binary(1024)") self.create_udf_sf(fun_name, f'{fun_name}.py', "binary(1024)")
@ -154,6 +179,12 @@ class TDTestCase:
for col_name, col_type in self.column_dict.items(): for col_name, col_type in self.column_dict.items():
self.create_udf_sf(f"sf_null_{col_name}", "sf_null.py", col_type) self.create_udf_sf(f"sf_null_{col_name}", "sf_null.py", col_type)
# concat
fun_name = "sf_concat_var"
self.create_udf_sf(fun_name, f'{fun_name}.py', "varchar(1024)")
fun_name = "sf_concat_nch"
self.create_udf_sf(fun_name, f'{fun_name}.py', "nchar(1024)")
# fun_name == fun_name.py # fun_name == fun_name.py
def create_udf_sf(self, fun_name, file_name, out_type): def create_udf_sf(self, fun_name, file_name, out_type):
@ -200,15 +231,14 @@ class TDTestCase:
cols = list(self.column_dict.keys()) + list(self.tag_dict.keys()) cols = list(self.column_dict.keys()) + list(self.tag_dict.keys())
cols.remove("col13") cols.remove("col13")
cols.remove("t13") cols.remove("t13")
cols.remove("ts")
ncols = len(cols) ncols = len(cols)
print(cols) print(cols)
for i in range(2, ncols): for i in range(2, ncols):
print(i)
sample = random.sample(cols, i) sample = random.sample(cols, i)
print(sample) print(sample)
cols_name = ','.join(sample) cols_name = ','.join(sample)
sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname}' sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname} limit 10'
self.verify_same_multi_values(sql) self.verify_same_multi_values(sql)
@ -218,10 +248,10 @@ class TDTestCase:
for col_name, col_type in self.column_dict.items(): for col_name, col_type in self.column_dict.items():
for fun_name, out_type in self.scalar_funs.items(): for fun_name, out_type in self.scalar_funs.items():
if col_type == out_type : if col_type == out_type :
sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname}' sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname} limit 10'
tdLog.info(sql) tdLog.info(sql)
self.verify_same_value(sql) self.verify_same_value(sql)
sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc' sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} ) order by b,a desc limit 10'
tdLog.info(sql) tdLog.info(sql)
self.verify_same_value(sql) self.verify_same_value(sql)
@ -229,12 +259,22 @@ class TDTestCase:
self.query_multi_args() self.query_multi_args()
# all type check null # all type check null
for col_name, col_type in self.column_dict.items(): for col_name, col_type in self.column_dict.items():
fun_name = f"sf_null_{col_name}" fun_name = f"sf_null_{col_name}"
sql = f'select {fun_name}({col_name}) from {self.stbname}' sql = f'select {fun_name}({col_name}) from {self.stbname}'
tdSql.query(sql) tdSql.query(sql)
tdSql.checkData(0, 0, "None") if col_type != "timestamp":
tdSql.checkData(0, 0, "None")
else:
val = tdSql.getData(0, 0)
if val is not None:
tdLog.exit(f" check {sql} not expect None.")
# concat
sql = f'select sf_concat_var(col12, t12), concat(col12, t12) from {self.stbname}'
self.verify_same_value(sql)
sql = f'select sf_concat_nch(col13, t13), concat(col13, t13) from {self.stbname}'
self.verify_same_value(sql)
# create aggregate # create aggregate
def create_aggr_udfpy(self): def create_aggr_udfpy(self):
@ -255,6 +295,17 @@ class TDTestCase:
self.create_udf_af(fun_name, file_name, f"float", 10*1024) self.create_udf_af(fun_name, file_name, f"float", 10*1024)
fun_name = "af_sum_int" fun_name = "af_sum_int"
self.create_udf_af(fun_name, file_name, f"int", 10*1024) self.create_udf_af(fun_name, file_name, f"int", 10*1024)
fun_name = "af_sum_bigint"
self.create_udf_af(fun_name, file_name, f"bigint", 10*1024)
# count
file_name = "af_count.py"
fun_name = "af_count_float"
self.create_udf_af(fun_name, file_name, f"float", 10*1024)
fun_name = "af_count_int"
self.create_udf_af(fun_name, file_name, f"int", 10*1024)
fun_name = "af_count_bigint"
self.create_udf_af(fun_name, file_name, f"bigint", 10*1024)
# query aggregate # query aggregate
@ -264,7 +315,12 @@ class TDTestCase:
fun_name = f"af_null_{col_name}" fun_name = f"af_null_{col_name}"
sql = f'select {fun_name}({col_name}) from {self.stbname}' sql = f'select {fun_name}({col_name}) from {self.stbname}'
tdSql.query(sql) tdSql.query(sql)
tdSql.checkData(0, 0, "None") if col_type != "timestamp":
tdSql.checkData(0, 0, "None")
else:
val = tdSql.getData(0, 0)
if val is not None:
tdLog.exit(f" check {sql} not expect None.")
# min # min
sql = f'select min(col3), af_min_int(col3) from {self.stbname}' sql = f'select min(col3), af_min_int(col3) from {self.stbname}'
@ -275,32 +331,55 @@ class TDTestCase:
self.verify_same_value(sql) self.verify_same_value(sql)
# sum # sum
sql = f'select sum(col3), af_sum_int(col3) from {self.stbname}' sql = f'select sum(col1), af_sum_int(col1) from d0'
self.verify_same_value(sql) self.verify_same_value(sql)
sql = f'select sum(col7), af_sum_int(col7) from {self.stbname}' sql = f'select sum(col3), af_sum_bigint(col3) from {self.stbname}'
self.verify_same_value(sql) self.verify_same_value(sql)
sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}' sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}'
self.verify_same_value(sql) self.verify_same_value(sql)
# count
sql = f'select count(col1), af_count_int(col1) from {self.stbname}'
self.verify_same_value(sql)
sql = f'select count(col7), af_count_bigint(col7) from {self.stbname}'
self.verify_same_value(sql)
sql = f'select count(col8), af_count_float(col8) from {self.stbname}'
self.verify_same_value(sql)
# nest
sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname})'
self.verify_same_value(sql)
# group by
sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname} group by tbname)'
self.verify_same_value(sql)
# two filed expr
sql = f'select sum(col1+col2),af_sum_float(col1+col2) from {self.stbname};'
self.verify_same_value(sql)
# interval
sql = f'select af_sum_float(col2+col3),sum(col3+col2) from {self.stbname} interval(1s)'
self.verify_same_value(sql)
# insert to child table d1 data # insert to child table d1 data
def insert_data(self, tbname, rows): def insert_data(self, tbname, rows):
ts = 1670000000000 ts = 1670000000000
sqls = "" values = ""
batch_size = 300 batch_size = 300
child_name = ""
for i in range(self.child_count): for i in range(self.child_count):
for j in range(rows): for j in range(rows):
tj = j % 128 tj = j % 128
cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}",now' cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}涛思数据codepage is utf_32_le"'
sql = f'insert into {tbname}{i} values({ts+j},{cols});' value = f'({ts+j},{cols})'
sqls += sql if values == "":
if j % batch_size == 0: values = value
tdSql.execute(sqls) else:
values += f",{value}"
if j % batch_size == 0 or j + 1 == rows:
sql = f'insert into {tbname}{i} values {values};'
tdSql.execute(sql)
tdLog.info(f" child table={i} rows={j} insert data.") tdLog.info(f" child table={i} rows={j} insert data.")
sqls = "" values = ""
# end
if sqls != "":
tdSql.execute(sqls)
# partial columns upate # partial columns upate
sql = f'insert into {tbname}0(ts, col1, col9, col11) values(now, 100, 200, 0)' sql = f'insert into {tbname}0(ts, col1, col9, col11) values(now, 100, 200, 0)'
@ -319,8 +398,8 @@ class TDTestCase:
# var # var
stable = "meters" stable = "meters"
tbname = "d" tbname = "d"
count = 10000 count = 3
rows = 1000 rows = 1000000
# do # do
self.create_table(stable, tbname, count) self.create_table(stable, tbname, count)
self.insert_data(tbname, rows) self.insert_data(tbname, rows)
@ -333,6 +412,8 @@ class TDTestCase:
self.create_aggr_udfpy() self.create_aggr_udfpy()
self.query_aggr_udfpy() self.query_aggr_udfpy()
# show performance
def stop(self): def stop(self):
tdSql.close() tdSql.close()