Merge pull request #20662 from taosdata/test/TD-22889-3.0
test : udf for python add ci case udfpy_main.py
This commit is contained in:
commit
183d9bc4fb
|
@ -120,6 +120,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
|
||||||
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
|
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
|
||||||
,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py
|
,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py
|
||||||
|
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
|
||||||
|
|
|
@ -17,6 +17,7 @@ import time
|
||||||
import datetime
|
import datetime
|
||||||
import inspect
|
import inspect
|
||||||
import importlib
|
import importlib
|
||||||
|
import traceback
|
||||||
from util.log import *
|
from util.log import *
|
||||||
|
|
||||||
|
|
||||||
|
@ -75,6 +76,7 @@ class TDCases:
|
||||||
case.run()
|
case.run()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tdLog.notice(repr(e))
|
tdLog.notice(repr(e))
|
||||||
|
traceback.print_exc()
|
||||||
tdLog.exit("%s failed" % (fileName))
|
tdLog.exit("%s failed" % (fileName))
|
||||||
case.stop()
|
case.stop()
|
||||||
runNum += 1
|
runNum += 1
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
import pickle
|
||||||
|
|
||||||
|
def init():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def destroy():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def start():
|
||||||
|
return pickle.dumps(0)
|
||||||
|
|
||||||
|
def finish(buf):
|
||||||
|
count = pickle.loads(buf)
|
||||||
|
return count
|
||||||
|
|
||||||
|
def reduce(datablock, buf):
|
||||||
|
(rows, cols) = datablock.shape()
|
||||||
|
count = pickle.loads(buf)
|
||||||
|
for i in range(rows):
|
||||||
|
val = datablock.data(i, 0)
|
||||||
|
if val is not None:
|
||||||
|
count += 1
|
||||||
|
return pickle.dumps(count)
|
|
@ -0,0 +1,30 @@
|
||||||
|
import pickle
|
||||||
|
|
||||||
|
def init():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def destroy():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def start():
|
||||||
|
return pickle.dumps([])
|
||||||
|
|
||||||
|
def finish(buf):
|
||||||
|
mins = pickle.loads(buf)
|
||||||
|
min_val = None
|
||||||
|
for min in mins:
|
||||||
|
if min_val is None or (min is not None and min < min_val):
|
||||||
|
min_val = min
|
||||||
|
return min_val
|
||||||
|
|
||||||
|
def reduce(datablock, buf):
|
||||||
|
(rows, cols) = datablock.shape()
|
||||||
|
mins = pickle.loads(buf)
|
||||||
|
min = None
|
||||||
|
for i in range(rows):
|
||||||
|
val = datablock.data(i, 0)
|
||||||
|
if min is None or (val is not None and val < min) :
|
||||||
|
min = val
|
||||||
|
if min is not None:
|
||||||
|
mins.append(min)
|
||||||
|
return pickle.dumps(mins)
|
|
@ -0,0 +1,19 @@
|
||||||
|
import pickle
|
||||||
|
|
||||||
|
def init():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def destroy():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def start():
|
||||||
|
return pickle.dumps([])
|
||||||
|
|
||||||
|
def finish(buf):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def reduce(datablock, buf):
|
||||||
|
(rows, cols) = datablock.shape()
|
||||||
|
mins = pickle.loads(buf)
|
||||||
|
mins.append(None)
|
||||||
|
return pickle.dumps(mins)
|
|
@ -0,0 +1,26 @@
|
||||||
|
import pickle
|
||||||
|
|
||||||
|
def init():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def destroy():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def start():
|
||||||
|
return pickle.dumps(None)
|
||||||
|
|
||||||
|
def finish(buf):
|
||||||
|
sum = pickle.loads(buf)
|
||||||
|
return sum
|
||||||
|
|
||||||
|
def reduce(datablock, buf):
|
||||||
|
(rows, cols) = datablock.shape()
|
||||||
|
sum = pickle.loads(buf)
|
||||||
|
for i in range(rows):
|
||||||
|
val = datablock.data(i, 0)
|
||||||
|
if val is not None:
|
||||||
|
if sum is None:
|
||||||
|
sum = val
|
||||||
|
else:
|
||||||
|
sum += val
|
||||||
|
return pickle.dumps(sum)
|
|
@ -0,0 +1,27 @@
|
||||||
|
# init
|
||||||
|
def init():
|
||||||
|
pass
|
||||||
|
|
||||||
|
# destroy
|
||||||
|
def destroy():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def process(block):
|
||||||
|
(nrows, ncols) = block.shape()
|
||||||
|
results = []
|
||||||
|
for i in range(nrows):
|
||||||
|
row = []
|
||||||
|
for j in range(ncols):
|
||||||
|
val = block.data(i, j)
|
||||||
|
if val is None:
|
||||||
|
row = None
|
||||||
|
break
|
||||||
|
row.append(val.decode('utf_32_le'))
|
||||||
|
if row is None:
|
||||||
|
results.append(None)
|
||||||
|
else:
|
||||||
|
row_str = ''.join(row)
|
||||||
|
results.append(row_str.encode('utf_32_le'))
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
# init
|
||||||
|
def init():
|
||||||
|
pass
|
||||||
|
|
||||||
|
# destroy
|
||||||
|
def destroy():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def process(block):
|
||||||
|
(nrows, ncols) = block.shape()
|
||||||
|
results = []
|
||||||
|
for i in range(nrows):
|
||||||
|
row = []
|
||||||
|
for j in range(ncols):
|
||||||
|
val = block.data(i, j)
|
||||||
|
if val is None:
|
||||||
|
row = None
|
||||||
|
break
|
||||||
|
row.append(val.decode('utf-8'))
|
||||||
|
if row is None:
|
||||||
|
results.append(None)
|
||||||
|
else:
|
||||||
|
results.append(''.join(row))
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
# init
|
||||||
|
def init():
|
||||||
|
pass
|
||||||
|
|
||||||
|
# destroy
|
||||||
|
def destroy():
|
||||||
|
pass
|
||||||
|
|
||||||
|
# return origin column one value
|
||||||
|
def process(block):
|
||||||
|
(nrows, ncols) = block.shape()
|
||||||
|
results = []
|
||||||
|
for i in range(nrows):
|
||||||
|
rows = []
|
||||||
|
for j in range(ncols):
|
||||||
|
val = block.data(i, j)
|
||||||
|
if type(val) is bytes:
|
||||||
|
rows.append(val.decode('utf-8'))
|
||||||
|
else:
|
||||||
|
rows.append(repr(val))
|
||||||
|
results.append(','.join(rows))
|
||||||
|
return results
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
|
||||||
|
# init
|
||||||
|
def init():
|
||||||
|
pass
|
||||||
|
|
||||||
|
# destroy
|
||||||
|
def destroy():
|
||||||
|
pass
|
||||||
|
|
||||||
|
# return origin column one value
|
||||||
|
def process(block):
|
||||||
|
(rows, cols) = block.shape()
|
||||||
|
results = []
|
||||||
|
for i in range(rows):
|
||||||
|
results.append(None)
|
||||||
|
return results
|
|
@ -0,0 +1,15 @@
|
||||||
|
# init
|
||||||
|
def init():
|
||||||
|
pass
|
||||||
|
|
||||||
|
# destroy
|
||||||
|
def destroy():
|
||||||
|
pass
|
||||||
|
|
||||||
|
# return origin column one value
|
||||||
|
def process(block):
|
||||||
|
(rows, cols) = block.shape()
|
||||||
|
results = []
|
||||||
|
for i in range(rows):
|
||||||
|
results.append(block.data(i,0))
|
||||||
|
return results
|
|
@ -0,0 +1,464 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.common import *
|
||||||
|
from util.sqlset import *
|
||||||
|
|
||||||
|
import random
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
|
class PerfDB:
|
||||||
|
def __init__(self):
|
||||||
|
self.sqls = []
|
||||||
|
self.spends = []
|
||||||
|
|
||||||
|
# execute
|
||||||
|
def execute(self, sql):
|
||||||
|
print(f" perfdb execute {sql}")
|
||||||
|
stime = time.time()
|
||||||
|
ret = tdSql.execute(sql, 1)
|
||||||
|
spend = time.time() - stime
|
||||||
|
|
||||||
|
self.sqls.append(sql)
|
||||||
|
self.spends.append(spend)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
# query
|
||||||
|
def query(self, sql):
|
||||||
|
print(f" perfdb query {sql}")
|
||||||
|
start = time.time()
|
||||||
|
ret = tdSql.query(sql, None, 1)
|
||||||
|
spend = time.time() - start
|
||||||
|
self.sqls.append(sql)
|
||||||
|
self.spends.append(spend)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
self.setsql = TDSetSql()
|
||||||
|
|
||||||
|
# udf path
|
||||||
|
self.udf_path = os.path.dirname(os.path.realpath(__file__)) + "/udfpy"
|
||||||
|
|
||||||
|
|
||||||
|
self.column_dict = {
|
||||||
|
'ts': 'timestamp',
|
||||||
|
'col1': 'tinyint',
|
||||||
|
'col2': 'smallint',
|
||||||
|
'col3': 'int',
|
||||||
|
'col4': 'bigint',
|
||||||
|
'col5': 'tinyint unsigned',
|
||||||
|
'col6': 'smallint unsigned',
|
||||||
|
'col7': 'int unsigned',
|
||||||
|
'col8': 'bigint unsigned',
|
||||||
|
'col9': 'float',
|
||||||
|
'col10': 'double',
|
||||||
|
'col11': 'bool',
|
||||||
|
'col12': 'varchar(120)',
|
||||||
|
'col13': 'nchar(100)',
|
||||||
|
}
|
||||||
|
self.tag_dict = {
|
||||||
|
't1': 'tinyint',
|
||||||
|
't2': 'smallint',
|
||||||
|
't3': 'int',
|
||||||
|
't4': 'bigint',
|
||||||
|
't5': 'tinyint unsigned',
|
||||||
|
't6': 'smallint unsigned',
|
||||||
|
't7': 'int unsigned',
|
||||||
|
't8': 'bigint unsigned',
|
||||||
|
't9': 'float',
|
||||||
|
't10': 'double',
|
||||||
|
't11': 'bool',
|
||||||
|
't12': 'varchar(120)',
|
||||||
|
't13': 'nchar(100)',
|
||||||
|
}
|
||||||
|
|
||||||
|
def set_stb_sql(self,stbname,column_dict,tag_dict):
|
||||||
|
column_sql = ''
|
||||||
|
tag_sql = ''
|
||||||
|
for k,v in column_dict.items():
|
||||||
|
column_sql += f"{k} {v}, "
|
||||||
|
for k,v in tag_dict.items():
|
||||||
|
tag_sql += f"{k} {v}, "
|
||||||
|
create_stb_sql = f'create stable {stbname} ({column_sql[:-2]}) tags ({tag_sql[:-2]})'
|
||||||
|
return create_stb_sql
|
||||||
|
|
||||||
|
# create stable and child tables
|
||||||
|
def create_table(self, stbname, tbname, count):
|
||||||
|
tdSql.execute("create database db wal_retention_period 4")
|
||||||
|
tdSql.execute('use db')
|
||||||
|
self.child_count = count
|
||||||
|
self.stbname = stbname
|
||||||
|
self.tbname = tbname
|
||||||
|
|
||||||
|
# create stable
|
||||||
|
create_table_sql = self.set_stb_sql(stbname, self.column_dict, self.tag_dict)
|
||||||
|
tdSql.execute(create_table_sql)
|
||||||
|
|
||||||
|
batch_size = 1000
|
||||||
|
# create child table
|
||||||
|
for i in range(count):
|
||||||
|
ti = i % 128
|
||||||
|
tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}"'
|
||||||
|
sql = f'create table {tbname}{i} using {stbname} tags({tags});'
|
||||||
|
tdSql.execute(sql)
|
||||||
|
if i % batch_size == 0:
|
||||||
|
tdLog.info(f" create child table {i} ...")
|
||||||
|
|
||||||
|
tdLog.info(f" create {count} child tables ok.")
|
||||||
|
|
||||||
|
# create with dicts
|
||||||
|
def create_sf_dicts(self, dicts, filename):
|
||||||
|
for fun_name, out_type in dicts.items():
|
||||||
|
sql = f' create function {fun_name} as "{self.udf_path}/{filename}" outputtype {out_type} language "Python" '
|
||||||
|
tdSql.execute(sql)
|
||||||
|
tdLog.info(sql)
|
||||||
|
|
||||||
|
# create_udfpy_function
|
||||||
|
def create_scalar_udfpy(self):
|
||||||
|
# scalar funciton
|
||||||
|
self.scalar_funs = {
|
||||||
|
'sf0': 'timestamp',
|
||||||
|
'sf1': 'tinyint',
|
||||||
|
'sf2': 'smallint',
|
||||||
|
'sf3': 'int',
|
||||||
|
'sf4': 'bigint',
|
||||||
|
'sf5': 'tinyint unsigned',
|
||||||
|
'sf6': 'smallint unsigned',
|
||||||
|
'sf7': 'int unsigned',
|
||||||
|
'sf8': 'bigint unsigned',
|
||||||
|
'sf9': 'float',
|
||||||
|
'sf10': 'double',
|
||||||
|
'sf11': 'bool',
|
||||||
|
'sf12': 'varchar(120)',
|
||||||
|
'sf13': 'nchar(100)'
|
||||||
|
}
|
||||||
|
# agg function
|
||||||
|
self.agg_funs = {
|
||||||
|
'af1': 'tinyint',
|
||||||
|
'af2': 'smallint',
|
||||||
|
'af3': 'int',
|
||||||
|
'af4': 'bigint',
|
||||||
|
'af5': 'tinyint unsigned',
|
||||||
|
'af6': 'smallint unsigned',
|
||||||
|
'af7': 'int unsigned',
|
||||||
|
'af8': 'bigint unsigned',
|
||||||
|
'af9': 'float',
|
||||||
|
'af10': 'double',
|
||||||
|
'af11': 'bool',
|
||||||
|
'af12': 'varchar(120)',
|
||||||
|
'af13': 'nchar(100)',
|
||||||
|
'af14': 'timestamp'
|
||||||
|
}
|
||||||
|
|
||||||
|
# multi_args
|
||||||
|
self.create_sf_dicts(self.scalar_funs, "sf_origin.py")
|
||||||
|
fun_name = "sf_multi_args"
|
||||||
|
self.create_udf_sf(fun_name, f'{fun_name}.py', "binary(1024)")
|
||||||
|
|
||||||
|
# all type check null
|
||||||
|
for col_name, col_type in self.column_dict.items():
|
||||||
|
self.create_udf_sf(f"sf_null_{col_name}", "sf_null.py", col_type)
|
||||||
|
|
||||||
|
# concat
|
||||||
|
fun_name = "sf_concat_var"
|
||||||
|
self.create_udf_sf(fun_name, f'{fun_name}.py', "varchar(1024)")
|
||||||
|
fun_name = "sf_concat_nch"
|
||||||
|
self.create_udf_sf(fun_name, f'{fun_name}.py', "nchar(1024)")
|
||||||
|
|
||||||
|
|
||||||
|
# fun_name == fun_name.py
|
||||||
|
def create_udf_sf(self, fun_name, file_name, out_type):
|
||||||
|
sql = f'create function {fun_name} as "{self.udf_path}/{file_name}" outputtype {out_type} language "Python" '
|
||||||
|
tdSql.execute(sql)
|
||||||
|
tdLog.info(sql)
|
||||||
|
|
||||||
|
def create_udf_af(self, fun_name, file_name, out_type, bufsize):
|
||||||
|
sql = f'create aggregate function {fun_name} as "{self.udf_path}/{file_name}" outputtype {out_type} bufsize {bufsize} language "Python" '
|
||||||
|
tdSql.execute(sql)
|
||||||
|
tdLog.info(sql)
|
||||||
|
|
||||||
|
|
||||||
|
# sql1 query result eual with sql2
|
||||||
|
def verify_same_result(self, sql1, sql2):
|
||||||
|
# query
|
||||||
|
result1 = tdSql.getResult(sql1)
|
||||||
|
tdSql.query(sql2)
|
||||||
|
|
||||||
|
for i, row in enumerate(result1):
|
||||||
|
for j , val in enumerate(row):
|
||||||
|
tdSql.checkData(i, j, result1[i][j])
|
||||||
|
|
||||||
|
# same value like select col1, udf_fun1(col1) from st
|
||||||
|
def verify_same_value(self, sql, col=0):
|
||||||
|
tdSql.query(sql)
|
||||||
|
nrows = tdSql.getRows()
|
||||||
|
for i in range(nrows):
|
||||||
|
val = tdSql.getData(i, col)
|
||||||
|
tdSql.checkData(i, col + 1, val)
|
||||||
|
|
||||||
|
# verify multi values
|
||||||
|
def verify_same_multi_values(self, sql):
|
||||||
|
tdSql.query(sql)
|
||||||
|
nrows = tdSql.getRows()
|
||||||
|
for i in range(nrows):
|
||||||
|
udf_val = tdSql.getData(i, 0)
|
||||||
|
vals = udf_val.split(',')
|
||||||
|
for j,val in enumerate(vals, 1):
|
||||||
|
tdSql.checkData(i, j, val)
|
||||||
|
|
||||||
|
# query multi-args
|
||||||
|
def query_multi_args(self):
|
||||||
|
cols = list(self.column_dict.keys()) + list(self.tag_dict.keys())
|
||||||
|
cols.remove("col13")
|
||||||
|
cols.remove("t13")
|
||||||
|
cols.remove("ts")
|
||||||
|
ncols = len(cols)
|
||||||
|
print(cols)
|
||||||
|
for i in range(2, ncols):
|
||||||
|
sample = random.sample(cols, i)
|
||||||
|
print(sample)
|
||||||
|
cols_name = ','.join(sample)
|
||||||
|
sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname} limit 10'
|
||||||
|
self.verify_same_multi_values(sql)
|
||||||
|
tdLog.info(sql)
|
||||||
|
|
||||||
|
|
||||||
|
# query_udfpy
|
||||||
|
def query_scalar_udfpy(self):
|
||||||
|
# col
|
||||||
|
for col_name, col_type in self.column_dict.items():
|
||||||
|
for fun_name, out_type in self.scalar_funs.items():
|
||||||
|
if col_type == out_type :
|
||||||
|
sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname} limit 10'
|
||||||
|
tdLog.info(sql)
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} limit 100) order by b,a desc'
|
||||||
|
tdLog.info(sql)
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
|
||||||
|
# multi-args
|
||||||
|
self.query_multi_args()
|
||||||
|
|
||||||
|
# all type check null
|
||||||
|
for col_name, col_type in self.column_dict.items():
|
||||||
|
fun_name = f"sf_null_{col_name}"
|
||||||
|
sql = f'select {fun_name}({col_name}) from {self.stbname}'
|
||||||
|
tdSql.query(sql)
|
||||||
|
if col_type != "timestamp":
|
||||||
|
tdSql.checkData(0, 0, "None")
|
||||||
|
else:
|
||||||
|
val = tdSql.getData(0, 0)
|
||||||
|
if val is not None:
|
||||||
|
tdLog.exit(f" check {sql} not expect None.")
|
||||||
|
|
||||||
|
# concat
|
||||||
|
sql = f'select sf_concat_var(col12, t12), concat(col12, t12) from {self.stbname} limit 1000'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
sql = f'select sf_concat_nch(col13, t13), concat(col13, t13) from {self.stbname} limit 1000'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
|
||||||
|
# create aggregate
|
||||||
|
def create_aggr_udfpy(self):
|
||||||
|
|
||||||
|
bufsize = 200 * 1024
|
||||||
|
# all type check null
|
||||||
|
for col_name, col_type in self.column_dict.items():
|
||||||
|
self.create_udf_af(f"af_null_{col_name}", "af_null.py", col_type, bufsize)
|
||||||
|
|
||||||
|
# min
|
||||||
|
file_name = "af_min.py"
|
||||||
|
fun_name = "af_min_float"
|
||||||
|
self.create_udf_af(fun_name, file_name, f"float", bufsize)
|
||||||
|
fun_name = "af_min_int"
|
||||||
|
self.create_udf_af(fun_name, file_name, f"int", bufsize)
|
||||||
|
|
||||||
|
# sum
|
||||||
|
file_name = "af_sum.py"
|
||||||
|
fun_name = "af_sum_float"
|
||||||
|
self.create_udf_af(fun_name, file_name, f"float", bufsize)
|
||||||
|
fun_name = "af_sum_int"
|
||||||
|
self.create_udf_af(fun_name, file_name, f"int", bufsize)
|
||||||
|
fun_name = "af_sum_bigint"
|
||||||
|
self.create_udf_af(fun_name, file_name, f"bigint", bufsize)
|
||||||
|
|
||||||
|
# count
|
||||||
|
file_name = "af_count.py"
|
||||||
|
fun_name = "af_count_float"
|
||||||
|
self.create_udf_af(fun_name, file_name, f"float", bufsize)
|
||||||
|
fun_name = "af_count_int"
|
||||||
|
self.create_udf_af(fun_name, file_name, f"int", bufsize)
|
||||||
|
fun_name = "af_count_bigint"
|
||||||
|
self.create_udf_af(fun_name, file_name, f"bigint", bufsize)
|
||||||
|
|
||||||
|
|
||||||
|
# query aggregate
|
||||||
|
def query_aggr_udfpy(self) :
|
||||||
|
# all type check null
|
||||||
|
for col_name, col_type in self.column_dict.items():
|
||||||
|
fun_name = f"af_null_{col_name}"
|
||||||
|
sql = f'select {fun_name}({col_name}) from {self.stbname}'
|
||||||
|
tdSql.query(sql)
|
||||||
|
if col_type != "timestamp":
|
||||||
|
tdSql.checkData(0, 0, "None")
|
||||||
|
else:
|
||||||
|
val = tdSql.getData(0, 0)
|
||||||
|
if val is not None:
|
||||||
|
tdLog.exit(f" check {sql} not expect None.")
|
||||||
|
|
||||||
|
# min
|
||||||
|
sql = f'select min(col3), af_min_int(col3) from {self.stbname}'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
sql = f'select min(col7), af_min_int(col7) from {self.stbname}'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
sql = f'select min(col9), af_min_float(col9) from {self.stbname}'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
|
||||||
|
# sum
|
||||||
|
sql = f'select sum(col1), af_sum_int(col1) from d0'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
sql = f'select sum(col3), af_sum_bigint(col3) from {self.stbname}'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
|
||||||
|
# count
|
||||||
|
sql = f'select count(col1), af_count_int(col1) from {self.stbname}'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
sql = f'select count(col7), af_count_bigint(col7) from {self.stbname}'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
sql = f'select count(col8), af_count_float(col8) from {self.stbname}'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
|
||||||
|
# nest
|
||||||
|
sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname})'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
# group by
|
||||||
|
sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname} group by tbname)'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
# two filed expr
|
||||||
|
sql = f'select sum(col1+col2),af_sum_float(col1+col2) from {self.stbname};'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
# interval
|
||||||
|
sql = f'select af_sum_float(col2+col3),sum(col3+col2) from {self.stbname} interval(1s)'
|
||||||
|
self.verify_same_value(sql)
|
||||||
|
|
||||||
|
|
||||||
|
# insert to child table d1 data
|
||||||
|
def insert_data(self, tbname, rows):
|
||||||
|
ts = 1670000000000
|
||||||
|
values = ""
|
||||||
|
batch_size = 500
|
||||||
|
child_name = ""
|
||||||
|
for i in range(self.child_count):
|
||||||
|
for j in range(rows):
|
||||||
|
tj = j % 128
|
||||||
|
cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}涛思数据codepage is utf_32_le"'
|
||||||
|
value = f'({ts+j},{cols})'
|
||||||
|
if values == "":
|
||||||
|
values = value
|
||||||
|
else:
|
||||||
|
values += f",{value}"
|
||||||
|
if j % batch_size == 0 or j + 1 == rows:
|
||||||
|
sql = f'insert into {tbname}{i} values {values};'
|
||||||
|
tdSql.execute(sql)
|
||||||
|
tdLog.info(f" child table={i} rows={j} insert data.")
|
||||||
|
values = ""
|
||||||
|
|
||||||
|
# partial columns upate
|
||||||
|
sql = f'insert into {tbname}0(ts, col1, col9, col11) values(now, 100, 200, 0)'
|
||||||
|
tdSql.execute(sql)
|
||||||
|
sql = f'insert into {tbname}0(ts, col2, col5, col8) values(now, 100, 200, 300)'
|
||||||
|
tdSql.execute(sql)
|
||||||
|
sql = f'insert into {tbname}0(ts, col3, col7, col13) values(now, null, null, null)'
|
||||||
|
tdSql.execute(sql)
|
||||||
|
sql = f'insert into {tbname}0(ts) values(now)'
|
||||||
|
tdSql.execute(sql)
|
||||||
|
tdLog.info(f" insert {rows} to child table {self.child_count} .")
|
||||||
|
|
||||||
|
|
||||||
|
# create stream
|
||||||
|
def create_stream(self):
|
||||||
|
sql = f"create stream ma into sta subtable(concat('sta_',tbname)) \
|
||||||
|
as select _wstart,count(col1),af_count_bigint(col1) from {self.stbname} partition by tbname interval(1s);"
|
||||||
|
tdSql.execute(sql)
|
||||||
|
tdLog.info(sql)
|
||||||
|
|
||||||
|
# query stream
|
||||||
|
def verify_stream(self):
|
||||||
|
sql = f"select * from sta limit 10"
|
||||||
|
self.verify_same_value(sql, 1)
|
||||||
|
|
||||||
|
# create tmq
|
||||||
|
def create_tmq(self):
|
||||||
|
sql = f"create topic topa as select concat(col12,t12),sf_concat_var(col12,t12) from {self.stbname};"
|
||||||
|
tdSql.execute(sql)
|
||||||
|
tdLog.info(sql)
|
||||||
|
|
||||||
|
def install_taospy(self):
|
||||||
|
tdLog.info("install taospyudf...")
|
||||||
|
packs = ["taospyudf"]
|
||||||
|
for pack in packs:
|
||||||
|
subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-i', 'https://pypi.org/simple', '-U', pack])
|
||||||
|
tdLog.info("call ldconfig...")
|
||||||
|
os.system("ldconfig")
|
||||||
|
tdLog.info("install taospyudf successfully.")
|
||||||
|
|
||||||
|
# run
|
||||||
|
def run(self):
|
||||||
|
self.install_taospy()
|
||||||
|
|
||||||
|
# var
|
||||||
|
stable = "meters"
|
||||||
|
tbname = "d"
|
||||||
|
count = 10
|
||||||
|
rows = 5000
|
||||||
|
# do
|
||||||
|
self.create_table(stable, tbname, count)
|
||||||
|
|
||||||
|
# create
|
||||||
|
self.create_scalar_udfpy()
|
||||||
|
self.create_aggr_udfpy()
|
||||||
|
|
||||||
|
# create stream
|
||||||
|
self.create_stream()
|
||||||
|
|
||||||
|
# create tmq
|
||||||
|
self.create_tmq()
|
||||||
|
|
||||||
|
# insert data
|
||||||
|
self.insert_data(tbname, rows)
|
||||||
|
|
||||||
|
# query
|
||||||
|
self.query_scalar_udfpy()
|
||||||
|
self.query_aggr_udfpy()
|
||||||
|
|
||||||
|
# show performance
|
||||||
|
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -146,7 +146,7 @@ class TDTestCase:
|
||||||
for i in range(expectRows):
|
for i in range(expectRows):
|
||||||
totalConsumeRows += resultList[i]
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
tdSql.query(queryString)
|
tdSql.query(queryString, None, 50)
|
||||||
totalRowsFromQury = tdSql.getRows()
|
totalRowsFromQury = tdSql.getRows()
|
||||||
|
|
||||||
tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
|
tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
|
||||||
|
@ -236,7 +236,7 @@ class TDTestCase:
|
||||||
for i in range(expectRows):
|
for i in range(expectRows):
|
||||||
totalConsumeRows += resultList[i]
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
tdSql.query(queryString)
|
tdSql.query(queryString, None, 50)
|
||||||
totalRowsFromQuery = tdSql.getRows()
|
totalRowsFromQuery = tdSql.getRows()
|
||||||
|
|
||||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery))
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery))
|
||||||
|
|
|
@ -84,15 +84,15 @@ SWords shellCommands[] = {
|
||||||
{"create table <anyword> using <stb_name> tags(", 0, 0, NULL},
|
{"create table <anyword> using <stb_name> tags(", 0, 0, NULL},
|
||||||
{"create database <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> "
|
{"create database <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> "
|
||||||
"<anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> "
|
"<anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> "
|
||||||
"<db_options> <anyword> <db_options> <anyword> ;",
|
"<db_options> <anyword> <db_options> <anyword> ;", 0, 0, NULL},
|
||||||
0, 0, NULL},
|
|
||||||
{"create dnode <anyword>", 0, 0, NULL},
|
{"create dnode <anyword>", 0, 0, NULL},
|
||||||
{"create index <anyword> on <stb_name> ()", 0, 0, NULL},
|
{"create index <anyword> on <stb_name> ()", 0, 0, NULL},
|
||||||
{"create mnode on dnode <dnode_id> ;", 0, 0, NULL},
|
{"create mnode on dnode <dnode_id> ;", 0, 0, NULL},
|
||||||
{"create qnode on dnode <dnode_id> ;", 0, 0, NULL},
|
{"create qnode on dnode <dnode_id> ;", 0, 0, NULL},
|
||||||
{"create stream <anyword> into <anyword> as select", 0, 0, NULL}, // 26 append sub sql
|
{"create stream <anyword> into <anyword> as select", 0, 0, NULL}, // 26 append sub sql
|
||||||
{"create topic <anyword> as select", 0, 0, NULL}, // 27 append sub sql
|
{"create topic <anyword> as select", 0, 0, NULL}, // 27 append sub sql
|
||||||
{"create function ", 0, 0, NULL},
|
{"create function <anyword> as <anyword> outputtype <data_types> language <udf_language>", 0, 0, NULL},
|
||||||
|
{"create aggregate function <anyword> as <anyword> outputtype <data_types> bufsize <anyword> language <udf_language>", 0, 0, NULL},
|
||||||
{"create user <anyword> pass <anyword> sysinfo 0;", 0, 0, NULL},
|
{"create user <anyword> pass <anyword> sysinfo 0;", 0, 0, NULL},
|
||||||
{"create user <anyword> pass <anyword> sysinfo 1;", 0, 0, NULL},
|
{"create user <anyword> pass <anyword> sysinfo 1;", 0, 0, NULL},
|
||||||
{"describe <all_table>", 0, 0, NULL},
|
{"describe <all_table>", 0, 0, NULL},
|
||||||
|
@ -105,7 +105,7 @@ SWords shellCommands[] = {
|
||||||
{"drop qnode on dnode <dnode_id> ;", 0, 0, NULL},
|
{"drop qnode on dnode <dnode_id> ;", 0, 0, NULL},
|
||||||
{"drop user <user_name> ;", 0, 0, NULL},
|
{"drop user <user_name> ;", 0, 0, NULL},
|
||||||
// 40
|
// 40
|
||||||
{"drop function", 0, 0, NULL},
|
{"drop function <udf_name> ;", 0, 0, NULL},
|
||||||
{"drop consumer group <anyword> on ", 0, 0, NULL},
|
{"drop consumer group <anyword> on ", 0, 0, NULL},
|
||||||
{"drop topic <topic_name> ;", 0, 0, NULL},
|
{"drop topic <topic_name> ;", 0, 0, NULL},
|
||||||
{"drop stream <stream_name> ;", 0, 0, NULL},
|
{"drop stream <stream_name> ;", 0, 0, NULL},
|
||||||
|
@ -272,6 +272,8 @@ char* key_systable[] = {
|
||||||
"ins_subscriptions", "ins_streams", "ins_stream_tasks", "ins_vnodes", "ins_user_privileges", "perf_connections",
|
"ins_subscriptions", "ins_streams", "ins_stream_tasks", "ins_vnodes", "ins_user_privileges", "perf_connections",
|
||||||
"perf_queries", "perf_consumers", "perf_trans", "perf_apps"};
|
"perf_queries", "perf_consumers", "perf_trans", "perf_apps"};
|
||||||
|
|
||||||
|
char* udf_language[] = {"\'Python\'", "\'C\'"};
|
||||||
|
|
||||||
//
|
//
|
||||||
// ------- global variant define ---------
|
// ------- global variant define ---------
|
||||||
//
|
//
|
||||||
|
@ -291,25 +293,29 @@ bool waitAutoFill = false;
|
||||||
#define WT_VAR_USERNAME 4
|
#define WT_VAR_USERNAME 4
|
||||||
#define WT_VAR_TOPIC 5
|
#define WT_VAR_TOPIC 5
|
||||||
#define WT_VAR_STREAM 6
|
#define WT_VAR_STREAM 6
|
||||||
#define WT_VAR_ALLTABLE 7
|
#define WT_VAR_UDFNAME 7
|
||||||
#define WT_VAR_FUNC 8
|
|
||||||
#define WT_VAR_KEYWORD 9
|
|
||||||
#define WT_VAR_TBACTION 10
|
|
||||||
#define WT_VAR_DBOPTION 11
|
|
||||||
#define WT_VAR_ALTER_DBOPTION 12
|
|
||||||
#define WT_VAR_DATATYPE 13
|
|
||||||
#define WT_VAR_KEYTAGS 14
|
|
||||||
#define WT_VAR_ANYWORD 15
|
|
||||||
#define WT_VAR_TBOPTION 16
|
|
||||||
#define WT_VAR_USERACTION 17
|
|
||||||
#define WT_VAR_KEYSELECT 18
|
|
||||||
#define WT_VAR_SYSTABLE 19
|
|
||||||
|
|
||||||
#define WT_VAR_CNT 20
|
#define WT_FROM_DB_MAX 7 // max get content from db
|
||||||
|
|
||||||
#define WT_FROM_DB_MAX 6 // max get content from db
|
|
||||||
#define WT_FROM_DB_CNT (WT_FROM_DB_MAX + 1)
|
#define WT_FROM_DB_CNT (WT_FROM_DB_MAX + 1)
|
||||||
|
|
||||||
|
#define WT_VAR_ALLTABLE 8
|
||||||
|
#define WT_VAR_FUNC 9
|
||||||
|
#define WT_VAR_KEYWORD 10
|
||||||
|
#define WT_VAR_TBACTION 11
|
||||||
|
#define WT_VAR_DBOPTION 12
|
||||||
|
#define WT_VAR_ALTER_DBOPTION 13
|
||||||
|
#define WT_VAR_DATATYPE 14
|
||||||
|
#define WT_VAR_KEYTAGS 15
|
||||||
|
#define WT_VAR_ANYWORD 16
|
||||||
|
#define WT_VAR_TBOPTION 17
|
||||||
|
#define WT_VAR_USERACTION 18
|
||||||
|
#define WT_VAR_KEYSELECT 19
|
||||||
|
#define WT_VAR_SYSTABLE 20
|
||||||
|
#define WT_VAR_LANGUAGE 21
|
||||||
|
|
||||||
|
#define WT_VAR_CNT 22
|
||||||
|
|
||||||
|
|
||||||
#define WT_TEXT 0xFF
|
#define WT_TEXT 0xFF
|
||||||
|
|
||||||
char dbName[256] = ""; // save use database name;
|
char dbName[256] = ""; // save use database name;
|
||||||
|
@ -319,13 +325,13 @@ TdThreadMutex tiresMutex;
|
||||||
// save thread handle obtain var name from db server
|
// save thread handle obtain var name from db server
|
||||||
TdThread* threads[WT_FROM_DB_CNT];
|
TdThread* threads[WT_FROM_DB_CNT];
|
||||||
// obtain var name with sql from server
|
// obtain var name with sql from server
|
||||||
char varTypes[WT_VAR_CNT][64] = {"<db_name>", "<stb_name>", "<tb_name>", "<dnode_id>", "<user_name>",
|
char varTypes[WT_VAR_CNT][64] = {
|
||||||
"<topic_name>", "<stream_name>", "<all_table>", "<function>", "<keyword>",
|
"<db_name>", "<stb_name>", "<tb_name>", "<dnode_id >", "<user_name>", "<topic_name>", "<stream_name>",
|
||||||
"<tb_actions>", "<db_options>", "<alter_db_options>", "<data_types>", "<key_tags>",
|
"<udf_name>", "<all_table>", "<function>", "<keyword>", "<tb_actions>", "<db_options>", "<alter_db_options>",
|
||||||
"<anyword>", "<tb_options>", "<user_actions>", "<key_select>"};
|
"<data_types>", "<key_tags>", "<anyword>", "<tb_options>", "<user_actions>", "<key_select>", "<sys_table>", "<udf_language>"};
|
||||||
|
|
||||||
char varSqls[WT_FROM_DB_CNT][64] = {"show databases;", "show stables;", "show tables;", "show dnodes;",
|
char varSqls[WT_FROM_DB_CNT][64] = {"show databases;", "show stables;", "show tables;", "show dnodes;",
|
||||||
"show users;", "show topics;", "show streams;"};
|
"show users;", "show topics;", "show streams;", "show functions;"};
|
||||||
|
|
||||||
// var words current cursor, if user press any one key except tab, cursorVar can be reset to -1
|
// var words current cursor, if user press any one key except tab, cursorVar can be reset to -1
|
||||||
int cursorVar = -1;
|
int cursorVar = -1;
|
||||||
|
@ -390,7 +396,8 @@ void showHelp() {
|
||||||
create qnode on dnode <dnode_id> ;\n\
|
create qnode on dnode <dnode_id> ;\n\
|
||||||
create stream <stream_name> into <stb_name> as select ...\n\
|
create stream <stream_name> into <stb_name> as select ...\n\
|
||||||
create topic <topic_name> as select ...\n\
|
create topic <topic_name> as select ...\n\
|
||||||
create function ...\n\
|
create function <udf_name> as <file_name> outputtype <data_types> language \'C\' | \'Python\' ;\n\
|
||||||
|
create aggregate function <udf_name> as <file_name> outputtype <data_types> bufsize <bufsize_bytes> language \'C\' | \'Python\';\n\
|
||||||
create user <user_name> pass <password> ...\n\
|
create user <user_name> pass <password> ...\n\
|
||||||
----- D ----- \n\
|
----- D ----- \n\
|
||||||
describe <all_table>\n\
|
describe <all_table>\n\
|
||||||
|
@ -401,7 +408,7 @@ void showHelp() {
|
||||||
drop mnode on dnode <dnode_id> ;\n\
|
drop mnode on dnode <dnode_id> ;\n\
|
||||||
drop qnode on dnode <dnode_id> ;\n\
|
drop qnode on dnode <dnode_id> ;\n\
|
||||||
drop user <user_name> ;\n\
|
drop user <user_name> ;\n\
|
||||||
drop function <function_name>;\n\
|
drop function <udf_name>;\n\
|
||||||
drop consumer group ... \n\
|
drop consumer group ... \n\
|
||||||
drop topic <topic_name> ;\n\
|
drop topic <topic_name> ;\n\
|
||||||
drop stream <stream_name> ;\n\
|
drop stream <stream_name> ;\n\
|
||||||
|
@ -643,6 +650,7 @@ bool shellAutoInit() {
|
||||||
GenerateVarType(WT_VAR_USERACTION, user_actions, sizeof(user_actions) / sizeof(char*));
|
GenerateVarType(WT_VAR_USERACTION, user_actions, sizeof(user_actions) / sizeof(char*));
|
||||||
GenerateVarType(WT_VAR_KEYSELECT, key_select, sizeof(key_select) / sizeof(char*));
|
GenerateVarType(WT_VAR_KEYSELECT, key_select, sizeof(key_select) / sizeof(char*));
|
||||||
GenerateVarType(WT_VAR_SYSTABLE, key_systable, sizeof(key_systable) / sizeof(char*));
|
GenerateVarType(WT_VAR_SYSTABLE, key_systable, sizeof(key_systable) / sizeof(char*));
|
||||||
|
GenerateVarType(WT_VAR_LANGUAGE, udf_language, sizeof(udf_language) / sizeof(char*));
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue