doc: add example code for udf
This commit is contained in:
parent
53771fbebf
commit
ada3a7b8e7
|
@ -75,13 +75,32 @@ int32_t udf_destroy()
|
||||||
|
|
||||||
用 C 语言开发标量函数的模板如下。
|
用 C 语言开发标量函数的模板如下。
|
||||||
```c
|
```c
|
||||||
|
#include "taos.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "taosudf.h"
|
||||||
|
|
||||||
|
// Initialization function. If no initialization, we can skip definition of it.
|
||||||
|
// The initialization function shall be concatenation of the udf name and _init suffix
|
||||||
|
// @return error number defined in taoserror.h
|
||||||
int32_t scalarfn_init() {
|
int32_t scalarfn_init() {
|
||||||
|
// initialization.
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Scalar function main computation function
|
||||||
|
// @param inputDataBlock, input data block composed of multiple columns with each column defined by SUdfColumn
|
||||||
|
// @param resultColumn, output column
|
||||||
|
// @return error number defined in taoserror.h
|
||||||
int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn* resultColumn) {
|
int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn* resultColumn) {
|
||||||
|
// read data from inputDataBlock and process, then output to resultColumn.
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cleanup function. If no cleanup related processing, we can skip definition of it.
|
||||||
|
// The destroy function shall be concatenation of the udf name and _destroy suffix.
|
||||||
|
// @return error number defined in taoserror.h
|
||||||
int32_t scalarfn_destroy() {
|
int32_t scalarfn_destroy() {
|
||||||
|
// clean up
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
@ -89,19 +108,52 @@ int32_t scalarfn_destroy() {
|
||||||
|
|
||||||
用C语言开发聚合函数的模板如下。
|
用C语言开发聚合函数的模板如下。
|
||||||
```c
|
```c
|
||||||
|
#include "taos.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "taosudf.h"
|
||||||
|
|
||||||
|
// Initialization function. If no initialization, we can skip definition of it.
|
||||||
|
// The initialization function shall be concatenation of the udf name and _init suffix
|
||||||
|
// @return error number defined in taoserror.h
|
||||||
int32_t aggfn_init() {
|
int32_t aggfn_init() {
|
||||||
|
// initialization.
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Aggregate start function. The intermediate value or the state(@interBuf) is initialized in this function.
|
||||||
|
// The function name shall be concatenation of udf name and _start suffix
|
||||||
|
// @param interbuf intermediate value to initialize
|
||||||
|
// @return error number defined in taoserror.h
|
||||||
int32_t aggfn_start(SUdfInterBuf* interBuf) {
|
int32_t aggfn_start(SUdfInterBuf* interBuf) {
|
||||||
|
// initialize intermediate value in interBuf
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Aggregate reduce function. This function aggregate old state(@interbuf) and one data bock(inputBlock) and output a new state(@newInterBuf).
|
||||||
|
// @param inputBlock input data block
|
||||||
|
// @param interBuf old state
|
||||||
|
// @param newInterBuf new state
|
||||||
|
// @return error number defined in taoserror.h
|
||||||
int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
|
int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
|
||||||
|
// read from inputBlock and interBuf and output to newInterBuf
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) {
|
|
||||||
|
// Aggregate function finish function. This function transforms the intermediate value(@interBuf) into the final output(@result).
|
||||||
|
// The function name must be concatenation of aggfn and _finish suffix.
|
||||||
|
// @interBuf : intermediate value
|
||||||
|
// @result: final result
|
||||||
|
// @return error number defined in taoserror.h
|
||||||
|
int32_t int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) {
|
||||||
|
// read data from inputDataBlock and process, then output to result
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cleanup function. If no cleanup related processing, we can skip definition of it.
|
||||||
|
// The destroy function shall be concatenation of the udf name and _destroy suffix.
|
||||||
|
// @return error number defined in taoserror.h
|
||||||
int32_t aggfn_destroy() {
|
int32_t aggfn_destroy() {
|
||||||
|
// clean up
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
@ -116,6 +168,119 @@ gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so
|
||||||
|
|
||||||
为了保证可靠运行,推荐使用 7.5 及以上版本的 GCC。
|
为了保证可靠运行,推荐使用 7.5 及以上版本的 GCC。
|
||||||
|
|
||||||
|
### C UDF 数据结构
|
||||||
|
```c
|
||||||
|
typedef struct SUdfColumnMeta {
|
||||||
|
int16_t type;
|
||||||
|
int32_t bytes;
|
||||||
|
uint8_t precision;
|
||||||
|
uint8_t scale;
|
||||||
|
} SUdfColumnMeta;
|
||||||
|
|
||||||
|
typedef struct SUdfColumnData {
|
||||||
|
int32_t numOfRows;
|
||||||
|
int32_t rowsAlloc;
|
||||||
|
union {
|
||||||
|
struct {
|
||||||
|
int32_t nullBitmapLen;
|
||||||
|
char *nullBitmap;
|
||||||
|
int32_t dataLen;
|
||||||
|
char *data;
|
||||||
|
} fixLenCol;
|
||||||
|
|
||||||
|
struct {
|
||||||
|
int32_t varOffsetsLen;
|
||||||
|
int32_t *varOffsets;
|
||||||
|
int32_t payloadLen;
|
||||||
|
char *payload;
|
||||||
|
int32_t payloadAllocLen;
|
||||||
|
} varLenCol;
|
||||||
|
};
|
||||||
|
} SUdfColumnData;
|
||||||
|
|
||||||
|
typedef struct SUdfColumn {
|
||||||
|
SUdfColumnMeta colMeta;
|
||||||
|
bool hasNull;
|
||||||
|
SUdfColumnData colData;
|
||||||
|
} SUdfColumn;
|
||||||
|
|
||||||
|
typedef struct SUdfDataBlock {
|
||||||
|
int32_t numOfRows;
|
||||||
|
int32_t numOfCols;
|
||||||
|
SUdfColumn **udfCols;
|
||||||
|
} SUdfDataBlock;
|
||||||
|
|
||||||
|
typedef struct SUdfInterBuf {
|
||||||
|
int32_t bufLen;
|
||||||
|
char *buf;
|
||||||
|
int8_t numOfResult; //zero or one
|
||||||
|
} SUdfInterBuf;
|
||||||
|
```
|
||||||
|
数据结构说明如下:
|
||||||
|
|
||||||
|
- SUdfDataBlock 数据块包含行数 numOfRows 和列数 numCols。udfCols[i] (0 \<= i \<= numCols-1)表示每一列数据,类型为SUdfColumn*。
|
||||||
|
- SUdfColumn 包含列的数据类型定义 colMeta 和列的数据 colData。
|
||||||
|
- SUdfColumnMeta 成员定义同 taos.h 数据类型定义。
|
||||||
|
- SUdfColumnData 数据可以变长,varLenCol 定义变长数据,fixLenCol 定义定长数据。
|
||||||
|
- SUdfInterBuf 定义中间结构 buffer,以及 buffer 中结果个数 numOfResult
|
||||||
|
|
||||||
|
为了更好的操作以上数据结构,提供了一些便利函数,定义在 taosudf.h。
|
||||||
|
|
||||||
|
|
||||||
|
### C UDF 示例代码
|
||||||
|
|
||||||
|
#### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/bit_and.c)
|
||||||
|
|
||||||
|
bit_add 实现多列的按位与功能。如果只有一列,返回这一列。bit_add 忽略空值。
|
||||||
|
|
||||||
|
<details>
|
||||||
|
<summary>bit_and.c</summary>
|
||||||
|
|
||||||
|
```c
|
||||||
|
{{#include tests/script/sh/bit_and.c}}
|
||||||
|
```
|
||||||
|
|
||||||
|
</details>
|
||||||
|
|
||||||
|
#### 聚合函数示例1 返回值为数值类型 [l2norm](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/l2norm.c)
|
||||||
|
|
||||||
|
l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。
|
||||||
|
|
||||||
|
<details>
|
||||||
|
<summary>l2norm.c</summary>
|
||||||
|
|
||||||
|
```c
|
||||||
|
{{#include tests/script/sh/l2norm.c}}
|
||||||
|
```
|
||||||
|
|
||||||
|
</details>
|
||||||
|
|
||||||
|
#### 聚合函数示例2 返回值为字符串类型 [max_vol](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/max_vol.c)
|
||||||
|
|
||||||
|
max_vol 实现了从多个输入的电压列中找到最大电压,返回由设备 ID + 最大电压所在(行,列)+ 最大电压值 组成的组合字符串值
|
||||||
|
|
||||||
|
创建表:
|
||||||
|
```bash
|
||||||
|
create table battery(ts timestamp, vol1 float, vol2 float, vol3 float, deviceId varchar(16));
|
||||||
|
```
|
||||||
|
创建自定义函数:
|
||||||
|
```bash
|
||||||
|
create aggregate function max_vol as '/root/udf/libmaxvol.so' outputtype binary(64) bufsize 10240 language 'C';
|
||||||
|
```
|
||||||
|
使用自定义函数:
|
||||||
|
```bash
|
||||||
|
select max_vol(vol1, vol2, vol3, deviceid) from battery;
|
||||||
|
```
|
||||||
|
|
||||||
|
<details>
|
||||||
|
<summary>max_vol.c</summary>
|
||||||
|
|
||||||
|
```c
|
||||||
|
{{#include tests/script/sh/max_vol.c}}
|
||||||
|
```
|
||||||
|
|
||||||
|
</details>
|
||||||
|
|
||||||
## 用 Python 语言开发 UDF
|
## 用 Python 语言开发 UDF
|
||||||
|
|
||||||
### 准备环境
|
### 准备环境
|
||||||
|
@ -129,6 +294,13 @@ gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so
|
||||||
- 第3步,执行命令 ldconfig。
|
- 第3步,执行命令 ldconfig。
|
||||||
- 第4步,启动 taosd 服务。
|
- 第4步,启动 taosd 服务。
|
||||||
|
|
||||||
|
安装过程中会编译 C++ 源码,因此系统上要有 cmake 和 gcc。编译生成的 libtaospyudf.so 文件自动会被复制到 /usr/local/lib/ 目录,因此如果是非 root 用户,安装时需加 sudo。安装完可以检查这个目录是否有了这个文件:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos*
|
||||||
|
-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so
|
||||||
|
```
|
||||||
|
|
||||||
### 接口定义
|
### 接口定义
|
||||||
|
|
||||||
当使用 Python 语言开发 UDF 时,需要实现规定的接口函数。具体要求如下。
|
当使用 Python 语言开发 UDF 时,需要实现规定的接口函数。具体要求如下。
|
||||||
|
@ -458,7 +630,7 @@ def process(block):
|
||||||
for i in range(rows)]
|
for i in range(rows)]
|
||||||
```
|
```
|
||||||
|
|
||||||
UDF 框架会将 TDengine 的 timestamp 类型映射为 Python 的 int 类型,所以这个函数只接受一个表示毫秒数的整数。process 方法先做参数检查,然后用 moment 包替换时间的星期为星期日,最后格式化输出。输出的字符串长度是固定的10个字符长,因此可以这样创建 UDF 函数:
|
UDF 框架会将 TDengine 的 timestamp 类型映射为 Python 的 int 类型,所以这个函数只接受一个表示毫秒数的整数。process 方法先做参数检查,然后用 moment 包替换时间的星期为星期日,最后格式化输出。输出的字符串长度是固定的 10 个字符长,因此可以这样创建 UDF 函数:
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';
|
create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';
|
||||||
|
@ -625,6 +797,45 @@ close log file: spread.log
|
||||||
|
|
||||||
通过这个示例,我们学会了如何定义聚合函数,并打印自定义的日志信息。
|
通过这个示例,我们学会了如何定义聚合函数,并打印自定义的日志信息。
|
||||||
|
|
||||||
|
### 更多 Python UDF 示例代码
|
||||||
|
#### 标量函数示例 [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py)
|
||||||
|
|
||||||
|
pybitand 实现多列的按位与功能。如果只有一列,返回这一列。pybitand 忽略空值。
|
||||||
|
|
||||||
|
<details>
|
||||||
|
<summary>pybitand.py</summary>
|
||||||
|
|
||||||
|
```Python
|
||||||
|
{{#include tests/script/sh/pybitand.py}}
|
||||||
|
```
|
||||||
|
|
||||||
|
</details>
|
||||||
|
|
||||||
|
#### 聚合函数示例 [pyl2norm](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pyl2norm.py)
|
||||||
|
|
||||||
|
pyl2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。
|
||||||
|
|
||||||
|
<details>
|
||||||
|
<summary>pyl2norm.py</summary>
|
||||||
|
|
||||||
|
```c
|
||||||
|
{{#include tests/script/sh/pyl2norm.py}}
|
||||||
|
```
|
||||||
|
|
||||||
|
</details>
|
||||||
|
|
||||||
|
#### 聚合函数示例 [pycumsum](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pycumsum.py)
|
||||||
|
|
||||||
|
pycumsum 使用 numpy 计算输入列所有数据的累积和。
|
||||||
|
<details>
|
||||||
|
<summary>pycumsum.py</summary>
|
||||||
|
|
||||||
|
```c
|
||||||
|
{{#include tests/script/sh/pycumsum.py}}
|
||||||
|
```
|
||||||
|
|
||||||
|
</details>
|
||||||
|
|
||||||
## 管理 UDF
|
## 管理 UDF
|
||||||
|
|
||||||
在集群中管理 UDF 的过程涉及创建、使用和维护这些函数。用户可以通过 SQL 在集群中创建和管理 UDF,一旦创建成功,集群的所有用户都可以在 SQL 中使用这些函数。由于 UDF 存储在集群的 mnode 上,因此即使重启集群,已经创建的 UDF 也仍然可用。
|
在集群中管理 UDF 的过程涉及创建、使用和维护这些函数。用户可以通过 SQL 在集群中创建和管理 UDF,一旦创建成功,集群的所有用户都可以在 SQL 中使用这些函数。由于 UDF 存储在集群的 mnode 上,因此即使重启集群,已经创建的 UDF 也仍然可用。
|
||||||
|
@ -637,7 +848,7 @@ close log file: spread.log
|
||||||
|
|
||||||
创建标量函数的 SQL 语法如下。
|
创建标量函数的 SQL 语法如下。
|
||||||
```sql
|
```sql
|
||||||
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python';
|
CREATE OR REPLACE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python';
|
||||||
```
|
```
|
||||||
各参数说明如下。
|
各参数说明如下。
|
||||||
- or replace:如果函数已经存在,则会修改已有的函数属性。
|
- or replace:如果函数已经存在,则会修改已有的函数属性。
|
||||||
|
@ -651,7 +862,7 @@ CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'P
|
||||||
|
|
||||||
创建聚合函数的 SQL 语法如下。
|
创建聚合函数的 SQL 语法如下。
|
||||||
```sql
|
```sql
|
||||||
CREATE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python';
|
CREATE OR REPLACE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python';
|
||||||
```
|
```
|
||||||
|
|
||||||
其中,buffer_size 表示中间计算结果的缓冲区大小,单位是字节。其他参数的含义与标量函数相同。
|
其中,buffer_size 表示中间计算结果的缓冲区大小,单位是字节。其他参数的含义与标量函数相同。
|
||||||
|
@ -675,3 +886,10 @@ DROP FUNCTION function_name;
|
||||||
show functions;
|
show functions;
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### 查看函数信息
|
||||||
|
|
||||||
|
同名的 UDF 每更新一次,版本号会增加 1。
|
||||||
|
```sql
|
||||||
|
select * from ins_functions \G;
|
||||||
|
```
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue