Merge pull request #27143 from taosdata/docs/udf
doc: update python udf
This commit is contained in:
commit
ec5daedff2
|
@ -429,7 +429,9 @@ def process(block):
|
|||
这个文件包含 3 个函数, init 和 destroy 都是空函数,它们是 UDF 的生命周期函数,即使什么都不做也要定义。最关键的是 process 函数, 它接受一个数据块,这个数据块对象有两个方法。
|
||||
1. shape() 返回数据块的行数和列数
|
||||
2. data(i, j) 返回 i 行 j 列的数据
|
||||
标量函数的 process 方法传入的数据块有多少行,就需要返回多少行数据。上述代码忽略列数,因为只需对每行的第一个数做计算。
|
||||
|
||||
标量函数的 process 方法传入的数据块有多少行,就需要返回多少行数据。上述代码忽略列数,因为只需对每行的第一列做计算。
|
||||
|
||||
接下来创建对应的 UDF 函数,在 TDengine CLI 中执行下面语句。
|
||||
|
||||
```sql
|
||||
|
@ -442,7 +444,7 @@ taos> create function myfun as '/root/udf/myfun.py' outputtype double language '
|
|||
Create OK, 0 row(s) affected (0.005202s)
|
||||
```
|
||||
|
||||
看起来很顺利,接下来 show 一下系统中所有的自定义函数,确认创建成功。
|
||||
看起来很顺利,接下来查看系统中所有的自定义函数,确认创建成功。
|
||||
|
||||
```text
|
||||
taos> show functions;
|
||||
|
@ -452,7 +454,7 @@ taos> show functions;
|
|||
Query OK, 1 row(s) in set (0.005767s)
|
||||
```
|
||||
|
||||
接下来就来测试一下这个函数,测试之前先执行下面的 SQL 命令,制造些测试数据,在 TDengine CLI 中执行下述命令。
|
||||
生成测试数据,可以在 TDengine CLI 中执行下述命令。
|
||||
|
||||
```sql
|
||||
create database test;
|
||||
|
@ -470,8 +472,7 @@ taos> select myfun(v1, v2) from t;
|
|||
DB error: udf function execution failure (0.011088s)
|
||||
```
|
||||
|
||||
不幸的是执行失败了,什么原因呢?
|
||||
查看 udfd 进程的日志。
|
||||
不幸的是执行失败了,什么原因呢?查看 udfd 进程的日志。
|
||||
|
||||
```shell
|
||||
tail -10 /var/log/taos/udfd.log
|
||||
|
@ -514,8 +515,7 @@ taos> select myfun(v1, v2) from t;
|
|||
2.302585093 |
|
||||
```
|
||||
|
||||
2. 没有处理 null 值。我们期望如果输入有 null,则会抛异常终止执行。
|
||||
因此 process 函数改进如下。
|
||||
2. 没有处理 null 值。我们期望如果输入有 null,则会抛异常终止执行。因此 process 函数改进如下。
|
||||
|
||||
```python
|
||||
def process(block):
|
||||
|
@ -525,7 +525,7 @@ def process(block):
|
|||
return [ None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
|
||||
```
|
||||
|
||||
然后执行下面的语句更新已有的 UDF。
|
||||
执行如下语句更新已有的 UDF。
|
||||
|
||||
```sql
|
||||
create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
|
||||
|
@ -539,7 +539,7 @@ taos> select myfun(v1, v2) from t;
|
|||
DB error: udf function execution failure (0.014643s)
|
||||
```
|
||||
|
||||
但遗憾的是我们自定义的异常信息没有展示给用户,而是在插件的日志文件 /var/log/taos/taospyudf.log 中。
|
||||
自定义的异常信息打印在插件的日志文件 /var/log/taos/taospyudf.log 中。
|
||||
|
||||
```text
|
||||
2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2
|
||||
|
@ -554,18 +554,17 @@ At:
|
|||
|
||||
#### 示例三
|
||||
|
||||
编写一个 UDF:输入(x1, x2, ..., xn), 输出每个值和它们的序号的乘积的和: 1 * x1 + 2 * x2 + ... + n * xn。如果 x1 至 xn 中包含 null,则结果为 null。
|
||||
这个示例与示例一的区别是,可以接受任意多列作为输入,且要处理每一列的值。编写 UDF 文件 /root/udf/nsum.py。
|
||||
输入(x1, x2, ..., xn), 输出每个值和它们的序号的乘积的和:1 * x1 + 2 * x2 + ... + n * xn。如果 x1 至 xn 中包含 null,则结果为 null。
|
||||
|
||||
本例与示例一的区别是,可以接受任意多列作为输入,且要处理每一列的值。编写 UDF 文件 /root/udf/nsum.py。
|
||||
|
||||
```python
|
||||
def init():
|
||||
pass
|
||||
|
||||
|
||||
def destroy():
|
||||
pass
|
||||
|
||||
|
||||
def process(block):
|
||||
rows, cols = block.shape()
|
||||
result = []
|
||||
|
@ -617,11 +616,9 @@ pip3 install moment
|
|||
```python
|
||||
import moment
|
||||
|
||||
|
||||
def init():
|
||||
pass
|
||||
|
||||
|
||||
def destroy():
|
||||
pass
|
||||
|
||||
|
@ -707,26 +704,21 @@ import pickle
|
|||
|
||||
LOG_FILE: io.TextIOBase = None
|
||||
|
||||
|
||||
def init():
|
||||
global LOG_FILE
|
||||
LOG_FILE = open("/var/log/taos/spread.log", "wt")
|
||||
log("init function myspead success")
|
||||
|
||||
|
||||
def log(o):
|
||||
LOG_FILE.write(str(o) + '\n')
|
||||
|
||||
|
||||
def destroy():
|
||||
log("close log file: spread.log")
|
||||
LOG_FILE.close()
|
||||
|
||||
|
||||
def start():
|
||||
return pickle.dumps((-math.inf, math.inf))
|
||||
|
||||
|
||||
def reduce(block, buf):
|
||||
max_number, min_number = pickle.loads(buf)
|
||||
log(f"initial max_number={max_number}, min_number={min_number}")
|
||||
|
@ -741,20 +733,20 @@ def reduce(block, buf):
|
|||
min_number = v
|
||||
return pickle.dumps((max_number, min_number))
|
||||
|
||||
|
||||
def finish(buf):
|
||||
max_number, min_number = pickle.loads(buf)
|
||||
return max_number - min_number
|
||||
```
|
||||
|
||||
在这个示例中我们不光定义了一个聚合函数,还添加记录执行日志的功能,讲解如下。
|
||||
1. init 函数不再是空函数,而是打开了一个文件用于写执行日志
|
||||
2. log 函数是记录日志的工具,自动将传入的对象转成字符串,加换行符输出
|
||||
3. destroy 函数用来在执行结束关闭文件
|
||||
4. start 返回了初始的 buffer,用来存聚合函数的中间结果,我们把最大值初始化为负无穷大,最小值初始化为正无穷大
|
||||
5. reduce 处理每个数据块并聚合结果
|
||||
6. finish 函数将最终的 buffer 转换成最终的输出
|
||||
执行下面的 SQL语句创建对应的 UDF。
|
||||
在这个示例中,我们不但定义了一个聚合函数,还增加了记录执行日志的功能。
|
||||
1. init 函数打开一个文件用于记录日志
|
||||
2. log 函数记录日志,自动将传入的对象转成字符串,加换行符输出
|
||||
3. destroy 函数在执行结束后关闭日志文件
|
||||
4. start 函数返回初始的 buffer,用来存聚合函数的中间结果,把最大值初始化为负无穷大,最小值初始化为正无穷大
|
||||
5. reduce 函数处理每个数据块并聚合结果
|
||||
6. finish 函数将 buffer 转换成最终的输出
|
||||
|
||||
执行下面 SQL 语句创建对应的 UDF。
|
||||
|
||||
```sql
|
||||
create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';
|
||||
|
@ -785,7 +777,7 @@ taos> select spread(v1) from t;
|
|||
Query OK, 1 row(s) in set (0.005501s)
|
||||
```
|
||||
|
||||
最后,查看我们自己打印的执行日志,从日志可以看出,reduce 函数被执行了 3 次。执行过程中 max 值被更新了 4 次, min 值只被更新 1 次。
|
||||
最后,查看执行日志,可以看到 reduce 函数被执行了 3 次,执行过程中 max 值被更新了 4 次,min 值只被更新 1 次。
|
||||
|
||||
```shell
|
||||
root@slave11 /var/log/taos $ cat spread.log
|
||||
|
|
Loading…
Reference in New Issue