Merge branch '3.0' into feature/TD-11274-3.0

This commit is contained in:
Cary Xu 2022-08-09 16:57:51 +08:00
commit dda6761549
45 changed files with 843 additions and 385 deletions

View File

@ -115,9 +115,7 @@ ELSE ()
ENDIF ()
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
ADD_DEFINITIONS("-D_TD_ARM_")
ELSE ()
IF (TD_INTEL_64 OR TD_INTEL_32)
ADD_DEFINITIONS("-msse4.2")
IF("${FMA_SUPPORT}" MATCHES "true")
MESSAGE(STATUS "turn fma function support on")

View File

@ -85,10 +85,14 @@ IF ("${CPUTYPE}" STREQUAL "")
MESSAGE(STATUS "The current platform is aarch32")
SET(PLATFORM_ARCH_STR "arm")
SET(TD_ARM_32 TRUE)
ADD_DEFINITIONS("-D_TD_ARM_")
ADD_DEFINITIONS("-D_TD_ARM_32")
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64")
MESSAGE(STATUS "The current platform is aarch64")
SET(PLATFORM_ARCH_STR "arm64")
SET(TD_ARM_64 TRUE)
ADD_DEFINITIONS("-D_TD_ARM_")
ADD_DEFINITIONS("-D_TD_ARM_64")
ENDIF ()
ELSE ()
# if generate ARM version:
@ -96,15 +100,21 @@ ELSE ()
IF (${CPUTYPE} MATCHES "aarch32")
SET(PLATFORM_ARCH_STR "arm")
MESSAGE(STATUS "input cpuType: aarch32")
ADD_DEFINITIONS("-D_TD_ARM_")
ADD_DEFINITIONS("-D_TD_ARM_32")
SET(TD_ARM_32 TRUE)
ELSEIF (${CPUTYPE} MATCHES "aarch64")
SET(PLATFORM_ARCH_STR "arm64")
MESSAGE(STATUS "input cpuType: aarch64")
ADD_DEFINITIONS("-D_TD_ARM_")
ADD_DEFINITIONS("-D_TD_ARM_64")
SET(TD_ARM_64 TRUE)
ELSEIF (${CPUTYPE} MATCHES "mips64")
SET(PLATFORM_ARCH_STR "mips")
MESSAGE(STATUS "input cpuType: mips64")
SET(TD_MIPS_64 TRUE)
ADD_DEFINITIONS("-D_TD_MIPS_")
ADD_DEFINITIONS("-D_TD_MIPS_64")
ELSEIF (${CPUTYPE} MATCHES "x64")
SET(PLATFORM_ARCH_STR "amd64")
MESSAGE(STATUS "input cpuType: x64")

View File

