Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

This commit is contained in:
Minghao Li 2022-07-28 15:53:06 +08:00
commit a75745de44
21 changed files with 419 additions and 1306 deletions

View File

@ -111,7 +111,7 @@ TDengine REST API 详情请参考[官方文档](/reference/rest-api/)。
这条命令很快完成 1 亿条记录的插入。具体时间取决于硬件性能。
taosBenchmark 命令本身带有很多选项,配置表的数目、记录条数等等,您可以设置不同参数进行体验,请执行 `taosBenchmark --help` 详细列出。taosBenchmark 详细使用方法请参照 [taosBenchmark 参考手册](../reference/taosbenchmark)。
taosBenchmark 命令本身带有很多选项,配置表的数目、记录条数等等,您可以设置不同参数进行体验,请执行 `taosBenchmark --help` 详细列出。taosBenchmark 详细使用方法请参照 [taosBenchmark 参考手册](../../reference/taosbenchmark)。
## 体验查询

View File

@ -245,7 +245,7 @@ select * from t;
Query OK, 2 row(s) in set (0.003128s)
```
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../reference/taos-shell/)
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../../reference/taos-shell/)
## 使用 taosBenchmark 体验写入速度

View File

@ -218,7 +218,7 @@ GROUP BY 子句中的表达式可以包含表或视图中的任何列,这些
PARTITION BY 子句是 TDengine 特色语法,按 part_list 对数据进行切分,在每个切分的分片中进行计算。
详见 [TDengine 特色查询](./distinguished)
详见 [TDengine 特色查询](../distinguished)
## ORDER BY

View File

@ -16,15 +16,15 @@ toc_max_heading_level: 4
SELECT ABS(field_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:获得指定列的绝对值
**功能说明**:获得指定字段的绝对值。
**返回结果类型**如果输入值为整数,输出值是 UBIGINT 类型。如果输入值是 FLOAT/DOUBLE 数据类型,输出值是 DOUBLE 数据类型
**返回结果类型**与指定字段的原始数据类型一致
**适用数据类型**:数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
**使用说明**只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
@ -34,15 +34,15 @@ SELECT ABS(field_name) FROM { tb_name | stb_name } [WHERE clause]
SELECT ACOS(field_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:获得指定列的反余弦结果
**功能说明**:获得指定字段的反余弦结果。
**返回结果类型**DOUBLE。如果输入值为 NULL输出值也为 NULL
**返回结果类型**DOUBLE。
**适用数据类型**:数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
**使用说明**只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
@ -52,15 +52,15 @@ SELECT ACOS(field_name) FROM { tb_name | stb_name } [WHERE clause]
SELECT ASIN(field_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:获得指定列的反正弦结果
**功能说明**:获得指定字段的反正弦结果。
**返回结果类型**DOUBLE。如果输入值为 NULL输出值也为 NULL
**返回结果类型**DOUBLE。
**适用数据类型**:数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
**使用说明**只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
@ -71,15 +71,15 @@ SELECT ASIN(field_name) FROM { tb_name | stb_name } [WHERE clause]
SELECT ATAN(field_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:获得指定列的反正切结果
**功能说明**:获得指定字段的反正切结果。
**返回结果类型**DOUBLE。如果输入值为 NULL输出值也为 NULL
**返回结果类型**DOUBLE。
**适用数据类型**:数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
**使用说明**只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
@ -90,20 +90,17 @@ SELECT ATAN(field_name) FROM { tb_name | stb_name } [WHERE clause]
SELECT CEIL(field_name) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:获得指定的向上取整数的结果。
**功能说明**:获得指定字段的向上取整数的结果。
**返回结果类型**:与指定列的原始数据类型一致。例如,如果指定列的原始数据类型为 Float那么返回的数据类型也为 Float如果指定列的原始数据类型为 Double那么返回的数据类型也为 Double
**返回结果类型**:与指定字段的原始数据类型一致
**适用数据类型**:数值类型。
**适用于**: 普通表、超级表。
**适用于**: 表和超级表。
**嵌套子查询支持**:适用于内层查询和外层查询。
**使用说明**:
- 支持 +、-、\*、/ 运算,如 ceil(col1) + ceil(col2)。
- 只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
**使用说明**: 只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
#### COS
@ -111,15 +108,15 @@ SELECT CEIL(field_name) FROM { tb_name | stb_name } [WHERE clause];
SELECT COS(field_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:获得指定列的余弦结果
**功能说明**:获得指定字段的余弦结果。
**返回结果类型**DOUBLE。如果输入值为 NULL输出值也为 NULL
**返回结果类型**DOUBLE。
**适用数据类型**:数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
**使用说明**只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
@ -129,24 +126,24 @@ SELECT COS(field_name) FROM { tb_name | stb_name } [WHERE clause]
SELECT FLOOR(field_name) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:获得指定的向下取整数的结果。
**功能说明**:获得指定字段的向下取整数的结果。
其他使用说明参见 CEIL 函数描述。
#### LOG
```sql
SELECT LOG(field_name, base) FROM { tb_name | stb_name } [WHERE clause]
SELECT LOG(field_name[, base]) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:获得指定列对于底数 base 的对数
**功能说明**:获得指定字段对于底数 base 的对数。如果 base 参数省略,则返回指定字段的自然对数值。
**返回结果类型**DOUBLE。如果输入值为 NULL输出值也为 NULL
**返回结果类型**DOUBLE。
**适用数据类型**:数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
**使用说明**只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
@ -157,15 +154,15 @@ SELECT LOG(field_name, base) FROM { tb_name | stb_name } [WHERE clause]
SELECT POW(field_name, power) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:获得指定列的指数为 power 的幂
**功能说明**:获得指定字段的指数为 power 的幂。
**返回结果类型**DOUBLE。如果输入值为 NULL输出值也为 NULL
**返回结果类型**DOUBLE。
**适用数据类型**:数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
**使用说明**只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
@ -176,7 +173,7 @@ SELECT POW(field_name, power) FROM { tb_name | stb_name } [WHERE clause]
SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:获得指定的四舍五入的结果。
**功能说明**:获得指定字段的四舍五入的结果。
其他使用说明参见 CEIL 函数描述。
@ -186,15 +183,15 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
SELECT SIN(field_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:获得指定列的正弦结果
**功能说明**:获得指定字段的正弦结果。
**返回结果类型**DOUBLE。如果输入值为 NULL输出值也为 NULL
**返回结果类型**DOUBLE。
**适用数据类型**:数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
**使用说明**只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
@ -204,15 +201,15 @@ SELECT SIN(field_name) FROM { tb_name | stb_name } [WHERE clause]
SELECT SQRT(field_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:获得指定列的平方根
**功能说明**:获得指定字段的平方根。
**返回结果类型**DOUBLE。如果输入值为 NULL输出值也为 NULL
**返回结果类型**DOUBLE。
**适用数据类型**:数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
**使用说明**只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
@ -222,15 +219,15 @@ SELECT SQRT(field_name) FROM { tb_name | stb_name } [WHERE clause]
SELECT TAN(field_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:获得指定列的正切结果
**功能说明**:获得指定字段的正切结果。
**返回结果类型**DOUBLE。如果输入值为 NULL输出值也为 NULL
**返回结果类型**DOUBLE。
**适用数据类型**:数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
**使用说明**只能与普通列选择Selection、投影Projection函数一起使用不能与聚合Aggregation函数一起使用。
@ -246,13 +243,13 @@ SELECT CHAR_LENGTH(str|column) FROM { tb_name | stb_name } [WHERE clause]
**功能说明**:以字符计数的字符串长度。
**返回结果类型**INT。如果输入值为NULL输出值为NULL。
**返回结果类型**BIGINT。
**适用数据类型**VARCHAR, NCHAR
**适用数据类型**VARCHAR, NCHAR
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
#### CONCAT
@ -262,13 +259,13 @@ SELECT CONCAT(str1|column1, str2|column2, ...) FROM { tb_name | stb_name } [WHER
**功能说明**:字符串连接函数。
**返回结果类型**:如果所有参数均为 VARCHAR 类型,则结果类型为 VARCHAR。如果参数包含NCHAR类型则结果类型为NCHAR。如果输入值为NULL输出值为NULL。
**返回结果类型**:如果所有参数均为 VARCHAR 类型,则结果类型为 VARCHAR。如果参数包含NCHAR类型则结果类型为NCHAR。如果参数包含NULL值输出值为NULL。
**适用数据类型**VARCHAR, NCHAR。 该函数最小参数个数为2个最大参数个数为8个。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
#### CONCAT_WS
@ -279,13 +276,13 @@ SELECT CONCAT_WS(separator, str1|column1, str2|column2, ...) FROM { tb_name | st
**功能说明**:带分隔符的字符串连接函数。
**返回结果类型**如果所有参数均为VARCHAR类型则结果类型为VARCHAR。如果参数包含NCHAR类型则结果类型为NCHAR。如果输入值为NULL输出值为NULL。如果separator值不为NULL其他输入为NULL输出为空串
**返回结果类型**如果所有参数均为VARCHAR类型则结果类型为VARCHAR。如果参数包含NCHAR类型则结果类型为NCHAR。如果参数包含NULL值则输出值为NULL
**适用数据类型**VARCHAR, NCHAR。 该函数最小参数个数为3个最大参数个数为9个。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
#### LENGTH
@ -296,13 +293,13 @@ SELECT LENGTH(str|column) FROM { tb_name | stb_name } [WHERE clause]
**功能说明**:以字节计数的字符串长度。
**返回结果类型**INT。
**返回结果类型**BIGINT。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
#### LOWER
@ -313,13 +310,13 @@ SELECT LOWER(str|column) FROM { tb_name | stb_name } [WHERE clause]
**功能说明**:将字符串参数值转换为全小写字母。
**返回结果类型**同输入类型。如果输入值为NULL输出值为NULL
**返回结果类型**与输入字段的原始类型相同
**适用数据类型**输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列
**适用数据类型**VARCHAR, NCHAR
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
#### LTRIM
@ -330,13 +327,13 @@ SELECT LTRIM(str|column) FROM { tb_name | stb_name } [WHERE clause]
**功能说明**:返回清除左边空格后的字符串。
**返回结果类型**同输入类型。如果输入值为NULL输出值为NULL
**返回结果类型**与输入字段的原始类型相同
**适用数据类型**输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列
**适用数据类型**VARCHAR, NCHAR
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
#### RTRIM
@ -347,13 +344,13 @@ SELECT LTRIM(str|column) FROM { tb_name | stb_name } [WHERE clause]
**功能说明**:返回清除右边空格后的字符串。
**返回结果类型**同输入类型。如果输入值为NULL输出值为NULL
**返回结果类型**与输入字段的原始类型相同
**适用数据类型**输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列
**适用数据类型**VARCHAR, NCHAR
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
#### SUBSTR
@ -362,15 +359,15 @@ SELECT LTRIM(str|column) FROM { tb_name | stb_name } [WHERE clause]
SELECT SUBSTR(str,pos[,len]) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:从源字符串 str 中的指定位置 pos 开始取一个长度为 len 的子串并返回。
**功能说明**:从源字符串 str 中的指定位置 pos 开始取一个长度为 len 的子串并返回。如果输入参数 len 被忽略,返回的子串包含从 pos 开始的整个字串。
**返回结果类型**同输入类型。如果输入值为NULL输出值为NULL
**返回结果类型**与输入字段的原始类型相同
**适用数据类型**输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。输入参数pos可以为正数也可以为负数。如果pos是正数表示开始位置从字符串开头正数计算。如果pos为负数表示开始位置从字符串结尾倒数计算。如果输入参数len被忽略返回的子串包含从pos开始的整个字串。
**适用数据类型**VARCHAR, NCHAR。输入参数 pos 可以为正数,也可以为负数。如果 pos 是正数,表示开始位置从字符串开头正数计算。如果 pos 为负数,表示开始位置从字符串结尾倒数计算。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
#### UPPER
@ -381,13 +378,13 @@ SELECT UPPER(str|column) FROM { tb_name | stb_name } [WHERE clause]
**功能说明**:将字符串参数值转换为全大写字母。
**返回结果类型**同输入类型。如果输入值为NULL输出值为NULL
**返回结果类型**与输入字段的原始类型相同
**适用数据类型**输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列
**适用数据类型**VARCHAR, NCHAR
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表
**适用于**: 表和超级表
### 转换函数
@ -400,16 +397,19 @@ SELECT UPPER(str|column) FROM { tb_name | stb_name } [WHERE clause]
SELECT CAST(expression AS type_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:数据类型转换函数,输入参数 expression 支持普通列、常量、标量函数及它们之间的四则运算,只适用于 select 子句中。
**功能说明**:数据类型转换函数,返回 expression 转换为 type_name 指定的类型后的结果。只适用于 select 子句中。
**返回结果类型**CAST 中指定的类型type_name),可以是 BIGINT、BIGINT UNSIGNED、BINARY、VARCHAR、NCHAR和TIMESTAMP
**返回结果类型**CAST 中指定的类型type_name)
**适用数据类型**:输入参数 expression 的类型可以是BLOB、MEDIUMBLOB和JSON外的所有类型
**适用数据类型**:输入参数 expression 的类型可以是BLOB、MEDIUMBLOB和JSON外的所有类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表。
**使用说明**
- 对于不能支持的类型转换会直接报错。
- 如果输入值为NULL则输出值也为NULL。
- 对于类型支持但某些值无法正确转换的情况对应的转换后的值以转换函数输出为准。目前可能遇到的几种情况:
1字符串类型转换数值类型时可能出现的无效字符情况例如"a"可能转为0但不会报错。
2转换到数值类型时数值大于type_name可表示的范围时则会溢出但不会报错。
@ -418,20 +418,23 @@ SELECT CAST(expression AS type_name) FROM { tb_name | stb_name } [WHERE clause]
#### TO_ISO8601
```sql
SELECT TO_ISO8601(ts_val | ts_col) FROM { tb_name | stb_name } [WHERE clause];
SELECT TO_ISO8601(ts[, timezone]) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:将 UNIX 时间戳转换成为 ISO8601 标准的日期时间格式,并附加客户端时区信息。
**功能说明**:将 UNIX 时间戳转换成为 ISO8601 标准的日期时间格式,并附加时区信息。timezone 参数允许用户为输出结果指定附带任意时区信息。如果 timezone 参数省略,输出结果附带当前客户端的系统时区信息。
**返回结果数据类型**VARCHAR 类型。
**适用数据类型**UNIX 时间戳常量或是 TIMESTAMP 类型的列
**适用数据类型**INTEGER, TIMESTAMP。
**适用于**:表、超级表。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表。
**使用说明**
- 如果输入是 UNIX 时间戳常量,返回格式精度由时间戳的位数决定;
- timezone 参数允许输入的时区格式为: [z/Z, +/-hhmm, +/-hh, +/-hh:mm]。例如TO_ISO8601(1, "+00:00")。
- 如果输入是表示 UNIX 时间戳的整形,返回格式精度由时间戳的位数决定;
- 如果输入是 TIMSTAMP 类型的列,返回格式的时间戳精度与当前 DATABASE 设置的时间精度一致。
@ -443,32 +446,34 @@ SELECT TO_JSON(str_literal) FROM { tb_name | stb_name } [WHERE clause];
**功能说明**: 将字符串常量转换为 JSON 类型。
**返回结果数据类型**: JSON
**返回结果数据类型**: JSON
**适用数据类型**: JSON 字符串,形如 '{ "literal" : literal }'。'{}'表示空值。键必须为字符串字面量值可以为数值字面量、字符串字面量、布尔字面量或空值字面量。str_literal中不支持转义符。
**适用于**: 表和超级表
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**: 表和超级表。
#### TO_UNIXTIMESTAMP
```sql
SELECT TO_UNIXTIMESTAMP(datetime_string | ts_col) FROM { tb_name | stb_name } [WHERE clause];
SELECT TO_UNIXTIMESTAMP(datetime_string) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:将日期时间格式的字符串转换成为 UNIX 时间戳。
**返回结果数据类型**长整型 INT64
**返回结果数据类型**BIGINT
**应用字段**字符串常量或是 VARCHAR/NCHAR 类型的列
**应用字段**VARCHAR, NCHAR
**适用于**:表、超级表。
**嵌套子查询支持**:适用于内层查询和外层查询。
**适用于**:表和超级表。
**使用说明**
- 输入的日期时间字符串须符合 ISO8601/RFC3339 标准,无法转换的字符串格式将返回 0
- 输入的日期时间字符串须符合 ISO8601/RFC3339 标准,无法转换的字符串格式将返回 NULL
- 返回的时间戳精度与当前 DATABASE 设置的时间精度一致。
@ -488,11 +493,13 @@ INSERT INTO tb_name VALUES (NOW(), ...);
**功能说明**:返回客户端当前系统时间。
**返回结果数据类型**TIMESTAMP 时间戳类型
**返回结果数据类型**TIMESTAMP。
**应用字段**:在 WHERE 或 INSERT 语句中使用时只能作用于 TIMESTAMP 类型的字段。
**适用于**:表、超级表。
**适用于**:表和超级表。
**嵌套子查询支持**:适用于内层查询和外层查询。
**使用说明**
@ -504,40 +511,42 @@ INSERT INTO tb_name VALUES (NOW(), ...);
#### TIMEDIFF
```sql
SELECT TIMEDIFF(ts_val1 | datetime_string1 | ts_col1, ts_val2 | datetime_string2 | ts_col2 [, time_unit]) FROM { tb_name | stb_name } [WHERE clause];
SELECT TIMEDIFF(ts | datetime_string1, ts | datetime_string2 [, time_unit]) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:计算两个时间戳之间的差值,并近似到时间单位 time_unit 指定的精度。
**返回结果数据类型**长整型 INT64
**返回结果数据类型**BIGINT。输入包含不符合时间日期格式字符串则返回 NULL
**应用字段**UNIX 时间戳,日期时间格式的字符串,或者 TIMESTAMP 类型的列
**应用字段**表示 UNIX 时间戳的 BIGINT, TIMESTAMP 类型,或符合日期时间格式的 VARCHAR, NCHAR 类型
**适用于**:表、超级表。
**嵌套子查询支持**:适用于内层查询和外层查询。
**使用说明**
- 支持的时间单位 time_unit 如下:
1u(微秒)1a(毫秒)1s(秒)1m(分)1h(小时)1d(天)。
1b(纳秒), 1u(微秒)1a(毫秒)1s(秒)1m(分)1h(小时)1d(天), 1w(周)。
- 如果时间单位 time_unit 未指定, 返回的时间差值精度与当前 DATABASE 设置的时间精度一致。
#### TIMETRUNCATE
```sql
SELECT TIMETRUNCATE(ts_val | datetime_string | ts_col, time_unit) FROM { tb_name | stb_name } [WHERE clause];
SELECT TIMETRUNCATE(ts | datetime_string , time_unit) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:将时间戳按照指定时间单位 time_unit 进行截断。
**返回结果数据类型**TIMESTAMP 时间戳类型
**返回结果数据类型**TIMESTAMP。
**应用字段**UNIX 时间戳,日期时间格式的字符串,或者 TIMESTAMP 类型的列
**应用字段**表示 UNIX 时间戳的 BIGINT, TIMESTAMP 类型,或符合日期时间格式的 VARCHAR, NCHAR 类型
**适用于**:表、超级表。
**使用说明**
- 支持的时间单位 time_unit 如下:
1u(微秒)1a(毫秒)1s(秒)1m(分)1h(小时)1d(天)。
1b(纳秒), 1u(微秒)1a(毫秒)1s(秒)1m(分)1h(小时)1d(天), 1w(周)。
- 返回的时间戳精度与当前 DATABASE 设置的时间精度一致。
@ -549,7 +558,7 @@ SELECT TIMEZONE() FROM { tb_name | stb_name } [WHERE clause];
**功能说明**:返回客户端当前时区信息。
**返回结果数据类型**VARCHAR 类型
**返回结果数据类型**VARCHAR。
**应用字段**:无
@ -566,7 +575,7 @@ INSERT INTO tb_name VALUES (TODAY(), ...);
**功能说明**:返回客户端当日零时的系统时间。
**返回结果数据类型**TIMESTAMP 时间戳类型
**返回结果数据类型**TIMESTAMP。
**应用字段**:在 WHERE 或 INSERT 语句中使用时只能作用于 TIMESTAMP 类型的字段。

View File

@ -19,7 +19,7 @@ library_path包含UDF函数实现的动态链接库的绝对路径是在
OUTPUTTYPE标识此函数的返回类型。
BUFSIZE中间结果的缓冲区大小单位是字节。不设置则默认为0。最大不可超过512字节。
关于如何开发自定义函数,请参考 [UDF使用说明](../develop/udf)。
关于如何开发自定义函数,请参考 [UDF使用说明](../../develop/udf)。
## 删除自定义函数

View File

@ -80,21 +80,16 @@ taos --dump-config
| 补充说明 | RESTful 服务在 2.4.0.0 之前(不含)由 taosd 提供,默认端口为 6041; 在 2.4.0.0 及后续版本由 taosAdapter默认端口为 6041 |
:::note
确保集群中所有主机在端口 6030-6042 上的 TCP/UDP 协议能够互通。(详细的端口情况请参见下表)
确保集群中所有主机在端口 6030 上的 TCP 协议能够互通。(详细的端口情况请参见下表)
:::
| 协议 | 默认端口 | 用途说明 | 修改方法 |
| :--- | :-------- | :---------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------- |
| TCP | 6030 | 客户端与服务端之间通讯。 | 由配置文件设置 serverPort 决定。 |
| TCP | 6035 | 多节点集群的节点间通讯。 | 随 serverPort 端口变化。 |
| TCP | 6040 | 多节点集群的节点间数据同步。 | 随 serverPort 端口变化。 |
| TCP | 6030 | 客户端与服务端之间通讯,多节点集群的节点间通讯。 | 由配置文件设置 serverPort 决定。 |
| TCP | 6041 | 客户端与服务端之间的 RESTful 通讯。 | 随 serverPort 端口变化。注意 taosAdapter 配置或有不同,请参考相应[文档](/reference/taosadapter/)。 |
| TCP | 6042 | Arbitrator 的服务端口。 | 随 Arbitrator 启动参数设置变化。 |
| TCP | 6043 | TaosKeeper 监控服务端口。 | 随 TaosKeeper 启动参数设置变化。 |
| TCP | 6044 | 支持 StatsD 的数据接入端口。 | 随 taosAdapter 启动参数设置变化2.3.0.1+以上版本)。 |
| UDP | 6045 | 支持 collectd 数据接入端口。 | 随 taosAdapter 启动参数设置变化2.3.0.1+以上版本)。 |
| TCP | 6060 | 企业版内 Monitor 服务的网络端口。 | |
| UDP | 6030-6034 | 客户端与服务端之间通讯。 | 随 serverPort 端口变化。 |
| UDP | 6035-6039 | 多节点集群的节点间通讯。 | 随 serverPort 端口变化。
### maxShellConns
@ -105,26 +100,6 @@ taos --dump-config
| 取值范围 | 10-50000000 |
| 缺省值 | 5000 |
### maxConnections
| 属性 | 说明 |
| -------- | ------------------------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 一个数据库连接所容许的 dnode 连接数 |
| 取值范围 | 1-100000 |
| 缺省值 | 5000 |
| 补充说明 | 实际测试下来,如果默认没有配,选 50 个 worker thread 会产生 Network unavailable |
### rpcForceTcp
| 属性 | 说明 |
| -------- | --------------------------------------------------- |
| 适用范围 | 服务端和客户端均适用 |
| 含义 | 强制使用 TCP 传输 |
| 取值范围 | 0: 不开启 1: 开启 |
| 缺省值 | 0 |
| 补充说明 | 在网络比较差的环境中,建议开启。<br/>2.0 版本新增。 |
## 监控相关
### monitor
@ -132,10 +107,26 @@ taos --dump-config
| 属性 | 说明 |
| -------- | ---------------------------------------------------------------------------------------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 服务器内部的系统监控开关。监控主要负责收集物理节点的负载状况,包括 CPU、内存、硬盘、网络带宽、HTTP 请求量的监控记录,记录信息存储在`LOG`库中。 |
| 含义 | 服务器内部的系统监控开关。监控主要负责收集物理节点的负载状况,包括 CPU、内存、硬盘、网络带宽的监控记录,监控信息将通过 HTTP 协议发送给由 `monitorFqdn``monitorProt` 指定的 TaosKeeper 监控服务 |
| 取值范围 | 0关闭监控服务 1激活监控服务。 |
| 缺省值 | 1 |
### monitorFqdn
| 属性 | 说明 |
| -------- | -------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | TaosKeeper 监控服务的 FQDN |
| 缺省值 | 无 |
### monitorPort
| 属性 | 说明 |
| -------- | -------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | TaosKeeper 监控服务的端口号 |
| 缺省值 | 6043 |
### monitorInterval
| 属性 | 说明 |
@ -143,9 +134,10 @@ taos --dump-config
| 适用范围 | 仅服务端适用 |
| 含义 | 监控数据库记录系统参数CPU/内存)的时间间隔 |
| 单位 | 秒 |
| 取值范围 | 1-600 |
| 取值范围 | 1-200000 |
| 缺省值 | 30 |
### telemetryReporting
| 属性 | 说明 |
@ -167,19 +159,10 @@ taos --dump-config
| 缺省值 | 无 |
| 补充说明 | 计算规则可以根据实际应用可能的最大并发数和表的数字相乘,再乘 170 。<br/>2.0.15 以前的版本中,此参数的单位是字节) |
### ratioOfQueryCores
| 属性 | 说明 |
| -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 设置查询线程的最大数量。 |
| 缺省值 | 1 |
| 补充说明 | 最小值 0 表示只有 1 个查询线程 <br/> 最大值 2 表示最大建立 2 倍 CPU 核数的查询线程。<br/>默认为 1表示最大和 CPU 核数相等的查询线程。<br/>该值可以为小数,即 0.5 表示最大建立 CPU 核数一半的查询线程。 |
### maxNumOfDistinctRes
| 属性 | 说明 |
| -------- | -------------------------------- | --- |
| -------- | -------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 允许返回的 distinct 结果最大行数 |
| 取值范围 | 默认值为 10 万,最大值 1 亿 |
@ -301,96 +284,6 @@ charset 的有效值是 UTF-8。
| 含义 | 数据文件目录,所有的数据文件都将写入该目录 |
| 缺省值 | /var/lib/taos |
### cache
| 属性 | 说明 |
| -------- | ------------ |
| 适用范围 | 仅服务端适用 |
| 含义 | 内存块的大小 |
| 单位 | MB |
| 缺省值 | 16 |
### blocks
| 属性 | 说明 |
| -------- | ----------------------------------------------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 每个 vnodetsdb中有多少 cache 大小的内存块。因此一个 vnode 的用的内存大小粗略为cache \* blocks |
| 缺省值 | 6 |
### days
| 属性 | 说明 |
| -------- | -------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 数据文件存储数据的时间跨度 |
| 单位 | 天 |
| 缺省值 | 10 |
### keep
| 属性 | 说明 |
| -------- | -------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 数据保留的天数 |
| 单位 | 天 |
| 缺省值 | 3650 |
### minRows
| 属性 | 说明 |
| -------- | ---------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 文件块中记录的最小条数 |
| 缺省值 | 100 |
### maxRows
| 属性 | 说明 |
| -------- | ---------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 文件块中记录的最大条数 |
| 缺省值 | 4096 |
### walLevel
| 属性 | 说明 |
| -------- | --------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | WAL 级别 |
| 取值范围 | 0: 不写WAL; <br/> 1写 WAL, 但不执行 fsync <br/> 2写 WAL, 而且执行 fsync |
| 缺省值 | 1 |
### fsync
| 属性 | 说明 |
| -------- | -------------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 当 WAL 设置为 2 时,执行 fsync 的周期 |
| 单位 | 毫秒 |
| 取值范围 | 最小为 0表示每次写入立即执行 fsync <br/> 最大为 180000三分钟 |
| 缺省值 | 3000 |
### update
| 属性 | 说明 |
| -------- | ---------------------------------------------------------------------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 允许更新已存在的数据行 |
| 取值范围 | 0不允许更新 <br/> 1允许整行更新 <br/> 2允许部分列更新。2.1.7.0 版本开始此参数支持设为 2在此之前取值只能是 [0, 1] |
| 缺省值 | 0 |
| 补充说明 | 2.0.8.0 版本之前,不支持此参数。 |
### cacheLast
| 属性 | 说明 |
| -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| 适用范围 | 仅服务端适用 |
| 含义 | 是否在内存中缓存子表的最近数据 |
| 取值范围 | 0关闭 <br/> 1缓存子表最近一行数据 <br/> 2缓存子表每一列的最近的非 NULL 值 <br/> 3同时打开缓存最近行和列功能。2.1.2.0 版本开始此参数支持 0 3 的取值范围,在此之前取值只能是 [0, 1] |
| 缺省值 | 0 |
| 补充说明 | 2.1.2.0 版本之前、2.0.20.7 版本之前在 taos.cfg 文件中不支持此参数。 |
### minimalTmpDirGB
| 属性 | 说明 |
@ -409,110 +302,19 @@ charset 的有效值是 UTF-8。
| 单位 | GB |
| 缺省值 | 2.0 |
### vnodeBak
| 属性 | 说明 |
| -------- | -------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 删除 vnode 时是否备份 vnode 目录 |
| 取值范围 | 01是 |
| 缺省值 | 1 |
## 集群相关
### numOfMnodes
| 属性 | 说明 |
| -------- | ------------------ |
| 适用范围 | 仅服务端适用 |
| 含义 | 系统中管理节点个数 |
| 缺省值 | 3 |
### replica
| 属性 | 说明 |
| -------- | ------------ |
| 适用范围 | 仅服务端适用 |
| 含义 | 副本个数 |
| 取值范围 | 1-3 |
| 缺省值 | 1 |
### quorum
| 属性 | 说明 |
| -------- | -------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 多副本环境下指令执行的确认数要求 |
| 取值范围 | 1,2 |
| 缺省值 | 1 |
### role
### supportVnodes
| 属性 | 说明 |
| -------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | dnode 的可选角色 |
| 取值范围 | 0any既可作为 mnode也可分配 vnode <br/> 1mgmt只能作为 mnode不能分配 vnode <br/> 2dnode不能作为 mnode只能分配 vnode |
| 缺省值 | 0 |
### balance
| 属性 | 说明 |
| -------- | ---------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 是否启动负载均衡 |
| 取值范围 | 01 |
| 缺省值 | 1 |
### balanceInterval
| 属性 | 说明 |
| -------- | ------------------------------------------------ |
| 适用范围 | 仅服务端适用 |
| 含义 | 管理节点在正常运行状态下,检查负载均衡的时间间隔 |
| 单位 | 秒 |
| 取值范围 | 1-30000 |
| 缺省值 | 300 |
### arbitrator
| 属性 | 说明 |
| -------- | ------------------------------------------ |
| 适用范围 | 仅服务端适用 |
| 含义 | 系统中裁决器的 endpoint其格式如 firstEp |
| 缺省值 | 空 |
| 含义 | dnode 支持的最大 vnode 数目 |
| 取值范围 | 0-4096 |
| 缺省值 | 256 |
## 时间相关
### precision
| 属性 | 说明 |
| -------- | ------------------------------------------------- |
| 适用范围 | 仅服务端 |
| 含义 | 创建数据库时使用的时间精度 |
| 取值范围 | ms: millisecond; us: microsecond ; ns: nanosecond |
| 缺省值 | ms |
### rpcTimer
| 属性 | 说明 |
| -------- | -------------------- |
| 适用范围 | 服务端和客户端均适用 |
| 含义 | rpc 重试时长 |
| 单位 | 毫秒 |
| 取值范围 | 100-3000 |
| 缺省值 | 300 |
### rpcMaxTime
| 属性 | 说明 |
| -------- | -------------------- |
| 适用范围 | 服务端和客户端均适用 |
| 含义 | rpc 等待应答最大时长 |
| 单位 | 秒 |
| 取值范围 | 100-7200 |
| 缺省值 | 600 |
### statusInterval
| 属性 | 说明 |
@ -533,105 +335,8 @@ charset 的有效值是 UTF-8。
| 取值范围 | 1-120 |
| 缺省值 | 3 |
### tableMetaKeepTimer
| 属性 | 说明 |
| -------- | --------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 表的元数据 cache 时长 |
| 单位 | 秒 |
| 取值范围 | 1-8640000 |
| 缺省值 | 7200 |
### maxTmrCtrl
| 属性 | 说明 |
| -------- | -------------------- |
| 适用范围 | 服务端和客户端均适用 |
| 含义 | 定时器个数 |
| 单位 | 个 |
| 取值范围 | 8-2048 |
| 缺省值 | 512 |
### offlineThreshold
| 属性 | 说明 |
| -------- | ------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | dnode 离线阈值,超过该时间将导致 dnode 离线 |
| 单位 | 秒 |
| 取值范围 | 5-7200000 |
| 缺省值 | 86400\*1010 天) |
## 性能调优
### numOfThreadsPerCore
| 属性 | 说明 |
| -------- | ----------------------------------- |
| 适用范围 | 服务端和客户端均适用 |
| 含义 | 每个 CPU 核生成的队列消费者线程数量 |
| 缺省值 | 1.0 |
### ratioOfQueryThreads
| 属性 | 说明 |
| -------- | ------------------------------------------------------------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 设置查询线程的最大数量 |
| 取值范围 | 0表示只有 1 个查询线程 <br/> 1表示最大和 CPU 核数相等的查询线程 <br/> 2表示最大建立 2 倍 CPU 核数的查询线程。 |
| 缺省值 | 1 |
| 补充说明 | 该值可以为小数,即 0.5 表示最大建立 CPU 核数一半的查询线程。 |
### maxVgroupsPerDb
| 属性 | 说明 |
| -------- | ------------------------------------ |
| 适用范围 | 仅服务端适用 |
| 含义 | 每个 DB 中 能够使用的最大 vnode 个数 |
| 取值范围 | 0-8192 |
| 缺省值 | 0 |
### maxTablesPerVnode
| 属性 | 说明 |
| -------- | --------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 每个 vnode 中能够创建的最大表个数 |
| 缺省值 | 1000000 |
### minTablesPerVnode
| 属性 | 说明 |
| -------- | --------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 每个 vnode 中必须创建表的最小数量 |
| 缺省值 | 1000 |
### tableIncStepPerVnode
| 属性 | 说明 |
| -------- | ------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 每个 vnode 中超过最小表数i.e. minTablesPerVnode, 后递增步长 |
| 缺省值 | 1000 |
### maxNumOfOrderedRes
| 属性 | 说明 |
| -------- | -------------------------------------- |
| 适用范围 | 服务端和客户端均适用 |
| 含义 | 支持超级表时间排序允许的最多记录数限制 |
| 缺省值 | 10 万 |
### mnodeEqualVnodeNum
| 属性 | 说明 |
| -------- | ------------------------------------ |
| 适用范围 | 仅服务端适用 |
| 含义 | 将一个 mnode 等同于 vnode 消耗的个数 |
| 缺省值 | 4 |
### numOfCommitThreads
| 属性 | 说明 |
@ -642,23 +347,6 @@ charset 的有效值是 UTF-8。
## 压缩相关
### comp
| 属性 | 说明 |
| -------- | ----------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 文件压缩标志位 |
| 取值范围 | 0关闭1:一阶段压缩2:两阶段压缩 |
| 缺省值 | 2 |
### tsdbMetaCompactRatio
| 属性 | 说明 |
| -------- | -------------------------------------------------------------- |
| 含义 | tsdb meta 文件中冗余数据超过多少阈值,开启 meta 文件的压缩功能 |
| 取值范围 | 0不开启[1-100]:冗余数据比例 |
| 缺省值 | 0 |
### compressMsgSize
| 属性 | 说明 |
@ -710,135 +398,6 @@ charset 的有效值是 UTF-8。
| 缺省值 | 0.0000000000000001 |
| 补充说明 | 小于此值的浮点数尾数部分将被截取 |
## 连续查询相关
### stream
| 属性 | 说明 |
| -------- | ------------------------------ |
| 适用范围 | 仅服务端适用 |
| 含义 | 是否启用连续查询(流计算功能) |
| 取值范围 | 0不允许 <br/> 1允许 |
| 缺省值 | 1 |
### minSlidingTime
| 属性 | 说明 |
| -------- | ----------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 最小滑动窗口时长 |
| 单位 | 毫秒 |
| 取值范围 | 10-1000000 |
| 缺省值 | 10 |
| 补充说明 | 支持 us 补值后,这个值就是 1us 了。 |
### minIntervalTime
| 属性 | 说明 |
| -------- | -------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 时间窗口最小值 |
| 单位 | 毫秒 |
| 取值范围 | 1-1000000 |
| 缺省值 | 10 |
### maxStreamCompDelay
| 属性 | 说明 |
| -------- | -------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 连续查询启动最大延迟 |
| 单位 | 毫秒 |
| 取值范围 | 10-1000000000 |
| 缺省值 | 20000 |
### maxFirstStreamCompDelay
| 属性 | 说明 |
| -------- | -------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 第一次连续查询启动最大延迟 |
| 单位 | 毫秒 |
| 取值范围 | 10-1000000000 |
| 缺省值 | 10000 |
### retryStreamCompDelay
| 属性 | 说明 |
| -------- | -------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 连续查询重试等待间隔 |
| 单位 | 毫秒 |
| 取值范围 | 10-1000000000 |
| 缺省值 | 10 |
### streamCompDelayRatio
| 属性 | 说明 |
| -------- | ---------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 连续查询的延迟时间计算系数,实际延迟时间为本参数乘以计算时间窗口 |
| 取值范围 | 0.1-0.9 |
| 缺省值 | 0.1 |
:::info
为避免多个 stream 同时执行占用太多系统资源,程序中对 stream 的执行时间人为增加了一些随机的延时。<br/>maxFirstStreamCompDelay 是 stream 第一次执行前最少要等待的时间。<br/>streamCompDelayRatio 是延迟时间的计算系数,它乘以查询的 interval 后为延迟时间基准。<br/>maxStreamCompDelay 是延迟时间基准的上限。<br/>实际延迟时间为一个不超过延迟时间基准的随机值。<br/>stream 某次计算失败后需要重试retryStreamCompDelay 是重试的等待时间基准。<br/>实际重试等待时间为不超过等待时间基准的随机值。
:::
## HTTP 相关
:::note
HTTP 服务在 2.4.0.0(不含)以前的版本中由 taosd 提供,在 2.4.0.0 以后(含)由 taosAdapter 提供。
本节的配置参数仅在 2.4.0.0(不含)以前的版本中生效。如果您使用的是 2.4.0.0(含)及以后的版本请参考[文档](/reference/taosadapter/)。
:::
### http
| 属性 | 说明 |
| -------- | --------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 服务器内部的 http 服务开关。 |
| 取值范围 | 0关闭 http 服务, 1激活 http 服务。 |
| 缺省值 | 1 |
### httpEnableRecordSql
| 属性 | 说明 |
| -------- | --------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 记录通过 RESTFul 接口,产生的 SQL 调用。 |
| 缺省值 | 0 |
| 补充说明 | 生成的文件httpnote.0/httpnote.1),与服务端日志所在目录相同。 |
### httpMaxThreads
| 属性 | 说明 |
| -------- | ------------------------------------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | RESTFul 接口的线程数。taosAdapter 配置或有不同,请参考相应[文档](/reference/taosadapter/)。 |
| 缺省值 | 2 |
### restfulRowLimit
| 属性 | 说明 |
| -------- | ----------------------------------------------------------------------------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | RESTFul 接口单次返回的记录条数。taosAdapter 配置或有不同,请参考相应[文档](/reference/taosadapter/)。 |
| 缺省值 | 10240 |
| 补充说明 | 最大 10,000,000 |
### httpDBNameMandatory
| 属性 | 说明 |
| -------- | ---------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 是否在 URL 中输入 数据库名称 |
| 取值范围 | 0不开启1开启 |
| 缺省值 | 0 |
| 补充说明 | 2.3 版本新增。 |
## 日志相关
### logDir

