diff --git a/cmake/cmake.define b/cmake/cmake.define
index a33db902ca..5639d212d7 100644
--- a/cmake/cmake.define
+++ b/cmake/cmake.define
@@ -115,9 +115,7 @@ ELSE ()
ENDIF ()
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")
- IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
- ADD_DEFINITIONS("-D_TD_ARM_")
- ELSE ()
+ IF (TD_INTEL_64 OR TD_INTEL_32)
ADD_DEFINITIONS("-msse4.2")
IF("${FMA_SUPPORT}" MATCHES "true")
MESSAGE(STATUS "turn fma function support on")
diff --git a/cmake/cmake.platform b/cmake/cmake.platform
index 5c6ffd4b10..49e730a885 100644
--- a/cmake/cmake.platform
+++ b/cmake/cmake.platform
@@ -85,10 +85,14 @@ IF ("${CPUTYPE}" STREQUAL "")
MESSAGE(STATUS "The current platform is aarch32")
SET(PLATFORM_ARCH_STR "arm")
SET(TD_ARM_32 TRUE)
+ ADD_DEFINITIONS("-D_TD_ARM_")
+ ADD_DEFINITIONS("-D_TD_ARM_32")
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64")
MESSAGE(STATUS "The current platform is aarch64")
SET(PLATFORM_ARCH_STR "arm64")
SET(TD_ARM_64 TRUE)
+ ADD_DEFINITIONS("-D_TD_ARM_")
+ ADD_DEFINITIONS("-D_TD_ARM_64")
ENDIF ()
ELSE ()
# if generate ARM version:
@@ -96,15 +100,21 @@ ELSE ()
IF (${CPUTYPE} MATCHES "aarch32")
SET(PLATFORM_ARCH_STR "arm")
MESSAGE(STATUS "input cpuType: aarch32")
+ ADD_DEFINITIONS("-D_TD_ARM_")
+ ADD_DEFINITIONS("-D_TD_ARM_32")
SET(TD_ARM_32 TRUE)
ELSEIF (${CPUTYPE} MATCHES "aarch64")
SET(PLATFORM_ARCH_STR "arm64")
MESSAGE(STATUS "input cpuType: aarch64")
+ ADD_DEFINITIONS("-D_TD_ARM_")
+ ADD_DEFINITIONS("-D_TD_ARM_64")
SET(TD_ARM_64 TRUE)
ELSEIF (${CPUTYPE} MATCHES "mips64")
SET(PLATFORM_ARCH_STR "mips")
MESSAGE(STATUS "input cpuType: mips64")
SET(TD_MIPS_64 TRUE)
+ ADD_DEFINITIONS("-D_TD_MIPS_")
+ ADD_DEFINITIONS("-D_TD_MIPS_64")
ELSEIF (${CPUTYPE} MATCHES "x64")
SET(PLATFORM_ARCH_STR "amd64")
MESSAGE(STATUS "input cpuType: x64")
diff --git a/docs/zh/05-get-started/01-docker.md b/docs/zh/05-get-started/01-docker.md
index 43d13e2864..741f6dfeab 100644
--- a/docs/zh/05-get-started/01-docker.md
+++ b/docs/zh/05-get-started/01-docker.md
@@ -10,9 +10,11 @@ title: 通过 Docker 快速体验 TDengine
如果已经安装了 docker, 只需执行下面的命令。
```shell
-docker run -d -p 6030-6049:6030-6049 -p 6030-6049:6030-6049/udp tdengine/tdengine
+docker run -d -p 6030:6030 -p 6041/6041 -p 6043-6049/6043-6049 -p 6043-6049:6043-6049/udp tdengine/tdengine
```
+注意:TDengine 3.0 服务端仅使用 6030 TCP 端口。6041 为 taosAdapter 所使用提供 REST 服务端口。6043-6049 为 taosAdapter 提供第三方应用接入所使用端口,可根据需要选择是否打开。
+
确定该容器已经启动并且在正常运行
```shell
@@ -104,7 +106,7 @@ docker run -d --network=host --name tdengine-taosd -e TAOS_DISABLE_ADAPTER=true
```
- 该命令将在数据库 test 下面自动创建一张超级表 meters,该超级表下有 1 万张表,表名为 "d0" 到 "d9999",每张表有 1 万条记录,每条记录有 (ts, current, voltage, phase) 四个字段,时间戳从 "2017-07-14 10:40:00 000" 到 "2017-07-14 10:40:09 999",每张表带有标签 location 和 groupId,groupId 被设置为 1 到 10, location 被设置为 "San Francisco" 或者 "Los Angeles"。
+ 该命令将在数据库 test 下面自动创建一张超级表 meters,该超级表下有 1 万张表,表名为 "d0" 到 "d9999",每张表有 1 万条记录,每条记录有 (ts, current, voltage, phase) 四个字段,时间戳从 "2017-07-14 10:40:00 000" 到 "2017-07-14 10:40:09 999",每张表带有标签 location 和 groupId,groupId 被设置为 1 到 10, location 被设置为 "San Francisco" 或者 "Los Angeles"等城市名称。
这条命令很快完成 1 亿条记录的插入。具体时间取决于硬件性能。
@@ -126,10 +128,10 @@ taos> select count(*) from test.meters;
taos> select avg(current), max(voltage), min(phase) from test.meters;
```
-查询 location="California.SanFrancisco" 的记录总条数:
+查询 location="San Francisco" 的记录总条数:
```sql
-taos> select count(*) from test.meters where location="California.SanFrancisco";
+taos> select count(*) from test.meters where location="San Francisco";
```
查询 groupId=10 的所有记录的平均值、最大值、最小值等:
diff --git a/docs/zh/07-develop/09-udf.md b/docs/zh/07-develop/09-udf.md
index b8ae618105..08da9e296c 100644
--- a/docs/zh/07-develop/09-udf.md
+++ b/docs/zh/07-develop/09-udf.md
@@ -6,56 +6,144 @@ description: "支持用户编码的聚合函数和标量函数,在查询中嵌
在有些应用场景中,应用逻辑需要的查询无法直接使用系统内置的函数来表示。利用 UDF 功能,TDengine 可以插入用户编写的处理代码并在查询中使用它们,就能够很方便地解决特殊应用场景中的使用需求。 UDF 通常以数据表中的一列数据做为输入,同时支持以嵌套子查询的结果作为输入。
-从 2.2.0.0 版本开始,TDengine 支持通过 C/C++ 语言进行 UDF 定义。接下来结合示例讲解 UDF 的使用方法。
+TDengine 支持通过 C/C++ 语言进行 UDF 定义。接下来结合示例讲解 UDF 的使用方法。
-用户可以通过 UDF 实现两类函数: 标量函数 和 聚合函数。
+用户可以通过 UDF 实现两类函数: 标量函数和聚合函数。标量函数对每行数据返回一个值,如求绝对值 abs,正弦函数 sin,字符串拼接函数 concat 等。聚合函数对多行数据进行返回一个值,如求平均数 avg,最大值 max 等。
-## 用 C/C++ 语言来定义 UDF
+实现 UDF 时,需要实现规定的接口函数
+- 标量函数需要实现标量接口函数 scalarfn 。
+- 聚合函数需要实现聚合接口函数 aggfn_start , aggfn , aggfn_finish。
+- 如果需要初始化,实现 udf_init;如果需要清理工作,实现udf_destory。
-### 标量函数
+接口函数的名称是 UDF 名称,或者是 UDF 名称和特定后缀(_start, _finish, _init, _destroy)的连接。列表中的scalarfn,aggfn, udf需要替换成udf函数名。
-用户可以按照下列函数模板定义自己的标量计算函数
+## 实现标量函数
+标量函数实现模板如下
+```c
+#include "taos.h"
+#include "taoserror.h"
+#include "taosudf.h"
- `int32_t udf(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
+// initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix
+// @return error number defined in taoserror.h
+int32_t scalarfn_init() {
+ // initialization.
+ return TSDB_CODE_SUCCESS;
+}
+
+// scalar function main computation function
+// @param inputDataBlock, input data block composed of multiple columns with each column defined by SUdfColumn
+// @param resultColumn, output column
+// @return error number defined in taoserror.h
+int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn* resultColumn) {
+ // read data from inputDataBlock and process, then output to resultColumn.
+ return TSDB_CODE_SUCCESS;
+}
+
+// cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix.
+// @return error number defined in taoserror.h
+int32_t scalarfn_destroy() {
+ // clean up
+ return TSDB_CODE_SUCCESS;
+}
+```
+scalarfn 为函数名的占位符,需要替换成函数名,如bit_and。
+
+## 实现聚合函数
+
+聚合函数的实现模板如下
+```c
+#include "taos.h"
+#include "taoserror.h"
+#include "taosudf.h"
+
+// Initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix
+// @return error number defined in taoserror.h
+int32_t aggfn_init() {
+ // initialization.
+ return TSDB_CODE_SUCCESS;
+}
+
+// aggregate start function. The intermediate value or the state(@interBuf) is initialized in this function. The function name shall be concatenation of udf name and _start suffix
+// @param interbuf intermediate value to intialize
+// @return error number defined in taoserror.h
+int32_t aggfn_start(SUdfInterBuf* interBuf) {
+ // initialize intermediate value in interBuf
+ return TSDB_CODE_SUCESS;
+}
+
+// aggregate reduce function. This function aggregate old state(@interbuf) and one data bock(inputBlock) and output a new state(@newInterBuf).
+// @param inputBlock input data block
+// @param interBuf old state
+// @param newInterBuf new state
+// @return error number defined in taoserror.h
+int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
+ // read from inputBlock and interBuf and output to newInterBuf
+ return TSDB_CODE_SUCCESS;
+}
+
+// aggregate function finish function. This function transforms the intermediate value(@interBuf) into the final output(@result). The function name must be concatenation of aggfn and _finish suffix.
+// @interBuf : intermediate value
+// @result: final result
+// @return error number defined in taoserror.h
+int32_t int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) {
+ // read data from inputDataBlock and process, then output to result
+ return TSDB_CODE_SUCCESS;
+}
+
+// cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix.
+// @return error number defined in taoserror.h
+int32_t aggfn_destroy() {
+ // clean up
+ return TSDB_CODE_SUCCESS;
+}
+```
+aggfn为函数名的占位符,需要修改为自己的函数名,如l2norm。
+
+## 接口函数定义
+
+接口函数的名称是 udf 名称,或者是 udf 名称和特定后缀(_start, _finish, _init, _destroy)的连接。以下描述中函数名称中的 scalarfn,aggfn, udf 需要替换成udf函数名。
+
+接口函数返回值表示是否成功,如果错误返回错误代码,错误代码见taoserror.h。
+
+接口函数参数类型见数据结构定义。
+
+### 标量接口函数
+
+ `int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
- 其中 udf 是函数名的占位符,以上述模板实现的函数对行数据块进行标量计算。
+ 其中 scalarFn 是函数名的占位符。这个函数对数据块进行标量计算,通过设置resultColumn结构体中的变量设置值
-- scalarFunction 中各参数的具体含义是:
+参数的具体含义是:
- inputDataBlock: 输入的数据块
- - resultColumn: 输出列
+ - resultColumn: 输出列。输出列
-### 聚合函数
+### 聚合接口函数
-用户可以按照如下函数模板定义自己的聚合函数。
+`int32_t aggfn_start(SUdfInterBuf *interBuf)`
-`int32_t udf_start(SUdfInterBuf *interBuf)`
+`int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf)`
-`int32_t udf(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf)`
+`int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)`
-`int32_t udf_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)`
-其中 udf 是函数名的占位符。其中各参数的具体含义是:
+其中 aggfn 是函数名的占位符。首先调用aggfn_start生成结果buffer,然后相关的数据会被分为多个行数据块,对每个数据块调用 aggfn 用数据块更新中间结果,最后再调用 aggfn_finish 从中间结果产生最终结果,最终结果只能含 0 或 1 条结果数据。
+参数的具体含义是:
- interBuf:中间结果 buffer。
- inputBlock:输入的数据块。
- newInterBuf:新的中间结果buffer。
- result:最终结果。
-其计算过程为:首先调用udf_start生成结果buffer,然后相关的数据会被分为多个行数据块,对每个行数据块调用 udf 用数据块更新中间结果,最后再调用 udf_finish 从中间结果产生最终结果,最终结果只能含 0 或 1 条结果数据。
-
### UDF 初始化和销毁
`int32_t udf_init()`
`int32_t udf_destroy()`
-其中 udf 是函数名的占位符。udf_init 完成初始化工作。 udf_destroy 完成清理工作。
+其中 udf 是函数名的占位符。udf_init 完成初始化工作。 udf_destroy 完成清理工作。如果没有初始化工作,无需定义udf_init函数。如果没有清理工作,无需定义udf_destroy函数。
-:::note
-如果对应的函数不需要具体的功能,也需要实现一个空函数。
-:::
-
-### UDF 数据结构
+## UDF 数据结构
```c
typedef struct SUdfColumnMeta {
int16_t type;
@@ -103,6 +191,13 @@ typedef struct SUdfInterBuf {
int8_t numOfResult; //zero or one
} SUdfInterBuf;
```
+数据结构说明如下:
+
+- SUdfDataBlock 数据块包含行数 numOfRows 和列数 numCols。udfCols[i] (0 <= i <= numCols-1)表示每一列数据,类型为SUdfColumn*。
+- SUdfColumn 包含列的数据类型定义 colMeta 和列的数据 colData。
+- SUdfColumnMeta 成员定义同 taos.h 数据类型定义。
+- SUdfColumnData 数据可以变长,varLenCol 定义变长数据,fixLenCol 定义定长数据。
+- SUdfInterBuf 定义中间结构 buffer,以及 buffer 中结果个数 numOfResult
为了更好的操作以上数据结构,提供了一些便利函数,定义在 taosudf.h。
@@ -118,75 +213,15 @@ gcc -g -O0 -fPIC -shared add_one.c -o add_one.so
这样就准备好了动态链接库 add_one.so 文件,可以供后文创建 UDF 时使用了。为了保证可靠的系统运行,编译器 GCC 推荐使用 7.5 及以上版本。
-## 在系统中管理和使用 UDF
-
-### 创建 UDF
-
-用户可以通过 SQL 指令在系统中加载客户端所在主机上的 UDF 函数库(不能通过 RESTful 接口或 HTTP 管理界面来进行这一过程)。一旦创建成功,则当前 TDengine 集群的所有用户都可以在 SQL 指令中使用这些函数。UDF 存储在系统的 MNode 节点上,因此即使重启 TDengine 系统,已经创建的 UDF 也仍然可用。
-
-在创建 UDF 时,需要区分标量函数和聚合函数。如果创建时声明了错误的函数类别,则可能导致通过 SQL 指令调用函数时出错。此外,用户需要保证输入数据类型与 UDF 程序匹配,UDF 输出数据类型与 OUTPUTTYPE 匹配。
-
-- 创建标量函数
-```sql
-CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type;
-```
-
- - function_name:标量函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udf 的实际名称一致;
- - library_path:包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- - output_type:此函数计算结果的数据类型名称;
-
- 例如,如下语句可以把 libbitand.so 创建为系统中可用的 UDF:
-
- ```sql
- CREATE FUNCTION bit_and AS "/home/taos/udf_example/libbitand.so" OUTPUTTYPE INT;
- ```
-
-- 创建聚合函数:
-```sql
-CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [ BUFSIZE buffer_size ];
-```
-
- - function_name:聚合函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udfNormalFunc 的实际名称一致;
- - library_path:包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- - output_type:此函数计算结果的数据类型,与上文中 udfNormalFunc 的 itype 参数不同,这里不是使用数字表示法,而是直接写类型名称即可;
- - buffer_size:中间计算结果的缓冲区大小,单位是字节。如果不使用可以不设置。
-
- 例如,如下语句可以把 libsqrsum.so 创建为系统中可用的 UDF:
-
- ```sql
- CREATE AGGREGATE FUNCTION sqr_sum AS "/home/taos/udf_example/libsqrsum.so" OUTPUTTYPE DOUBLE bufsize 8;
- ```
-
-### 管理 UDF
-
-- 删除指定名称的用户定义函数:
-```
-DROP FUNCTION function_name;
-```
-
-- function_name:此参数的含义与 CREATE 指令中的 function_name 参数一致,也即要删除的函数的名字,例如
-```sql
-DROP FUNCTION bit_and;
-```
-- 显示系统中当前可用的所有 UDF:
-```sql
-SHOW FUNCTIONS;
-```
-
-### 调用 UDF
-
-在 SQL 指令中,可以直接以在系统中创建 UDF 时赋予的函数名来调用用户定义函数。例如:
-```sql
-SELECT X(c1,c2) FROM table/stable;
-```
-
-表示对名为 c1, c2 的数据列调用名为 X 的用户定义函数。SQL 指令中用户定义函数可以配合 WHERE 等查询特性来使用。
-
+## 管理和使用UDF
+关于如何管理和使用UDF,参见[UDF使用说明](../12-taos-sql/26-udf.md)
## 示例代码
### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/bit_and.c)
+bit_add 实现多列的按位与功能。如果只有一列,返回这一列。bit_add 忽略空值。
+
bit_and.c
@@ -196,13 +231,15 @@ SELECT X(c1,c2) FROM table/stable;
-### 聚合函数示例 [sqr_sum](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/sqr_sum.c)
+### 聚合函数示例 [l2norm](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/l2norm.c)
+
+l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。
-sqr_sum.c
+l2norm.c
```c
-{{#include tests/script/sh/sqr_sum.c}}
+{{#include tests/script/sh/l2norm.c}}
```
diff --git a/docs/zh/12-taos-sql/26-udf.md b/docs/zh/12-taos-sql/26-udf.md
index 1292206311..7ddcad298b 100644
--- a/docs/zh/12-taos-sql/26-udf.md
+++ b/docs/zh/12-taos-sql/26-udf.md
@@ -4,34 +4,65 @@ title: 用户自定义函数
---
除了 TDengine 的内置函数以外,用户还可以编写自己的函数逻辑并加入TDengine系统中。
+## 创建 UDF
-## 创建函数
+用户可以通过 SQL 指令在系统中加载客户端所在主机上的 UDF 函数库(不能通过 RESTful 接口或 HTTP 管理界面来进行这一过程)。一旦创建成功,则当前 TDengine 集群的所有用户都可以在 SQL 指令中使用这些函数。UDF 存储在系统的 MNode 节点上,因此即使重启 TDengine 系统,已经创建的 UDF 也仍然可用。
+在创建 UDF 时,需要区分标量函数和聚合函数。如果创建时声明了错误的函数类别,则可能导致通过 SQL 指令调用函数时出错。此外,用户需要保证输入数据类型与 UDF 程序匹配,UDF 输出数据类型与 OUTPUTTYPE 匹配。
+
+- 创建标量函数
```sql
-CREATE [AGGREGATE] FUNCTION func_name AS library_path OUTPUTTYPE type_name [BUFSIZE buffer_size]
+CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type;
```
-语法说明:
+ - function_name:标量函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udf 的实际名称一致;
+ - library_path:包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
+ - output_type:此函数计算结果的数据类型名称;
-AGGREGATE:标识此函数是标量函数还是聚集函数。
-func_name:函数名,必须与函数实现中 udf 的实际名称一致。
-library_path:包含UDF函数实现的动态链接库的绝对路径,是在客户端侧主机上的绝对路径。
-type_name:标识此函数的返回类型。
-buffer_size:中间结果的缓冲区大小,单位是字节。不设置则默认为0。
+ 例如,如下语句可以把 libbitand.so 创建为系统中可用的 UDF:
+ ```sql
+ CREATE FUNCTION bit_and AS "/home/taos/udf_example/libbitand.so" OUTPUTTYPE INT;
+ ```
+
+- 创建聚合函数:
+```sql
+CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [ BUFSIZE buffer_size ];
+```
+
+ - function_name:聚合函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udfNormalFunc 的实际名称一致;
+ - library_path:包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
+ - output_type:此函数计算结果的数据类型,与上文中 udfNormalFunc 的 itype 参数不同,这里不是使用数字表示法,而是直接写类型名称即可;
+ - buffer_size:中间计算结果的缓冲区大小,单位是字节。如果不使用可以不设置。
+
+ 例如,如下语句可以把 libl2norm.so 创建为系统中可用的 UDF:
+
+ ```sql
+ CREATE AGGREGATE FUNCTION l2norm AS "/home/taos/udf_example/libl2norm.so" OUTPUTTYPE DOUBLE bufsize 8;
+ ```
关于如何开发自定义函数,请参考 [UDF使用说明](../../develop/udf)。
-## 删除自定义函数
+## 管理 UDF
+- 删除指定名称的用户定义函数:
```
DROP FUNCTION function_name;
```
-- function_name:此参数的含义与 CREATE 指令中的 function_name 参数一致,也即要删除的函数的名字,例如
-
-
-## 显示 UDF
-
+- function_name:此参数的含义与 CREATE 指令中的 function_name 参数一致,也即要删除的函数的名字,例如bit_and, l2norm
```sql
-SHOW FUNCTION;
+DROP FUNCTION bit_and;
```
+- 显示系统中当前可用的所有 UDF:
+```sql
+SHOW FUNCTIONS;
+```
+
+## 调用 UDF
+
+在 SQL 指令中,可以直接以在系统中创建 UDF 时赋予的函数名来调用用户定义函数。例如:
+```sql
+SELECT X(c1,c2) FROM table/stable;
+```
+
+表示对名为 c1, c2 的数据列调用名为 X 的用户定义函数。SQL 指令中用户定义函数可以配合 WHERE 等查询特性来使用。
diff --git a/docs/zh/12-taos-sql/index.md b/docs/zh/12-taos-sql/index.md
index cb01b3a918..900fff1ba2 100644
--- a/docs/zh/12-taos-sql/index.md
+++ b/docs/zh/12-taos-sql/index.md
@@ -5,11 +5,12 @@ description: "TAOS SQL 支持的语法规则、主要查询功能、支持的 SQ
本文档说明 TAOS SQL 支持的语法规则、主要查询功能、支持的 SQL 查询函数,以及常用技巧等内容。阅读本文档需要读者具有基本的 SQL 语言的基础。
-TAOS SQL 是用户对 TDengine 进行数据写入和查询的主要工具。TAOS SQL 为了便于用户快速上手,在一定程度上提供与标准 SQL 类似的风格和模式。严格意义上,TAOS SQL 并不是也不试图提供标准的 SQL 语法。此外,由于 TDengine 针对的时序性结构化数据不提供删除功能,因此在 TAO SQL 中不提供数据删除的相关功能。
+TAOS SQL 是用户对 TDengine 进行数据写入和查询的主要工具。TAOS SQL 提供标准的 SQL 语法,并针对时序数据和业务的特点优化和新增了许多语法和功能。TAOS SQL 语句的最大长度为 1M。TAOS SQL 不支持关键字的缩写,例如 DELETE 不能缩写为 DEL。
本章节 SQL 语法遵循如下约定:
-- <\> 里的内容是用户需要输入的,但不要输入 <\> 本身
+- 用大写字母表示关键字,但 SQL 本身并不区分关键字和标识符的大小写
+- 用小写字母表示需要用户输入的内容
- \[ \] 表示内容为可选项,但不能输入 [] 本身
- | 表示多选一,选择其中一个即可,但不能输入 | 本身
- … 表示前面的项可重复多个
diff --git a/docs/zh/14-reference/03-connector/java.mdx b/docs/zh/14-reference/03-connector/java.mdx
index 88a3674671..7a107bd04d 100644
--- a/docs/zh/14-reference/03-connector/java.mdx
+++ b/docs/zh/14-reference/03-connector/java.mdx
@@ -9,7 +9,7 @@ description: TDengine Java 连接器基于标准 JDBC API 实现, 并提供原
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
-`taos-jdbcdriver` 是 TDengine 的官方 Java 语言连接器,Java 开发人员可以通过它开发存取 TDengine 数据库的应用软件。`taos-jdbcdriver` 实现了 JDBC driver 标准的接口,并提供两种形式的连接器。一种是通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能,一种是通过 taosAdapter 提供的 REST 接口连接 TDengine 实例(2.4.0.0 及更高版本)。REST 连接实现的功能集合和原生连接有少量不同。
+`taos-jdbcdriver` 是 TDengine 的官方 Java 语言连接器,Java 开发人员可以通过它开发存取 TDengine 数据库的应用软件。`taos-jdbcdriver` 实现了 JDBC driver 标准的接口,并提供两种形式的连接器。一种是通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能,一种是通过 taosAdapter 提供的 REST 接口连接 TDengine 实例。REST 连接实现的功能集合和原生连接有少量不同。

@@ -41,19 +41,19 @@ REST 连接支持所有能运行 Java 的平台。
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对应类型转换如下:
-| TDengine DataType | JDBCType (driver 版本 < 2.0.24) | JDBCType (driver 版本 >= 2.0.24) |
-| ----------------- | --------------------------------- | ---------------------------------- |
-| TIMESTAMP | java.lang.Long | java.sql.Timestamp |
-| INT | java.lang.Integer | java.lang.Integer |
-| BIGINT | java.lang.Long | java.lang.Long |
-| FLOAT | java.lang.Float | java.lang.Float |
-| DOUBLE | java.lang.Double | java.lang.Double |
-| SMALLINT | java.lang.Short | java.lang.Short |
-| TINYINT | java.lang.Byte | java.lang.Byte |
-| BOOL | java.lang.Boolean | java.lang.Boolean |
-| BINARY | java.lang.String | byte array |
-| NCHAR | java.lang.String | java.lang.String |
-| JSON | - | java.lang.String |
+| TDengine DataType | JDBCType |
+| ----------------- | ---------------------------------- |
+| TIMESTAMP | java.sql.Timestamp |
+| INT | java.lang.Integer |
+| BIGINT | java.lang.Long |
+| FLOAT | java.lang.Float |
+| DOUBLE | java.lang.Double |
+| SMALLINT | java.lang.Short |
+| TINYINT | java.lang.Byte |
+| BOOL | java.lang.Boolean |
+| BINARY | byte array |
+| NCHAR | java.lang.String |
+| JSON | java.lang.String |
**注意**:JSON 类型仅在 tag 中支持。
@@ -198,7 +198,7 @@ url 中的配置参数如下:
- user:登录 TDengine 用户名,默认值 'root'。
- password:用户登录密码,默认值 'taosdata'。
-- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 和 TDengine 2.4.0.12 版本开始,JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
+- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 开始,JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。
- httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。
@@ -216,7 +216,7 @@ url 中的配置参数如下:
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6);
```
-- 从 taos-jdbcdriver-2.0.36 和 TDengine 2.2.0.0 版本开始,如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
+- 从 taos-jdbcdriver-2.0.36 开始,如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
:::
@@ -358,11 +358,11 @@ JDBC 连接器可能报错的错误码包括 3 种:JDBC driver 本身的报错
具体的错误码请参考:
- [TDengine Java Connector](https://github.com/taosdata/taos-connector-jdbc/blob/main/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java)
-- [TDengine_ERROR_CODE](https://github.com/taosdata/TDengine/blob/develop/src/inc/taoserror.h)
+- [TDengine_ERROR_CODE](../error-code)
### 通过参数绑定写入数据
-从 2.1.2.0 版本开始,TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据写入(INSERT)场景的支持。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
+TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据写入(INSERT)场景的支持。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
**注意**:
@@ -630,7 +630,7 @@ public void setNString(int columnIndex, ArrayList list, int size) throws
### 无模式写入
-从 2.2.0.0 版本开始,TDengine 增加了对无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](/reference/schemaless/)。
+TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../schemaless)。
**注意**:
@@ -670,55 +670,127 @@ public class SchemalessInsertTest {
TDengine Java 连接器支持订阅功能,应用 API 如下:
-#### 创建订阅
+#### 创建 Topic
```java
-TSDBSubscribe sub = ((TSDBConnection)conn).subscribe("topic", "select * from meters", false);
+Connection connection = DriverManager.getConnection(url, properties);
+Statement statement = connection.createStatement();
+statement.executeUpdate("create topic if not exists topic_speed as select ts, speed from speed_table");
```
`subscribe` 方法的三个参数含义如下:
-- topic:订阅的主题(即名称),此参数是订阅的唯一标识
-- sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据
-- restart:如果订阅已经存在,是重新开始,还是继续之前的订阅
+- topic_speed:订阅的主题(即名称),此参数是订阅的唯一标识。
+- sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据。
-如上面的例子将使用 SQL 语句 `select * from meters` 创建一个名为 `topic` 的订阅,如果这个订阅已经存在,将继续之前的查询进度,而不是从头开始消费所有的数据。
+如上面的例子将使用 SQL 语句 `select ts, speed from speed_table` 创建一个名为 `topic_speed` 的订阅。
+
+#### 创建 Consumer
+
+```java
+Properties config = new Properties();
+config.setProperty("enable.auto.commit", "true");
+config.setProperty("group.id", "group1");
+config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
+
+TaosConsumer consumer = new TaosConsumer<>(config);
+```
+
+- enable.auto.commit: 是否允许自动提交。
+- group.id: consumer: 所在的 group。
+- value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean,实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。
+- 其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
#### 订阅消费数据
```java
-int total = 0;
while(true) {
- TSDBResultSet rs = sub.consume();
- int count = 0;
- while(rs.next()) {
- count++;
- }
- total += count;
- System.out.printf("%d rows consumed, total %d\n", count, total);
- Thread.sleep(1000);
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ for (ResultBean record : records) {
+ process(record);
+ }
}
```
-`consume` 方法返回一个结果集,其中包含从上次 `consume` 到目前为止的所有新数据。请务必按需选择合理的调用 `consume` 的频率(如例子中的 `Thread.sleep(1000)`),否则会给服务端造成不必要的压力。
+`poll` 方法返回一个结果集,其中包含从上次 `poll` 到目前为止的所有新数据。请务必按需选择合理的调用 `poll` 的频率(如例子中的 `Duration.ofMillis(100)`),否则会给服务端造成不必要的压力。
#### 关闭订阅
```java
-sub.close(true);
+consumer.close()
```
-`close` 方法关闭一个订阅。如果其参数为 `true` 表示保留订阅进度信息,后续可以创建同名订阅继续消费数据;如为 `false` 则不保留订阅进度。
-
-### 关闭资源
+### 使用示例如下:
```java
-resultSet.close();
-stmt.close();
-conn.close();
-```
+public abstract class ConsumerLoop {
+ private final TaosConsumer consumer;
+ private final List topics;
+ private final AtomicBoolean shutdown;
+ private final CountDownLatch shutdownLatch;
-> `注意务必要将 connection 进行关闭`,否则会出现连接泄露。
+ public ConsumerLoop() throws SQLException {
+ Properties config = new Properties();
+ config.setProperty("msg.with.table.name", "true");
+ config.setProperty("enable.auto.commit", "true");
+ config.setProperty("group.id", "group1");
+ config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
+
+ this.consumer = new TaosConsumer<>(config);
+ this.topics = Collections.singletonList("topic_speed");
+ this.shutdown = new AtomicBoolean(false);
+ this.shutdownLatch = new CountDownLatch(1);
+ }
+
+ public abstract void process(ResultBean result);
+
+ public void pollData() throws SQLException {
+ try {
+ consumer.subscribe(topics);
+
+ while (!shutdown.get()) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ for (ResultBean record : records) {
+ process(record);
+ }
+ }
+ } finally {
+ consumer.close();
+ shutdownLatch.countDown();
+ }
+ }
+
+ public void shutdown() throws InterruptedException {
+ shutdown.set(true);
+ shutdownLatch.await();
+ }
+
+ static class ResultDeserializer extends ReferenceDeserializer {
+
+ }
+
+ static class ResultBean {
+ private Timestamp ts;
+ private int speed;
+
+ public Timestamp getTs() {
+ return ts;
+ }
+
+ public void setTs(Timestamp ts) {
+ this.ts = ts;
+ }
+
+ public int getSpeed() {
+ return speed;
+ }
+
+ public void setSpeed(int speed) {
+ this.speed = speed;
+ }
+ }
+}
+```
### 与连接池使用
@@ -787,20 +859,6 @@ public static void main(String[] args) throws Exception {
> 更多 druid 使用问题请查看[官方说明](https://github.com/alibaba/druid)。
-**注意事项:**
-
-- TDengine `v1.6.4.1` 版本开始提供了一个专门用于心跳检测的函数 `select server_status()`,所以在使用连接池时推荐使用 `select server_status()` 进行 Validation Query。
-
-如下所示,`select server_status()` 执行成功会返回 `1`。
-
-```sql
-taos> select server_status();
-server_status()|
-================
-1 |
-Query OK, 1 row(s) in set (0.000141s)
-```
-
### 更多示例程序
示例程序源码位于 `TDengine/examples/JDBC` 下:
@@ -811,7 +869,7 @@ Query OK, 1 row(s) in set (0.000141s)
- SpringJdbcTemplate:Spring JdbcTemplate 中使用 taos-jdbcdriver。
- mybatisplus-demo:Springboot + Mybatis 中使用 taos-jdbcdriver。
-请参考:[JDBC example](https://github.com/taosdata/TDengine/tree/develop/examples/JDBC)
+请参考:[JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC)
## 最近更新记录
@@ -842,7 +900,7 @@ Query OK, 1 row(s) in set (0.000141s)
**解决方法**:重新安装 64 位 JDK。
-4. 其它问题请参考 [FAQ](/train-faq/faq)
+4. 其它问题请参考 [FAQ](../../../train-faq/faq)
## API 参考
diff --git a/examples/rust b/examples/rust
deleted file mode 160000
index 7ed7a97715..0000000000
--- a/examples/rust
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 7ed7a97715388fa144718764d6bf20f9bfc29a12
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 0dd137d532..8aacfec397 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -42,8 +42,8 @@ enum {
TASK_STATUS__DROPPING,
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
- TASK_STATUS__PREPARE_RECOVER,
- TASK_STATUS__RECOVERING,
+ TASK_STATUS__RECOVER_DOWNSTREAM,
+ TASK_STATUS__RECOVER_SELF,
};
enum {
@@ -232,8 +232,8 @@ typedef struct {
} SStreamChildEpInfo;
typedef struct {
- int32_t nodeId;
- int32_t childId;
+ int32_t srcNodeId;
+ int32_t srcChildId;
int64_t stateSaveVer;
int64_t stateProcessedVer;
} SStreamCheckpointInfo;
@@ -394,9 +394,6 @@ typedef struct {
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
-#if 0
- int64_t sourceVer;
-#endif
int32_t blockNum;
SArray* dataLen; // SArray
SArray* data; // SArray
@@ -426,6 +423,7 @@ typedef struct {
int32_t rspToTaskId;
} SStreamRetrieveRsp;
+#if 0
typedef struct {
int64_t streamId;
int32_t taskId;
@@ -462,13 +460,25 @@ int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq
int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);
-typedef struct {
- int64_t streamId;
-} SPStreamTaskRecoverReq;
+int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
+int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
+#endif
typedef struct {
- int8_t reserved;
-} SPStreamTaskRecoverRsp;
+ int64_t streamId;
+ int32_t downstreamTaskId;
+ int32_t taskId;
+} SStreamRecoverDownstreamReq;
+
+typedef struct {
+ int64_t streamId;
+ int32_t downstreamTaskId;
+ int32_t taskId;
+ SArray* checkpointVer; // SArray
+} SStreamRecoverDownstreamRsp;
+
+int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
+int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
@@ -479,8 +489,6 @@ int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
-int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
-int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
@@ -513,6 +521,7 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
+int32_t streamLoadTasks(SStreamMeta* pMeta);
#ifdef __cplusplus
}
diff --git a/source/client/src/TSDBJNIConnector.c b/source/client/src/TSDBJNIConnector.c
index 227c2fff18..38bbec24ce 100644
--- a/source/client/src/TSDBJNIConnector.c
+++ b/source/client/src/TSDBJNIConnector.c
@@ -757,7 +757,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(J
int32_t code = taos_stmt_prepare(pStmt, str, len);
taosMemoryFreeClear(str);
if (code != TSDB_CODE_SUCCESS) {
- jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
+ jniError("prepareStmt jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
@@ -785,7 +785,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
if (code != TSDB_CODE_SUCCESS) {
(*env)->ReleaseStringUTFChars(env, jname, name);
- jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
+ jniError("bindTableName jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
@@ -860,7 +860,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
- jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
+ jniError("tableNameTags jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
@@ -926,7 +926,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
taosMemoryFreeClear(b);
if (code != TSDB_CODE_SUCCESS) {
- jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
+ jniError("bindColData jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
@@ -949,7 +949,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEn
int32_t code = taos_stmt_add_batch(pStmt);
if (code != TSDB_CODE_SUCCESS) {
- jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
+ jniError("add batch jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
@@ -973,7 +973,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
int32_t code = taos_stmt_execute(pStmt);
if (code != TSDB_CODE_SUCCESS) {
- jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
+ jniError("excute batch jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
@@ -997,7 +997,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
int32_t code = taos_stmt_close(pStmt);
if (code != TSDB_CODE_SUCCESS) {
- jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
+ jniError("close stmt jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index 7031a1ebca..9475d1b51e 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -327,7 +327,13 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
while (pIter != NULL) {
int64_t *rid = pIter;
SRequestObj *pRequest = acquireRequest(*rid);
- if (NULL == pRequest || pRequest->killed) {
+ if (NULL == pRequest) {
+ pIter = taosHashIterate(pObj->pRequests, pIter);
+ continue;
+ }
+
+ if (pRequest->killed) {
+ releaseRequest(*rid);
pIter = taosHashIterate(pObj->pRequests, pIter);
continue;
}
diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c
index b85d2a67ac..4917a01c55 100644
--- a/source/client/src/clientImpl.c
+++ b/source/client/src/clientImpl.c
@@ -283,7 +283,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
- code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false);
+ code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
}
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c
index 272d71715a..0d41d35721 100644
--- a/source/client/src/clientMain.c
+++ b/source/client/src/clientMain.c
@@ -182,6 +182,7 @@ void taos_free_result(TAOS_RES *res) {
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
+ tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId);
destroyRequest(pRequest);
} else if (TD_RES_TMQ(res)) {
SMqRspObj *pRsp = (SMqRspObj *)res;
@@ -482,7 +483,7 @@ void taos_stop_query(TAOS_RES *res) {
int32_t numOfFields = taos_num_fields(pRequest);
// It is not a query, no need to stop.
if (numOfFields == 0) {
- tscDebug("request %" PRIx64 " no need to be killed since not query", pRequest->requestId);
+ tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
return;
}
@@ -852,7 +853,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
}
pRequest->code =
- setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, false);
+ setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->code = code;
diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c
index 4217cf08b3..0c4cf23c4e 100644
--- a/source/client/src/clientMsgHandler.c
+++ b/source/client/src/clientMsgHandler.c
@@ -389,7 +389,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = buildShowVariablesRsp(rsp.variables, &pRes);
}
if (TSDB_CODE_SUCCESS == code) {
- code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, false);
+ code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
}
tFreeSShowVariablesRsp(&rsp);
diff --git a/source/common/src/systable.c b/source/common/src/systable.c
index 11faaaad97..2a7c6ca471 100644
--- a/source/common/src/systable.c
+++ b/source/common/src/systable.c
@@ -24,7 +24,7 @@
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
static const SSysDbTableSchema dnodesSchema[] = {
- {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_SMALLINT},
+ {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
@@ -66,7 +66,7 @@ static const SSysDbTableSchema bnodesSchema[] = {
};
static const SSysDbTableSchema clusterSchema[] = {
- {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_BIGINT},
+ {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
};
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index a0f02d96f9..8d3deff495 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -355,7 +355,11 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1;
+
+ tsNumOfSupportVnodes = tsNumOfCores * 2;
+ tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, 0) != 0) return -1;
+
if (cfgAddInt32(pCfg, "maxShellConns", tsMaxShellConns, 10, 50000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 10, 1000000, 0) != 0) return -1;
@@ -423,7 +427,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 2, 1024, 0) != 0) return -1;
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
- tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, TSDB_MAX_MSG_SIZE * 10000L);
+ tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, 0) != 0)
return -1;
diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h
index 38a5ecd273..cea0a43b61 100644
--- a/source/dnode/mnode/impl/inc/mndDb.h
+++ b/source/dnode/mnode/impl/inc/mndDb.h
@@ -28,6 +28,7 @@ SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
+bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
const char *mndGetDbStr(const char *src);
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index c4da9b5c3d..455da6a40e 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -162,8 +162,9 @@ typedef struct {
int64_t lastExecTime;
int32_t lastAction;
int32_t lastErrorNo;
- tmsg_t lastMsgType;
SEpSet lastEpset;
+ tmsg_t lastMsgType;
+ tmsg_t originRpcType;
char dbname1[TSDB_TABLE_FNAME_LEN];
char dbname2[TSDB_TABLE_FNAME_LEN];
int32_t startFunc;
diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c
index 66f81a3dba..8edfcad2c6 100644
--- a/source/dnode/mnode/impl/src/mndDb.c
+++ b/source/dnode/mnode/impl/src/mndDb.c
@@ -1495,8 +1495,34 @@ static const char *getCacheModelStr(int8_t cacheModel) {
return "unknown";
}
-static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows, int64_t numOfTables,
- bool sysDb, ESdbStatus objStatus, bool sysinfo) {
+bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb) {
+ if (pDb->cfg.replications == 1) return true;
+
+ SSdb *pSdb = pMnode->pSdb;
+ void *pIter = NULL;
+ bool isReady = true;
+ while (1) {
+ SVgObj *pVgroup = NULL;
+ pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
+ if (pIter == NULL) break;
+
+ if (pVgroup->dbUid == pDb->uid && pVgroup->replica > 1) {
+ bool hasLeader = false;
+ for (int32_t i = 0; i < pVgroup->replica; ++i) {
+ if (pVgroup->vnodeGid[i].role == TAOS_SYNC_STATE_LEADER) {
+ hasLeader = true;
+ }
+ }
+ if (!hasLeader) isReady = false;
+ }
+ sdbRelease(pSdb, pVgroup);
+ }
+
+ return isReady;
+}
+
+static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows,
+ int64_t numOfTables, bool sysDb, ESdbStatus objStatus, bool sysinfo) {
int32_t cols = 0;
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
char *buf = taosMemoryMalloc(bytes);
@@ -1509,8 +1535,16 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
}
char *statusStr = "ready";
- if (objStatus == SDB_STATUS_CREATING) statusStr = "creating";
- if (objStatus == SDB_STATUS_DROPPING) statusStr = "dropping";
+ if (objStatus == SDB_STATUS_CREATING) {
+ statusStr = "creating";
+ } else if (objStatus == SDB_STATUS_DROPPING) {
+ statusStr = "dropping";
+ } else {
+ if (!sysDb && !mndIsDbReady(pMnode, pDb)) {
+ statusStr = "unsynced";
+ }
+ }
+
char statusVstr[24] = {0};
STR_WITH_SIZE_TO_VARSTR(statusVstr, statusStr, strlen(statusStr));
@@ -1693,7 +1727,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
setInformationSchemaDbCfg(&infoschemaDb);
size_t numOfTables = 0;
getInfosDbMeta(NULL, &numOfTables);
- dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
+ mndDumpDbInfoData(pMnode, pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
numOfRows += 1;
@@ -1701,7 +1735,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
setPerfSchemaDbCfg(&perfschemaDb);
numOfTables = 0;
getPerfDbMeta(NULL, &numOfTables);
- dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
+ mndDumpDbInfoData(pMnode, pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
numOfRows += 1;
pShow->sysDbRsp = true;
@@ -1714,7 +1748,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) == 0) {
int32_t numOfTables = 0;
sdbTraverse(pSdb, SDB_VGROUP, mndGetTablesOfDbFp, &numOfTables, NULL, NULL);
- dumpDbInfoData(pBlock, pDb, pShow, numOfRows, numOfTables, false, objStatus, sysinfo);
+ mndDumpDbInfoData(pMnode, pBlock, pDb, pShow, numOfRows, numOfTables, false, objStatus, sysinfo);
numOfRows++;
}
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index ba9bb2982f..7bde05aca5 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -504,6 +504,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
+#if 0
static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
SMStreamTaskRecoverReq *pReq = taosMemoryCalloc(1, sizeof(SMStreamTaskRecoverReq));
if (pReq == NULL) {
@@ -540,7 +541,6 @@ static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
-#if 0
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
if (pStream->isDistributed) {
int32_t lv = taosArrayGetSize(pStream->tasks);
diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c
index b2a0e6aac8..17b4336465 100644
--- a/source/dnode/mnode/impl/src/mndTrans.c
+++ b/source/dnode/mnode/impl/src/mndTrans.c
@@ -124,8 +124,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->oper, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
- SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
- SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
+ SDB_SET_INT16(pRaw, dataPos, pTrans->originRpcType, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
@@ -282,13 +281,12 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &exec, _OVER)
SDB_GET_INT8(pRaw, dataPos, &oper, _OVER)
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
- SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
- SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
pTrans->stage = stage;
pTrans->policy = policy;
pTrans->conflict = conflict;
pTrans->exec = exec;
pTrans->oper = oper;
+ SDB_GET_INT16(pRaw, dataPos, &pTrans->originRpcType, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
@@ -611,6 +609,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
if (pReq != NULL) {
taosArrayPush(pTrans->pRpcArray, &pReq->info);
+ pTrans->originRpcType = pReq->msgType;
}
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
return pTrans;
@@ -910,6 +909,23 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
}
+ if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
+ mDebug("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
+ SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1);
+ if (pDb != NULL) {
+ for (int32_t j = 0; j < 12; j++) {
+ bool ready = mndIsDbReady(pMnode, pDb);
+ if (!ready) {
+ mDebug("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname1, j);
+ taosMsleep(1000);
+ } else {
+ break;
+ }
+ }
+ }
+ mndReleaseDb(pMnode, pDb);
+ }
+
tmsgSendRsp(&rspMsg);
}
}
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index f7da287ca1..02ca248054 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -777,6 +777,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
}
}
+#if 0
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
@@ -789,18 +790,6 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
-int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
- SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
- int32_t taskId = pRsp->taskId;
- SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
- if (pTask) {
- streamProcessDispatchRsp(pTask, pRsp);
- return 0;
- } else {
- return -1;
- }
-}
-
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->rspTaskId;
@@ -813,6 +802,19 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
}
+#endif
+
+int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
+ SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
+ int32_t taskId = pRsp->taskId;
+ SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
+ if (pTask) {
+ streamProcessDispatchRsp(pTask, pRsp);
+ return 0;
+ } else {
+ return -1;
+ }
+}
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
@@ -873,6 +875,8 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp, false);
+ rpcFreeCont(pMsg->pCont);
+ taosFreeQitem(pMsg);
return;
}
@@ -883,4 +887,6 @@ FAIL:
.info = pMsg->info,
};
tmsgSendRsp(&rsp);
+ rpcFreeCont(pMsg->pCont);
+ taosFreeQitem(pMsg);
}
diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c
index 0f8ec07016..b8e94de115 100644
--- a/source/dnode/vnode/src/vnd/vnodeSvr.c
+++ b/source/dnode/vnode/src/vnd/vnodeSvr.c
@@ -334,14 +334,14 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_STREAM_TASK_DISPATCH:
// return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0);
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
- case TDMT_STREAM_TASK_RECOVER:
- return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
+ /*case TDMT_STREAM_TASK_RECOVER:*/
+ /*return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);*/
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
- case TDMT_STREAM_TASK_RECOVER_RSP:
- return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
+ /*case TDMT_STREAM_TASK_RECOVER_RSP:*/
+ /*return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);*/
case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
default:
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 096d2b5b22..8439cf700d 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -732,18 +732,19 @@ typedef struct SStreamSessionAggOperatorInfo {
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
- SSDataBlock* pRes;
- STimeWindow win;
- SInterval interval;
- int64_t current;
- SArray* pPrevRow; // SArray
- SArray* pNextRow; // SArray
- bool isPrevRowSet;
- bool isNextRowSet;
- int32_t fillType; // fill type
- SColumn tsCol; // primary timestamp column
- SExprSupp scalarSup; // scalar calculation
- struct SFillColInfo* pFillColInfo; // fill column info
+ SSDataBlock* pRes;
+ STimeWindow win;
+ SInterval interval;
+ int64_t current;
+ SArray* pPrevRow; // SArray
+ SArray* pNextRow; // SArray
+ SArray* pLinearInfo; // SArray
+ bool isPrevRowSet;
+ bool isNextRowSet;
+ int32_t fillType; // fill type
+ SColumn tsCol; // primary timestamp column
+ SExprSupp scalarSup; // scalar calculation
+ struct SFillColInfo* pFillColInfo; // fill column info
} STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo {
diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h
index b604794dad..c2de48d0eb 100644
--- a/source/libs/executor/inc/tfill.h
+++ b/source/libs/executor/inc/tfill.h
@@ -33,11 +33,20 @@ typedef struct SFillColInfo {
SVariant fillVal;
} SFillColInfo;
+typedef struct SFillLinearInfo {
+ SPoint start;
+ SPoint end;
+ bool hasNull;
+ bool fillLastPoint;
+ int16_t type;
+ int32_t bytes;
+} SFillLinearInfo;
+
typedef struct {
SSchema col;
char* tagVal;
} SFillTagColInfo;
-
+
typedef struct SFillInfo {
TSKEY start; // start timestamp
TSKEY end; // endKey for fill
diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c
index c5d68676d2..bc266cc33e 100644
--- a/source/libs/executor/src/tfill.c
+++ b/source/libs/executor/src/tfill.c
@@ -669,4 +669,4 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const str
}
return pFillCol;
-}
\ No newline at end of file
+}
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index e7d9a8d8b2..0fec1e61d2 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -2087,6 +2087,34 @@ static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock
pSliceInfo->isNextRowSet = true;
}
+static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
+ int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
+ for (int32_t i = 0; i < numOfCols; ++i) {
+ SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
+ SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
+ SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i);
+
+ // null data should not be kept since it can not be used to perform interpolation
+ if (!colDataIsNull_s(pColInfoData, i)) {
+ int64_t startKey = *(int64_t*)colDataGetData(pTsCol, rowIndex);
+ int64_t endKey = *(int64_t*)colDataGetData(pTsCol, rowIndex + 1);
+ pLinearInfo->start.key = startKey;
+ pLinearInfo->end.key = endKey;
+
+ char* val;
+ val = colDataGetData(pColInfoData, rowIndex);
+ memcpy(pLinearInfo->start.val, val, pLinearInfo->bytes);
+ val = colDataGetData(pColInfoData, rowIndex + 1);
+ memcpy(pLinearInfo->end.val, val, pLinearInfo->bytes);
+
+ pLinearInfo->hasNull = false;
+ } else {
+ pLinearInfo->hasNull = true;
+ }
+ }
+
+}
+
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock,
SSDataBlock* pResBlock) {
int32_t rows = pResBlock->info.rows;
@@ -2115,52 +2143,41 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
- colDataAppend(pDst, rows, (char*)&v, false);
+ colDataAppend(pDst, rows, (char *)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
- colDataAppend(pDst, rows, (char*)&v, false);
+ colDataAppend(pDst, rows, (char *)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
- colDataAppend(pDst, rows, (char*)&v, false);
+ colDataAppend(pDst, rows, (char *)&v, false);
}
pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_LINEAR: {
-#if 0
- if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
- || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
-// goto interp_exit;
- }
+ SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot);
- double v1 = -1, v2 = -1;
- GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
- GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
+ SPoint start = pLinearInfo->start;
+ SPoint end = pLinearInfo->end;
+ SPoint current = {.key = pSliceInfo->current};
+ current.val = taosMemoryCalloc(pLinearInfo->bytes, 1);
- SPoint point1 = {.key = ts, .val = &v1};
- SPoint point2 = {.key = nextTs, .val = &v2};
- SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
+ // before interp range, do not fill
+ if (start.key == INT64_MIN || end.key == INT64_MAX) {
+ break;
+ }
- int32_t srcType = pCtx->inputType;
- if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
- setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
- } else {
- bool exceedMax = false, exceedMin = false;
- taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
- if (exceedMax || exceedMin) {
- __compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
- if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
- COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
- } else {
- COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
- }
- }
- }
-#endif
- // TODO: pResBlock->info.rows += 1;
+ if (pLinearInfo->hasNull) {
+ colDataAppendNULL(pDst, rows);
+ } else {
+ taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type);
+ colDataAppend(pDst, rows, (char *)current.val, false);
+ }
+
+ pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_PREV: {
@@ -2246,6 +2263,55 @@ static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
return TSDB_CODE_SUCCESS;
}
+static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
+ if (pInfo->pLinearInfo != NULL) {
+ return TSDB_CODE_SUCCESS;
+ }
+
+ pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo));
+ if (pInfo->pNextRow == NULL) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+
+ int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
+ for (int32_t i = 0; i < numOfCols; ++i) {
+ SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
+
+ SFillLinearInfo linearInfo = {0};
+ linearInfo.start.key = INT64_MIN;
+ linearInfo.end.key = INT64_MAX;
+ linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
+ linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
+ linearInfo.hasNull = false;
+ linearInfo.fillLastPoint = false;
+ linearInfo.type = pColInfo->info.type;
+ linearInfo.bytes = pColInfo->info.bytes;
+ taosArrayPush(pInfo->pLinearInfo, &linearInfo);
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
+
+static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
+ int32_t code;
+ code = initPrevRowsKeeper(pInfo, pBlock);
+ if (code != TSDB_CODE_SUCCESS) {
+ return TSDB_CODE_FAILED;
+ }
+
+ code = initNextRowsKeeper(pInfo, pBlock);
+ if (code != TSDB_CODE_SUCCESS) {
+ return TSDB_CODE_FAILED;
+ }
+
+ code = initFillLinearInfo(pInfo, pBlock);
+ if (code != TSDB_CODE_SUCCESS) {
+ return TSDB_CODE_FAILED;
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
+
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
@@ -2278,13 +2344,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break;
}
- int32_t code;
- code = initPrevRowsKeeper(pSliceInfo, pBlock);
- if (code != TSDB_CODE_SUCCESS) {
- longjmp(pTaskInfo->env, code);
- }
-
- code = initNextRowsKeeper(pSliceInfo, pBlock);
+ int32_t code = initKeeperInfo(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
@@ -2312,46 +2372,103 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pResBlock->info.rows += 1;
doKeepPrevRows(pSliceInfo, pBlock, i);
- pSliceInfo->current =
- taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
- if (pSliceInfo->current > pSliceInfo->win.ekey) {
- doSetOperatorCompleted(pOperator);
- break;
- }
+ // for linear interpolation, always fill value between this and next points;
+ // if its the first point in data block, also fill values between previous(if there's any) and this point;
+ // if its the last point in data block, no need to fill, but reserve this point as the start value for next data block.
+ if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
+ doKeepLinearInfo(pSliceInfo, pBlock, i);
+ pSliceInfo->current =
+ taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
+ if (i < pBlock->info.rows - 1) {
+ int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
+ if (nextTs > pSliceInfo->current) {
+ while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
+ genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
+ pSliceInfo->current =
+ taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
+ if (pResBlock->info.rows >= pResBlock->info.capacity) {
+ break;
+ }
+ }
- if (pResBlock->info.rows >= pResBlock->info.capacity) {
- break;
- }
- } else if (ts < pSliceInfo->current) {
- // in case interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
- doKeepPrevRows(pSliceInfo, pBlock, i);
-
- if (i < pBlock->info.rows - 1) {
- // in case interpolation window starts and ends between two datapoints, fill(next) need to interpolate
- doKeepNextRows(pSliceInfo, pBlock, i + 1);
- int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
- if (nextTs > pSliceInfo->current) {
- while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
- genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
- pSliceInfo->current =
- taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
- if (pResBlock->info.rows >= pResBlock->info.capacity) {
+ if (pSliceInfo->current > pSliceInfo->win.ekey) {
+ doSetOperatorCompleted(pOperator);
break;
}
+ } else {
+ // ignore current row, and do nothing
}
-
- if (pSliceInfo->current > pSliceInfo->win.ekey) {
- doSetOperatorCompleted(pOperator);
- break;
- }
- } else {
- // ignore current row, and do nothing
+ } else { // it is the last row of current block
+ }
+ } else { // non-linear interpolation
+ pSliceInfo->current =
+ taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
+ if (pSliceInfo->current > pSliceInfo->win.ekey) {
+ doSetOperatorCompleted(pOperator);
+ break;
+ }
+
+ if (pResBlock->info.rows >= pResBlock->info.capacity) {
+ break;
+ }
+ }
+ } else if (ts < pSliceInfo->current) {
+ // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
+ doKeepPrevRows(pSliceInfo, pBlock, i);
+
+ if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
+ doKeepLinearInfo(pSliceInfo, pBlock, i);
+ //pSliceInfo->current =
+ // taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
+ if (i < pBlock->info.rows - 1) {
+ int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
+ if (nextTs > pSliceInfo->current) {
+ while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
+ genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
+ pSliceInfo->current =
+ taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
+ if (pResBlock->info.rows >= pResBlock->info.capacity) {
+ break;
+ }
+ }
+
+ if (pSliceInfo->current > pSliceInfo->win.ekey) {
+ doSetOperatorCompleted(pOperator);
+ break;
+ }
+ } else {
+ // ignore current row, and do nothing
+ }
+ } else { // it is the last row of current block
+ }
+ } else { // non-linear interpolation
+ if (i < pBlock->info.rows - 1) {
+ // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
+ doKeepNextRows(pSliceInfo, pBlock, i + 1);
+ int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
+ if (nextTs > pSliceInfo->current) {
+ while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
+ genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
+ pSliceInfo->current =
+ taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
+ if (pResBlock->info.rows >= pResBlock->info.capacity) {
+ break;
+ }
+ }
+
+ if (pSliceInfo->current > pSliceInfo->win.ekey) {
+ doSetOperatorCompleted(pOperator);
+ break;
+ }
+ } else {
+ // ignore current row, and do nothing
+ }
+ } else { // it is the last row of current block
+ doKeepPrevRows(pSliceInfo, pBlock, i);
}
- } else { // it is the last row of current block
- doKeepPrevRows(pSliceInfo, pBlock, i);
}
} else { // ts > pSliceInfo->current
- // in case interpolation window starts and ends between two datapoints, fill(next) need to interpolate
+ // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
@@ -2380,11 +2497,39 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pResBlock->info.rows += 1;
doKeepPrevRows(pSliceInfo, pBlock, i);
- pSliceInfo->current =
- taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
- if (pResBlock->info.rows >= pResBlock->info.capacity) {
- break;
+ if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
+ doKeepLinearInfo(pSliceInfo, pBlock, i);
+ pSliceInfo->current =
+ taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
+ if (i < pBlock->info.rows - 1) {
+ int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
+ if (nextTs > pSliceInfo->current) {
+ while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
+ genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
+ pSliceInfo->current =
+ taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
+ if (pResBlock->info.rows >= pResBlock->info.capacity) {
+ break;
+ }
+ }
+
+ if (pSliceInfo->current > pSliceInfo->win.ekey) {
+ doSetOperatorCompleted(pOperator);
+ break;
+ }
+ } else {
+ // ignore current row, and do nothing
+ }
+ } else { // it is the last row of current block
+ }
+ } else { // non-linear interpolation
+ pSliceInfo->current =
+ taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
+
+ if (pResBlock->info.rows >= pResBlock->info.capacity) {
+ break;
+ }
}
}
@@ -2446,6 +2591,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
initResultSizeInfo(&pOperator->resultInfo, 4096);
+ pInfo->pPrevRow = NULL;
+ pInfo->pNextRow = NULL;
+ pInfo->pLinearInfo = NULL;
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues);
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->win = pInterpPhyNode->timeRange;
diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c
index d19c203ffe..aa526dd440 100644
--- a/source/libs/parser/src/parAstCreater.c
+++ b/source/libs/parser/src/parAstCreater.c
@@ -819,6 +819,10 @@ SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt) {
pOptions->numOfVgroups = TSDB_DEFAULT_VN_PER_DB;
pOptions->singleStable = TSDB_DEFAULT_DB_SINGLE_STABLE;
pOptions->schemaless = TSDB_DEFAULT_DB_SCHEMALESS;
+ pOptions->walRetentionPeriod = TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD;
+ pOptions->walRetentionSize = TSDB_DEFAULT_DB_WAL_RETENTION_SIZE;
+ pOptions->walRollPeriod = TSDB_DEFAULT_DB_WAL_ROLL_PERIOD;
+ pOptions->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
return (SNode*)pOptions;
}
@@ -846,6 +850,10 @@ SNode* createAlterDatabaseOptions(SAstCreateContext* pCxt) {
pOptions->numOfVgroups = -1;
pOptions->singleStable = -1;
pOptions->schemaless = -1;
+ pOptions->walRetentionPeriod = -1;
+ pOptions->walRetentionSize = -1;
+ pOptions->walRollPeriod = -1;
+ pOptions->walSegmentSize = -1;
return (SNode*)pOptions;
}
diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp
index b513ff57ed..5cf6c30b31 100644
--- a/source/libs/parser/test/parInitialCTest.cpp
+++ b/source/libs/parser/test/parInitialCTest.cpp
@@ -111,6 +111,10 @@ TEST_F(ParserInitialCTest, createDatabase) {
expect.numOfVgroups = TSDB_DEFAULT_VN_PER_DB;
expect.numOfStables = TSDB_DEFAULT_DB_SINGLE_STABLE;
expect.schemaless = TSDB_DEFAULT_DB_SCHEMALESS;
+ expect.walRetentionPeriod = TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD;
+ expect.walRetentionSize = TSDB_DEFAULT_DB_WAL_RETENTION_SIZE;
+ expect.walRollPeriod = TSDB_DEFAULT_DB_WAL_ROLL_PERIOD;
+ expect.walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
};
auto setDbBufferFunc = [&](int32_t buffer) { expect.buffer = buffer; };
diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c
index cabca0dc0c..9d87fae62a 100644
--- a/source/libs/scheduler/src/schTask.c
+++ b/source/libs/scheduler/src/schTask.c
@@ -411,7 +411,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
if (pJob->fetched) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode));
- SCH_ERR_RET(rspCode);
+ SCH_ERR_JRET(rspCode);
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c
index a37cd4fd9e..f1ec7f5e04 100644
--- a/source/libs/scheduler/src/scheduler.c
+++ b/source/libs/scheduler/src/scheduler.c
@@ -154,6 +154,8 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
return;
}
+ SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", errCode:0x%x", *jobId, errCode);
+
schHandleJobDrop(pJob, errCode);
schReleaseJob(*jobId);
diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c
index e4bf90f308..1f8d742de4 100644
--- a/source/libs/stream/src/stream.c
+++ b/source/libs/stream/src/stream.c
@@ -229,6 +229,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
return 0;
}
+#if 0
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
@@ -267,6 +268,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
return 0;
}
+#endif
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 4a4d67b89a..ffb7c04bf2 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -19,23 +19,23 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
void* exec = pTask->exec.executor;
// set input
- SStreamQueueItem* pItem = (SStreamQueueItem*)data;
+ const SStreamQueueItem* pItem = (const SStreamQueueItem*)data;
if (pItem->type == STREAM_INPUT__GET_RES) {
- SStreamTrigger* pTrigger = (SStreamTrigger*)data;
+ const SStreamTrigger* pTrigger = (const SStreamTrigger*)data;
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
- SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
+ const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data;
qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
- SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
- SArray* blocks = pBlock->blocks;
+ const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
+ SArray* blocks = pBlock->blocks;
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
- SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
- SArray* blocks = pMerged->reqs;
+ const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data;
+ SArray* blocks = pMerged->reqs;
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
} else {
@@ -51,8 +51,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
}
if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
- SSDataBlock block = {0};
- SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
+ SSDataBlock block = {0};
+ const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
block.info.type = STREAM_PULL_OVER;
@@ -200,14 +200,13 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId);
- streamFreeQitem(data);
-
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
// TODO log failed ver
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
+ streamFreeQitem(data);
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
@@ -224,10 +223,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
+ streamFreeQitem(data);
return -1;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/
+ } else {
+ taosArrayDestroy(pRes);
}
+ streamFreeQitem(data);
}
return 0;
}
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index 8faa22d643..b74e838628 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -48,8 +48,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
+ if (streamLoadTasks(pMeta) < 0) {
+ goto _err;
+ }
return pMeta;
+
_err:
+ if (pMeta->path) taosMemoryFree(pMeta->path);
+ if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
+ if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb);
+ if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
+ if (pMeta->db) tdbClose(pMeta->db);
+ taosMemoryFree(pMeta);
return NULL;
}
diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c
index 28693d15a6..b2a7e00877 100644
--- a/source/libs/stream/src/streamRecover.c
+++ b/source/libs/stream/src/streamRecover.c
@@ -15,6 +15,7 @@
#include "streamInc.h"
+#if 0
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@@ -86,17 +87,18 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp
tEndDecode(pDecoder);
return 0;
}
+#endif
int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) {
- if (tEncodeI32(pEncoder, pCheckpoint->nodeId) < 0) return -1;
- if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1;
+ if (tEncodeI32(pEncoder, pCheckpoint->srcNodeId) < 0) return -1;
+ if (tEncodeI32(pEncoder, pCheckpoint->srcChildId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->stateProcessedVer) < 0) return -1;
return 0;
}
int32_t tDecodeSStreamCheckpointInfo(SDecoder* pDecoder, SStreamCheckpointInfo* pCheckpoint) {
- if (tDecodeI32(pDecoder, &pCheckpoint->nodeId) < 0) return -1;
- if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1;
+ if (tDecodeI32(pDecoder, &pCheckpoint->srcNodeId) < 0) return -1;
+ if (tDecodeI32(pDecoder, &pCheckpoint->srcChildId) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->stateProcessedVer) < 0) return -1;
return 0;
}
@@ -130,14 +132,13 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh
return 0;
}
-int32_t streamCheckSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
+int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL;
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
- int32_t sz = taosArrayGetSize(pTask->checkpointInfo);
SStreamMultiVgCheckpointInfo checkpoint;
- checkpoint.checkpointId = 0;
+ checkpoint.checkpointId = atomic_fetch_add_32(&pTask->nextCheckId, 1);
checkpoint.checkTs = taosGetTimestampMs();
checkpoint.streamId = pTask->streamId;
checkpoint.taskId = pTask->taskId;
@@ -169,16 +170,21 @@ int32_t streamCheckSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
goto FAIL;
}
+ int32_t sz = taosArrayGetSize(pTask->checkpointInfo);
+ for (int32_t i = 0; i < sz; i++) {
+ SStreamCheckpointInfo* pCheck = taosArrayGet(pTask->checkpointInfo, i);
+ pCheck->stateSaveVer = pCheck->stateProcessedVer;
+ }
+
taosMemoryFree(buf);
return 0;
FAIL:
if (buf) taosMemoryFree(buf);
return -1;
+ return 0;
}
-int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
- ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
- // load status
+int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
void* pVal = NULL;
int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
@@ -196,29 +202,77 @@ int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
-int32_t streamCheckAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
+int32_t streamSaveSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
+ ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
+ return streamSaveStateInfo(pMeta, pTask);
+}
+
+int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
+ ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
+ return streamLoadStateInfo(pMeta, pTask);
+}
+
+int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
- // save and copy state
+ // TODO save and copy state
+
// save state info
+ if (streamSaveStateInfo(pMeta, pTask) < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+int32_t streamFetchDownstreamStatus(SStreamTask* pTask) {
+ // set self status to recover_phase1
+ // build fetch status msg
+ // send fetch msg
+ atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM);
+
+ if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
+ } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
+ } else {
+ ASSERT(0);
+ }
+ return 0;
+}
+
+int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, void* msg) {
+ // if failed, set timer and retry
+ // if successful
+ // add rsp state to partial recover hash
+ // if complete, begin actual recover
return 0;
}
int32_t streamRecoverAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
- // try recover sink level
- // after all sink level recovered, choose current state backend to recover
+ // recover sink level
+ // after all sink level recovered
+ // choose suitable state to recover
return 0;
}
-int32_t streamCheckSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
+int32_t streamSaveSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
- // try recover agg level
- //
+ // TODO: save and copy state
return 0;
}
int32_t streamRecoverSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
+ // if totLevel == 3
+ // fetch agg state
+ // recover from local state to agg state, not send msg
+ // recover from agg state to most recent log v1
+ // enable input queue, set status recover_phase2
+ // recover from v1 to queue msg v2, set status normal
+
+ // if totLevel == 2
+ // fetch sink state
+ // recover from local state to sink state v1, send msg
+ // enable input queue, set status recover_phase2
+ // recover from v1 to queue msg v2, set status normal
return 0;
}
diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c
index e1df181329..31d567a2ed 100644
--- a/source/libs/transport/src/transCli.c
+++ b/source/libs/transport/src/transCli.c
@@ -1271,6 +1271,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd);
return -1;
+ } else {
+ // change error code for taos client driver if retryCnt exceeds limit
+ if (0 == strncmp(pTransInst->label, "TSC", strlen("TSC"))) pResp->code = TSDB_CODE_APP_NOT_READY;
}
}
}
@@ -1381,6 +1384,7 @@ int transReleaseCliHandle(void* handle) {
tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
+ taosMemoryFree(cmsg);
return -1;
}
return 0;
diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c
index b76769bdb8..6c8e949b25 100644
--- a/source/os/src/osFile.c
+++ b/source/os/src/osFile.c
@@ -703,7 +703,11 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
int64_t sentbytes;
while (leftbytes > 0) {
+ #ifdef _TD_ARM_32
+ sentbytes = sendfile(pFileOut->fd, pFileIn->fd, (long int*)offset, leftbytes);
+ #else
sentbytes = sendfile(pFileOut->fd, pFileIn->fd, offset, leftbytes);
+ #endif
if (sentbytes == -1) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c
index 7cbc1cd555..dd61f7d225 100644
--- a/source/util/src/tcache.c
+++ b/source/util/src/tcache.c
@@ -35,7 +35,7 @@ typedef struct SCacheNode {
uint64_t addedTime; // the added time when this element is added or updated into cache
uint64_t lifespan; // life duration when this element should be remove from cache
int64_t expireTime; // expire time
- uint64_t signature;
+ void* signature;
struct STrashElem *pTNodeHeader; // point to trash node head
uint16_t keyLen : 15; // max key size: 32kb
bool inTrashcan : 1; // denote if it is in trash or not
@@ -208,7 +208,7 @@ static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
* @param pNode data node
*/
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheNode *pNode) {
- if (pNode->signature != (uint64_t)pNode) {
+ if (pNode->signature != pNode) {
uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
return;
}
@@ -226,7 +226,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheNode *
}
static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STrashElem *pElem) {
- if (pElem->pData->signature != (uint64_t)pElem->pData) {
+ if (pElem->pData->signature != pElem->pData) {
uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return NULL;
}
@@ -494,7 +494,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if (pCacheObj == NULL || data == NULL) return NULL;
SCacheNode *ptNode = (SCacheNode *)((char *)data - sizeof(SCacheNode));
- if (ptNode->signature != (uint64_t)ptNode) {
+ if (ptNode->signature != ptNode) {
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
return NULL;
}
@@ -511,7 +511,7 @@ void *taosCacheTransferData(SCacheObj *pCacheObj, void **data) {
if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
SCacheNode *ptNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
- if (ptNode->signature != (uint64_t)ptNode) {
+ if (ptNode->signature != ptNode) {
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
return NULL;
}
@@ -539,7 +539,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
// It happens when there is only one object in the cache, and two threads which has referenced this object
// start to free the it simultaneously [TD-1569].
SCacheNode *pNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
- if (pNode->signature != (uint64_t)pNode) {
+ if (pNode->signature != pNode) {
uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
return;
}
@@ -728,7 +728,7 @@ SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pDat
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
pNewNode->lifespan = duration;
pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
- pNewNode->signature = (uint64_t)pNewNode;
+ pNewNode->signature = pNewNode;
pNewNode->size = (uint32_t)sizeInBytes;
return pNewNode;
diff --git a/tests/pytest/util/dnodes-random-fail.py b/tests/pytest/util/dnodes-random-fail.py
index 7cadca64a3..9b653844f8 100644
--- a/tests/pytest/util/dnodes-random-fail.py
+++ b/tests/pytest/util/dnodes-random-fail.py
@@ -24,12 +24,9 @@ class TDSimClient:
self.cfgDict = {
"numOfLogLines": "100000000",
- "numOfThreadsPerCore": "2.0",
"locale": "en_US.UTF-8",
"charset": "UTF-8",
"asyncLog": "0",
- "anyIp": "0",
- "sdbDebugFlag": "135",
"rpcDebugFlag": "135",
"tmrDebugFlag": "131",
"cDebugFlag": "135",
diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py
index 20e4e4abe6..e530695d1e 100644
--- a/tests/pytest/util/dnodes.py
+++ b/tests/pytest/util/dnodes.py
@@ -30,7 +30,6 @@ class TDSimClient:
self.path = path
self.cfgDict = {
"numOfLogLines": "100000000",
- "numOfThreadsPerCore": "2.0",
"locale": "en_US.UTF-8",
"charset": "UTF-8",
"asyncLog": "0",
@@ -40,6 +39,7 @@ class TDSimClient:
"udebugFlag": "143",
"jnidebugFlag": "143",
"qdebugFlag": "143",
+ "supportVnodes": "1024",
"telemetryReporting": "0",
}
@@ -117,8 +117,6 @@ class TDDnode:
self.valgrind = 0
self.remoteIP = ""
self.cfgDict = {
- "walLevel": "2",
- "fsync": "1000",
"monitor": "0",
"maxShellConns": "30000",
"locale": "en_US.UTF-8",
@@ -139,6 +137,7 @@ class TDDnode:
"qdebugFlag": "143",
"numOfLogLines": "100000000",
"statusInterval": "1",
+ "supportVnodes": "1024",
"telemetryReporting": "0"
}
diff --git a/tests/script/sh/compile_udf.sh b/tests/script/sh/compile_udf.sh
index 5ff3f2bc8a..c7148d7d7d 100755
--- a/tests/script/sh/compile_udf.sh
+++ b/tests/script/sh/compile_udf.sh
@@ -4,7 +4,7 @@ rm -rf /tmp/udf/libbitand.so /tmp/udf/libsqrsum.so
mkdir -p /tmp/udf
echo "compile udf bit_and and sqr_sum"
gcc -fPIC -shared sh/bit_and.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libbitand.so
-gcc -fPIC -shared sh/sqr_sum.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libsqrsum.so
+gcc -fPIC -shared sh/l2norm.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libl2norm.so
echo "debug show /tmp/udf/*.so"
ls /tmp/udf/*.so
diff --git a/tests/script/sh/sqr_sum.c b/tests/script/sh/l2norm.c
similarity index 84%
rename from tests/script/sh/sqr_sum.c
rename to tests/script/sh/l2norm.c
index af57f377ab..8ccdffb8d6 100644
--- a/tests/script/sh/sqr_sum.c
+++ b/tests/script/sh/l2norm.c
@@ -5,22 +5,22 @@
#include "taosudf.h"
-DLL_EXPORT int32_t sqr_sum_init() {
+DLL_EXPORT int32_t l2norm_init() {
return 0;
}
-DLL_EXPORT int32_t sqr_sum_destroy() {
+DLL_EXPORT int32_t l2norm_destroy() {
return 0;
}
-DLL_EXPORT int32_t sqr_sum_start(SUdfInterBuf *buf) {
+DLL_EXPORT int32_t l2norm_start(SUdfInterBuf *buf) {
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(double);
buf->numOfResult = 0;
return 0;
}
-DLL_EXPORT int32_t sqr_sum(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
+DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
double sumSquares = *(double*)interBuf->buf;
int8_t numNotNull = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
@@ -67,7 +67,7 @@ DLL_EXPORT int32_t sqr_sum(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInt
return 0;
}
-DLL_EXPORT int32_t sqr_sum_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
+DLL_EXPORT int32_t l2norm_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
if (buf->numOfResult == 0) {
resultData->numOfResult = 0;
return 0;
diff --git a/tests/script/tsim/db/alter_option.sim b/tests/script/tsim/db/alter_option.sim
index 7df1f02713..00baa9b3f6 100644
--- a/tests/script/tsim/db/alter_option.sim
+++ b/tests/script/tsim/db/alter_option.sim
@@ -111,16 +111,16 @@ endi
if $data21_db != 1000 then # wal_level fsyncperiod
return -1
endi
-if $data22_db != 0 then #
+if $data22_db != 172800 then # wal_retention_period
return -1
endi
-if $data23_db != 0 then #
+if $data23_db != -1 then # wal_retention_size
return -1
endi
-if $data24_db != 0 then #
+if $data24_db != 86400 then # wal_roll_period
return -1
endi
-if $data25_db != 0 then #
+if $data25_db != 0 then # wal_segment_size
return -1
endi
diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim
index 7259b1e779..2f685f8e24 100644
--- a/tests/script/tsim/query/udf.sim
+++ b/tests/script/tsim/query/udf.sim
@@ -24,10 +24,10 @@ if $system_content == Windows_NT then
endi
if $system_content == Windows_NT then
sql create function bit_and as 'C:\\Windows\\Temp\\bitand.dll' outputtype int bufSize 8;
- sql create aggregate function sqr_sum as 'C:\\Windows\\Temp\\sqrsum.dll' outputtype double bufSize 8;
+ sql create aggregate function l2norm as 'C:\\Windows\\Temp\\l2norm.dll' outputtype double bufSize 8;
else
sql create function bit_and as '/tmp/udf/libbitand.so' outputtype int bufSize 8;
- sql create aggregate function sqr_sum as '/tmp/udf/libsqrsum.so' outputtype double bufSize 8;
+ sql create aggregate function l2norm as '/tmp/udf/libl2norm.so' outputtype double bufSize 8;
endi
sql show functions;
if $rows != 2 then
@@ -44,7 +44,7 @@ if $data10 != 2 then
return -1
endi
-sql select sqr_sum(f) from t;
+sql select l2norm(f) from t;
if $rows != 1 then
print expect 1, actual $rows
return -1
@@ -66,7 +66,7 @@ if $data10 != 1 then
return -1
endi
-sql select sqr_sum(f1, f2) from t2;
+sql select l2norm(f1, f2) from t2;
if $rows != 1 then
return -1
endi
@@ -95,7 +95,7 @@ if $data30 != NULL then
return -1
endi
-sql select sqr_sum(f1, f2) from t2;
+sql select l2norm(f1, f2) from t2;
print $rows, $data00
if $rows != 1 then
return -1
@@ -105,7 +105,7 @@ if $data00 != 2.645751311 then
endi
sql insert into t2 values(now+4s, 4, 8)(now+5s, 5, 9);
-sql select sqr_sum(f1-f2), sqr_sum(f1+f2) from t2;
+sql select l2norm(f1-f2), l2norm(f1+f2) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1;
@@ -117,7 +117,7 @@ if $data01 != 18.547236991 then
return -1
endi
-sql select sqr_sum(bit_and(f2, f1)), sqr_sum(bit_and(f1, f2)) from t2;
+sql select l2norm(bit_and(f2, f1)), l2norm(bit_and(f1, f2)) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1
@@ -129,7 +129,7 @@ if $data01 != 1.414213562 then
return -1
endi
-sql select sqr_sum(f2) from udf.t2 group by 1-bit_and(f1, f2) order by 1-bit_and(f1,f2);
+sql select l2norm(f2) from udf.t2 group by 1-bit_and(f1, f2) order by 1-bit_and(f1,f2);
print $rows , $data00 , $data10 , $data20
if $rows != 3 then
return -1
@@ -149,10 +149,10 @@ sql show functions;
if $rows != 1 then
return -1
endi
-if $data00 != @sqr_sum@ then
+if $data00 != @l2norm@ then
return -1
endi
-sql drop function sqr_sum;
+sql drop function l2norm;
sql show functions;
if $rows != 0 then
return -1
diff --git a/tools/taos-tools b/tools/taos-tools
deleted file mode 160000
index 3c7dafeea3..0000000000
--- a/tools/taos-tools
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 3c7dafeea3e558968165b73bee0f51024898e3da