@ -10,9 +10,11 @@ title: 通过 Docker 快速体验 TDengine
如果已经安装了 docker 只需执行下面的命令。
```shell
docker run -d -p 6030-6049:6030-6049 -p 6030-6049:6030-6049/udp tdengine/tdengine
docker run -d -p 6030:6030 -p 6041/6041 -p 6043-6049/6043-6049 -p 6043-6049:6043-6049/udp tdengine/tdengine
```
注意TDengine 3.0 服务端仅使用 6030 TCP 端口。6041 为 taosAdapter 所使用提供 REST 服务端口。6043-6049 为 taosAdapter 提供第三方应用接入所使用端口,可根据需要选择是否打开。
确定该容器已经启动并且在正常运行
```shell
@ -104,7 +106,7 @@ docker run -d --network=host --name tdengine-taosd -e TAOS_DISABLE_ADAPTER=true
```
该命令将在数据库 test 下面自动创建一张超级表 meters该超级表下有 1 万张表,表名为 "d0" 到 "d9999",每张表有 1 万条记录,每条记录有 (ts, current, voltage, phase) 四个字段,时间戳从 "2017-07-14 10:40:00 000" 到 "2017-07-14 10:40:09 999",每张表带有标签 location 和 groupIdgroupId 被设置为 1 到 10 location 被设置为 "San Francisco" 或者 "Los Angeles"。
该命令将在数据库 test 下面自动创建一张超级表 meters该超级表下有 1 万张表,表名为 "d0" 到 "d9999",每张表有 1 万条记录,每条记录有 (ts, current, voltage, phase) 四个字段,时间戳从 "2017-07-14 10:40:00 000" 到 "2017-07-14 10:40:09 999",每张表带有标签 location 和 groupIdgroupId 被设置为 1 到 10 location 被设置为 "San Francisco" 或者 "Los Angeles"等城市名称
这条命令很快完成 1 亿条记录的插入。具体时间取决于硬件性能。
@ -126,10 +128,10 @@ taos> select count(*) from test.meters;
taos> select avg(current), max(voltage), min(phase) from test.meters;
```
查询 location="California.SanFrancisco" 的记录总条数:
查询 location="San Francisco" 的记录总条数:
```sql
taos> select count(*) from test.meters where location="California.SanFrancisco";
taos> select count(*) from test.meters where location="San Francisco";
```
查询 groupId=10 的所有记录的平均值、最大值、最小值等:

View File

@ -6,56 +6,144 @@ description: "支持用户编码的聚合函数和标量函数,在查询中嵌
在有些应用场景中,应用逻辑需要的查询无法直接使用系统内置的函数来表示。利用 UDF 功能TDengine 可以插入用户编写的处理代码并在查询中使用它们,就能够很方便地解决特殊应用场景中的使用需求。 UDF 通常以数据表中的一列数据做为输入,同时支持以嵌套子查询的结果作为输入。
从 2.2.0.0 版本开始,TDengine 支持通过 C/C++ 语言进行 UDF 定义。接下来结合示例讲解 UDF 的使用方法。
TDengine 支持通过 C/C++ 语言进行 UDF 定义。接下来结合示例讲解 UDF 的使用方法。
用户可以通过 UDF 实现两类函数: 标量函数 和 聚合函数。
用户可以通过 UDF 实现两类函数: 标量函数和聚合函数。标量函数对每行数据返回一个值,如求绝对值 abs正弦函数 sin字符串拼接函数 concat 等。聚合函数对多行数据进行返回一个值,如求平均数 avg最大值 max 等
## 用 C/C++ 语言来定义 UDF
实现 UDF 时,需要实现规定的接口函数
- 标量函数需要实现标量接口函数 scalarfn 。
- 聚合函数需要实现聚合接口函数 aggfn_start aggfn aggfn_finish。
- 如果需要初始化,实现 udf_init如果需要清理工作实现udf_destory。
### 标量函数
接口函数的名称是 UDF 名称,或者是 UDF 名称和特定后缀_start, _finish, _init, _destroy)的连接。列表中的scalarfnaggfn, udf需要替换成udf函数名。
用户可以按照下列函数模板定义自己的标量计算函数
## 实现标量函数
标量函数实现模板如下
```c
#include "taos.h"
#include "taoserror.h"
#include "taosudf.h"
`int32_t udf(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
// initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix
// @return error number defined in taoserror.h
int32_t scalarfn_init() {
// initialization.
return TSDB_CODE_SUCCESS;
}
// scalar function main computation function
// @param inputDataBlock, input data block composed of multiple columns with each column defined by SUdfColumn
// @param resultColumn, output column
// @return error number defined in taoserror.h
int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn* resultColumn) {
// read data from inputDataBlock and process, then output to resultColumn.
return TSDB_CODE_SUCCESS;
}
// cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix.
// @return error number defined in taoserror.h
int32_t scalarfn_destroy() {
// clean up
return TSDB_CODE_SUCCESS;
}
```
scalarfn 为函数名的占位符需要替换成函数名如bit_and。
## 实现聚合函数
聚合函数的实现模板如下
```c
#include "taos.h"
#include "taoserror.h"
#include "taosudf.h"
// Initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix
// @return error number defined in taoserror.h
int32_t aggfn_init() {
// initialization.
return TSDB_CODE_SUCCESS;
}
// aggregate start function. The intermediate value or the state(@interBuf) is initialized in this function. The function name shall be concatenation of udf name and _start suffix
// @param interbuf intermediate value to intialize
// @return error number defined in taoserror.h
int32_t aggfn_start(SUdfInterBuf* interBuf) {
// initialize intermediate value in interBuf
return TSDB_CODE_SUCESS;
}
// aggregate reduce function. This function aggregate old state(@interbuf) and one data bock(inputBlock) and output a new state(@newInterBuf).
// @param inputBlock input data block
// @param interBuf old state
// @param newInterBuf new state
// @return error number defined in taoserror.h
int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
// read from inputBlock and interBuf and output to newInterBuf
return TSDB_CODE_SUCCESS;
}
// aggregate function finish function. This function transforms the intermediate value(@interBuf) into the final output(@result). The function name must be concatenation of aggfn and _finish suffix.
// @interBuf : intermediate value
// @result: final result
// @return error number defined in taoserror.h
int32_t int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) {
// read data from inputDataBlock and process, then output to result
return TSDB_CODE_SUCCESS;
}
// cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix.
// @return error number defined in taoserror.h
int32_t aggfn_destroy() {
// clean up
return TSDB_CODE_SUCCESS;
}
```
aggfn为函数名的占位符需要修改为自己的函数名如l2norm。
## 接口函数定义
接口函数的名称是 udf 名称,或者是 udf 名称和特定后缀_start, _finish, _init, _destroy)的连接。以下描述中函数名称中的 scalarfnaggfn, udf 需要替换成udf函数名。
接口函数返回值表示是否成功如果错误返回错误代码错误代码见taoserror.h。
接口函数参数类型见数据结构定义。
### 标量接口函数
`int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
其中 udf 是函数名的占位符,以上述模板实现的函数对行数据块进行标量计算。
其中 scalarFn 是函数名的占位符。这个函数对数据块进行标量计算通过设置resultColumn结构体中的变量设置值
- scalarFunction 中各参数的具体含义是:
参数的具体含义是:
- inputDataBlock: 输入的数据块
- resultColumn: 输出列
- resultColumn: 输出列。输出列
### 聚合函数
### 聚合接口函数
用户可以按照如下函数模板定义自己的聚合函数。
`int32_t aggfn_start(SUdfInterBuf *interBuf)`
`int32_t udf_start(SUdfInterBuf *interBuf)`
`int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf)`
`int32_t udf(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf)`
`int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)`
`int32_t udf_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)`
其中 udf 是函数名的占位符。其中各参数的具体含义是:
其中 aggfn 是函数名的占位符。首先调用aggfn_start生成结果buffer然后相关的数据会被分为多个行数据块对每个数据块调用 aggfn 用数据块更新中间结果,最后再调用 aggfn_finish 从中间结果产生最终结果,最终结果只能含 0 或 1 条结果数据。
参数的具体含义是:
- interBuf中间结果 buffer。
- inputBlock输入的数据块。
- newInterBuf新的中间结果buffer。
- result最终结果。
其计算过程为首先调用udf_start生成结果buffer然后相关的数据会被分为多个行数据块对每个行数据块调用 udf 用数据块更新中间结果,最后再调用 udf_finish 从中间结果产生最终结果,最终结果只能含 0 或 1 条结果数据。
### UDF 初始化和销毁
`int32_t udf_init()`
`int32_t udf_destroy()`
其中 udf 是函数名的占位符。udf_init 完成初始化工作。 udf_destroy 完成清理工作。
其中 udf 是函数名的占位符。udf_init 完成初始化工作。 udf_destroy 完成清理工作。如果没有初始化工作无需定义udf_init函数。如果没有清理工作无需定义udf_destroy函数。
:::note
如果对应的函数不需要具体的功能,也需要实现一个空函数。
:::
### UDF 数据结构
## UDF 数据结构
```c
typedef struct SUdfColumnMeta {
int16_t type;
@ -103,6 +191,13 @@ typedef struct SUdfInterBuf {
int8_t numOfResult; //zero or one
} SUdfInterBuf;
```
数据结构说明如下:
- SUdfDataBlock 数据块包含行数 numOfRows 和列数 numCols。udfCols[i] (0 <= i <= numCols-1)表示每一列数据类型为SUdfColumn*。
- SUdfColumn 包含列的数据类型定义 colMeta 和列的数据 colData。
- SUdfColumnMeta 成员定义同 taos.h 数据类型定义。
- SUdfColumnData 数据可以变长varLenCol 定义变长数据fixLenCol 定义定长数据。
- SUdfInterBuf 定义中间结构 buffer以及 buffer 中结果个数 numOfResult
为了更好的操作以上数据结构,提供了一些便利函数,定义在 taosudf.h。
@ -118,75 +213,15 @@ gcc -g -O0 -fPIC -shared add_one.c -o add_one.so
这样就准备好了动态链接库 add_one.so 文件,可以供后文创建 UDF 时使用了。为了保证可靠的系统运行,编译器 GCC 推荐使用 7.5 及以上版本。
## 在系统中管理和使用 UDF
### 创建 UDF
用户可以通过 SQL 指令在系统中加载客户端所在主机上的 UDF 函数库(不能通过 RESTful 接口或 HTTP 管理界面来进行这一过程)。一旦创建成功,则当前 TDengine 集群的所有用户都可以在 SQL 指令中使用这些函数。UDF 存储在系统的 MNode 节点上,因此即使重启 TDengine 系统,已经创建的 UDF 也仍然可用。
在创建 UDF 时,需要区分标量函数和聚合函数。如果创建时声明了错误的函数类别,则可能导致通过 SQL 指令调用函数时出错。此外,用户需要保证输入数据类型与 UDF 程序匹配UDF 输出数据类型与 OUTPUTTYPE 匹配。
- 创建标量函数
```sql
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type;
```
- function_name标量函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udf 的实际名称一致;
- library_path包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- output_type此函数计算结果的数据类型名称
例如,如下语句可以把 libbitand.so 创建为系统中可用的 UDF
```sql
CREATE FUNCTION bit_and AS "/home/taos/udf_example/libbitand.so" OUTPUTTYPE INT;
```
- 创建聚合函数:
```sql
CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [ BUFSIZE buffer_size ];
```
- function_name聚合函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udfNormalFunc 的实际名称一致;
- library_path包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- output_type此函数计算结果的数据类型与上文中 udfNormalFunc 的 itype 参数不同,这里不是使用数字表示法,而是直接写类型名称即可;
- buffer_size中间计算结果的缓冲区大小单位是字节。如果不使用可以不设置。
例如,如下语句可以把 libsqrsum.so 创建为系统中可用的 UDF
```sql
CREATE AGGREGATE FUNCTION sqr_sum AS "/home/taos/udf_example/libsqrsum.so" OUTPUTTYPE DOUBLE bufsize 8;
```
### 管理 UDF
- 删除指定名称的用户定义函数:
```
DROP FUNCTION function_name;
```
- function_name此参数的含义与 CREATE 指令中的 function_name 参数一致,也即要删除的函数的名字,例如
```sql
DROP FUNCTION bit_and;
```
- 显示系统中当前可用的所有 UDF
```sql
SHOW FUNCTIONS;
```
### 调用 UDF
在 SQL 指令中,可以直接以在系统中创建 UDF 时赋予的函数名来调用用户定义函数。例如:
```sql
SELECT X(c1,c2) FROM table/stable;
```
表示对名为 c1, c2 的数据列调用名为 X 的用户定义函数。SQL 指令中用户定义函数可以配合 WHERE 等查询特性来使用。
## 管理和使用UDF
关于如何管理和使用UDF参见[UDF使用说明](../12-taos-sql/26-udf.md)
## 示例代码
### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/bit_and.c)
bit_add 实现多列的按位与功能。如果只有一列返回这一列。bit_add 忽略空值。
<details>
<summary>bit_and.c</summary>
@ -196,13 +231,15 @@ SELECT X(c1,c2) FROM table/stable;
</details>
### 聚合函数示例 [sqr_sum](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/sqr_sum.c)
### 聚合函数示例 [l2norm](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/l2norm.c)
l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。
<details>
<summary>sqr_sum.c</summary>
<summary>l2norm.c</summary>
```c
{{#include tests/script/sh/sqr_sum.c}}
{{#include tests/script/sh/l2norm.c}}
```
</details>

View File

@ -4,34 +4,65 @@ title: 用户自定义函数
---
除了 TDengine 的内置函数以外用户还可以编写自己的函数逻辑并加入TDengine系统中。
## 创建 UDF
## 创建函数
用户可以通过 SQL 指令在系统中加载客户端所在主机上的 UDF 函数库(不能通过 RESTful 接口或 HTTP 管理界面来进行这一过程)。一旦创建成功,则当前 TDengine 集群的所有用户都可以在 SQL 指令中使用这些函数。UDF 存储在系统的 MNode 节点上,因此即使重启 TDengine 系统,已经创建的 UDF 也仍然可用。
在创建 UDF 时,需要区分标量函数和聚合函数。如果创建时声明了错误的函数类别,则可能导致通过 SQL 指令调用函数时出错。此外,用户需要保证输入数据类型与 UDF 程序匹配UDF 输出数据类型与 OUTPUTTYPE 匹配。
- 创建标量函数
```sql
CREATE [AGGREGATE] FUNCTION func_name AS library_path OUTPUTTYPE type_name [BUFSIZE buffer_size]
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type;
```
语法说明:
- function_name标量函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udf 的实际名称一致;
- library_path包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- output_type此函数计算结果的数据类型名称
AGGREGATE标识此函数是标量函数还是聚集函数。
func_name函数名必须与函数实现中 udf 的实际名称一致。
library_path包含UDF函数实现的动态链接库的绝对路径是在客户端侧主机上的绝对路径。
type_name标识此函数的返回类型。
buffer_size中间结果的缓冲区大小单位是字节。不设置则默认为0。
例如,如下语句可以把 libbitand.so 创建为系统中可用的 UDF
```sql
CREATE FUNCTION bit_and AS "/home/taos/udf_example/libbitand.so" OUTPUTTYPE INT;
```
- 创建聚合函数:
```sql
CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [ BUFSIZE buffer_size ];
```
- function_name聚合函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udfNormalFunc 的实际名称一致;
- library_path包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- output_type此函数计算结果的数据类型与上文中 udfNormalFunc 的 itype 参数不同,这里不是使用数字表示法,而是直接写类型名称即可;
- buffer_size中间计算结果的缓冲区大小单位是字节。如果不使用可以不设置。
例如,如下语句可以把 libl2norm.so 创建为系统中可用的 UDF
```sql
CREATE AGGREGATE FUNCTION l2norm AS "/home/taos/udf_example/libl2norm.so" OUTPUTTYPE DOUBLE bufsize 8;
```
关于如何开发自定义函数,请参考 [UDF使用说明](../../develop/udf)。
## 删除自定义函数
## 管理 UDF
- 删除指定名称的用户定义函数:
```
DROP FUNCTION function_name;
```
- function_name此参数的含义与 CREATE 指令中的 function_name 参数一致,也即要删除的函数的名字,例如
## 显示 UDF
- function_name此参数的含义与 CREATE 指令中的 function_name 参数一致也即要删除的函数的名字例如bit_and, l2norm
```sql
SHOW FUNCTION;
DROP FUNCTION bit_and;
```
- 显示系统中当前可用的所有 UDF
```sql
SHOW FUNCTIONS;
```
## 调用 UDF
在 SQL 指令中,可以直接以在系统中创建 UDF 时赋予的函数名来调用用户定义函数。例如:
```sql
SELECT X(c1,c2) FROM table/stable;
```
表示对名为 c1, c2 的数据列调用名为 X 的用户定义函数。SQL 指令中用户定义函数可以配合 WHERE 等查询特性来使用。

View File

@ -5,11 +5,12 @@ description: "TAOS SQL 支持的语法规则、主要查询功能、支持的 SQ
本文档说明 TAOS SQL 支持的语法规则、主要查询功能、支持的 SQL 查询函数,以及常用技巧等内容。阅读本文档需要读者具有基本的 SQL 语言的基础。
TAOS SQL 是用户对 TDengine 进行数据写入和查询的主要工具。TAOS SQL 为了便于用户快速上手,在一定程度上提供与标准 SQL 类似的风格和模式。严格意义上TAOS SQL 并不是也不试图提供标准的 SQL 语法。此外,由于 TDengine 针对的时序性结构化数据不提供删除功能,因此在 TAO SQL 中不提供数据删除的相关功能
TAOS SQL 是用户对 TDengine 进行数据写入和查询的主要工具。TAOS SQL 提供标准的 SQL 语法并针对时序数据和业务的特点优化和新增了许多语法和功能。TAOS SQL 语句的最大长度为 1M。TAOS SQL 不支持关键字的缩写,例如 DELETE 不能缩写为 DEL
本章节 SQL 语法遵循如下约定:
- <\> 里的内容是用户需要输入的但不要输入 <\> 本身
- 用大写字母表示关键字,但 SQL 本身并不区分关键字和标识符的大小写
- 用小写字母表示需要用户输入的内容
- \[ \] 表示内容为可选项,但不能输入 [] 本身
- | 表示多选一,选择其中一个即可,但不能输入 | 本身
- … 表示前面的项可重复多个

View File

@ -9,7 +9,7 @@ description: TDengine Java 连接器基于标准 JDBC API 实现, 并提供原
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
`taos-jdbcdriver` 是 TDengine 的官方 Java 语言连接器Java 开发人员可以通过它开发存取 TDengine 数据库的应用软件。`taos-jdbcdriver` 实现了 JDBC driver 标准的接口,并提供两种形式的连接器。一种是通过 TDengine 客户端驱动程序taosc原生连接 TDengine 实例支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能,一种是通过 taosAdapter 提供的 REST 接口连接 TDengine 实例2.4.0.0 及更高版本)。REST 连接实现的功能集合和原生连接有少量不同。
`taos-jdbcdriver` 是 TDengine 的官方 Java 语言连接器Java 开发人员可以通过它开发存取 TDengine 数据库的应用软件。`taos-jdbcdriver` 实现了 JDBC driver 标准的接口,并提供两种形式的连接器。一种是通过 TDengine 客户端驱动程序taosc原生连接 TDengine 实例支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能,一种是通过 taosAdapter 提供的 REST 接口连接 TDengine 实例。REST 连接实现的功能集合和原生连接有少量不同。
![TDengine Database Connector Java](tdengine-jdbc-connector.webp)
@ -41,19 +41,19 @@ REST 连接支持所有能运行 Java 的平台。
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对应类型转换如下:
| TDengine DataType | JDBCType driver 版本 < 2.0.24 | JDBCType driver 版本 >= 2.0.24 |
| ----------------- | --------------------------------- | ---------------------------------- |
| TIMESTAMP | java.lang.Long | java.sql.Timestamp |
| INT | java.lang.Integer | java.lang.Integer |
| BIGINT | java.lang.Long | java.lang.Long |
| FLOAT | java.lang.Float | java.lang.Float |
| DOUBLE | java.lang.Double | java.lang.Double |
| SMALLINT | java.lang.Short | java.lang.Short |
| TINYINT | java.lang.Byte | java.lang.Byte |
| BOOL | java.lang.Boolean | java.lang.Boolean |
| BINARY | java.lang.String | byte array |
| NCHAR | java.lang.String | java.lang.String |
| JSON | - | java.lang.String |
| TDengine DataType | JDBCType |
| ----------------- | ---------------------------------- |
| TIMESTAMP | java.sql.Timestamp |
| INT | java.lang.Integer |
| BIGINT | java.lang.Long |
| FLOAT | java.lang.Float |
| DOUBLE | java.lang.Double |
| SMALLINT | java.lang.Short |
| TINYINT | java.lang.Byte |
| BOOL | java.lang.Boolean |
| BINARY | byte array |
| NCHAR | java.lang.String |
| JSON | java.lang.String |
**注意**JSON 类型仅在 tag 中支持。
@ -198,7 +198,7 @@ url 中的配置参数如下:
- user登录 TDengine 用户名,默认值 'root'。
- password用户登录密码默认值 'taosdata'。
- batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 和 TDengine 2.4.0.12 版本开始JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTPWebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 开始JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTPWebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
- batchErrorIgnoretrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false不再执行失败 SQL 后的任何语句。默认值为false。
- httpConnectTimeout: 连接超时时间,单位 ms 默认值为 5000。
@ -216,7 +216,7 @@ url 中的配置参数如下:
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6);
```
- 从 taos-jdbcdriver-2.0.36 和 TDengine 2.2.0.0 版本开始,如果在 url 中指定了 dbname那么JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url在 SQL 中不需要指定 dbname。例如url 为 jdbc:TAOS-RS://127.0.0.1:6041/test那么可以执行 sqlinsert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
- 从 taos-jdbcdriver-2.0.36 开始,如果在 url 中指定了 dbname那么JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url在 SQL 中不需要指定 dbname。例如url 为 jdbc:TAOS-RS://127.0.0.1:6041/test那么可以执行 sqlinsert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
:::
@ -358,11 +358,11 @@ JDBC 连接器可能报错的错误码包括 3 种JDBC driver 本身的报错
具体的错误码请参考:
- [TDengine Java Connector](https://github.com/taosdata/taos-connector-jdbc/blob/main/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java)
- [TDengine_ERROR_CODE](https://github.com/taosdata/TDengine/blob/develop/src/inc/taoserror.h)
- [TDengine_ERROR_CODE](../error-code)
### 通过参数绑定写入数据
从 2.1.2.0 版本开始,TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据写入INSERT场景的支持。采用这种方式写入数据时能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据写入INSERT场景的支持。采用这种方式写入数据时能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
**注意**
@ -630,7 +630,7 @@ public void setNString(int columnIndex, ArrayList<String> list, int size) throws
### 无模式写入
从 2.2.0.0 版本开始TDengine 增加了对无模式写入功能。无模式写入兼容 InfluxDB 的 行协议Line Protocol、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](/reference/schemaless/)。
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议Line Protocol、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../schemaless)。
**注意**
@ -670,55 +670,127 @@ public class SchemalessInsertTest {
TDengine Java 连接器支持订阅功能,应用 API 如下:
#### 创建订阅
#### 创建 Topic
```java
TSDBSubscribe sub = ((TSDBConnection)conn).subscribe("topic", "select * from meters", false);
Connection connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
statement.executeUpdate("create topic if not exists topic_speed as select ts, speed from speed_table");
```
`subscribe` 方法的三个参数含义如下:
- topic订阅的主题即名称此参数是订阅的唯一标识
- sql订阅的查询语句此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据
- restart如果订阅已经存在是重新开始还是继续之前的订阅
- topic_speed订阅的主题即名称此参数是订阅的唯一标识。
- sql订阅的查询语句此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据。
如上面的例子将使用 SQL 语句 `select * from meters` 创建一个名为 `topic` 的订阅,如果这个订阅已经存在,将继续之前的查询进度,而不是从头开始消费所有的数据。
如上面的例子将使用 SQL 语句 `select ts, speed from speed_table` 创建一个名为 `topic_speed` 的订阅。
#### 创建 Consumer
```java
Properties config = new Properties();
config.setProperty("enable.auto.commit", "true");
config.setProperty("group.id", "group1");
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
TaosConsumer consumer = new TaosConsumer<>(config);
```
- enable.auto.commit: 是否允许自动提交。
- group.id: consumer: 所在的 group。
- value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。
- 其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
#### 订阅消费数据
```java
int total = 0;
while(true) {
TSDBResultSet rs = sub.consume();
int count = 0;
while(rs.next()) {
count++;
}
total += count;
System.out.printf("%d rows consumed, total %d\n", count, total);
Thread.sleep(1000);
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ResultBean record : records) {
process(record);
}
}
```
`consume` 方法返回一个结果集,其中包含从上次 `consume` 到目前为止的所有新数据。请务必按需选择合理的调用 `consume` 的频率(如例子中的 `Thread.sleep(1000)`),否则会给服务端造成不必要的压力。
`poll` 方法返回一个结果集,其中包含从上次 `poll` 到目前为止的所有新数据。请务必按需选择合理的调用 `poll` 的频率(如例子中的 `Duration.ofMillis(100)`),否则会给服务端造成不必要的压力。
#### 关闭订阅
```java
sub.close(true);
consumer.close()
```
`close` 方法关闭一个订阅。如果其参数为 `true` 表示保留订阅进度信息,后续可以创建同名订阅继续消费数据;如为 `false` 则不保留订阅进度。
### 关闭资源
### 使用示例如下:
```java
resultSet.close();
stmt.close();
conn.close();
```
public abstract class ConsumerLoop {
private final TaosConsumer<ResultBean> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
> `注意务必要将 connection 进行关闭`,否则会出现连接泄露。
public ConsumerLoop() throws SQLException {
Properties config = new Properties();
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("group.id", "group1");
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
this.consumer = new TaosConsumer<>(config);
this.topics = Collections.singletonList("topic_speed");
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ResultBean result);
public void pollData() throws SQLException {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ResultBean record : records) {
process(record);
}
}
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
static class ResultBean {
private Timestamp ts;
private int speed;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public int getSpeed() {
return speed;
}
public void setSpeed(int speed) {
this.speed = speed;
}
}
}
```
### 与连接池使用
@ -787,20 +859,6 @@ public static void main(String[] args) throws Exception {
> 更多 druid 使用问题请查看[官方说明](https://github.com/alibaba/druid)。
**注意事项:**
- TDengine `v1.6.4.1` 版本开始提供了一个专门用于心跳检测的函数 `select server_status()`,所以在使用连接池时推荐使用 `select server_status()` 进行 Validation Query。
如下所示,`select server_status()` 执行成功会返回 `1`。
```sql
taos> select server_status();
server_status()|
================
1 |
Query OK, 1 row(s) in set (0.000141s)
```
### 更多示例程序
示例程序源码位于 `TDengine/examples/JDBC` 下:
@ -811,7 +869,7 @@ Query OK, 1 row(s) in set (0.000141s)
- SpringJdbcTemplateSpring JdbcTemplate 中使用 taos-jdbcdriver。
- mybatisplus-demoSpringboot + Mybatis 中使用 taos-jdbcdriver。
请参考:[JDBC example](https://github.com/taosdata/TDengine/tree/develop/examples/JDBC)
请参考:[JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC)
## 最近更新记录
@ -842,7 +900,7 @@ Query OK, 1 row(s) in set (0.000141s)
**解决方法**:重新安装 64 位 JDK。
4. 其它问题请参考 [FAQ](/train-faq/faq)
4. 其它问题请参考 [FAQ](../../../train-faq/faq)
## API 参考

@ -1 +0,0 @@
Subproject commit 7ed7a97715388fa144718764d6bf20f9bfc29a12

View File

@ -42,8 +42,8 @@ enum {
TASK_STATUS__DROPPING,
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__PREPARE_RECOVER,
TASK_STATUS__RECOVERING,
TASK_STATUS__RECOVER_DOWNSTREAM,
TASK_STATUS__RECOVER_SELF,
};
enum {
@ -232,8 +232,8 @@ typedef struct {
} SStreamChildEpInfo;
typedef struct {
int32_t nodeId;
int32_t childId;
int32_t srcNodeId;
int32_t srcChildId;
int64_t stateSaveVer;
int64_t stateProcessedVer;
} SStreamCheckpointInfo;
@ -394,9 +394,6 @@ typedef struct {
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
#if 0
int64_t sourceVer;
#endif
int32_t blockNum;
SArray* dataLen; // SArray<int32_t>
SArray* data; // SArray<SRetrieveTableRsp*>
@ -426,6 +423,7 @@ typedef struct {
int32_t rspToTaskId;
} SStreamRetrieveRsp;
#if 0
typedef struct {
int64_t streamId;
int32_t taskId;
@ -462,13 +460,25 @@ int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq
int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);
typedef struct {
int64_t streamId;
} SPStreamTaskRecoverReq;
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
#endif
typedef struct {
int8_t reserved;
} SPStreamTaskRecoverRsp;
int64_t streamId;
int32_t downstreamTaskId;
int32_t taskId;
} SStreamRecoverDownstreamReq;
typedef struct {
int64_t streamId;
int32_t downstreamTaskId;
int32_t taskId;
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
} SStreamRecoverDownstreamRsp;
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
@ -479,8 +489,6 @@ int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
@ -513,6 +521,7 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta);
#ifdef __cplusplus
}

View File

@ -757,7 +757,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(J
int32_t code = taos_stmt_prepare(pStmt, str, len);
taosMemoryFreeClear(str);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
jniError("prepareStmt jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
@ -785,7 +785,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
if (code != TSDB_CODE_SUCCESS) {
(*env)->ReleaseStringUTFChars(env, jname, name);
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
jniError("bindTableName jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
@ -860,7 +860,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
jniError("tableNameTags jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
@ -926,7 +926,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
taosMemoryFreeClear(b);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
jniError("bindColData jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
@ -949,7 +949,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEn
int32_t code = taos_stmt_add_batch(pStmt);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
jniError("add batch jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
@ -973,7 +973,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
int32_t code = taos_stmt_execute(pStmt);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
jniError("excute batch jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
@ -997,7 +997,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
int32_t code = taos_stmt_close(pStmt);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
jniError("close stmt jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}

View File

@ -327,7 +327,13 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
while (pIter != NULL) {
int64_t *rid = pIter;
SRequestObj *pRequest = acquireRequest(*rid);
if (NULL == pRequest || pRequest->killed) {
if (NULL == pRequest) {
pIter = taosHashIterate(pObj->pRequests, pIter);
continue;
}
if (pRequest->killed) {
releaseRequest(*rid);
pIter = taosHashIterate(pObj->pRequests, pIter);
continue;
}

View File

@ -283,7 +283,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false);
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
}
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;

View File

@ -182,6 +182,7 @@ void taos_free_result(TAOS_RES *res) {
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId);
destroyRequest(pRequest);
} else if (TD_RES_TMQ(res)) {
SMqRspObj *pRsp = (SMqRspObj *)res;
@ -482,7 +483,7 @@ void taos_stop_query(TAOS_RES *res) {
int32_t numOfFields = taos_num_fields(pRequest);
// It is not a query, no need to stop.
if (numOfFields == 0) {
tscDebug("request %" PRIx64 " no need to be killed since not query", pRequest->requestId);
tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
return;
}
@ -852,7 +853,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
}
pRequest->code =
setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, false);
setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->code = code;

View File

@ -389,7 +389,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = buildShowVariablesRsp(rsp.variables, &pRes);
}
if (TSDB_CODE_SUCCESS == code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, false);
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
}
tFreeSShowVariablesRsp(&rsp);

View File

@ -24,7 +24,7 @@
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
static const SSysDbTableSchema dnodesSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
@ -66,7 +66,7 @@ static const SSysDbTableSchema bnodesSchema[] = {
};
static const SSysDbTableSchema clusterSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
};

View File

@ -355,7 +355,11 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1;
tsNumOfSupportVnodes = tsNumOfCores * 2;
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxShellConns", tsMaxShellConns, 10, 50000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 10, 1000000, 0) != 0) return -1;
@ -423,7 +427,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 2, 1024, 0) != 0) return -1;
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, TSDB_MAX_MSG_SIZE * 10000L);
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, 0) != 0)
return -1;

View File

@ -28,6 +28,7 @@ SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
const char *mndGetDbStr(const char *src);

View File

@ -162,8 +162,9 @@ typedef struct {
int64_t lastExecTime;
int32_t lastAction;
int32_t lastErrorNo;
tmsg_t lastMsgType;
SEpSet lastEpset;
tmsg_t lastMsgType;
tmsg_t originRpcType;
char dbname1[TSDB_TABLE_FNAME_LEN];
char dbname2[TSDB_TABLE_FNAME_LEN];
int32_t startFunc;

View File

@ -1495,8 +1495,34 @@ static const char *getCacheModelStr(int8_t cacheModel) {
return "unknown";
}
static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows, int64_t numOfTables,
bool sysDb, ESdbStatus objStatus, bool sysinfo) {
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb) {
if (pDb->cfg.replications == 1) return true;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
bool isReady = true;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid && pVgroup->replica > 1) {
bool hasLeader = false;
for (int32_t i = 0; i < pVgroup->replica; ++i) {
if (pVgroup->vnodeGid[i].role == TAOS_SYNC_STATE_LEADER) {
hasLeader = true;
}
}
if (!hasLeader) isReady = false;
}
sdbRelease(pSdb, pVgroup);
}
return isReady;
}
static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows,
int64_t numOfTables, bool sysDb, ESdbStatus objStatus, bool sysinfo) {
int32_t cols = 0;
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
char *buf = taosMemoryMalloc(bytes);
@ -1509,8 +1535,16 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
}
char *statusStr = "ready";
if (objStatus == SDB_STATUS_CREATING) statusStr = "creating";
if (objStatus == SDB_STATUS_DROPPING) statusStr = "dropping";
if (objStatus == SDB_STATUS_CREATING) {
statusStr = "creating";
} else if (objStatus == SDB_STATUS_DROPPING) {
statusStr = "dropping";
} else {
if (!sysDb && !mndIsDbReady(pMnode, pDb)) {
statusStr = "unsynced";
}
}
char statusVstr[24] = {0};
STR_WITH_SIZE_TO_VARSTR(statusVstr, statusStr, strlen(statusStr));
@ -1693,7 +1727,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
setInformationSchemaDbCfg(&infoschemaDb);
size_t numOfTables = 0;
getInfosDbMeta(NULL, &numOfTables);
dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
mndDumpDbInfoData(pMnode, pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
numOfRows += 1;
@ -1701,7 +1735,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
setPerfSchemaDbCfg(&perfschemaDb);
numOfTables = 0;
getPerfDbMeta(NULL, &numOfTables);
dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
mndDumpDbInfoData(pMnode, pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
numOfRows += 1;
pShow->sysDbRsp = true;
@ -1714,7 +1748,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) == 0) {
int32_t numOfTables = 0;
sdbTraverse(pSdb, SDB_VGROUP, mndGetTablesOfDbFp, &numOfTables, NULL, NULL);
dumpDbInfoData(pBlock, pDb, pShow, numOfRows, numOfTables, false, objStatus, sysinfo);
mndDumpDbInfoData(pMnode, pBlock, pDb, pShow, numOfRows, numOfTables, false, objStatus, sysinfo);
numOfRows++;
}

View File

@ -504,6 +504,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
#if 0
static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
SMStreamTaskRecoverReq *pReq = taosMemoryCalloc(1, sizeof(SMStreamTaskRecoverReq));
if (pReq == NULL) {
@ -540,7 +541,6 @@ static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
#if 0
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
if (pStream->isDistributed) {
int32_t lv = taosArrayGetSize(pStream->tasks);

View File

@ -124,8 +124,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->oper, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->originRpcType, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
@ -282,13 +281,12 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &exec, _OVER)
SDB_GET_INT8(pRaw, dataPos, &oper, _OVER)
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
pTrans->stage = stage;
pTrans->policy = policy;
pTrans->conflict = conflict;
pTrans->exec = exec;
pTrans->oper = oper;
SDB_GET_INT16(pRaw, dataPos, &pTrans->originRpcType, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
@ -611,6 +609,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
if (pReq != NULL) {
taosArrayPush(pTrans->pRpcArray, &pReq->info);
pTrans->originRpcType = pReq->msgType;
}
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
return pTrans;
@ -910,6 +909,23 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
}
if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
mDebug("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1);
if (pDb != NULL) {
for (int32_t j = 0; j < 12; j++) {
bool ready = mndIsDbReady(pMnode, pDb);
if (!ready) {
mDebug("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname1, j);
taosMsleep(1000);
} else {
break;
}
}
}
mndReleaseDb(pMnode, pDb);
}
tmsgSendRsp(&rspMsg);
}
}

View File

@ -777,6 +777,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
}
}
#if 0
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
@ -789,18 +790,6 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp);
return 0;
} else {
return -1;
}
}
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->rspTaskId;
@ -813,6 +802,19 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
}
#endif
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp);
return 0;
} else {
return -1;
}
}
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
@ -873,6 +875,8 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp, false);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
return;
}
@ -883,4 +887,6 @@ FAIL:
.info = pMsg->info,
};
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}

View File

@ -334,14 +334,14 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_STREAM_TASK_DISPATCH:
// return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0);
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
case TDMT_STREAM_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
/*case TDMT_STREAM_TASK_RECOVER:*/
/*return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);*/
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RECOVER_RSP:
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
/*case TDMT_STREAM_TASK_RECOVER_RSP:*/
/*return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);*/
case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
default:

View File

@ -732,18 +732,19 @@ typedef struct SStreamSessionAggOperatorInfo {
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
SSDataBlock* pRes;
STimeWindow win;
SInterval interval;
int64_t current;
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pNextRow; // SArray<SGroupValue>
bool isPrevRowSet;
bool isNextRowSet;
int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info
SSDataBlock* pRes;
STimeWindow win;
SInterval interval;
int64_t current;
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pNextRow; // SArray<SGroupValue>
SArray* pLinearInfo; // SArray<SFillLinearInfo>
bool isPrevRowSet;
bool isNextRowSet;
int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info
} STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo {

View File

@ -33,11 +33,20 @@ typedef struct SFillColInfo {
SVariant fillVal;
} SFillColInfo;
typedef struct SFillLinearInfo {
SPoint start;
SPoint end;
bool hasNull;
bool fillLastPoint;
int16_t type;
int32_t bytes;
} SFillLinearInfo;
typedef struct {
SSchema col;
char* tagVal;
} SFillTagColInfo;
typedef struct SFillInfo {
TSKEY start; // start timestamp
TSKEY end; // endKey for fill

View File

@ -669,4 +669,4 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const str
}
return pFillCol;
}
}

View File

@ -2087,6 +2087,34 @@ static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock
pSliceInfo->isNextRowSet = true;
}
static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i);
// null data should not be kept since it can not be used to perform interpolation
if (!colDataIsNull_s(pColInfoData, i)) {
int64_t startKey = *(int64_t*)colDataGetData(pTsCol, rowIndex);
int64_t endKey = *(int64_t*)colDataGetData(pTsCol, rowIndex + 1);
pLinearInfo->start.key = startKey;
pLinearInfo->end.key = endKey;
char* val;
val = colDataGetData(pColInfoData, rowIndex);
memcpy(pLinearInfo->start.val, val, pLinearInfo->bytes);
val = colDataGetData(pColInfoData, rowIndex + 1);
memcpy(pLinearInfo->end.val, val, pLinearInfo->bytes);
pLinearInfo->hasNull = false;
} else {
pLinearInfo->hasNull = true;
}
}
}
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock,
SSDataBlock* pResBlock) {
int32_t rows = pResBlock->info.rows;
@ -2115,52 +2143,41 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false);
colDataAppend(pDst, rows, (char *)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false);
colDataAppend(pDst, rows, (char *)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false);
colDataAppend(pDst, rows, (char *)&v, false);
}
pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_LINEAR: {
#if 0
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
// goto interp_exit;
}
SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot);
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint start = pLinearInfo->start;
SPoint end = pLinearInfo->end;
SPoint current = {.key = pSliceInfo->current};
current.val = taosMemoryCalloc(pLinearInfo->bytes, 1);
SPoint point1 = {.key = ts, .val = &v1};
SPoint point2 = {.key = nextTs, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
// before interp range, do not fill
if (start.key == INT64_MIN || end.key == INT64_MAX) {
break;
}
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
#endif
// TODO: pResBlock->info.rows += 1;
if (pLinearInfo->hasNull) {
colDataAppendNULL(pDst, rows);
} else {
taosGetLinearInterpolationVal(&current, pLinearInfo->type, &start, &end, pLinearInfo->type);
colDataAppend(pDst, rows, (char *)current.val, false);
}
pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_PREV: {
@ -2246,6 +2263,55 @@ static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
return TSDB_CODE_SUCCESS;
}
static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
if (pInfo->pLinearInfo != NULL) {
return TSDB_CODE_SUCCESS;
}
pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo));
if (pInfo->pNextRow == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SFillLinearInfo linearInfo = {0};
linearInfo.start.key = INT64_MIN;
linearInfo.end.key = INT64_MAX;
linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
linearInfo.hasNull = false;
linearInfo.fillLastPoint = false;
linearInfo.type = pColInfo->info.type;
linearInfo.bytes = pColInfo->info.bytes;
taosArrayPush(pInfo->pLinearInfo, &linearInfo);
}
return TSDB_CODE_SUCCESS;
}
static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
int32_t code;
code = initPrevRowsKeeper(pInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
code = initNextRowsKeeper(pInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
code = initFillLinearInfo(pInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
@ -2278,13 +2344,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break;
}
int32_t code;
code = initPrevRowsKeeper(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
code = initNextRowsKeeper(pSliceInfo, pBlock);
int32_t code = initKeeperInfo(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
@ -2312,46 +2372,103 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pResBlock->info.rows += 1;
doKeepPrevRows(pSliceInfo, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
// for linear interpolation, always fill value between this and next points;
// if its the first point in data block, also fill values between previous(if there's any) and this point;
// if its the last point in data block, no need to fill, but reserve this point as the start value for next data block.
if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
doKeepLinearInfo(pSliceInfo, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (i < pBlock->info.rows - 1) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
} else if (ts < pSliceInfo->current) {
// in case interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
doKeepPrevRows(pSliceInfo, pBlock, i);
if (i < pBlock->info.rows - 1) {
// in case interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i + 1);
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
} else {
// ignore current row, and do nothing
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
} else {
// ignore current row, and do nothing
} else { // it is the last row of current block
}
} else { // non-linear interpolation
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
} else if (ts < pSliceInfo->current) {
// in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
doKeepPrevRows(pSliceInfo, pBlock, i);
if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
doKeepLinearInfo(pSliceInfo, pBlock, i);
//pSliceInfo->current =
// taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (i < pBlock->info.rows - 1) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
} else {
// ignore current row, and do nothing
}
} else { // it is the last row of current block
}
} else { // non-linear interpolation
if (i < pBlock->info.rows - 1) {
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i + 1);
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
} else {
// ignore current row, and do nothing
}
} else { // it is the last row of current block
doKeepPrevRows(pSliceInfo, pBlock, i);
}
} else { // it is the last row of current block
doKeepPrevRows(pSliceInfo, pBlock, i);
}
} else { // ts > pSliceInfo->current
// in case interpolation window starts and ends between two datapoints, fill(next) need to interpolate
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
@ -2380,11 +2497,39 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pResBlock->info.rows += 1;
doKeepPrevRows(pSliceInfo, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
doKeepLinearInfo(pSliceInfo, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (i < pBlock->info.rows - 1) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
} else {
// ignore current row, and do nothing
}
} else { // it is the last row of current block
}
} else { // non-linear interpolation
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
}
@ -2446,6 +2591,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pPrevRow = NULL;
pInfo->pNextRow = NULL;
pInfo->pLinearInfo = NULL;
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues);
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->win = pInterpPhyNode->timeRange;

View File

@ -819,6 +819,10 @@ SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt) {
pOptions->numOfVgroups = TSDB_DEFAULT_VN_PER_DB;
pOptions->singleStable = TSDB_DEFAULT_DB_SINGLE_STABLE;
pOptions->schemaless = TSDB_DEFAULT_DB_SCHEMALESS;
pOptions->walRetentionPeriod = TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD;
pOptions->walRetentionSize = TSDB_DEFAULT_DB_WAL_RETENTION_SIZE;
pOptions->walRollPeriod = TSDB_DEFAULT_DB_WAL_ROLL_PERIOD;
pOptions->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
return (SNode*)pOptions;
}
@ -846,6 +850,10 @@ SNode* createAlterDatabaseOptions(SAstCreateContext* pCxt) {
pOptions->numOfVgroups = -1;
pOptions->singleStable = -1;
pOptions->schemaless = -1;
pOptions->walRetentionPeriod = -1;
pOptions->walRetentionSize = -1;
pOptions->walRollPeriod = -1;
pOptions->walSegmentSize = -1;
return (SNode*)pOptions;
}

View File

@ -111,6 +111,10 @@ TEST_F(ParserInitialCTest, createDatabase) {
expect.numOfVgroups = TSDB_DEFAULT_VN_PER_DB;
expect.numOfStables = TSDB_DEFAULT_DB_SINGLE_STABLE;
expect.schemaless = TSDB_DEFAULT_DB_SCHEMALESS;
expect.walRetentionPeriod = TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD;
expect.walRetentionSize = TSDB_DEFAULT_DB_WAL_RETENTION_SIZE;
expect.walRollPeriod = TSDB_DEFAULT_DB_WAL_ROLL_PERIOD;
expect.walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
};
auto setDbBufferFunc = [&](int32_t buffer) { expect.buffer = buffer; };

View File

@ -411,7 +411,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
if (pJob->fetched) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode));
SCH_ERR_RET(rspCode);
SCH_ERR_JRET(rspCode);
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);

View File

@ -154,6 +154,8 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
return;
}
SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", errCode:0x%x", *jobId, errCode);
schHandleJobDrop(pJob, errCode);
schReleaseJob(*jobId);

View File

@ -229,6 +229,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
return 0;
}
#if 0
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
@ -267,6 +268,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
return 0;
}
#endif
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);

View File

@ -19,23 +19,23 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
void* exec = pTask->exec.executor;
// set input
SStreamQueueItem* pItem = (SStreamQueueItem*)data;
const SStreamQueueItem* pItem = (const SStreamQueueItem*)data;
if (pItem->type == STREAM_INPUT__GET_RES) {
SStreamTrigger* pTrigger = (SStreamTrigger*)data;
const SStreamTrigger* pTrigger = (const SStreamTrigger*)data;
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data;
qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks;
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks;
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
SArray* blocks = pMerged->reqs;
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data;
SArray* blocks = pMerged->reqs;
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
} else {
@ -51,8 +51,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
}
if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0};
SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
SSDataBlock block = {0};
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
block.info.type = STREAM_PULL_OVER;
@ -200,14 +200,13 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId);
streamFreeQitem(data);
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
// TODO log failed ver
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
streamFreeQitem(data);
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
@ -224,10 +223,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
streamFreeQitem(data);
return -1;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/
} else {
taosArrayDestroy(pRes);
}
streamFreeQitem(data);
}
return 0;
}

View File

@ -48,8 +48,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
if (streamLoadTasks(pMeta) < 0) {
goto _err;
}
return pMeta;
_err:
if (pMeta->path) taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->db) tdbClose(pMeta->db);
taosMemoryFree(pMeta);
return NULL;
}

View File

@ -15,6 +15,7 @@
#include "streamInc.h"
#if 0
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@ -86,17 +87,18 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp
tEndDecode(pDecoder);
return 0;
}
#endif
int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) {
if (tEncodeI32(pEncoder, pCheckpoint->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->srcNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->srcChildId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->stateProcessedVer) < 0) return -1;
return 0;
}
int32_t tDecodeSStreamCheckpointInfo(SDecoder* pDecoder, SStreamCheckpointInfo* pCheckpoint) {
if (tDecodeI32(pDecoder, &pCheckpoint->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->srcNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->srcChildId) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->stateProcessedVer) < 0) return -1;
return 0;
}
@ -130,14 +132,13 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh
return 0;
}
int32_t streamCheckSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL;
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
int32_t sz = taosArrayGetSize(pTask->checkpointInfo);
SStreamMultiVgCheckpointInfo checkpoint;
checkpoint.checkpointId = 0;
checkpoint.checkpointId = atomic_fetch_add_32(&pTask->nextCheckId, 1);
checkpoint.checkTs = taosGetTimestampMs();
checkpoint.streamId = pTask->streamId;
checkpoint.taskId = pTask->taskId;
@ -169,16 +170,21 @@ int32_t streamCheckSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
goto FAIL;
}
int32_t sz = taosArrayGetSize(pTask->checkpointInfo);
for (int32_t i = 0; i < sz; i++) {
SStreamCheckpointInfo* pCheck = taosArrayGet(pTask->checkpointInfo, i);
pCheck->stateSaveVer = pCheck->stateProcessedVer;
}
taosMemoryFree(buf);
return 0;
FAIL:
if (buf) taosMemoryFree(buf);
return -1;
return 0;
}
int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
// load status
int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
void* pVal = NULL;
int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
@ -196,29 +202,77 @@ int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
int32_t streamCheckAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t streamSaveSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
return streamSaveStateInfo(pMeta, pTask);
}
int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
return streamLoadStateInfo(pMeta, pTask);
}
int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
// save and copy state
// TODO save and copy state
// save state info
if (streamSaveStateInfo(pMeta, pTask) < 0) {
return -1;
}
return 0;
}
int32_t streamFetchDownstreamStatus(SStreamTask* pTask) {
// set self status to recover_phase1
// build fetch status msg
// send fetch msg
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
} else {
ASSERT(0);
}
return 0;
}
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, void* msg) {
// if failed, set timer and retry
// if successful
// add rsp state to partial recover hash
// if complete, begin actual recover
return 0;
}
int32_t streamRecoverAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
// try recover sink level
// after all sink level recovered, choose current state backend to recover
// recover sink level
// after all sink level recovered
// choose suitable state to recover
return 0;
}
int32_t streamCheckSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t streamSaveSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
// try recover agg level
//
// TODO: save and copy state
return 0;
}
int32_t streamRecoverSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
// if totLevel == 3
// fetch agg state
// recover from local state to agg state, not send msg
// recover from agg state to most recent log v1
// enable input queue, set status recover_phase2
// recover from v1 to queue msg v2, set status normal
// if totLevel == 2
// fetch sink state
// recover from local state to sink state v1, send msg
// enable input queue, set status recover_phase2
// recover from v1 to queue msg v2, set status normal
return 0;
}

View File

@ -1271,6 +1271,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd);
return -1;
} else {
// change error code for taos client driver if retryCnt exceeds limit
if (0 == strncmp(pTransInst->label, "TSC", strlen("TSC"))) pResp->code = TSDB_CODE_APP_NOT_READY;
}
}
}
@ -1381,6 +1384,7 @@ int transReleaseCliHandle(void* handle) {
tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
taosMemoryFree(cmsg);
return -1;
}
return 0;

View File

@ -703,7 +703,11 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
int64_t sentbytes;
while (leftbytes > 0) {
#ifdef _TD_ARM_32
sentbytes = sendfile(pFileOut->fd, pFileIn->fd, (long int*)offset, leftbytes);
#else
sentbytes = sendfile(pFileOut->fd, pFileIn->fd, offset, leftbytes);
#endif
if (sentbytes == -1) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue;

View File

@ -35,7 +35,7 @@ typedef struct SCacheNode {
uint64_t addedTime; // the added time when this element is added or updated into cache
uint64_t lifespan; // life duration when this element should be remove from cache
int64_t expireTime; // expire time
uint64_t signature;
void* signature;
struct STrashElem *pTNodeHeader; // point to trash node head
uint16_t keyLen : 15; // max key size: 32kb
bool inTrashcan : 1; // denote if it is in trash or not
@ -208,7 +208,7 @@ static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
* @param pNode data node
*/
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheNode *pNode) {
if (pNode->signature != (uint64_t)pNode) {
if (pNode->signature != pNode) {
uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
return;
}
@ -226,7 +226,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheNode *
}
static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STrashElem *pElem) {
if (pElem->pData->signature != (uint64_t)pElem->pData) {
if (pElem->pData->signature != pElem->pData) {
uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return NULL;
}
@ -494,7 +494,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if (pCacheObj == NULL || data == NULL) return NULL;
SCacheNode *ptNode = (SCacheNode *)((char *)data - sizeof(SCacheNode));
if (ptNode->signature != (uint64_t)ptNode) {
if (ptNode->signature != ptNode) {
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
return NULL;
}
@ -511,7 +511,7 @@ void *taosCacheTransferData(SCacheObj *pCacheObj, void **data) {
if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
SCacheNode *ptNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
if (ptNode->signature != (uint64_t)ptNode) {
if (ptNode->signature != ptNode) {
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
return NULL;
}
@ -539,7 +539,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
// It happens when there is only one object in the cache, and two threads which has referenced this object
// start to free the it simultaneously [TD-1569].
SCacheNode *pNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
if (pNode->signature != (uint64_t)pNode) {
if (pNode->signature != pNode) {
uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
return;
}
@ -728,7 +728,7 @@ SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pDat
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
pNewNode->lifespan = duration;
pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
pNewNode->signature = (uint64_t)pNewNode;
pNewNode->signature = pNewNode;
pNewNode->size = (uint32_t)sizeInBytes;
return pNewNode;

View File

@ -24,12 +24,9 @@ class TDSimClient:
self.cfgDict = {
"numOfLogLines": "100000000",
"numOfThreadsPerCore": "2.0",
"locale": "en_US.UTF-8",
"charset": "UTF-8",
"asyncLog": "0",
"anyIp": "0",
"sdbDebugFlag": "135",
"rpcDebugFlag": "135",
"tmrDebugFlag": "131",
"cDebugFlag": "135",

View File

@ -30,7 +30,6 @@ class TDSimClient:
self.path = path
self.cfgDict = {
"numOfLogLines": "100000000",
"numOfThreadsPerCore": "2.0",
"locale": "en_US.UTF-8",
"charset": "UTF-8",
"asyncLog": "0",
@ -40,6 +39,7 @@ class TDSimClient:
"udebugFlag": "143",
"jnidebugFlag": "143",
"qdebugFlag": "143",
"supportVnodes": "1024",
"telemetryReporting": "0",
}
@ -117,8 +117,6 @@ class TDDnode:
self.valgrind = 0
self.remoteIP = ""
self.cfgDict = {
"walLevel": "2",
"fsync": "1000",
"monitor": "0",
"maxShellConns": "30000",
"locale": "en_US.UTF-8",
@ -139,6 +137,7 @@ class TDDnode:
"qdebugFlag": "143",
"numOfLogLines": "100000000",
"statusInterval": "1",
"supportVnodes": "1024",
"telemetryReporting": "0"
}

View File

@ -4,7 +4,7 @@ rm -rf /tmp/udf/libbitand.so /tmp/udf/libsqrsum.so
mkdir -p /tmp/udf
echo "compile udf bit_and and sqr_sum"
gcc -fPIC -shared sh/bit_and.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libbitand.so
gcc -fPIC -shared sh/sqr_sum.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libsqrsum.so
gcc -fPIC -shared sh/l2norm.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libl2norm.so
echo "debug show /tmp/udf/*.so"
ls /tmp/udf/*.so

View File

@ -5,22 +5,22 @@
#include "taosudf.h"
DLL_EXPORT int32_t sqr_sum_init() {
DLL_EXPORT int32_t l2norm_init() {
return 0;
}
DLL_EXPORT int32_t sqr_sum_destroy() {
DLL_EXPORT int32_t l2norm_destroy() {
return 0;
}
DLL_EXPORT int32_t sqr_sum_start(SUdfInterBuf *buf) {
DLL_EXPORT int32_t l2norm_start(SUdfInterBuf *buf) {
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(double);
buf->numOfResult = 0;
return 0;
}
DLL_EXPORT int32_t sqr_sum(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
double sumSquares = *(double*)interBuf->buf;
int8_t numNotNull = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
@ -67,7 +67,7 @@ DLL_EXPORT int32_t sqr_sum(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInt
return 0;
}
DLL_EXPORT int32_t sqr_sum_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
DLL_EXPORT int32_t l2norm_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
if (buf->numOfResult == 0) {
resultData->numOfResult = 0;
return 0;

View File

@ -111,16 +111,16 @@ endi
if $data21_db != 1000 then # wal_level fsyncperiod
return -1
endi
if $data22_db != 0 then #
if $data22_db != 172800 then # wal_retention_period
return -1
endi
if $data23_db != 0 then #
if $data23_db != -1 then # wal_retention_size
return -1
endi
if $data24_db != 0 then #
if $data24_db != 86400 then # wal_roll_period
return -1
endi
if $data25_db != 0 then #
if $data25_db != 0 then # wal_segment_size
return -1
endi

View File

@ -24,10 +24,10 @@ if $system_content == Windows_NT then
endi
if $system_content == Windows_NT then
sql create function bit_and as 'C:\\Windows\\Temp\\bitand.dll' outputtype int bufSize 8;
sql create aggregate function sqr_sum as 'C:\\Windows\\Temp\\sqrsum.dll' outputtype double bufSize 8;
sql create aggregate function l2norm as 'C:\\Windows\\Temp\\l2norm.dll' outputtype double bufSize 8;
else
sql create function bit_and as '/tmp/udf/libbitand.so' outputtype int bufSize 8;
sql create aggregate function sqr_sum as '/tmp/udf/libsqrsum.so' outputtype double bufSize 8;
sql create aggregate function l2norm as '/tmp/udf/libl2norm.so' outputtype double bufSize 8;
endi
sql show functions;
if $rows != 2 then
@ -44,7 +44,7 @@ if $data10 != 2 then
return -1
endi
sql select sqr_sum(f) from t;
sql select l2norm(f) from t;
if $rows != 1 then
print expect 1, actual $rows
return -1
@ -66,7 +66,7 @@ if $data10 != 1 then
return -1
endi
sql select sqr_sum(f1, f2) from t2;
sql select l2norm(f1, f2) from t2;
if $rows != 1 then
return -1
endi
@ -95,7 +95,7 @@ if $data30 != NULL then
return -1
endi
sql select sqr_sum(f1, f2) from t2;
sql select l2norm(f1, f2) from t2;
print $rows, $data00
if $rows != 1 then
return -1
@ -105,7 +105,7 @@ if $data00 != 2.645751311 then
endi
sql insert into t2 values(now+4s, 4, 8)(now+5s, 5, 9);
sql select sqr_sum(f1-f2), sqr_sum(f1+f2) from t2;
sql select l2norm(f1-f2), l2norm(f1+f2) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1;
@ -117,7 +117,7 @@ if $data01 != 18.547236991 then
return -1
endi
sql select sqr_sum(bit_and(f2, f1)), sqr_sum(bit_and(f1, f2)) from t2;
sql select l2norm(bit_and(f2, f1)), l2norm(bit_and(f1, f2)) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1
@ -129,7 +129,7 @@ if $data01 != 1.414213562 then
return -1
endi
sql select sqr_sum(f2) from udf.t2 group by 1-bit_and(f1, f2) order by 1-bit_and(f1,f2);
sql select l2norm(f2) from udf.t2 group by 1-bit_and(f1, f2) order by 1-bit_and(f1,f2);
print $rows , $data00 , $data10 , $data20
if $rows != 3 then
return -1
@ -149,10 +149,10 @@ sql show functions;
if $rows != 1 then
return -1
endi
if $data00 != @sqr_sum@ then
if $data00 != @l2norm@ then
return -1
endi
sql drop function sqr_sum;
sql drop function l2norm;
sql show functions;
if $rows != 0 then
return -1

@ -1 +0,0 @@
Subproject commit 3c7dafeea3e558968165b73bee0f51024898e3da