diff --git a/docs/zh/08-develop/09-udf.md b/docs/zh/08-develop/09-udf.md index b498db5c34..28457cb24d 100644 --- a/docs/zh/08-develop/09-udf.md +++ b/docs/zh/08-develop/09-udf.md @@ -75,13 +75,32 @@ int32_t udf_destroy() 用 C 语言开发标量函数的模板如下。 ```c +#include "taos.h" +#include "taoserror.h" +#include "taosudf.h" + +// Initialization function. If no initialization, we can skip definition of it. +// The initialization function shall be concatenation of the udf name and _init suffix +// @return error number defined in taoserror.h int32_t scalarfn_init() { + // initialization. return TSDB_CODE_SUCCESS; } + +// Scalar function main computation function +// @param inputDataBlock, input data block composed of multiple columns with each column defined by SUdfColumn +// @param resultColumn, output column +// @return error number defined in taoserror.h int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn* resultColumn) { + // read data from inputDataBlock and process, then output to resultColumn. return TSDB_CODE_SUCCESS; } + +// Cleanup function. If no cleanup related processing, we can skip definition of it. +// The destroy function shall be concatenation of the udf name and _destroy suffix. +// @return error number defined in taoserror.h int32_t scalarfn_destroy() { + // clean up return TSDB_CODE_SUCCESS; } ``` @@ -89,19 +108,52 @@ int32_t scalarfn_destroy() { 用C语言开发聚合函数的模板如下。 ```c +#include "taos.h" +#include "taoserror.h" +#include "taosudf.h" + +// Initialization function. If no initialization, we can skip definition of it. +// The initialization function shall be concatenation of the udf name and _init suffix +// @return error number defined in taoserror.h int32_t aggfn_init() { + // initialization. return TSDB_CODE_SUCCESS; } + +// Aggregate start function. The intermediate value or the state(@interBuf) is initialized in this function. +// The function name shall be concatenation of udf name and _start suffix +// @param interbuf intermediate value to initialize +// @return error number defined in taoserror.h int32_t aggfn_start(SUdfInterBuf* interBuf) { + // initialize intermediate value in interBuf return TSDB_CODE_SUCCESS; } + +// Aggregate reduce function. This function aggregate old state(@interbuf) and one data bock(inputBlock) and output a new state(@newInterBuf). +// @param inputBlock input data block +// @param interBuf old state +// @param newInterBuf new state +// @return error number defined in taoserror.h int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { + // read from inputBlock and interBuf and output to newInterBuf return TSDB_CODE_SUCCESS; } -int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) { + +// Aggregate function finish function. This function transforms the intermediate value(@interBuf) into the final output(@result). +// The function name must be concatenation of aggfn and _finish suffix. +// @interBuf : intermediate value +// @result: final result +// @return error number defined in taoserror.h +int32_t int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) { + // read data from inputDataBlock and process, then output to result return TSDB_CODE_SUCCESS; } + +// Cleanup function. If no cleanup related processing, we can skip definition of it. +// The destroy function shall be concatenation of the udf name and _destroy suffix. +// @return error number defined in taoserror.h int32_t aggfn_destroy() { + // clean up return TSDB_CODE_SUCCESS; } ``` @@ -116,6 +168,119 @@ gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so 为了保证可靠运行,推荐使用 7.5 及以上版本的 GCC。 +### C UDF 数据结构 +```c +typedef struct SUdfColumnMeta { + int16_t type; + int32_t bytes; + uint8_t precision; + uint8_t scale; +} SUdfColumnMeta; + +typedef struct SUdfColumnData { + int32_t numOfRows; + int32_t rowsAlloc; + union { + struct { + int32_t nullBitmapLen; + char *nullBitmap; + int32_t dataLen; + char *data; + } fixLenCol; + + struct { + int32_t varOffsetsLen; + int32_t *varOffsets; + int32_t payloadLen; + char *payload; + int32_t payloadAllocLen; + } varLenCol; + }; +} SUdfColumnData; + +typedef struct SUdfColumn { + SUdfColumnMeta colMeta; + bool hasNull; + SUdfColumnData colData; +} SUdfColumn; + +typedef struct SUdfDataBlock { + int32_t numOfRows; + int32_t numOfCols; + SUdfColumn **udfCols; +} SUdfDataBlock; + +typedef struct SUdfInterBuf { + int32_t bufLen; + char *buf; + int8_t numOfResult; //zero or one +} SUdfInterBuf; +``` +数据结构说明如下: + +- SUdfDataBlock 数据块包含行数 numOfRows 和列数 numCols。udfCols[i] (0 \<= i \<= numCols-1)表示每一列数据,类型为SUdfColumn*。 +- SUdfColumn 包含列的数据类型定义 colMeta 和列的数据 colData。 +- SUdfColumnMeta 成员定义同 taos.h 数据类型定义。 +- SUdfColumnData 数据可以变长,varLenCol 定义变长数据,fixLenCol 定义定长数据。 +- SUdfInterBuf 定义中间结构 buffer,以及 buffer 中结果个数 numOfResult + +为了更好的操作以上数据结构,提供了一些便利函数,定义在 taosudf.h。 + + +### C UDF 示例代码 + +#### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/bit_and.c) + +bit_add 实现多列的按位与功能。如果只有一列,返回这一列。bit_add 忽略空值。 + +
+bit_and.c + +```c +{{#include tests/script/sh/bit_and.c}} +``` + +
+ +#### 聚合函数示例1 返回值为数值类型 [l2norm](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/l2norm.c) + +l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。 + +
+l2norm.c + +```c +{{#include tests/script/sh/l2norm.c}} +``` + +
+ +#### 聚合函数示例2 返回值为字符串类型 [max_vol](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/max_vol.c) + +max_vol 实现了从多个输入的电压列中找到最大电压,返回由设备 ID + 最大电压所在(行,列)+ 最大电压值 组成的组合字符串值 + +创建表: +```bash +create table battery(ts timestamp, vol1 float, vol2 float, vol3 float, deviceId varchar(16)); +``` +创建自定义函数: +```bash +create aggregate function max_vol as '/root/udf/libmaxvol.so' outputtype binary(64) bufsize 10240 language 'C'; +``` +使用自定义函数: +```bash +select max_vol(vol1, vol2, vol3, deviceid) from battery; +``` + +
+max_vol.c + +```c +{{#include tests/script/sh/max_vol.c}} +``` + +
+ ## 用 Python 语言开发 UDF ### 准备环境 @@ -129,6 +294,13 @@ gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so - 第3步,执行命令 ldconfig。 - 第4步,启动 taosd 服务。 +安装过程中会编译 C++ 源码,因此系统上要有 cmake 和 gcc。编译生成的 libtaospyudf.so 文件自动会被复制到 /usr/local/lib/ 目录,因此如果是非 root 用户,安装时需加 sudo。安装完可以检查这个目录是否有了这个文件: + +```shell +root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos* +-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so +``` + ### 接口定义 当使用 Python 语言开发 UDF 时,需要实现规定的接口函数。具体要求如下。 @@ -458,7 +630,7 @@ def process(block): for i in range(rows)] ``` -UDF 框架会将 TDengine 的 timestamp 类型映射为 Python 的 int 类型,所以这个函数只接受一个表示毫秒数的整数。process 方法先做参数检查,然后用 moment 包替换时间的星期为星期日,最后格式化输出。输出的字符串长度是固定的10个字符长,因此可以这样创建 UDF 函数: +UDF 框架会将 TDengine 的 timestamp 类型映射为 Python 的 int 类型,所以这个函数只接受一个表示毫秒数的整数。process 方法先做参数检查,然后用 moment 包替换时间的星期为星期日,最后格式化输出。输出的字符串长度是固定的 10 个字符长,因此可以这样创建 UDF 函数: ```sql create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python'; @@ -625,6 +797,45 @@ close log file: spread.log 通过这个示例,我们学会了如何定义聚合函数,并打印自定义的日志信息。 +### 更多 Python UDF 示例代码 +#### 标量函数示例 [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py) + +pybitand 实现多列的按位与功能。如果只有一列,返回这一列。pybitand 忽略空值。 + +
+pybitand.py + +```Python +{{#include tests/script/sh/pybitand.py}} +``` + +
+ +#### 聚合函数示例 [pyl2norm](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pyl2norm.py) + +pyl2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。 + +
+pyl2norm.py + +```c +{{#include tests/script/sh/pyl2norm.py}} +``` + +
+ +#### 聚合函数示例 [pycumsum](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pycumsum.py) + +pycumsum 使用 numpy 计算输入列所有数据的累积和。 +
+pycumsum.py + +```c +{{#include tests/script/sh/pycumsum.py}} +``` + +
+ ## 管理 UDF 在集群中管理 UDF 的过程涉及创建、使用和维护这些函数。用户可以通过 SQL 在集群中创建和管理 UDF,一旦创建成功,集群的所有用户都可以在 SQL 中使用这些函数。由于 UDF 存储在集群的 mnode 上,因此即使重启集群,已经创建的 UDF 也仍然可用。 @@ -637,7 +848,7 @@ close log file: spread.log 创建标量函数的 SQL 语法如下。 ```sql -CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python'; +CREATE OR REPLACE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python'; ``` 各参数说明如下。 - or replace:如果函数已经存在,则会修改已有的函数属性。 @@ -651,7 +862,7 @@ CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'P 创建聚合函数的 SQL 语法如下。 ```sql -CREATE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python'; +CREATE OR REPLACE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python'; ``` 其中,buffer_size 表示中间计算结果的缓冲区大小,单位是字节。其他参数的含义与标量函数相同。 @@ -675,3 +886,10 @@ DROP FUNCTION function_name; show functions; ``` +### 查看函数信息 + +同名的 UDF 每更新一次,版本号会增加 1。 +```sql +select * from ins_functions \G; +``` +