docs: udf function, extract_avg (#30244)

This commit is contained in:
xinsheng Ren 2025-03-18 17:52:21 +08:00 committed by GitHub
parent 5a93f5f61e
commit 789f36edda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 205 additions and 3 deletions

View File

@ -298,13 +298,53 @@ select max_vol(vol1, vol2, vol3, deviceid) from battery;
</details>
#### Aggregate Function Example 3 Split string and calculate average value [extract_avg](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/extract_avg.c)
The `extract_avg` function converts a comma-separated string sequence into a set of numerical values, counts the results of all rows, and calculates the final average. Note when implementing:
- `interBuf->numOfResult` needs to return 1 or 0 and cannot be used for count.
- Count can use additional caches, such as the `SumCount` structure.
- Use `varDataVal` to obtain the string.
Create table:
```shell
create table scores(ts timestamp, varStr varchar(128));
```
Create custom function:
```shell
create aggregate function extract_avg as '/root/udf/libextract_avg.so' outputtype double bufsize 16 language 'C';
```
Use custom function:
```shell
select extract_avg(valStr) from scores;
```
Generate `.so` file
```bash
gcc -g -O0 -fPIC -shared extract_vag.c -o libextract_avg.so
```
<details>
<summary>max_vol.c</summary>
```c
{{#include tests/script/sh/max_vol.c}}
```
</details>
## Developing UDFs in Python Language
### Environment Setup
The specific steps to prepare the environment are as follows:
- Step 1, prepare the Python runtime environment.
- Step 1, prepare the Python runtime environment. If you compile and install Python locally, be sure to enable the `--enable-shared` option, otherwise the subsequent installation of taospyudf will fail due to failure to generate a shared library.
- Step 2, install the Python package taospyudf. The command is as follows.
```shell

View File

@ -237,7 +237,7 @@ typedef struct SUdfInterBuf {
#### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/bit_and.c)
bit_add 实现多列的按位与功能。如果只有一列返回这一列。bit_add 忽略空值。
bit_and 实现多列的按位与功能。如果只有一列返回这一列。bit_and 忽略空值。
<details>
<summary>bit_and.c</summary>
@ -287,12 +287,46 @@ select max_vol(vol1, vol2, vol3, deviceid) from battery;
</details>
#### 聚合函数示例3 切分字符串求平均值 [extract_avg](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/extract_avg.c)
`extract_avg` 函数是将一个逗号分隔的字符串数列转为一组数值,统计所有行的结果,计算最终平均值。实现时需注意:
- `interBuf->numOfResult` 需要返回 1 或者 0不能用于 count 计数。
- count 计数可使用额外的缓存,例如 `SumCount` 结构体。
- 字符串的获取需使用`varDataVal`。
创建表:
```bash
create table scores(ts timestamp, varStr varchar(128));
```
创建自定义函数:
```bash
create aggregate function extract_avg as '/root/udf/libextract_avg.so' outputtype double bufsize 16 language 'C';
```
使用自定义函数:
```bash
select extract_avg(valStr) from scores;
```
生成 `.so` 文件
```bash
gcc -g -O0 -fPIC -shared extract_vag.c -o libextract_avg.so
```
<details>
<summary>extract_avg.c</summary>
```c
{{#include tests/script/sh/extract_avg.c}}
```
</details>
## 用 Python 语言开发 UDF
### 准备环境
准备环境的具体步骤如下:
- 第 1 步,准备好 Python 运行环境。
- 第 1 步,准备好 Python 运行环境。本地编译安装 python 注意打开 `--enable-shared` 选项,不然后续安装 taospyudf 会因无法生成共享库而导致失败。
- 第 2 步,安装 Python 包 taospyudf。命令如下。
```shell
pip3 install taospyudf

View File

@ -0,0 +1,128 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taos.h"
#include "taoserror.h"
#include "taosudf.h"
// Define a structure to store sum and count
typedef struct {
double sum;
int count;
} SumCount;
// initialization function
DLL_EXPORT int32_t extract_avg_init() {
udfTrace("extract_avg_init: Initializing UDF");
return TSDB_CODE_SUCCESS;
}
DLL_EXPORT int32_t extract_avg_start(SUdfInterBuf *interBuf) {
int32_t bufLen = sizeof(SumCount);
if (interBuf->bufLen < bufLen) {
udfError("extract_avg_start: Failed to execute UDF since input buflen:%d < %d", interBuf->bufLen, bufLen);
return TSDB_CODE_UDF_INVALID_BUFSIZE;
}
// Initialize sum and count
SumCount *sumCount = (SumCount *)interBuf->buf;
sumCount->sum = 0.0;
sumCount->count = 0;
interBuf->numOfResult = 0;
udfTrace("extract_avg_start: Initialized sum=0.0, count=0");
return TSDB_CODE_SUCCESS;
}
DLL_EXPORT int32_t extract_avg(SUdfDataBlock *inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
udfTrace("extract_avg: Processing data block with %d rows", inputBlock->numOfRows);
// Check the number of columns in the input data block
if (inputBlock->numOfCols != 1) {
udfError("extract_avg: Invalid number of columns. Expected 1, got %d", inputBlock->numOfCols);
return TSDB_CODE_UDF_INVALID_INPUT;
}
// Get the input column
SUdfColumn *inputCol = inputBlock->udfCols[0];
if (inputCol->colMeta.type != TSDB_DATA_TYPE_VARCHAR) {
udfError("extract_avg: Invalid data type. Expected VARCHAR, got %d", inputCol->colMeta.type);
return TSDB_CODE_UDF_INVALID_INPUT;
}
// Read the current sum and count from interBuf
SumCount *sumCount = (SumCount *)interBuf->buf;
udfTrace("extract_avg: Starting with sum=%f, count=%d", sumCount->sum, sumCount->count);
for (int i = 0; i < inputBlock->numOfRows; i++) {
if (udfColDataIsNull(inputCol, i)) {
udfTrace("extract_avg: Skipping NULL value at row %d", i);
continue;
}
char *buf = (char *)udfColDataGetData(inputCol, i);
char data[64];
memset(data, 0, 64);
memcpy(data, varDataVal(buf), varDataLen(buf));
udfTrace("extract_avg: Processing row %d, data='%s'", i, data);
char *rest = data;
char *token;
while ((token = strtok_r(rest, ",", &rest))) {
while (*token == ' ') token++;
int tokenLen = strlen(token);
while (tokenLen > 0 && token[tokenLen - 1] == ' ') token[--tokenLen] = '\0';
if (tokenLen == 0) {
udfTrace("extract_avg: Empty string encountered at row %d", i);
continue;
}
char *endPtr;
double value = strtod(token, &endPtr);
if (endPtr == token || *endPtr != '\0') {
udfError("extract_avg: Failed to convert string '%s' to double at row %d", token, i);
continue;
}
sumCount->sum += value;
sumCount->count++;
udfTrace("extract_avg: Updated sum=%f, count=%d", sumCount->sum, sumCount->count);
}
}
newInterBuf->bufLen = sizeof(SumCount);
newInterBuf->buf = (char *)malloc(newInterBuf->bufLen);
if (newInterBuf->buf == NULL) {
udfError("extract_avg: Failed to allocate memory for newInterBuf");
return TSDB_CODE_UDF_INTERNAL_ERROR;
}
memcpy(newInterBuf->buf, sumCount, newInterBuf->bufLen);
newInterBuf->numOfResult = 0;
udfTrace("extract_avg: Final sum=%f, count=%d", sumCount->sum, sumCount->count);
return TSDB_CODE_SUCCESS;
}
DLL_EXPORT int32_t extract_avg_finish(SUdfInterBuf *interBuf, SUdfInterBuf *result) {
SumCount *sumCount = (SumCount *)interBuf->buf;
double avg = (sumCount->count > 0) ? (sumCount->sum / sumCount->count) : 0.0;
*(double *)result->buf = avg;
result->bufLen = sizeof(double);
result->numOfResult = sumCount->count > 0 ? 1 : 0;
udfTrace("extract_avg_finish: Final result=%f (sum=%f, count=%d)", avg, sumCount->sum, sumCount->count);
return TSDB_CODE_SUCCESS;
}
DLL_EXPORT int32_t extract_avg_destroy() {
udfTrace("extract_avg_destroy: Cleaning up UDF");
return TSDB_CODE_SUCCESS;
}