View File

@ -10,7 +10,7 @@ import TabItem from "@theme/TabItem";
## 安装
关于安装,请参考 [使用安装包立即开始](../get-started/package)
关于安装,请参考 [使用安装包立即开始](../../get-started/package)

View File

@ -16,7 +16,7 @@ title: 容量规划
- pagesize
- cachesize
关于这些参数的详细说明请参考 [数据库管理](../taos-sql/database)。
关于这些参数的详细说明请参考 [数据库管理](../../taos-sql/database)。
一个数据库所需要的内存大小等于
@ -24,7 +24,7 @@ title: 容量规划
vgroups * replica * (buffer + pages * pagesize + cachesize)
```
但要注意的是这些内存并不需要由单一服务器提供,而是由整个集群中所有数据节点共同负担,相当于由这些数据节点所在的服务器共同负担。如果集群中有不止一个数据库,则所需内存要累加,并由集群中所有服务器共同负担。更复杂的场景是如果集群中的数据节点并非在最初就一次性全部建立,而是随着使用中系统负载的增加逐步增加服务器并增加数据节点,则新创建的数据库会导致新旧数据节点上的负载并不均衡,此时简单的理论计算并不能直接使用,要结合各数据节点的负载情况。
但要注意的是这些内存并不需要由单一服务器提供,而是由整个集群中所有数据节点共同负担,相当于由这些数据节点所在的服务器共同负担。如果集群中有不止一个数据库,则所需内存要累加。更复杂的场景是如果集群中的数据节点并非在最初就一次性全部建立,而是随着使用中系统负载的增加逐步增加服务器并增加数据节点,则新创建的数据库会导致新旧数据节点上的负载并不均衡,此时简单的理论计算并不能直接使用,要结合各数据节点的负载情况。
## 客户端内存需求
@ -51,7 +51,7 @@ CPU 的需求取决于如下两方面:
- **数据插入** TDengine 单核每秒能至少处理一万个插入请求。每个插入请求可以带多条记录,一次插入一条记录与插入 10 条记录,消耗的计算资源差别很小。因此每次插入,条数越大,插入效率越高。如果一个插入请求带 200 条以上记录,单核就能达到每秒插入 100 万条记录的速度。但对前端数据采集的要求越高,因为需要缓存记录,然后一批插入。
- **查询需求** TDengine 提供高效的查询,但是每个场景的查询差异很大,查询频次变化也很大,难以给出客观数字。需要用户针对自己的场景,写一些查询语句,才能确定。
因此仅对数据插入而言CPU 是可以估算出来的,但查询所耗的计算资源无法估算。在实际运过程中,不建议 CPU 使用率超过 50%,超过后,需要增加新的节点,以获得更多计算资源。
因此仅对数据插入而言CPU 是可以估算出来的,但查询所耗的计算资源无法估算。在实际运过程中,不建议 CPU 使用率超过 50%,超过后,需要增加新的节点,以获得更多计算资源。
## 存储需求

View File

@ -27,4 +27,4 @@ TDengine 集群的节点数必须大于等于副本数,否则创建表时将
当 TDengine 集群中的节点部署在不同的物理机上并设置多个副本数时就实现了系统的高可靠性无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。
另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../reference/taosX)
另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosX)

View File

@ -102,11 +102,6 @@ extern int32_t tsQuerySmaOptimize;
// client
extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay;
extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int64_t tsMaxRetentWindow;
// build info
extern char version[];

View File

@ -77,11 +77,11 @@ typedef struct {
} SWalSyncInfo;
typedef struct {
int8_t protoVer;
int64_t version;
int16_t msgType;
int64_t ingestTs;
int32_t bodyLen;
int64_t ingestTs; // not implemented
int16_t msgType;
int8_t protoVer;
// sync meta
SWalSyncInfo syncMeta;

View File

@ -118,20 +118,6 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
// 1 database precision unit for interval time range, changed accordingly
int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000;
// 10sec, the first stream computing delay time after system launched successfully, changed accordingly
int32_t tsStreamCompStartDelay = 10000;
// the stream computing delay time after executing failed, change accordingly
int32_t tsRetryStreamCompDelay = 10 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f;
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
// 0 no query allowed, queries are disabled
@ -330,7 +316,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTempDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTmpDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
@ -383,10 +369,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamCompDelay", tsMaxStreamComputDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxFirstStreamCompDelay", tsStreamCompStartDelay, 1000, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "retryStreamCompDelay", tsRetryStreamCompDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "streamCompDelayRatio", tsStreamComputDelayRatio, 0.1, 0.9, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "retrieveBlockingModel", tsRetrieveBlockingModel, 0) != 0) return -1;
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
@ -532,7 +514,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval;
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
if (taosMulMkDir(tsTempDir) != 0) {
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
return -1;
@ -579,10 +561,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32;
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
@ -758,10 +736,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32;
} else if (strcasecmp("maxNumOfDistinctRes", name) == 0) {
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
} else if (strcasecmp("maxStreamCompDelay", name) == 0) {
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
} else if (strcasecmp("maxFirstStreamCompDelay", name) == 0) {
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
}
break;
}
@ -772,8 +746,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'i': {
if (strcasecmp("minimalTempDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval;
if (strcasecmp("minimalTmpDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
} else if (strcasecmp("minimalDataDirGB", name) == 0) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
} else if (strcasecmp("minSlidingTime", name) == 0) {
@ -883,9 +857,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'r': {
if (strcasecmp("retryStreamCompDelay", name) == 0) {
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
} else if (strcasecmp("retrieveBlockingModel", name) == 0) {
if (strcasecmp("retrieveBlockingModel", name) == 0) {
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
} else if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) {
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
@ -913,8 +885,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
} else if (strcasecmp("statusInterval", name) == 0) {
tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32;
} else if (strcasecmp("streamCompDelayRatio", name) == 0) {
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
} else if (strcasecmp("slaveQuery", name) == 0) {
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
} else if (strcasecmp("snodeShmSize", name) == 0) {

View File

@ -46,10 +46,10 @@ void tsdbCloseCache(SLRUCache *pCache) {
}
}
static void getTableCacheKeyS(tb_uid_t uid, const char *cacheType, char *key, int *len) {
snprintf(key, 30, "%" PRIi64 "%s", uid, cacheType);
*len = strlen(key);
}
/* static void getTableCacheKeyS(tb_uid_t uid, const char *cacheType, char *key, int *len) { */
/* snprintf(key, 30, "%" PRIi64 "%s", uid, cacheType); */
/* *len = strlen(key); */
/* } */
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
if (cacheType == 0) { // last_row
@ -245,8 +245,6 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb
char key[32] = {0};
int keyLen = 0;
// ((void)(row));
// getTableCacheKey(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
@ -323,26 +321,10 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
int32_t code = 0;
// SMapData delDataMap;
// SDelData delData;
if (pDelIdx) {
// tMapDataReset(&delDataMap);
// code = tsdbReadDelData(pDelReader, pDelIdx, &delDataMap, NULL);
code = tsdbReadDelData(pDelReader, pDelIdx, aDelData, NULL);
if (code) goto _err;
/*
for (int32_t iDelData = 0; iDelData < delDataMap.nItem; ++iDelData) {
code = tMapDataGetItemByIdx(&delDataMap, iDelData, &delData, tGetDelData);
if (code) goto _err;
taosArrayPush(aDelData, &delData);
}
*/
}
_err:
return code;
}
@ -444,18 +426,16 @@ typedef struct SFSNextRowIter {
SArray *aDFileSet;
SDataFReader *pDataFReader;
SArray *aBlockIdx;
// SMapData blockIdxMap;
// SBlockIdx blockIdx;
SBlockIdx *pBlockIdx;
SMapData blockMap;
int32_t nBlock;
int32_t iBlock;
SBlock block;
SBlockData blockData;
SBlockData *pBlockData;
int32_t nRow;
int32_t iRow;
TSDBROW row;
SBlockIdx *pBlockIdx;
SMapData blockMap;
int32_t nBlock;
int32_t iBlock;
SBlock block;
SBlockData blockData;
SBlockData *pBlockData;
int32_t nRow;
int32_t iRow;
TSDBROW row;
} SFSNextRowIter;
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
@ -629,41 +609,8 @@ typedef struct SMemNextRowIter {
} SMemNextRowIter;
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow) {
// static int32_t getNextRowFromMem(void *iter, SArray *pRowArray) {
SMemNextRowIter *state = (SMemNextRowIter *)iter;
int32_t code = 0;
/*
if (!state->iterOpened) {
if (state->pMem != NULL) {
tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
state->iterOpened = true;
TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
if (pMemRow) {
state->curRow = pMemRow;
} else {
return code;
}
} else {
return code;
}
}
taosArrayPush(pRowArray, state->curRow);
while (tsdbTbDataIterNext(&state->iter)) {
TSDBROW *row = tsdbTbDataIterGet(&state->iter);
if (TSDBROW_TS(row) < TSDBROW_TS(state->curRow)) {
state->curRow = row;
break;
} else {
taosArrayPush(pRowArray, row);
}
}
return code;
*/
switch (state->state) {
case SMEMNEXTROW_ENTER: {
if (state->pMem != NULL) {
@ -768,10 +715,8 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
}
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow);
// typedef int32_t (*_next_row_fn_t)(void *iter, SArray *pRowArray);
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
// typedef struct TsdbNextRowState {
typedef struct {
TSDBROW *pRow;
bool stop;
@ -782,7 +727,6 @@ typedef struct {
} TsdbNextRowState;
typedef struct {
// STsdb *pTsdb;
SArray *pSkyline;
int64_t iSkyline;
@ -793,10 +737,8 @@ typedef struct {
TSDBROW memRow, imemRow, fsRow;
TsdbNextRowState input[3];
// SMemTable *pMemTable;
// SMemTable *pIMemTable;
STsdbReadSnap *pReadSnap;
STsdb *pTsdb;
STsdbReadSnap *pReadSnap;
STsdb *pTsdb;
} CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) {
@ -967,7 +909,7 @@ _err:
return code;
}
static int32_t mergeLastRow2(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
@ -978,8 +920,6 @@ static int32_t mergeLastRow2(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppR
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
SColVal *pColVal = &(SColVal){0};
// tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
@ -1066,7 +1006,7 @@ _err:
return code;
}
static int32_t mergeLast2(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
@ -1077,8 +1017,6 @@ static int32_t mergeLast2(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
SColVal *pColVal = &(SColVal){0};
// tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
@ -1124,12 +1062,7 @@ static int32_t mergeLast2(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
continue;
}
}
/*
if ((TSDBROW_TS(pRow) < lastRowTs)) {
// goto build the result ts row
break;
}
*/
// merge into pColArray
setNoneCol = false;
for (iCol = noneCol; iCol < nCol; ++iCol) {
@ -1139,7 +1072,6 @@ static int32_t mergeLast2(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if ((tColVal->isNone || tColVal->isNull) && (!pColVal->isNone && !pColVal->isNull)) {
taosArraySet(pColArray, iCol, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
//} else if (tColVal->isNone && pColVal->isNone && !setNoneCol) {
} else if ((tColVal->isNone || tColVal->isNull) && (pColVal->isNone || pColVal->isNull) && !setNoneCol) {
noneCol = iCol;
setNoneCol = true;
@ -1148,521 +1080,36 @@ static int32_t mergeLast2(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
} while (setNoneCol);
// build the result ts row here
//*dup = false;
if (taosArrayGetSize(pColArray) <= 0) {
*ppLastArray = NULL;
taosArrayDestroy(pColArray);
} else {
*ppLastArray = pColArray;
}
/* if (taosArrayGetSize(pColArray) == nCol) {
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
if (code) goto _err;
} else {
*ppRow = NULL;
}*/
nextRowIterClose(&iter);
// taosArrayDestroy(pColArray);
taosMemoryFreeClear(pTSchema);
return code;
_err:
nextRowIterClose(&iter);
// taosArrayDestroy(pColArray);
taosMemoryFreeClear(pTSchema);
return code;
}
// static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
// int32_t code = 0;
// SArray *pSkyline = NULL;
// STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
// int16_t nCol = pTSchema->numOfCols;
// SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
// tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
// STbData *pMem = NULL;
// if (pTsdb->mem) {
// tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
// }
// STbData *pIMem = NULL;
// if (pTsdb->imem) {
// tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
// }
// *ppRow = NULL;
// pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
// SDelIdx delIdx;
// SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
// if (pDelFile) {
// SDelFReader *pDelFReader;
// code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
// if (code) goto _err;
// code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
// if (code) goto _err;
// code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline);
// if (code) goto _err;
// tsdbDelFReaderClose(&pDelFReader);
// } else {
// code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline);
// if (code) goto _err;
// }
// int64_t iSkyline = taosArrayGetSize(pSkyline) - 1;
// SBlockIdx idx = {.suid = suid, .uid = uid};
// SFSNextRowIter fsState = {0};
// fsState.state = SFSNEXTROW_FS;
// fsState.pTsdb = pTsdb;
// fsState.pBlockIdxExp = &idx;
// SMemNextRowIter memState = {0};
// SMemNextRowIter imemState = {0};
// TSDBROW memRow, imemRow, fsRow;
// TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL},
// {&imemRow, true, false, &imemState, getNextRowFromMem, NULL},
// {&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}};
// if (pMem) {
// memState.pMem = pMem;
// memState.state = SMEMNEXTROW_ENTER;
// input[0].stop = false;
// input[0].next = true;
// }
// if (pIMem) {
// imemState.pMem = pIMem;
// imemState.state = SMEMNEXTROW_ENTER;
// input[1].stop = false;
// input[1].next = true;
// }
// int16_t nilColCount = nCol - 1; // count of null & none cols
// int iCol = 0; // index of first nil col index from left to right
// bool setICol = false;
// do {
// for (int i = 0; i < 3; ++i) {
// if (input[i].next && !input[i].stop) {
// if (input[i].pRow == NULL) {
// code = input[i].nextRowFn(input[i].iter, &input[i].pRow);
// if (code) goto _err;
// if (input[i].pRow == NULL) {
// input[i].stop = true;
// input[i].next = false;
// }
// }
// }
// }
// if (input[0].stop && input[1].stop && input[2].stop) {
// break;
// }
// // select maxpoint(s) from mem, imem, fs
// TSDBROW *max[3] = {0};
// int iMax[3] = {-1, -1, -1};
// int nMax = 0;
// TSKEY maxKey = TSKEY_MIN;
// for (int i = 0; i < 3; ++i) {
// if (!input[i].stop && input[i].pRow != NULL) {
// TSDBKEY key = TSDBROW_KEY(input[i].pRow);
// // merging & deduplicating on client side
// if (maxKey <= key.ts) {
// if (maxKey < key.ts) {
// nMax = 0;
// maxKey = key.ts;
// }
// iMax[nMax] = i;
// max[nMax++] = input[i].pRow;
// }
// }
// }
// // delete detection
// TSDBROW *merge[3] = {0};
// int iMerge[3] = {-1, -1, -1};
// int nMerge = 0;
// for (int i = 0; i < nMax; ++i) {
// TSDBKEY maxKey = TSDBROW_KEY(max[i]);
// bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
// if (!deleted) {
// iMerge[nMerge] = i;
// merge[nMerge++] = max[i];
// }
// input[iMax[i]].next = deleted;
// }
// // merge if nMerge > 1
// if (nMerge > 0) {
// *dup = false;
// if (nMerge == 1) {
// code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow);
// if (code) goto _err;
// } else {
// // merge 2 or 3 rows
// SRowMerger merger = {0};
// tRowMergerInit(&merger, merge[0], pTSchema);
// for (int i = 1; i < nMerge; ++i) {
// tRowMerge(&merger, merge[i]);
// }
// tRowMergerGetRow(&merger, ppRow);
// tRowMergerClear(&merger);
// }
// }
// } while (1);
// for (int i = 0; i < 3; ++i) {
// if (input[i].nextRowClearFn) {
// input[i].nextRowClearFn(input[i].iter);
// }
// }
// if (pSkyline) {
// taosArrayDestroy(pSkyline);
// }
// taosMemoryFreeClear(pTSchema);
// return code;
// _err:
// for (int i = 0; i < 3; ++i) {
// if (input[i].nextRowClearFn) {
// input[i].nextRowClearFn(input[i].iter);
// }
// }
// if (pSkyline) {
// taosArrayDestroy(pSkyline);
// }
// taosMemoryFreeClear(pTSchema);
// tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
// return code;
// }
// static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
// static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
// int32_t code = 0;
// SArray *pSkyline = NULL;
// STSRow *pRow = NULL;
// STSRow **ppRow = &pRow;
// STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
// int16_t nCol = pTSchema->numOfCols;
// // SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
// SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
// tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
// STbData *pMem = NULL;
// if (pTsdb->mem) {
// tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
// }
// STbData *pIMem = NULL;
// if (pTsdb->imem) {
// tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
// }
// *ppLastArray = NULL;
// pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
// SDelIdx delIdx;
// SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
// if (pDelFile) {
// SDelFReader *pDelFReader;
// code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
// if (code) goto _err;
// code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
// if (code) goto _err;
// code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline);
// if (code) goto _err;
// tsdbDelFReaderClose(&pDelFReader);
// } else {
// code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline);
// if (code) goto _err;
// }
// int64_t iSkyline = taosArrayGetSize(pSkyline) - 1;
// SBlockIdx idx = {.suid = suid, .uid = uid};
// SFSNextRowIter fsState = {0};
// fsState.state = SFSNEXTROW_FS;
// fsState.pTsdb = pTsdb;
// fsState.pBlockIdxExp = &idx;
// SMemNextRowIter memState = {0};
// SMemNextRowIter imemState = {0};
// TSDBROW memRow, imemRow, fsRow;
// TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL},
// {&imemRow, true, false, &imemState, getNextRowFromMem, NULL},
// {&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}};
// if (pMem) {
// memState.pMem = pMem;
// memState.state = SMEMNEXTROW_ENTER;
// input[0].stop = false;
// input[0].next = true;
// }
// if (pIMem) {
// imemState.pMem = pIMem;
// imemState.state = SMEMNEXTROW_ENTER;
// input[1].stop = false;
// input[1].next = true;
// }
// int16_t nilColCount = nCol - 1; // count of null & none cols
// int iCol = 0; // index of first nil col index from left to right
// bool setICol = false;
// do {
// for (int i = 0; i < 3; ++i) {
// if (input[i].next && !input[i].stop) {
// code = input[i].nextRowFn(input[i].iter, &input[i].pRow);
// if (code) goto _err;
// if (input[i].pRow == NULL) {
// input[i].stop = true;
// input[i].next = false;
// }
// }
// }
// if (input[0].stop && input[1].stop && input[2].stop) {
// break;
// }
// // select maxpoint(s) from mem, imem, fs
// TSDBROW *max[3] = {0};
// int iMax[3] = {-1, -1, -1};
// int nMax = 0;
// TSKEY maxKey = TSKEY_MIN;
// for (int i = 0; i < 3; ++i) {
// if (!input[i].stop && input[i].pRow != NULL) {
// TSDBKEY key = TSDBROW_KEY(input[i].pRow);
// // merging & deduplicating on client side
// if (maxKey <= key.ts) {
// if (maxKey < key.ts) {
// nMax = 0;
// maxKey = key.ts;
// }
// iMax[nMax] = i;
// max[nMax++] = input[i].pRow;
// }
// }
// }
// // delete detection
// TSDBROW *merge[3] = {0};
// int iMerge[3] = {-1, -1, -1};
// int nMerge = 0;
// for (int i = 0; i < nMax; ++i) {
// TSDBKEY maxKey = TSDBROW_KEY(max[i]);
// bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
// if (!deleted) {
// iMerge[nMerge] = iMax[i];
// merge[nMerge++] = max[i];
// }
// input[iMax[i]].next = deleted;
// }
// // merge if nMerge > 1
// if (nMerge > 0) {
// if (nMerge == 1) {
// code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow);
// if (code) goto _err;
// } else {
// // merge 2 or 3 rows
// SRowMerger merger = {0};
// tRowMergerInit(&merger, merge[0], pTSchema);
// for (int i = 1; i < nMerge; ++i) {
// tRowMerge(&merger, merge[i]);
// }
// tRowMergerGetRow(&merger, ppRow);
// tRowMergerClear(&merger);
// }
// } else {
// /* *ppRow = NULL; */
// /* return code; */
// continue;
// }
// if (iCol == 0) {
// STColumn *pTColumn = &pTSchema->columns[0];
// SColVal *pColVal = &(SColVal){0};
// *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey});
// // if (taosArrayPush(pColArray, pColVal) == NULL) {
// if (taosArrayPush(pColArray, &(SLastCol){.ts = maxKey, .colVal = *pColVal}) == NULL) {
// code = TSDB_CODE_OUT_OF_MEMORY;
// goto _err;
// }
// ++iCol;
// setICol = false;
// for (int16_t i = iCol; i < nCol; ++i) {
// // tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal);
// tTSRowGetVal(*ppRow, pTSchema, i, pColVal);
// // if (taosArrayPush(pColArray, pColVal) == NULL) {
// if (taosArrayPush(pColArray, &(SLastCol){.ts = maxKey, .colVal = *pColVal}) == NULL) {
// code = TSDB_CODE_OUT_OF_MEMORY;
// goto _err;
// }
// if (pColVal->isNull || pColVal->isNone) {
// for (int j = 0; j < nMerge; ++j) {
// SColVal jColVal = {0};
// tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal);
// if (jColVal.isNull || jColVal.isNone) {
// input[iMerge[j]].next = true;
// }
// }
// if (!setICol) {
// iCol = i;
// setICol = true;
// }
// } else {
// --nilColCount;
// }
// }
// if (*ppRow) {
// taosMemoryFreeClear(*ppRow);
// }
// continue;
// }
// setICol = false;
// for (int16_t i = iCol; i < nCol; ++i) {
// SColVal colVal = {0};
// tTSRowGetVal(*ppRow, pTSchema, i, &colVal);
// TSKEY rowTs = (*ppRow)->ts;
// // SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i);
// SLastCol *tTsVal = (SLastCol *)taosArrayGet(pColArray, i);
// SColVal *tColVal = &tTsVal->colVal;
// if (!colVal.isNone && !colVal.isNull) {
// if (tColVal->isNull || tColVal->isNone) {
// // taosArraySet(pColArray, i, &colVal);
// taosArraySet(pColArray, i, &(SLastCol){.ts = rowTs, .colVal = colVal});
// --nilColCount;
// }
// } else {
// if ((tColVal->isNull || tColVal->isNone) && !setICol) {
// iCol = i;
// setICol = true;
// for (int j = 0; j < nMerge; ++j) {
// SColVal jColVal = {0};
// tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal);
// if (jColVal.isNull || jColVal.isNone) {
// input[iMerge[j]].next = true;
// }
// }
// }
// }
// }
// if (*ppRow) {
// taosMemoryFreeClear(*ppRow);
// }
// } while (nilColCount > 0);
// // if () new ts row from pColArray if non empty
// /* if (taosArrayGetSize(pColArray) == nCol) { */
// /* code = tdSTSRowNew(pColArray, pTSchema, ppRow); */
// /* if (code) goto _err; */
// /* } */
// /* taosArrayDestroy(pColArray); */
// if (taosArrayGetSize(pColArray) <= 0) {
// *ppLastArray = NULL;
// taosArrayDestroy(pColArray);
// } else {
// *ppLastArray = pColArray;
// }
// if (*ppRow) {
// taosMemoryFreeClear(*ppRow);
// }
// for (int i = 0; i < 3; ++i) {
// if (input[i].nextRowClearFn) {
// input[i].nextRowClearFn(input[i].iter);
// }
// }
// if (pSkyline) {
// taosArrayDestroy(pSkyline);
// }
// taosMemoryFreeClear(pTSchema);
// return code;
// _err:
// taosArrayDestroy(pColArray);
// if (*ppRow) {
// taosMemoryFreeClear(*ppRow);
// }
// for (int i = 0; i < 3; ++i) {
// if (input[i].nextRowClearFn) {
// input[i].nextRowClearFn(input[i].iter);
// }
// }
// if (pSkyline) {
// taosArrayDestroy(pSkyline);
// }
// taosMemoryFreeClear(pTSchema);
// tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
// return code;
// }
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "lr", key, &keyLen);
// getTableCacheKeyS(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} else {
STSRow *pRow = NULL;
bool dup = false; // which is always false for now
code = mergeLastRow2(uid, pTsdb, &dup, &pRow);
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
if (!dup && pRow) {
@ -1680,9 +1127,7 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
code = -1;
}
// tsdbCacheInsertLastrow(pCache, pTsdb, uid, pRow, dup);
h = taosLRUCacheLookup(pCache, key, keyLen);
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
*handle = h;
@ -1719,18 +1164,13 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "l", key, &keyLen);
// getTableCacheKeyS(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} else {
// STSRow *pRow = NULL;
// code = mergeLast(uid, pTsdb, &pRow);
SArray *pLastArray = NULL;
// code = mergeLast(uid, pTsdb, &pLastArray);
code = mergeLast2(uid, pTsdb, &pLastArray);
code = mergeLast(uid, pTsdb, &pLastArray);
// if table's empty or error, return code of -1
// if (code < 0 || pRow == NULL) {
if (code < 0 || pLastArray == NULL) {
@ -1746,7 +1186,6 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
}
h = taosLRUCacheLookup(pCache, key, keyLen);
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
*handle = h;

View File

@ -579,11 +579,13 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
if (ctx->noExec == false) {
for (int32_t m = 0; m < node->pParameterList->length; m++) {
// add impl later
if (node->condType == LOGIC_COND_TYPE_AND) {
taosArrayAddAll(output->result, params[m].result);
// taosArrayDestroy(params[m].result);
// params[m].result = NULL;
} else if (node->condType == LOGIC_COND_TYPE_OR) {
taosArrayAddAll(output->result, params[m].result);
// params[m].result = NULL;
} else if (node->condType == LOGIC_COND_TYPE_NOT) {
// taosArrayAddAll(output->result, params[m].result);
}
@ -593,6 +595,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
} else {
for (int32_t m = 0; m < node->pParameterList->length; m++) {
output->status = sifMergeCond(node->condType, output->status, params[m].status);
taosArrayDestroy(params[m].result);
params[m].result = NULL;
}
}
_return:
@ -607,6 +611,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) {
SIFCtx *ctx = context;
ctx->code = sifExecFunction(node, ctx, &output);
if (ctx->code != TSDB_CODE_SUCCESS) {
sifFreeParam(&output);
return DEAL_RES_ERROR;
}
@ -624,6 +629,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) {
SIFCtx *ctx = context;
ctx->code = sifExecLogic(node, ctx, &output);
if (ctx->code) {
sifFreeParam(&output);
return DEAL_RES_ERROR;
}
@ -640,6 +646,7 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) {
SIFCtx *ctx = context;
ctx->code = sifExecOper(node, ctx, &output);
if (ctx->code) {
sifFreeParam(&output);
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
@ -698,7 +705,11 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
}
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
SIF_ERR_RET(ctx.code);
if (ctx.code != 0) {
sifFreeRes(ctx.pRes);
return ctx.code;
}
if (pDst) {
SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
@ -714,8 +725,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
}
sifFreeRes(ctx.pRes);
SIF_RET(code);
return code;
}
static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
@ -732,8 +742,10 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
}
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
SIF_ERR_RET(ctx.code);
if (ctx.code != 0) {
sifFreeRes(ctx.pRes);
return ctx.code;
}
SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
if (res == NULL) {
@ -745,8 +757,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
taosHashCleanup(ctx.pRes);
SIF_RET(code);
return code;
}
int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status) {
@ -760,7 +771,11 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
SArray *output = taosArrayInit(8, sizeof(uint64_t));
SIFParam param = {.arg = *metaArg, .result = output};
SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, &param));
int32_t code = sifCalculate((SNode *)pFilterNode, &param);
if (code != 0) {
sifFreeParam(&param);
return code;
}
taosArrayAddAll(result, param.result);
sifFreeParam(&param);

View File

@ -1,5 +1,4 @@
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
@ -16,6 +15,10 @@
#ifdef USE_UV
#include "transComm.h"
typedef struct SConnList {
queue conn;
} SConnList;
typedef struct SCliConn {
T_REF_DECLARE()
uv_connect_t connReq;
@ -26,7 +29,9 @@ typedef struct SCliConn {
SConnBuffer readBuf;
STransQueue cliMsgs;
queue q;
queue q;
SConnList* list;
STransCtx ctx;
bool broken; // link broken or not
@ -56,13 +61,14 @@ typedef struct SCliMsg {
} SCliMsg;
typedef struct SCliThrd {
TdThread thread; // tid
int64_t pid; // pid
uv_loop_t* loop;
SAsyncPool* asyncPool;
uv_idle_t* idle;
uv_timer_t timer;
void* pool; // conn pool
TdThread thread; // tid
int64_t pid; // pid
uv_loop_t* loop;
SAsyncPool* asyncPool;
uv_idle_t* idle;
uv_prepare_t* prepare;
uv_timer_t timer;
void* pool; // conn pool
// msg queue
queue msg;
@ -86,10 +92,6 @@ typedef struct SCliObj {
SCliThrd** pThreadObj;
} SCliObj;
typedef struct SConnList {
queue conn;
} SConnList;
// conn pool
// add expire timeout and capacity limit
static void* createConnPool(int size);
@ -101,7 +103,7 @@ static void doCloseIdleConn(void* param);
static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
char buf[20] = {0};
char buf[16] = {0};
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r;
@ -118,6 +120,9 @@ static void cliSendCb(uv_write_t* req, int status);
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
static void cliIdleCb(uv_idle_t* handle);
static void cliPrepareCb(uv_prepare_t* handle);
static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
@ -198,7 +203,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
pThrd = (SCliThrd*)(exh)->pThrd; \
} \
} while (0)
#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para))
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
@ -499,9 +504,8 @@ void* destroyConnPool(void* pool) {
}
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
char key[128] = {0};
char key[32] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SHashObj* pPool = pool;
SConnList* plist = taosHashGet(pPool, key, strlen(key));
if (plist == NULL) {
@ -519,13 +523,44 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
conn->status = ConnNormal;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
assert(h == &conn->q);
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
return conn;
}
static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
}
SCliThrd* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd);
allocConnRef(conn, true);
STrans* pTransInst = thrd->pTransInst;
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
transCtxCleanup(&conn->ctx);
conn->status = ConnInPool;
if (conn->list == NULL) {
char key[32] = {0};
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
conn->list = taosHashGet((SHashObj*)pool, key, strlen(key));
}
assert(conn->list != NULL);
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&conn->list->conn, &conn->q);
assert(!QUEUE_IS_EMPTY(&conn->list->conn));
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
}
static int32_t allocConnRef(SCliConn* conn, bool update) {
if (update) {
transRemoveExHandle(transGetRefMgt(), conn->refId);
@ -556,38 +591,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
return 0;
}
static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
}
SCliThrd* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd);
allocConnRef(conn, true);
STrans* pTransInst = thrd->pTransInst;
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
transCtxCleanup(&conn->ctx);
conn->status = ConnInPool;
char key[128] = {0};
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before
assert(plist != NULL);
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&plist->conn, &conn->q);
assert(!QUEUE_IS_EMPTY(&plist->conn));
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
}
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
@ -602,11 +605,9 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) {
pBuf->len += nread;
if (transReadComplete(pBuf)) {
while (transReadComplete(pBuf)) {
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
cliHandleResp(conn);
} else {
tTrace("%s conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
}
return;
}
@ -967,6 +968,62 @@ static void cliAsyncCb(uv_async_t* handle) {
static void cliIdleCb(uv_idle_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("do idle work");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
}
static void cliPrepareCb(uv_prepare_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("prepare work start");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
}
static void* cliWorkThread(void* arg) {
@ -1033,7 +1090,12 @@ static SCliThrd* createThrdObj() {
// pThrd->idle = taosMemoryCalloc(1, sizeof(uv_idle_t));
// uv_idle_init(pThrd->loop, pThrd->idle);
// pThrd->idle->data = pThrd;
// uv_idle_start(pThrd->idle, cliIdleCb);
// uv_idle_start(pThrd->idle, cliIdleCb);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
pThrd->prepare->data = pThrd;
uv_prepare_start(pThrd->prepare, cliPrepareCb);
pThrd->pool = createConnPool(4);
transDQCreate(pThrd->loop, &pThrd->delayQueue);
@ -1058,6 +1120,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->timeoutQueue, NULL);
taosMemoryFree(pThrd->idle);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
}

View File

@ -120,8 +120,9 @@ int transInitBuffer(SConnBuffer* buf) {
buf->total = 0;
return 0;
}
int transDestroyBuffer(SConnBuffer* buf) {
taosMemoryFree(buf->buf);
int transDestroyBuffer(SConnBuffer* p) {
taosMemoryFree(p->buf);
p->buf = NULL;
return 0;
}

View File

@ -73,6 +73,7 @@ typedef struct SWorkThrd {
uv_os_fd_t fd;
uv_loop_t* loop;
SAsyncPool* asyncPool;
uv_prepare_t* prepare;
queue msg;
TdThreadMutex msgMtx;
@ -112,6 +113,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvShutDownCb(uv_shutdown_t* req, int status);
static void uvPrepareCb(uv_prepare_t* handle);
/*
* time-consuming task throwed into BG work thread
@ -238,8 +240,6 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code;
// transClearBuffer(&pConn->readBuf);
pConn->inType = pHead->msgType;
if (pConn->status == ConnNormal) {
if (pHead->persist == 1) {
@ -546,6 +546,52 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
uv_close((uv_handle_t*)req->handle, uvDestroyConn);
taosMemoryFree(req);
}
static void uvPrepareCb(uv_prepare_t* handle) {
// prepare callback
SWorkThrd* pThrd = handle->data;
SAsyncPool* pool = pThrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
while (!QUEUE_IS_EMPTY(&wq)) {
queue* head = QUEUE_HEAD(&wq);
QUEUE_REMOVE(head);
SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q);
if (msg == NULL) {
tError("unexcept occurred, continue");
continue;
}
// release handle to rpc init
if (msg->type == Quit) {
(*transAsyncHandle[msg->type])(msg, pThrd);
continue;
} else {
STransMsg transMsg = msg->msg;
SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1);
transReleaseExHandle(transGetRefMgt(), refId);
destroySmsg(msg);
continue;
}
msg->pConn = exh1->handle;
transReleaseExHandle(transGetRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd);
}
}
}
}
static void uvWorkDoTask(uv_work_t* req) {
// doing time-consumeing task
@ -695,13 +741,17 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
}
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
// int r = uv_pipe_open(pThrd->pipe, pThrd->fd);
pThrd->pipe->data = pThrd;
QUEUE_INIT(&pThrd->msg);
taosThreadMutexInit(&pThrd->msgMtx, NULL);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
uv_prepare_start(pThrd->prepare, uvPrepareCb);
pThrd->prepare->data = pThrd;
// conn set
QUEUE_INIT(&pThrd->conn);
@ -986,6 +1036,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transAsyncPoolDestroy(pThrd->asyncPool);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
}

View File

@ -139,7 +139,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern;
regex_t idxRegPattern;
SArray* pLogInfoArray = taosArrayInit(8, sizeof(SWalFileInfo));
SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
@ -159,7 +159,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
SWalFileInfo fileInfo;
memset(&fileInfo, -1, sizeof(SWalFileInfo));
sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
taosArrayPush(pLogInfoArray, &fileInfo);
taosArrayPush(actualLog, &fileInfo);
}
}
@ -167,10 +167,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
regfree(&logRegPattern);
regfree(&idxRegPattern);
taosArraySort(pLogInfoArray, compareWalFileInfo);
taosArraySort(actualLog, compareWalFileInfo);
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
int actualFileNum = taosArrayGetSize(pLogInfoArray);
int actualFileNum = taosArrayGetSize(actualLog);
#if 0
for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) {
@ -196,11 +196,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
} else if (metaFileNum < actualFileNum) {
for (int i = metaFileNum; i < actualFileNum; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
SWalFileInfo* pFileInfo = taosArrayGet(actualLog, i);
taosArrayPush(pWal->fileInfoSet, pFileInfo);
}
}
taosArrayDestroy(pLogInfoArray);
taosArrayDestroy(actualLog);
pWal->writeCur = actualFileNum - 1;
if (actualFileNum > 0) {
@ -221,7 +221,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
int code = walSaveMeta(pWal);
if (code < 0) {
taosArrayDestroy(pLogInfoArray);
taosArrayDestroy(actualLog);
return -1;
}
}

View File

@ -423,37 +423,38 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
return 0;
}
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
wDebug("vgId:%d wal start to read ver %ld", pRead->pWal->cfg.vgId, ver);
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
wDebug("vgId:%d wal start to read ver %ld", pReader->pWal->cfg.vgId, ver);
int64_t contLen;
int32_t code;
bool seeked = false;
if (pRead->pWal->vers.firstVer == -1) {
if (pReader->pWal->vers.firstVer == -1) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
if (pRead->curInvalid || pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
if (pReader->curInvalid || pReader->curVersion != ver) {
if (walReadSeekVer(pReader, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
return -1;
}
seeked = true;
}
while (1) {
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, ver);
walReadSeekVerImpl(pReader, ver);
seeked = true;
continue;
} else {
@ -467,26 +468,26 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
}
}
contLen = walValidHeadCksum(pRead->pHead);
if (contLen != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId,
code = walValidHeadCksum(pReader->pHead);
if (code != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (pRead->capacity < pRead->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pRead->pHead->head.bodyLen);
if (pReader->capacity < pReader->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
pRead->pHead = ptr;
pRead->capacity = pRead->pHead->head.bodyLen;
pReader->pHead = ptr;
pReader->capacity = pReader->pHead->head.bodyLen;
}
if ((contLen = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
pRead->pHead->head.bodyLen) {
if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, pReader->pHead->head.bodyLen)) !=
pReader->pHead->head.bodyLen) {
if (contLen < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else {
@ -496,25 +497,28 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
return -1;
}
if (pRead->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver);
pRead->curInvalid = 1;
if (pReader->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
pReader->pHead->head.version, ver);
pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
contLen = walValidBodyCksum(pRead->pHead);
if (contLen != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
code = walValidBodyCksum(pReader->pHead);
if (code != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId,
ver);
pRead->curInvalid = 1;
uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen);
uint32_t logCkSum = pReader->pHead->cksumBody;
wError("checksum written into log: %u, checksum calculated: %u", logCkSum, readCkSum);
pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
pRead->curVersion++;
pReader->curVersion++;
return 0;
}

View File

@ -289,18 +289,25 @@ int32_t walEndSnapshot(SWal *pWal) {
newTotSize -= iter->fileSize;
}
}
char fnameStr[WAL_FILE_LEN];
int32_t actualDelete = 0;
char fnameStr[WAL_FILE_LEN];
// remove file
for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
goto UPDATE_META;
}
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
ASSERT(0);
}
actualDelete++;
}
UPDATE_META:
// make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
taosArrayPopFrontBatch(pWal->fileInfoSet, actualDelete);
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1;
pWal->vers.firstVer = -1;

View File

@ -166,7 +166,7 @@
# ---- query ----
./test.sh -f tsim/query/charScalarFunction.sim
# ./test.sh -f tsim/query/explain.sim
./test.sh -f tsim/query/explain.sim
./test.sh -f tsim/query/interval-offset.sim
./test.sh -f tsim/query/interval.sim
./test.sh -f tsim/query/scalarFunction.sim