diff --git a/docs/en/07-develop/09-udf.md b/docs/en/07-develop/09-udf.md index 138378e450..d39721de17 100644 --- a/docs/en/07-develop/09-udf.md +++ b/docs/en/07-develop/09-udf.md @@ -298,13 +298,53 @@ select max_vol(vol1, vol2, vol3, deviceid) from battery; +#### Aggregate Function Example 3 Split string and calculate average value [extract_avg](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/extract_avg.c) + +The `extract_avg` function converts a comma-separated string sequence into a set of numerical values, counts the results of all rows, and calculates the final average. Note when implementing: +- `interBuf->numOfResult` needs to return 1 or 0 and cannot be used for count. +- Count can use additional caches, such as the `SumCount` structure. +- Use `varDataVal` to obtain the string. + +Create table: + +```shell +create table scores(ts timestamp, varStr varchar(128)); +``` + +Create custom function: + +```shell +create aggregate function extract_avg as '/root/udf/libextract_avg.so' outputtype double bufsize 16 language 'C'; +``` + +Use custom function: + +```shell +select extract_avg(valStr) from scores; +``` + +Generate `.so` file +```bash +gcc -g -O0 -fPIC -shared extract_vag.c -o libextract_avg.so +``` + +
+max_vol.c + +```c +{{#include tests/script/sh/max_vol.c}} +``` + +
+ + ## Developing UDFs in Python Language ### Environment Setup The specific steps to prepare the environment are as follows: -- Step 1, prepare the Python runtime environment. +- Step 1, prepare the Python runtime environment. If you compile and install Python locally, be sure to enable the `--enable-shared` option, otherwise the subsequent installation of taospyudf will fail due to failure to generate a shared library. - Step 2, install the Python package taospyudf. The command is as follows. ```shell diff --git a/docs/zh/07-develop/09-udf.md b/docs/zh/07-develop/09-udf.md index d0f9c93652..9438df0b1c 100644 --- a/docs/zh/07-develop/09-udf.md +++ b/docs/zh/07-develop/09-udf.md @@ -237,7 +237,7 @@ typedef struct SUdfInterBuf { #### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/bit_and.c) -bit_add 实现多列的按位与功能。如果只有一列,返回这一列。bit_add 忽略空值。 +bit_and 实现多列的按位与功能。如果只有一列,返回这一列。bit_and 忽略空值。
bit_and.c @@ -287,12 +287,46 @@ select max_vol(vol1, vol2, vol3, deviceid) from battery;
+#### 聚合函数示例3 切分字符串求平均值 [extract_avg](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/extract_avg.c) + +`extract_avg` 函数是将一个逗号分隔的字符串数列转为一组数值,统计所有行的结果,计算最终平均值。实现时需注意: +- `interBuf->numOfResult` 需要返回 1 或者 0,不能用于 count 计数。 +- count 计数可使用额外的缓存,例如 `SumCount` 结构体。 +- 字符串的获取需使用`varDataVal`。 + +创建表: +```bash +create table scores(ts timestamp, varStr varchar(128)); +``` +创建自定义函数: +```bash +create aggregate function extract_avg as '/root/udf/libextract_avg.so' outputtype double bufsize 16 language 'C'; +``` +使用自定义函数: +```bash +select extract_avg(valStr) from scores; +``` + +生成 `.so` 文件 +```bash +gcc -g -O0 -fPIC -shared extract_vag.c -o libextract_avg.so +``` + +
+extract_avg.c + +```c +{{#include tests/script/sh/extract_avg.c}} +``` + +
+ ## 用 Python 语言开发 UDF ### 准备环境 准备环境的具体步骤如下: -- 第 1 步,准备好 Python 运行环境。 +- 第 1 步,准备好 Python 运行环境。本地编译安装 python 注意打开 `--enable-shared` 选项,不然后续安装 taospyudf 会因无法生成共享库而导致失败。 - 第 2 步,安装 Python 包 taospyudf。命令如下。 ```shell pip3 install taospyudf diff --git a/tests/script/sh/extract_avg.c b/tests/script/sh/extract_avg.c new file mode 100644 index 0000000000..508a73f7eb --- /dev/null +++ b/tests/script/sh/extract_avg.c @@ -0,0 +1,128 @@ +#include +#include +#include +#include "taos.h" +#include "taoserror.h" +#include "taosudf.h" + +// Define a structure to store sum and count +typedef struct { + double sum; + int count; +} SumCount; + +// initialization function +DLL_EXPORT int32_t extract_avg_init() { + udfTrace("extract_avg_init: Initializing UDF"); + return TSDB_CODE_SUCCESS; +} + +DLL_EXPORT int32_t extract_avg_start(SUdfInterBuf *interBuf) { + int32_t bufLen = sizeof(SumCount); + if (interBuf->bufLen < bufLen) { + udfError("extract_avg_start: Failed to execute UDF since input buflen:%d < %d", interBuf->bufLen, bufLen); + return TSDB_CODE_UDF_INVALID_BUFSIZE; + } + + // Initialize sum and count + SumCount *sumCount = (SumCount *)interBuf->buf; + sumCount->sum = 0.0; + sumCount->count = 0; + + interBuf->numOfResult = 0; + + udfTrace("extract_avg_start: Initialized sum=0.0, count=0"); + return TSDB_CODE_SUCCESS; +} + +DLL_EXPORT int32_t extract_avg(SUdfDataBlock *inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { + udfTrace("extract_avg: Processing data block with %d rows", inputBlock->numOfRows); + + // Check the number of columns in the input data block + if (inputBlock->numOfCols != 1) { + udfError("extract_avg: Invalid number of columns. Expected 1, got %d", inputBlock->numOfCols); + return TSDB_CODE_UDF_INVALID_INPUT; + } + + // Get the input column + SUdfColumn *inputCol = inputBlock->udfCols[0]; + + if (inputCol->colMeta.type != TSDB_DATA_TYPE_VARCHAR) { + udfError("extract_avg: Invalid data type. Expected VARCHAR, got %d", inputCol->colMeta.type); + return TSDB_CODE_UDF_INVALID_INPUT; + } + + // Read the current sum and count from interBuf + SumCount *sumCount = (SumCount *)interBuf->buf; + udfTrace("extract_avg: Starting with sum=%f, count=%d", sumCount->sum, sumCount->count); + + for (int i = 0; i < inputBlock->numOfRows; i++) { + if (udfColDataIsNull(inputCol, i)) { + udfTrace("extract_avg: Skipping NULL value at row %d", i); + continue; + } + + char *buf = (char *)udfColDataGetData(inputCol, i); + + char data[64]; + memset(data, 0, 64); + memcpy(data, varDataVal(buf), varDataLen(buf)); + + udfTrace("extract_avg: Processing row %d, data='%s'", i, data); + + char *rest = data; + char *token; + while ((token = strtok_r(rest, ",", &rest))) { + while (*token == ' ') token++; + int tokenLen = strlen(token); + while (tokenLen > 0 && token[tokenLen - 1] == ' ') token[--tokenLen] = '\0'; + + if (tokenLen == 0) { + udfTrace("extract_avg: Empty string encountered at row %d", i); + continue; + } + + char *endPtr; + double value = strtod(token, &endPtr); + + if (endPtr == token || *endPtr != '\0') { + udfError("extract_avg: Failed to convert string '%s' to double at row %d", token, i); + continue; + } + + sumCount->sum += value; + sumCount->count++; + udfTrace("extract_avg: Updated sum=%f, count=%d", sumCount->sum, sumCount->count); + } + } + + newInterBuf->bufLen = sizeof(SumCount); + newInterBuf->buf = (char *)malloc(newInterBuf->bufLen); + if (newInterBuf->buf == NULL) { + udfError("extract_avg: Failed to allocate memory for newInterBuf"); + return TSDB_CODE_UDF_INTERNAL_ERROR; + } + memcpy(newInterBuf->buf, sumCount, newInterBuf->bufLen); + newInterBuf->numOfResult = 0; + + udfTrace("extract_avg: Final sum=%f, count=%d", sumCount->sum, sumCount->count); + return TSDB_CODE_SUCCESS; +} + +DLL_EXPORT int32_t extract_avg_finish(SUdfInterBuf *interBuf, SUdfInterBuf *result) { + SumCount *sumCount = (SumCount *)interBuf->buf; + + double avg = (sumCount->count > 0) ? (sumCount->sum / sumCount->count) : 0.0; + + *(double *)result->buf = avg; + result->bufLen = sizeof(double); + result->numOfResult = sumCount->count > 0 ? 1 : 0; + + udfTrace("extract_avg_finish: Final result=%f (sum=%f, count=%d)", avg, sumCount->sum, sumCount->count); + return TSDB_CODE_SUCCESS; +} + +DLL_EXPORT int32_t extract_avg_destroy() { + udfTrace("extract_avg_destroy: Cleaning up UDF"); + return TSDB_CODE_SUCCESS; +}