commit
83862319a4
|
@ -410,7 +410,7 @@ def finish(buf: bytes) -> output_type:
|
||||||
#### 示例一
|
#### 示例一
|
||||||
|
|
||||||
编写一个只接收一个整数的 UDF 函数: 输入 n, 输出 ln(n^2 + 1)。
|
编写一个只接收一个整数的 UDF 函数: 输入 n, 输出 ln(n^2 + 1)。
|
||||||
首先编写一个 Python 文件,存在系统某个目录,比如 /root/udf/myfun.py 内容如下
|
首先编写一个 Python 文件,存在系统某个目录,比如 /root/udf/myfun.py 内容如下。
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from math import log
|
from math import log
|
||||||
|
@ -426,23 +426,23 @@ def process(block):
|
||||||
return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
|
return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
|
||||||
```
|
```
|
||||||
|
|
||||||
这个文件包含 3 个函数, init 和 destroy 都是空函数,它们是 UDF 的生命周期函数,即使什么都不做也要定义。最关键的是 process 函数, 它接受一个数据块,这个数据块对象有两个方法:
|
这个文件包含 3 个函数, init 和 destroy 都是空函数,它们是 UDF 的生命周期函数,即使什么都不做也要定义。最关键的是 process 函数, 它接受一个数据块,这个数据块对象有两个方法。
|
||||||
1. shape() 返回数据块的行数和列数
|
1. shape() 返回数据块的行数和列数
|
||||||
2. data(i, j) 返回 i 行 j 列的数据
|
2. data(i, j) 返回 i 行 j 列的数据
|
||||||
标量函数的 process 方法传人的数据块有多少行,就需要返回多少个数据。上述代码中我们忽略的列数,因为我们只想对每行的第一个数做计算。
|
标量函数的 process 方法传入的数据块有多少行,就需要返回多少行数据。上述代码忽略列数,因为只需对每行的第一个数做计算。
|
||||||
接下来我们创建对应的 UDF 函数,在 TDengine CLI 中执行下面语句:
|
接下来创建对应的 UDF 函数,在 TDengine CLI 中执行下面语句。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
|
create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
|
||||||
```
|
```
|
||||||
其输出如下
|
其输出如下。
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
taos> create function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
|
taos> create function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
|
||||||
Create OK, 0 row(s) affected (0.005202s)
|
Create OK, 0 row(s) affected (0.005202s)
|
||||||
```
|
```
|
||||||
|
|
||||||
看起来很顺利,接下来 show 一下系统中所有的自定义函数,确认创建成功:
|
看起来很顺利,接下来 show 一下系统中所有的自定义函数,确认创建成功。
|
||||||
|
|
||||||
```text
|
```text
|
||||||
taos> show functions;
|
taos> show functions;
|
||||||
|
@ -452,7 +452,7 @@ taos> show functions;
|
||||||
Query OK, 1 row(s) in set (0.005767s)
|
Query OK, 1 row(s) in set (0.005767s)
|
||||||
```
|
```
|
||||||
|
|
||||||
接下来就来测试一下这个函数,测试之前先执行下面的 SQL 命令,制造些测试数据,在 TDengine CLI 中执行下述命令
|
接下来就来测试一下这个函数,测试之前先执行下面的 SQL 命令,制造些测试数据,在 TDengine CLI 中执行下述命令。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
create database test;
|
create database test;
|
||||||
|
@ -462,7 +462,7 @@ insert into t values('2023-05-03 08:09:10', 2, 3, 4);
|
||||||
insert into t values('2023-05-10 07:06:05', 3, 4, 5);
|
insert into t values('2023-05-10 07:06:05', 3, 4, 5);
|
||||||
```
|
```
|
||||||
|
|
||||||
测试 myfun 函数:
|
测试 myfun 函数。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
taos> select myfun(v1, v2) from t;
|
taos> select myfun(v1, v2) from t;
|
||||||
|
@ -471,13 +471,13 @@ DB error: udf function execution failure (0.011088s)
|
||||||
```
|
```
|
||||||
|
|
||||||
不幸的是执行失败了,什么原因呢?
|
不幸的是执行失败了,什么原因呢?
|
||||||
查看 udfd 进程的日志
|
查看 udfd 进程的日志。
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
tail -10 /var/log/taos/udfd.log
|
tail -10 /var/log/taos/udfd.log
|
||||||
```
|
```
|
||||||
|
|
||||||
发现以下错误信息:
|
发现以下错误信息。
|
||||||
|
|
||||||
```text
|
```text
|
||||||
05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted
|
05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted
|
||||||
|
@ -486,7 +486,7 @@ tail -10 /var/log/taos/udfd.log
|
||||||
|
|
||||||
错误很明确:没有加载到 Python 插件 libtaospyudf.so,如果遇到此错误,请参考前面的准备环境一节。
|
错误很明确:没有加载到 Python 插件 libtaospyudf.so,如果遇到此错误,请参考前面的准备环境一节。
|
||||||
|
|
||||||
修复环境错误后再次执行,如下:
|
修复环境错误后再次执行,如下。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
taos> select myfun(v1) from t;
|
taos> select myfun(v1) from t;
|
||||||
|
@ -501,7 +501,7 @@ taos> select myfun(v1) from t;
|
||||||
|
|
||||||
#### 示例二
|
#### 示例二
|
||||||
|
|
||||||
上面的 myfun 虽然测试测试通过了,但是有两个缺点:
|
上面的 myfun 虽然测试测试通过了,但是有两个缺点。
|
||||||
|
|
||||||
1. 这个标量函数只接受 1 列数据作为输入,如果用户传入了多列也不会抛异常。
|
1. 这个标量函数只接受 1 列数据作为输入,如果用户传入了多列也不会抛异常。
|
||||||
|
|
||||||
|
@ -515,7 +515,7 @@ taos> select myfun(v1, v2) from t;
|
||||||
```
|
```
|
||||||
|
|
||||||
2. 没有处理 null 值。我们期望如果输入有 null,则会抛异常终止执行。
|
2. 没有处理 null 值。我们期望如果输入有 null,则会抛异常终止执行。
|
||||||
因此 process 函数改进如下:
|
因此 process 函数改进如下。
|
||||||
|
|
||||||
```python
|
```python
|
||||||
def process(block):
|
def process(block):
|
||||||
|
@ -525,13 +525,13 @@ def process(block):
|
||||||
return [ None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
|
return [ None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
|
||||||
```
|
```
|
||||||
|
|
||||||
然后执行下面的语句更新已有的 UDF:
|
然后执行下面的语句更新已有的 UDF。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
|
create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
|
||||||
```
|
```
|
||||||
|
|
||||||
再传入 myfun 两个参数,就会执行失败了
|
再传入 myfun 两个参数,就会执行失败了。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
taos> select myfun(v1, v2) from t;
|
taos> select myfun(v1, v2) from t;
|
||||||
|
@ -539,7 +539,7 @@ taos> select myfun(v1, v2) from t;
|
||||||
DB error: udf function execution failure (0.014643s)
|
DB error: udf function execution failure (0.014643s)
|
||||||
```
|
```
|
||||||
|
|
||||||
但遗憾的是我们自定义的异常信息没有展示给用户,而是在插件的日志文件 /var/log/taos/taospyudf.log 中:
|
但遗憾的是我们自定义的异常信息没有展示给用户,而是在插件的日志文件 /var/log/taos/taospyudf.log 中。
|
||||||
|
|
||||||
```text
|
```text
|
||||||
2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2
|
2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2
|
||||||
|
@ -555,7 +555,7 @@ At:
|
||||||
#### 示例三
|
#### 示例三
|
||||||
|
|
||||||
编写一个 UDF:输入(x1, x2, ..., xn), 输出每个值和它们的序号的乘积的和: 1 * x1 + 2 * x2 + ... + n * xn。如果 x1 至 xn 中包含 null,则结果为 null。
|
编写一个 UDF:输入(x1, x2, ..., xn), 输出每个值和它们的序号的乘积的和: 1 * x1 + 2 * x2 + ... + n * xn。如果 x1 至 xn 中包含 null,则结果为 null。
|
||||||
这个示例与示例一的区别是,可以接受任意多列作为输入,且要处理每一列的值。编写 UDF 文件 /root/udf/nsum.py:
|
这个示例与示例一的区别是,可以接受任意多列作为输入,且要处理每一列的值。编写 UDF 文件 /root/udf/nsum.py。
|
||||||
|
|
||||||
```python
|
```python
|
||||||
def init():
|
def init():
|
||||||
|
@ -581,13 +581,13 @@ def process(block):
|
||||||
return result
|
return result
|
||||||
```
|
```
|
||||||
|
|
||||||
创建 UDF:
|
创建 UDF。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
create function nsum as '/root/udf/nsum.py' outputtype double language 'Python';
|
create function nsum as '/root/udf/nsum.py' outputtype double language 'Python';
|
||||||
```
|
```
|
||||||
|
|
||||||
测试 UDF:
|
测试 UDF。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
taos> insert into t values('2023-05-25 09:09:15', 6, null, 8);
|
taos> insert into t values('2023-05-25 09:09:15', 6, null, 8);
|
||||||
|
@ -606,13 +606,13 @@ Query OK, 4 row(s) in set (0.010653s)
|
||||||
#### 示例四
|
#### 示例四
|
||||||
|
|
||||||
编写一个 UDF,输入一个时间戳,输出距离这个时间最近的下一个周日。比如今天是 2023-05-25, 则下一个周日是 2023-05-28。
|
编写一个 UDF,输入一个时间戳,输出距离这个时间最近的下一个周日。比如今天是 2023-05-25, 则下一个周日是 2023-05-28。
|
||||||
完成这个函数要用到第三方库 momen。先安装这个库:
|
完成这个函数要用到第三方库 momen。先安装这个库。
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
pip3 install moment
|
pip3 install moment
|
||||||
```
|
```
|
||||||
|
|
||||||
然后编写 UDF 文件 /root/udf/nextsunday.py
|
然后编写 UDF 文件 /root/udf/nextsunday.py。
|
||||||
|
|
||||||
```python
|
```python
|
||||||
import moment
|
import moment
|
||||||
|
@ -636,13 +636,13 @@ def process(block):
|
||||||
for i in range(rows)]
|
for i in range(rows)]
|
||||||
```
|
```
|
||||||
|
|
||||||
UDF 框架会将 TDengine 的 timestamp 类型映射为 Python 的 int 类型,所以这个函数只接受一个表示毫秒数的整数。process 方法先做参数检查,然后用 moment 包替换时间的星期为星期日,最后格式化输出。输出的字符串长度是固定的 10 个字符长,因此可以这样创建 UDF 函数:
|
UDF 框架会将 TDengine 的 timestamp 类型映射为 Python 的 int 类型,所以这个函数只接受一个表示毫秒数的整数。process 方法先做参数检查,然后用 moment 包替换时间的星期为星期日,最后格式化输出。输出的字符串长度是固定的 10 个字符长,因此可以这样创建 UDF 函数。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';
|
create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';
|
||||||
```
|
```
|
||||||
|
|
||||||
此时测试函数,如果你是用 systemctl 启动的 taosd,肯定会遇到错误:
|
此时测试函数,如果你是用 systemctl 启动的 taosd,肯定会遇到错误。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
taos> select ts, nextsunday(ts) from t;
|
taos> select ts, nextsunday(ts) from t;
|
||||||
|
@ -655,7 +655,7 @@ DB error: udf function execution failure (1.123615s)
|
||||||
2023-05-25 11:42:34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment'
|
2023-05-25 11:42:34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment'
|
||||||
```
|
```
|
||||||
|
|
||||||
这是因为 “moment” 所在位置不在 python udf 插件默认的库搜索路径中。怎么确认这一点呢?通过以下命令搜索 taospyudf.log:
|
这是因为 “moment” 所在位置不在 python udf 插件默认的库搜索路径中。怎么确认这一点呢?通过以下命令搜索 taospyudf.log。
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
grep 'sys path' taospyudf.log | tail -1
|
grep 'sys path' taospyudf.log | tail -1
|
||||||
|
@ -668,7 +668,7 @@ grep 'sys path' taospyudf.log | tail -1
|
||||||
```
|
```
|
||||||
|
|
||||||
发现 python udf 插件默认搜索的第三方库安装路径是: /lib/python3/dist-packages,而 moment 默认安装到了 /usr/local/lib/python3.8/dist-packages。下面我们修改 python udf 插件默认的库搜索路径。
|
发现 python udf 插件默认搜索的第三方库安装路径是: /lib/python3/dist-packages,而 moment 默认安装到了 /usr/local/lib/python3.8/dist-packages。下面我们修改 python udf 插件默认的库搜索路径。
|
||||||
先打开 python3 命令行,查看当前的 sys.path
|
先打开 python3 命令行,查看当前的 sys.path。
|
||||||
|
|
||||||
```python
|
```python
|
||||||
>>> import sys
|
>>> import sys
|
||||||
|
@ -676,13 +676,13 @@ grep 'sys path' taospyudf.log | tail -1
|
||||||
'/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages'
|
'/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages'
|
||||||
```
|
```
|
||||||
|
|
||||||
复制上面脚本的输出的字符串,然后编辑 /var/taos/taos.cfg 加入以下配置:
|
复制上面脚本的输出的字符串,然后编辑 /var/taos/taos.cfg 加入以下配置。
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
UdfdLdLibPath /usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages
|
UdfdLdLibPath /usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages
|
||||||
```
|
```
|
||||||
|
|
||||||
保存后执行 systemctl restart taosd, 再测试就不报错了:
|
保存后执行 systemctl restart taosd, 再测试就不报错了。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
taos> select ts, nextsunday(ts) from t;
|
taos> select ts, nextsunday(ts) from t;
|
||||||
|
@ -698,7 +698,7 @@ Query OK, 4 row(s) in set (1.011474s)
|
||||||
#### 示例五
|
#### 示例五
|
||||||
|
|
||||||
编写一个聚合函数,计算某一列最大值和最小值的差。
|
编写一个聚合函数,计算某一列最大值和最小值的差。
|
||||||
聚合函数与标量函数的区别是:标量函数是多行输入对应多个输出,聚合函数是多行输入对应一个输出。聚合函数的执行过程有点像经典的 map-reduce 框架的执行过程,框架把数据分成若干块,每个 mapper 处理一个块,reducer 再把 mapper 的结果做聚合。不一样的地方在于,对于 TDengine Python UDF 中的 reduce 函数既有 map 的功能又有 reduce 的功能。reduce 函数接受两个参数:一个是自己要处理的数据,一个是别的任务执行 reduce 函数的处理结果。如下面的示例 /root/udf/myspread.py:
|
聚合函数与标量函数的区别是:标量函数是多行输入对应多个输出,聚合函数是多行输入对应一个输出。聚合函数的执行过程有点像经典的 map-reduce 框架的执行过程,框架把数据分成若干块,每个 mapper 处理一个块,reducer 再把 mapper 的结果做聚合。不一样的地方在于,对于 TDengine Python UDF 中的 reduce 函数既有 map 的功能又有 reduce 的功能。reduce 函数接受两个参数:一个是自己要处理的数据,一个是别的任务执行 reduce 函数的处理结果。如下面的示例 /root/udf/myspread.py。
|
||||||
|
|
||||||
```python
|
```python
|
||||||
import io
|
import io
|
||||||
|
@ -747,20 +747,20 @@ def finish(buf):
|
||||||
return max_number - min_number
|
return max_number - min_number
|
||||||
```
|
```
|
||||||
|
|
||||||
在这个示例中我们不光定义了一个聚合函数,还添加记录执行日志的功能,讲解如下:
|
在这个示例中我们不光定义了一个聚合函数,还添加记录执行日志的功能,讲解如下。
|
||||||
1. init 函数不再是空函数,而是打开了一个文件用于写执行日志
|
1. init 函数不再是空函数,而是打开了一个文件用于写执行日志
|
||||||
2. log 函数是记录日志的工具,自动将传入的对象转成字符串,加换行符输出
|
2. log 函数是记录日志的工具,自动将传入的对象转成字符串,加换行符输出
|
||||||
3. destroy 函数用来在执行结束关闭文件
|
3. destroy 函数用来在执行结束关闭文件
|
||||||
4. start 返回了初始的 buffer,用来存聚合函数的中间结果,我们把最大值初始化为负无穷大,最小值初始化为正无穷大
|
4. start 返回了初始的 buffer,用来存聚合函数的中间结果,我们把最大值初始化为负无穷大,最小值初始化为正无穷大
|
||||||
5. reduce 处理每个数据块并聚合结果
|
5. reduce 处理每个数据块并聚合结果
|
||||||
6. finish 函数将最终的 buffer 转换成最终的输出
|
6. finish 函数将最终的 buffer 转换成最终的输出
|
||||||
执行下面的 SQL语句创建对应的 UDF:
|
执行下面的 SQL语句创建对应的 UDF。
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';
|
create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';
|
||||||
```
|
```
|
||||||
|
|
||||||
这个 SQL 语句与创建标量函数的 SQL 语句有两个重要区别:
|
这个 SQL 语句与创建标量函数的 SQL 语句有两个重要区别。
|
||||||
1. 增加了 aggregate 关键字
|
1. 增加了 aggregate 关键字
|
||||||
2. 增加了 bufsize 关键字,用来指定存储中间结果的内存大小,这个数值可以大于实际使用的数值。本例中间结果是两个浮点数组成的 tuple,序列化后实际占用大小只有 32 个字节,但指定的 bufsize 是128,可以用 python 命令行打印实际占用的字节数
|
2. 增加了 bufsize 关键字,用来指定存储中间结果的内存大小,这个数值可以大于实际使用的数值。本例中间结果是两个浮点数组成的 tuple,序列化后实际占用大小只有 32 个字节,但指定的 bufsize 是128,可以用 python 命令行打印实际占用的字节数
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue