Merge pull request #22592 from taosdata/fix/m23.0

Fix/m23.0
This commit is contained in:
dapan1121 2023-08-28 17:35:35 +08:00 committed by GitHub
commit 778e50da79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
68 changed files with 699 additions and 1544 deletions

View File

@ -7,7 +7,7 @@ description: This document describes how to query data in TDengine.
## Syntax
```sql
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE()}
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE() | CURRENT_USER() | USER() }
SELECT [hints] [DISTINCT] [TAGS] select_list
from_clause
@ -195,7 +195,7 @@ The following SQL statement returns the number of subtables within the meters su
SELECT COUNT(*) FROM (SELECT DISTINCT TBNAME FROM meters);
```
In the preceding two statements, only tags can be used as filtering conditions in the WHERE clause. For example:
In the preceding two statements, only tags can be used as filtering conditions in the WHERE clause.
**\_QSTART and \_QEND**
@ -245,8 +245,7 @@ You can perform INNER JOIN statements based on the primary key. The following co
3. For supertables, the ON condition must be equivalent to the primary key. In addition, the tag columns of the tables on which the INNER JOIN is performed must have a one-to-one relationship. You cannot specify an OR condition.
4. The tables that are included in a JOIN clause must be of the same type (supertable, standard table, or subtable).
5. You can include subqueries before and after the JOIN keyword.
6. You cannot include more than ten tables in a JOIN clause.
7. You cannot include a FILL clause and a JOIN clause in the same statement.
6. You cannot include a FILL clause and a JOIN clause in the same statement.
## GROUP BY
@ -337,6 +336,12 @@ SELECT TODAY();
SELECT TIMEZONE();
```
### Obtain Current User
```sql
SELECT CURRENT_USER();
```
## Regular Expression
### Syntax
@ -391,7 +396,7 @@ SELECT AVG(CASE WHEN voltage < 200 or voltage > 250 THEN 220 ELSE voltage END) F
## JOIN
TDengine supports the `INTER JOIN` based on the timestamp primary key, that is, the `JOIN` condition must contain the timestamp primary key. As long as the requirement of timestamp-based primary key is met, `INTER JOIN` can be made between normal tables, sub-tables, super tables and sub-queries at will, and there is no limit on the number of tables.
TDengine supports the `INTER JOIN` based on the timestamp primary key, that is, the `JOIN` condition must contain the timestamp primary key. As long as the requirement of timestamp-based primary key is met, `INTER JOIN` can be made between normal tables, sub-tables, super tables and sub-queries at will, and there is no limit on the number of tables, primary key and other conditions must be combined with `AND` operator.
For standard tables:

View File

@ -1275,6 +1275,14 @@ SELECT SERVER_STATUS();
**Description**: The server status.
### CURRENT_USER
```sql
SELECT CURRENT_USER();
```
**Description**: get current user.
## Geometry Functions

View File

@ -178,7 +178,7 @@ The following list shows all reserved keywords:
- MATCH
- MAX_DELAY
- MAX_SPEED
- BWLIMIT
- MAXROWS
- MERGE
- META

View File

@ -22,6 +22,14 @@ SHOW CLUSTER;
Shows information about the current cluster.
## SHOW CLUSTER ALIVE
```sql
SHOW CLUSTER ALIVE;
```
It is used to check whether the cluster is available or not. Return value: 0 means unavailable, 1 means available, 2 means partially available (some dnodes are offline, the other dnodes are available)
## SHOW CONNECTIONS
```sql

View File

@ -17,7 +17,7 @@ You can use the SHOW CONNECTIONS statement to find the conn_id.
## Terminate a Query
```sql
KILL QUERY kill_id;
KILL QUERY 'kill_id';
```
You can use the SHOW QUERIES statement to find the kill_id.

View File

@ -168,6 +168,12 @@ The base API is used to do things like create database connections and provide a
:::
- `TAOS *taos_connect_auth(const char *host, const char *user, const char *auth, const char *db, uint16_t port)`
The function is the same as taos_connect. Except that the pass parameter is replaced by auth, other parameters are the same as taos_connect.
- auth: the 32-bit lowercase md5 of the raw password
- `char *taos_get_server_info(TAOS *taos)`
Get server-side version information.
@ -184,6 +190,14 @@ The base API is used to do things like create database connections and provide a
- If len is less than the space required to store the db (including the last '\0'), an error is returned. The truncated data assigned in the database ends with '\0'.
- If len is greater than or equal to the space required to store the db (including the last '\0'), return normal 0, and assign the db name ending with '\0' in the database.
- `int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)`
Set the event callback function.
- fp: event callback function pointer. Declarationtypedef void (*__taos_notify_fn_t)(void *param, void *ext, int type)Param is a user-defined parameter, ext is an extended parameter (depending on the event type, and returns the user password version for TAOS_NOTIFY_PASSVER), and type is the event type
- param: user-defined parameter
- type: event type. Value range: 1) TAOS_NOTIFY_PASSVER: User password changed
- `void taos_close(TAOS *taos)`
Closes the connection, where `taos` is the handle returned by `taos_connect()`.
@ -307,21 +321,20 @@ The specific functions related to the interface are as follows (see also the [pr
Parse a SQL command, and bind the parsed result and parameter information to `stmt`. If the parameter length is greater than 0, use this parameter as the length of the SQL command. If it is equal to 0, the length of the SQL command will be determined automatically.
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind)`
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind)`
Not as efficient as `taos_stmt_bind_param_batch()`, but can support non-INSERT type SQL statements.
To bind parameters, bind points to an array (representing the row of data to be bound), making sure that the number and order of the elements in this array are the same as the parameters in the SQL statement. taos_bind is used similarly to MYSQL_BIND in MySQL, as defined below.
```c
typedef struct TAOS_BIND {
typedef struct TAOS_MULTI_BIND {
int buffer_type;
void * buffer;
uintptr_t buffer_length; // not in use
uintptr_t * length;
int * is_null;
int is_unsigned; // not in use
int * error; // not in use
} TAOS_BIND;
void *buffer;
uintptr_t buffer_length;
uint32_t *length;
char *is_null;
int num;
} TAOS_MULTI_BIND;
```
- `int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name)`
@ -329,7 +342,7 @@ The specific functions related to the interface are as follows (see also the [pr
(Available in 2.1.1.0 and later versions, only supported for replacing parameter values in INSERT statements)
When the table name in the SQL command uses `? ` placeholder, you can use this function to bind a specific table name.
- `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags)`
- `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_MULTI_BIND* tags)`
(Available in 2.1.2.0 and later versions, only supported for replacing parameter values in INSERT statements)
When the table name and TAGS in the SQL command both use `? `, you can use this function to bind the specific table name and the specific TAGS value. The most typical usage scenario is an INSERT statement that uses the automatic table building function (the current version does not support specifying specific TAGS columns.) The number of columns in the TAGS parameter needs to be the same as the number of TAGS requested in the SQL command.
@ -358,6 +371,14 @@ The specific functions related to the interface are as follows (see also the [pr
Execute the prepared statement. Currently, a statement can only be executed once.
- `int taos_stmt_affected_rows(TAOS_STMT *stmt)`
Gets the number of rows affected by executing bind statements multiple times.
- `int taos_stmt_affected_rows_once(TAOS_STMT *stmt)`
Gets the number of rows affected by executing a bind statement once.
- `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)`
Gets the result set of a statement. Use the result set in the same way as in the non-parametric call. When finished, `taos_free_result()` should be called on this result set to free resources.

View File

@ -30,6 +30,10 @@ The source code of `TDengine.Connector` is hosted on [GitHub](https://github.com
The supported platforms are the same as those supported by the TDengine client driver.
:::note
Please note TDengine does not support 32bit Windows any more.
:::
## Version support
Please refer to [version support list](/reference/connector#version-support)

View File

@ -102,6 +102,8 @@ Usage: taosdump [OPTION...] dbname [tbname ...]
-L, --loose-mode Use loose mode if the table name and column name
use letter and number only. Default is NOT.
-n, --no-escape No escape char '`'. Default is using it.
-Q, --dot-replace Repalce dot character with underline character in
the table name.
-T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is
8.
-C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service

View File

@ -74,7 +74,7 @@ grafana-cli plugins install tdengine-datasource
sudo -u grafana grafana-cli plugins install tdengine-datasource
```
You can also download zip files from [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) or [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) and install manually. The commands are as follows:
You can also download zip files from [GitHub](https://github.com/taosdata/grafanaplugin/releases/latest) or [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) and install manually. The commands are as follows:
```bash
GF_VERSION=3.3.1

View File

@ -51,7 +51,7 @@ void insertData(TAOS *taos) {
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
// bind table name and tags
TAOS_BIND tags[2];
TAOS_MULTI_BIND tags[2];
char *location = "California.SanFrancisco";
int groupId = 2;
tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
@ -144,4 +144,4 @@ int main() {
}
// output:
// successfully inserted 2 rows
// successfully inserted 2 rows

View File

@ -58,7 +58,7 @@ void insertData(TAOS *taos) {
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
// bind table name and tags
TAOS_BIND tags[2];
TAOS_MULTI_BIND tags[2];
char* location = "California.SanFrancisco";
int groupId = 2;
tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
@ -82,7 +82,7 @@ void insertData(TAOS *taos) {
{1648432611749, 12.6, 218, 0.33},
};
TAOS_BIND values[4];
TAOS_MULTI_BIND values[4];
values[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
values[0].buffer_length = sizeof(int64_t);
values[0].length = &values[0].buffer_length;
@ -138,4 +138,4 @@ int main() {
// output:
// successfully inserted 2 rows
// successfully inserted 2 rows

View File

@ -256,6 +256,12 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
:::
- `TAOS *taos_connect_auth(const char *host, const char *user, const char *auth, const char *db, uint16_t port)`
功能同 taos_connect。除 pass 参数替换为 auth 外,其他参数同 taos_connect。
- auth: 原始密码取 32 位小写 md5
- `char *taos_get_server_info(TAOS *taos)`
获取服务端版本信息。
@ -272,6 +278,14 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
- 如果len 小于 存储db需要的空间包含最后的'\0'返回错误database里赋值截断的数据以'\0'结尾。
- 如果len 大于等于 存储db需要的空间包含最后的'\0'返回正常0database里赋值以'\0结尾的db名。
- `int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)`
设置事件回调函数。
- fp 事件回调函数指针。函数声明typedef void (*__taos_notify_fn_t)(void *param, void *ext, int type);其中, param 为用户自定义参数ext 为扩展参数(依赖事件类型,针对 TAOS_NOTIFY_PASSVER 返回用户密码版本)type 为事件类型
- param 用户自定义参数
- type 事件类型。取值范围1TAOS_NOTIFY_PASSVER 用户密码改变
- `void taos_close(TAOS *taos)`
关闭连接,其中`taos`是 `taos_connect()` 返回的句柄。
@ -396,21 +410,20 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
解析一条 SQL 语句,将解析结果和参数信息绑定到 stmt 上,如果参数 length 大于 0将使用此参数作为 SQL 语句的长度,如等于 0将自动判断 SQL 语句的长度。
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind)`
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind)`
不如 `taos_stmt_bind_param_batch()` 效率高,但可以支持非 INSERT 类型的 SQL 语句。
进行参数绑定bind 指向一个数组(代表所要绑定的一行数据),需保证此数组中的元素数量和顺序与 SQL 语句中的参数完全一致。TAOS_BIND 的使用方法与 MySQL 中的 MYSQL_BIND 类似,具体定义如下:
进行参数绑定bind 指向一个数组(代表所要绑定的一行数据),需保证此数组中的元素数量和顺序与 SQL 语句中的参数完全一致。TAOS_MULTI_BIND 的使用方法与 MySQL 中的 MYSQL_BIND 类似,具体定义如下:
```c
typedef struct TAOS_BIND {
typedef struct TAOS_MULTI_BIND {
int buffer_type;
void * buffer;
uintptr_t buffer_length; // not in use
uintptr_t * length;
int * is_null;
int is_unsigned; // not in use
int * error; // not in use
} TAOS_BIND;
void *buffer;
uintptr_t buffer_length;
uint32_t *length;
char *is_null;
int num; // the number of columns
} TAOS_MULTI_BIND;
```
- `int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name)`
@ -418,7 +431,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)
当 SQL 语句中的表名使用了 `?` 占位时,可以使用此函数绑定一个具体的表名。
- `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags)`
- `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_MULTI_BIND* tags)`
2.1.2.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)
当 SQL 语句中的表名和 TAGS 都使用了 `?` 占位时,可以使用此函数绑定具体的表名和具体的 TAGS 取值。最典型的使用场景是使用了自动建表功能的 INSERT 语句(目前版本不支持指定具体的 TAGS 列。TAGS 参数中的列数量需要与 SQL 语句中要求的 TAGS 数量完全一致。
@ -428,17 +441,6 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)
以多列的方式传递待绑定的数据,需要保证这里传递的数据列的顺序、列的数量与 SQL 语句中的 VALUES 参数完全一致。TAOS_MULTI_BIND 的具体定义如下:
```c
typedef struct TAOS_MULTI_BIND {
int buffer_type;
void * buffer;
uintptr_t buffer_length;
uintptr_t * length;
char * is_null;
int num; // the number of columns
} TAOS_MULTI_BIND;
```
- `int taos_stmt_add_batch(TAOS_STMT *stmt)`
将当前绑定的参数加入批处理中,调用此函数后,可以再次调用 `taos_stmt_bind_param()` 或 `taos_stmt_bind_param_batch()` 绑定新的参数。需要注意,此函数仅支持 INSERT/IMPORT 语句,如果是 SELECT 等其他 SQL 语句,将返回错误。
@ -447,6 +449,14 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
执行准备好的语句。目前,一条语句只能执行一次。
- `int taos_stmt_affected_rows(TAOS_STMT *stmt)`
获取执行多次绑定语句影响的行数。
- `int taos_stmt_affected_rows_once(TAOS_STMT *stmt)`
获取执行一次绑定语句影响的行数。
- `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)`
获取语句的结果集。结果集的使用方式与非参数化调用时一致,使用完成后,应对此结果集调用 `taos_free_result()` 以释放资源。

View File

@ -29,6 +29,10 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx"
支持的平台和 TDengine 客户端驱动支持的平台一致。
:::note
注意 TDengine 不再支持 32 位 Windows 平台。
:::
## 版本支持
请参考[版本支持列表](../#版本支持)

View File

@ -7,7 +7,7 @@ description: 查询数据的详细语法
## 查询语法
```sql
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE()}
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE() | CURRENT_USER() | USER() }
SELECT [hints] [DISTINCT] [TAGS] select_list
from_clause
@ -205,7 +205,7 @@ SELECT table_name, tag_name, tag_type, tag_value FROM information_schema.ins_tag
SELECT COUNT(*) FROM (SELECT DISTINCT TBNAME FROM meters);
```
以上两个查询均只支持在 WHERE 条件子句中添加针对标签TAGS的过滤条件。例如:
以上两个查询均只支持在 WHERE 条件子句中添加针对标签TAGS的过滤条件。
**\_QSTART/\_QEND**
@ -247,8 +247,7 @@ TDengine 支持基于时间戳主键的 INNER JOIN规则如下
3. 对于超级表ON 条件在时间戳主键的等值条件之外,还要求有可以一一对应的标签列等值条件,不支持 OR 条件。
4. 参与 JOIN 计算的表只能是同一种类型,即只能都是超级表,或都是子表,或都是普通表。
5. JOIN 两侧均支持子查询。
6. 参与 JOIN 的表个数上限为 10 个。
7. 不支持与 FILL 子句混合使用。
6. 不支持与 FILL 子句混合使用。
## GROUP BY
@ -339,6 +338,12 @@ SELECT TODAY();
SELECT TIMEZONE();
```
### 获取当前用户
```sql
SELECT CURRENT_USER();
```
## 正则表达式过滤
### 语法
@ -392,7 +397,7 @@ SELECT AVG(CASE WHEN voltage < 200 or voltage > 250 THEN 220 ELSE voltage END) F
## JOIN 子句
TDengine 支持基于时间戳主键的内连接,即 JOIN 条件必须包含时间戳主键。只要满足基于时间戳主键这个要求,普通表、子表、超级表和子查询之间可以随意的进行内连接,且对表个数没有限制。
TDengine 支持基于时间戳主键的内连接,即 JOIN 条件必须包含时间戳主键。只要满足基于时间戳主键这个要求,普通表、子表、超级表和子查询之间可以随意的进行内连接,且对表个数没有限制,其它连接条件与主键间必须是 AND 操作
普通表与普通表之间的 JOIN 操作:

View File

@ -1266,6 +1266,14 @@ SELECT SERVER_STATUS();
**说明**:检测服务端是否所有 dnode 都在线,如果是则返回成功,否则返回无法建立连接的错误。
### CURRENT_USER
```sql
SELECT CURRENT_USER();
```
**说明**:获取当前用户。
## Geometry 函数

2
docs/zh/12-taos-sql/12-distinguished.md Normal file → Executable file
View File

@ -31,7 +31,7 @@ select max(current) from meters partition by location interval(10m)
## 窗口切分查询
TDengine 支持按时间窗口切分方式进行聚合结果查询,比如温度传感器每秒采集一次数据,但需查询每隔 10 分钟的温度平均值。这种场景下可以使用窗口子句来获得需要的查询结果。窗口子句用于针对查询的数据集合按照窗口切分成为查询子集并进行聚合窗口包含时间窗口time window、状态窗口status window、会话窗口session window件窗口event window四种窗口。其中时间窗口又可划分为滑动时间窗口和翻转时间窗口。
TDengine 支持按时间窗口切分方式进行聚合结果查询,比如温度传感器每秒采集一次数据,但需查询每隔 10 分钟的温度平均值。这种场景下可以使用窗口子句来获得需要的查询结果。窗口子句用于针对查询的数据集合按照窗口切分成为查询子集并进行聚合窗口包含时间窗口time window、状态窗口status window、会话窗口session window件窗口event window四种窗口。其中时间窗口又可划分为滑动时间窗口和翻转时间窗口。
窗口子句语法如下:

View File

@ -178,7 +178,7 @@ description: TDengine 保留关键字的详细列表
- MATCH
- MAX_DELAY
- MAX_SPEED
- BWLIMIT
- MAXROWS
- MERGE
- META

View File

@ -22,6 +22,14 @@ SHOW CLUSTER;
显示当前集群的信息
## SHOW CLUSTER ALIVE
```sql
SHOW CLUSTER ALIVE;
```
查询当前集群的状态是否可用,返回值: 0不可用 1完全可用 2部分可用集群中部分节点下线但其它节点仍可以正常使用
## SHOW CONNECTIONS
```sql

View File

@ -17,7 +17,7 @@ conn_id 可以通过 `SHOW CONNECTIONS` 获取。
## 终止查询
```sql
KILL QUERY kill_id;
KILL QUERY 'kill_id';
```
kill_id 可以通过 `SHOW QUERIES` 获取。

View File

@ -105,6 +105,8 @@ Usage: taosdump [OPTION...] dbname [tbname ...]
-L, --loose-mode Using loose mode if the table name and column name
use letter and number only. Default is NOT.
-n, --no-escape No escape char '`'. Default is using it.
-Q, --dot-replace Repalce dot character with underline character in
the table name.
-T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is
8.
-C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service

View File

@ -133,6 +133,7 @@
<configuration>
<source>8</source>
<target>8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>

View File

@ -8,4 +8,4 @@ java -jar target/taosdemo-2.0.1-jar-with-dependencies.jar -host <hostname> -data
```
如果发生错误 Exception in thread "main" java.lang.UnsatisfiedLinkError: no taos in java.library.path
请检查是否安装 TDengine 客户端安装包或编译 TDengine 安装。如果确定已经安装过还出现这个错误,可以在命令行 java 后加 -Djava.library.path=/usr/local/lib 来指定寻找共享库的路径。
请检查是否安装 TDengine 客户端安装包或编译 TDengine 安装。如果确定已经安装过还出现这个错误,可以在命令行 java 后加 -Djava.library.path=/usr/lib 来指定寻找共享库的路径。

View File

@ -113,7 +113,7 @@
#define TK_TABLE_PREFIX 95
#define TK_TABLE_SUFFIX 96
#define TK_NK_COLON 97
#define TK_MAX_SPEED 98
#define TK_BWLIMIT 98
#define TK_START 99
#define TK_TIMESTAMP 100
#define TK_END 101
@ -356,6 +356,7 @@
#define TK_WAL 338
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
#define TK_NK_ILLEGAL 602

View File

@ -59,7 +59,7 @@ typedef struct SDataSinkMgtCfg {
uint32_t maxDataBlockNumPerQuery;
} SDataSinkMgtCfg;
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI);
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager);
typedef struct SInputData {
const struct SSDataBlock* pData;
@ -83,7 +83,7 @@ typedef struct SOutputData {
* @param pHandle output
* @return error code
*/
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat);

View File

@ -114,7 +114,7 @@ pipeline {
sync_source("${BRANCH_NAME}")
sh '''
if [ "${verMode}" = "all" ];then
verMode="community enterprise"
verMode="enterprise"
fi
verModeList=${verMode}
for verModeSin in ${verModeList}
@ -123,18 +123,6 @@ pipeline {
bash testpackage.sh -m ${verModeSin} -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar
python3 checkPackageRuning.py
done
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh -m community -f server -l true -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t tar
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh -m community -f server -l false -c x64 -v ${version} -o ${baseVersion} -s ${sourcePath} -t deb
python3 checkPackageRuning.py
'''
}
}

View File

@ -37,6 +37,7 @@ else
${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/tdef.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
[ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h || :
${csudo}rm -f ${lib_link_dir}/libtaos.* || :

View File

@ -98,6 +98,7 @@ cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_pat
cp ${compile_dir}/../include/client/taos.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/common/taosdef.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/util/taoserror.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/util/tdef.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/libs/function/taosudf.h ${pkg_dir}${install_home_path}/include
[ -f ${compile_dir}/build/include/taosws.h ] && cp ${compile_dir}/build/include/taosws.h ${pkg_dir}${install_home_path}/include ||:
cp -r ${top_dir}/examples/* ${pkg_dir}${install_home_path}/examples

View File

@ -95,6 +95,7 @@ cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driv
cp %{_compiledir}/../include/client/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/common/taosdef.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/util/taoserror.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/util/tdef.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/libs/function/taosudf.h %{buildroot}%{homepath}/include
[ -f %{_compiledir}/build/include/taosws.h ] && cp %{_compiledir}/build/include/taosws.h %{buildroot}%{homepath}/include ||:
#cp -r %{_compiledir}/../src/connector/python %{buildroot}%{homepath}/connector
@ -217,6 +218,7 @@ if [ $1 -eq 0 ];then
${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/tdef.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${lib_link_dir}/libtaos.* || :

View File

@ -345,7 +345,7 @@ function install_jemalloc() {
}
function install_header() {
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/tdef.h ${inc_link_dir}/taosudf.h || :
[ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h || :
@ -353,6 +353,7 @@ function install_header() {
${csudo}ln -sf ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo}ln -sf ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
${csudo}ln -sf ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -sf ${install_main_dir}/include/tdef.h ${inc_link_dir}/tdef.h
${csudo}ln -sf ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
[ -f ${install_main_dir}/include/taosws.h ] && ${csudo}ln -sf ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h || :
@ -935,7 +936,7 @@ function updateProduct() {
fi
echo
echo -e "\033[44;32;1m${productName2} is updated successfully!${NC}"
echo -e "\033[44;32;1mTo manage ${productName2} instance, view documentation and explorer features, you need to install ${clientName2}Explorer ${NC}"
echo -e "\033[44;32;1mTo manage ${productName2} instance, view documentation or explorer features, please install ${clientName2}Explorer ${NC}"
else
install_bin
install_config
@ -1028,7 +1029,7 @@ function installProduct() {
fi
echo -e "\033[44;32;1m${productName2} is installed successfully!${NC}"
echo -e "\033[44;32;1mTo manage ${productName2} instance, view documentation and explorer features, you need to install ${clientName2}Explorer ${NC}"
echo -e "\033[44;32;1mTo manage ${productName2} instance, view documentation or explorer features, please install ${clientName2}Explorer ${NC}"
echo
else # Only install client
install_bin

View File

@ -180,10 +180,11 @@ function install_lib() {
}
function install_header() {
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/tdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}cp -f ${script_dir}/inc/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/*
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
${csudo}ln -s ${install_main_dir}/include/tdef.h ${inc_link_dir}/tdef.h
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h

View File

@ -158,7 +158,6 @@ function install_bin() {
${csudo}rm -f ${bin_link_dir}/udfd || :
${csudo}rm -f ${bin_link_dir}/taosdemo || :
${csudo}rm -f ${bin_link_dir}/taosdump || :
${csudo}rm -f ${bin_link_dir}/taosx || :
${csudo}rm -f ${bin_link_dir}/${uninstallScript} || :
if [ "$osType" != "Darwin" ]; then
@ -348,9 +347,9 @@ function install_lib() {
function install_header() {
${csudo}mkdir -p ${inc_link_dir}
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/tdef.h ${inc_link_dir}/taosudf.h || :
[ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h ||:
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/util/tdef.h ${source_dir}/include/libs/function/taosudf.h \
${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/*
if [ -f ${binary_dir}/build/include/taosws.h ]; then
@ -361,6 +360,7 @@ function install_header() {
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h > /dev/null 2>&1
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h > /dev/null 2>&1
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h > /dev/null 2>&1
${csudo}ln -s ${install_main_dir}/include/tdef.h ${inc_link_dir}/tdef.h > /dev/null 2>&1
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h > /dev/null 2>&1
${csudo}chmod 644 ${install_main_dir}/include/*

View File

@ -83,7 +83,7 @@ else
wslib_files="${build_dir}/lib/libtaosws.dylib"
fi
header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h"
header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/util/tdef.h ${code_dir}/include/libs/function/taosudf.h"
wsheader_files="${build_dir}/include/taosws.h"
if [ "$dbName" != "taos" ]; then

View File

@ -115,7 +115,7 @@ else
lib_files="${build_dir}/lib/libtaos.so.${version}"
wslib_files="${build_dir}/lib/libtaosws.so"
fi
header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h"
header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/util/tdef.h ${code_dir}/include/libs/function/taosudf.h"
wsheader_files="${build_dir}/include/taosws.h"

View File

@ -133,12 +133,13 @@ function kill_taosd() {
function install_include() {
log_print "start install include from ${inc_dir} to ${inc_link_dir}"
${csudo}mkdir -p ${inc_link_dir}
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/tdef.h ${inc_link_dir}/taosudf.h || :
[ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h ||:
${csudo}ln -s ${inc_dir}/taos.h ${inc_link_dir}/taos.h
${csudo}ln -s ${inc_dir}/taosdef.h ${inc_link_dir}/taosdef.h
${csudo}ln -s ${inc_dir}/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -s ${inc_dir}/tdef.h ${inc_link_dir}/tdef.h
${csudo}ln -s ${inc_dir}/taosudf.h ${inc_link_dir}/taosudf.h
[ -f ${inc_dir}/taosws.h ] && ${csudo}ln -sf ${inc_dir}/taosws.h ${inc_link_dir}/taosws.h ||:

View File

@ -143,6 +143,7 @@ ${csudo}rm -f ${cfg_link_dir}/*.new || :
${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/tdef.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${lib_link_dir}/libtaos.* || :
${csudo}rm -f ${lib64_link_dir}/libtaos.* || :

View File

@ -155,6 +155,7 @@ function clean_header() {
${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/tdef.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
[ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h || :

View File

@ -73,6 +73,7 @@ function clean_header() {
${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/tdef.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
}

View File

@ -697,14 +697,14 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
SSchema* pSchema = req.schemaRow.pSchema + i;
SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
SField field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pColumns, &field);
}
pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
SSchema* pSchema = req.schemaTag.pSchema + i;
SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
SField field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pTags, &field);
}

View File

@ -218,7 +218,16 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable) {
if (strlen(oneTable->childTableName) == 0) {
SArray *dst = taosArrayDup(oneTable->tags, NULL);
RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName};
ASSERT(oneTable->sTableNameLen < TSDB_TABLE_NAME_LEN);
char superName[TSDB_TABLE_NAME_LEN] = {0};
RandTableName rName = {dst, NULL, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName};
if(tsSmlDot2Underline){
memcpy(superName, oneTable->sTableName, oneTable->sTableNameLen);
smlStrReplace(superName, oneTable->sTableNameLen);
rName.stbFullName = superName;
}else{
rName.stbFullName = oneTable->sTableName;
}
buildChildTableName(&rName);
taosArrayDestroy(dst);
@ -230,6 +239,9 @@ void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tin
char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0};
size_t nLen = strlen(tinfo->childTableName);
memcpy(key, currElement->measure, currElement->measureLen);
if(tsSmlDot2Underline){
smlStrReplace(key, currElement->measureLen);
}
memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen);
void *uid =
taosHashGet(info->tableUids, key,

View File

@ -986,7 +986,7 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
int32_t tmq_unsubscribe(tmq_t* tmq) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
if (tmq->status != TMQ_CONSUMER_STATUS__READY) {
tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
tscInfo("consumer:0x%" PRIx64 " not in ready state, unsubscribe it directly", tmq->consumerId);
return 0;
}
if (tmq->autoCommit) {

View File

@ -1407,8 +1407,9 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
void* pData = colDataGetData(pSrc, rowIdx);
bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
void* pData = NULL;
if (!isNull) pData = colDataGetData(pSrc, rowIdx);
colDataSetVal(pDst, 0, pData, isNull);
}

View File

@ -3240,136 +3240,161 @@ static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *p
return numOfRows;
}
#define BUILD_COL_FOR_INFO_DB 1
#define BUILD_COL_FOR_PERF_DB 1 << 1
#define BUILD_COL_FOR_USER_DB 1 << 2
#define BUILD_COL_FOR_ALL_DB (BUILD_COL_FOR_INFO_DB | BUILD_COL_FOR_PERF_DB | BUILD_COL_FOR_USER_DB)
static int32_t buildSysDbColsInfo(SSDataBlock *p, char *db, char *tb) {
static int32_t buildSysDbColsInfo(SSDataBlock *p, int8_t buildWhichDBs, char *tb) {
size_t size = 0;
const SSysTableMeta *pSysDbTableMeta = NULL;
if (db[0] && strncmp(db, TSDB_INFORMATION_SCHEMA_DB, TSDB_DB_FNAME_LEN) != 0 &&
strncmp(db, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_DB_FNAME_LEN) != 0) {
return p->info.rows;
if (buildWhichDBs & BUILD_COL_FOR_INFO_DB) {
getInfosDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
}
getInfosDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
getPerfDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
if (buildWhichDBs & BUILD_COL_FOR_PERF_DB) {
getPerfDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
}
return p->info.rows;
}
static int8_t determineBuildColForWhichDBs(const char* db) {
int8_t buildWhichDBs;
if (!db[0])
buildWhichDBs = BUILD_COL_FOR_ALL_DB;
else {
char *p = strchr(db, '.');
if (p && strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB) == 0) {
buildWhichDBs = BUILD_COL_FOR_INFO_DB;
} else if (p && strcmp(p + 1, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
buildWhichDBs = BUILD_COL_FOR_PERF_DB;
} else {
buildWhichDBs = BUILD_COL_FOR_USER_DB;
}
}
return buildWhichDBs;
}
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
uint8_t buildWhichDBs;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SStbObj *pStb = NULL;
int32_t numOfRows = 0;
buildWhichDBs = determineBuildColForWhichDBs(pShow->db);
if (!pShow->sysDbRsp) {
numOfRows = buildSysDbColsInfo(pBlock, pShow->db, pShow->filterTb);
numOfRows = buildSysDbColsInfo(pBlock, buildWhichDBs, pShow->filterTb);
mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db);
pShow->sysDbRsp = true;
}
SDbObj *pDb = NULL;
if (strlen(pShow->db) > 0) {
pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return terrno;
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE");
bool fetch = pShow->restore ? false : true;
pShow->restore = false;
while (numOfRows < rows) {
if (fetch) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
} else {
fetch = true;
void *pKey = taosHashGetKey(pShow->pIter, NULL);
pStb = sdbAcquire(pSdb, SDB_STB, pKey);
if (!pStb) continue;
if (buildWhichDBs & BUILD_COL_FOR_USER_DB) {
SDbObj *pDb = NULL;
if (strlen(pShow->db) > 0) {
pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL && TSDB_CODE_MND_DB_NOT_EXIST != terrno && pBlock->info.rows == 0) return terrno;
}
if (pDb != NULL && pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb);
continue;
}
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
if (pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0) {
sdbRelease(pSdb, pStb);
continue;
}
if ((numOfRows + pStb->numOfColumns) > rows) {
pShow->restore = true;
if (numOfRows == 0) {
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
rows, pStb->numOfColumns, pStb->name, pStb->db);
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE");
bool fetch = pShow->restore ? false : true;
pShow->restore = false;
while (numOfRows < rows) {
if (fetch) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
} else {
fetch = true;
void *pKey = taosHashGetKey(pShow->pIter, NULL);
pStb = sdbAcquire(pSdb, SDB_STB, pKey);
if (!pStb) continue;
}
sdbRelease(pSdb, pStb);
break;
}
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db)));
for (int i = 0; i < pStb->numOfColumns; i++) {
int32_t cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, typeName, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, pStb->pColumns[i].name);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, colName, false);
// col type
int8_t colType = pStb->pColumns[i].type;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
if (pDb != NULL && pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb);
continue;
}
varDataSetLen(colTypeStr, colTypeLen);
colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false);
while (cols < pShow->numOfColumns) {
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
if (pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0) {
sdbRelease(pSdb, pStb);
continue;
}
if ((numOfRows + pStb->numOfColumns) > rows) {
pShow->restore = true;
if (numOfRows == 0) {
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
rows, pStb->numOfColumns, pStb->name, pStb->db);
}
sdbRelease(pSdb, pStb);
break;
}
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db)));
for (int i = 0; i < pStb->numOfColumns; i++) {
int32_t cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, typeName, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, pStb->pColumns[i].name);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, colName, false);
// col type
int8_t colType = pStb->pColumns[i].type;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(colTypeStr, colTypeLen);
colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false);
while (cols < pShow->numOfColumns) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
}
numOfRows++;
}
numOfRows++;
sdbRelease(pSdb, pStb);
}
sdbRelease(pSdb, pStb);
}
if (pDb != NULL) {
mndReleaseDb(pMnode, pDb);
if (pDb != NULL) {
mndReleaseDb(pMnode, pDb);
}
}
pShow->numOfRows += numOfRows;

View File

@ -1572,6 +1572,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
if (pStream->status != STREAM_STATUS__PAUSE) {
sdbRelease(pMnode->pSdb, pStream);
return 0;
}

View File

@ -883,20 +883,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
// atomic_add_fetch_32(&pHandle->epoch, 1);
atomic_store_32(&pHandle->epoch, 0);
// kill executing task
// if(tqIsHandleExec(pHandle)) {
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
// if (pTaskInfo != NULL) {
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
// }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
// }
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
}

View File

@ -305,7 +305,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
}
SWalCont* pCont = &pReader->pHead->head;
int64_t ver = pCont->version;
int64_t ver = pCont->version;
if (ver > maxVer) {
tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
return TSDB_CODE_SUCCESS;

View File

@ -235,36 +235,33 @@ static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
return 0;
}
ASSERT(TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 1);
SSttLvl *lvl;
TARRAY2_FOREACH(committer->ctx->fset->lvlArr, lvl) {
STFileObj *fobj = NULL;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
SSttFileReader *sttReader;
SSttLvl *lvl = TARRAY2_FIRST(committer->ctx->fset->lvlArr);
SSttFileReaderConfig config = {
.tsdb = committer->tsdb,
.szPage = committer->szPage,
.file = fobj->f[0],
};
ASSERT(lvl->level == 0);
code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
STFileObj *fobj = NULL;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
SSttFileReader *sttReader;
code = TARRAY2_APPEND(committer->sttReaderArray, sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
SSttFileReaderConfig config = {
.tsdb = committer->tsdb,
.szPage = committer->szPage,
.file = fobj->f[0],
};
STFileOp op = {
.optype = TSDB_FOP_REMOVE,
.fid = fobj->f->fid,
.of = fobj->f[0],
};
code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->sttReaderArray, sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
STFileOp op = {
.optype = TSDB_FOP_REMOVE,
.fid = fobj->f->fid,
.of = fobj->f[0],
};
code = TARRAY2_APPEND(committer->fopArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->fopArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
_exit:

View File

@ -120,7 +120,15 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
};
if (dataBlk->hasDup) {
code = tsdbReadDataBlockEx(reader, dataBlk, ctx->blockData);
tBlockDataReset(ctx->blockData);
int16_t aCid = 0;
STSchema tSchema = {0};
TABLEID tbid = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
code = tBlockDataInit(ctx->blockData, &tbid, &tSchema, &aCid, 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadDataBlock(reader, dataBlk, ctx->blockData);
TSDB_CHECK_CODE(code, lino, _exit);
record.count = 1;
@ -334,6 +342,8 @@ static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArr
int32_t code = 0;
int32_t lino = 0;
tsdbInfo("vgId:%d upgrade file set start, fid:%d", TD_VID(tsdb->pVnode), pDFileSet->fid);
SDataFReader *reader;
STFileSet *fset;
@ -366,6 +376,8 @@ static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArr
code = TARRAY2_APPEND(fileSetArray, fset);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbInfo("vgId:%d upgrade file set end, fid:%d", TD_VID(tsdb->pVnode), pDFileSet->fid);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);

View File

@ -224,6 +224,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
}
taosCloseQueue(pDeleter->pDataBlocks);
taosThreadMutexDestroy(&pDeleter->mutex);
taosMemoryFree(pDeleter->pManager);
return TSDB_CODE_SUCCESS;
}
@ -279,6 +281,8 @@ _end:
if (deleter != NULL) {
destroyDataSinker((SDataSinkHandle*)deleter);
taosMemoryFree(deleter);
} else {
taosMemoryFree(pManager);
}
return code;
}

View File

@ -234,6 +234,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
}
taosCloseQueue(pDispatcher->pDataBlocks);
taosThreadMutexDestroy(&pDispatcher->mutex);
taosMemoryFree(pDispatcher->pManager);
return TSDB_CODE_SUCCESS;
}
@ -248,7 +249,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
dispatcher->sink.fPut = putDataBlock;
dispatcher->sink.fEndPut = endPut;
@ -266,8 +267,13 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
if (NULL == dispatcher->pDataBlocks) {
taosMemoryFree(dispatcher);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
*pHandle = dispatcher;
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFree(pManager);
return terrno;
}

View File

@ -395,6 +395,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
taosMemoryFree(pInserter->pParam);
taosHashCleanup(pInserter->pCols);
taosThreadMutexDestroy(&pInserter->mutex);
taosMemoryFree(pInserter->pManager);
return TSDB_CODE_SUCCESS;
}
@ -411,7 +413,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
if (NULL == inserter) {
taosMemoryFree(pParam);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
@ -431,23 +433,18 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
int64_t suid = 0;
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
if (code) {
destroyDataSinker((SDataSinkHandle*)inserter);
taosMemoryFree(inserter);
return code;
terrno = code;
goto _return;
}
if (pInserterNode->stableId != suid) {
destroyDataSinker((SDataSinkHandle*)inserter);
taosMemoryFree(inserter);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return terrno;
goto _return;
}
inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
taosThreadMutexInit(&inserter->mutex, NULL);
if (NULL == inserter->pDataBlocks) {
destroyDataSinker((SDataSinkHandle*)inserter);
taosMemoryFree(inserter);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -471,4 +468,15 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
*pHandle = inserter;
return TSDB_CODE_SUCCESS;
_return:
if (inserter) {
destroyDataSinker((SDataSinkHandle*)inserter);
taosMemoryFree(inserter);
} else {
taosMemoryFree(pManager);
}
return terrno;
}

View File

@ -18,12 +18,17 @@
#include "planner.h"
#include "tarray.h"
static SDataSinkManager gDataSinkManager = {0};
SDataSinkStat gDataSinkStat = {0};
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI) {
gDataSinkManager.cfg = *cfg;
gDataSinkManager.pAPI = pAPI;
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) {
SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager));
if (NULL == pSinkManager) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSinkManager->cfg = *cfg;
pSinkManager->pAPI = pAPI;
*ppSinkManager = pSinkManager;
return 0; // to avoid compiler eror
}
@ -33,18 +38,22 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) {
return 0;
}
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
SDataSinkManager* pManager = pSinkManager;
switch ((int)nodeType(pDataSink)) {
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
return createDataDispatcher(pManager, pDataSink, pHandle);
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam);
return createDataDeleter(pManager, pDataSink, pHandle, pParam);
}
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam);
return createDataInserter(pManager, pDataSink, pHandle, pParam);
}
default:
break;
}
taosMemoryFree(pSinkManager);
qError("invalid input node type:%d, %s", nodeType(pDataSink), id);
return TSDB_CODE_QRY_INVALID_INPUT;
}

View File

@ -531,23 +531,25 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
goto _error;
}
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
goto _error;
}
if (handle) {
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
void* pSinkManager = NULL;
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
goto _error;
}
void* pSinkParam = NULL;
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
taosMemoryFree(pSinkManager);
goto _error;
}
// pSinkParam has been freed during create sinker.
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
}
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);

View File

@ -64,6 +64,7 @@ typedef struct SFillOperatorInfo {
static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order);
static void destroyFillOperatorInfo(void* param);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static void fillResetPrevForNewGroup(SFillInfo* pFillInfo);
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
SResultInfo* pResultInfo, int32_t order) {
@ -84,6 +85,9 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
if (pInfo->pFillInfo->type == TSDB_FILL_PREV || pInfo->pFillInfo->type == TSDB_FILL_LINEAR) {
fillResetPrevForNewGroup(pInfo->pFillInfo);
}
int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
@ -122,6 +126,15 @@ void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int
pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
}
static void fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
if (!pFillInfo->pFillCol[colIdx].notFillCol) {
SGroupKeys* key = taosArrayGet(pFillInfo->prev.pRowVal, colIdx);
key->isNull = true;
}
}
}
// todo refactor: decide the start key according to the query time range.
static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order) {
if (order == TSDB_ORDER_ASC) {
@ -164,6 +177,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, i
}
// todo time window chosen problem: t or prev value?
if (t > pInfo->pFillInfo->start) t -= pInterval->sliding;
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t);
}
}
@ -825,6 +839,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
(pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) {
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
resetFillWindow(&pFillSup->prev);
pFillSup->prev.key = pFillSup->cur.key;
pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
} else if (hasPrevWindow(pFillSup)) {
@ -838,6 +853,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
resetFillWindow(&pFillSup->next);
pFillSup->next.key = pFillSup->cur.key;
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
pFillInfo->preRowKey = INT64_MIN;
@ -1216,8 +1232,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
SWinKey nextKey = {.groupId = groupId, .ts = ts};
while (pInfo->srcDelRowIndex < pBlock->info.rows) {
void* nextVal = NULL;
int32_t nextLen = 0;
TSKEY delTs = tsStarts[pInfo->srcDelRowIndex];
uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex];
int32_t code = TSDB_CODE_SUCCESS;
@ -1232,7 +1246,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
if (delTs == nextKey.ts) {
code = pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur);
if (code == TSDB_CODE_SUCCESS) {
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextLen);
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, NULL, NULL);
}
// ts will be deleted later
if (delTs != ts) {

View File

@ -973,7 +973,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
char* pSrcData = colDataGetData(pSrcCol, rowIndex);
char* pSrcData = NULL;
if (!isNull) pSrcData = colDataGetData(pSrcCol, rowIndex);
colDataSetVal(pDestCol, pDest->info.rows, pSrcData, isNull);
}
pDest->info.rows++;

View File

@ -1353,7 +1353,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
char* pSrcData = colDataGetData(pSrcCol, i);
char* pSrcData = NULL;
if (!isNull) pSrcData = colDataGetData(pSrcCol, i);
colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
}
pResult->info.rows++;

View File

@ -286,7 +286,7 @@ retention(A) ::= NK_VARIABLE(B) NK_COLON NK_VARIABLE(C).
%type speed_opt { int32_t }
%destructor speed_opt { }
speed_opt(A) ::= . { A = 0; }
speed_opt(A) ::= MAX_SPEED NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); }
speed_opt(A) ::= BWLIMIT NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); }
start_opt(A) ::= . { A = NULL; }
start_opt(A) ::= START WITH NK_INTEGER(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }

View File

@ -138,7 +138,7 @@ static SKeyword keywordTable[] = {
{"MATCH", TK_MATCH},
{"MAXROWS", TK_MAXROWS},
{"MAX_DELAY", TK_MAX_DELAY},
{"MAX_SPEED", TK_MAX_SPEED},
{"BWLIMIT", TK_BWLIMIT},
{"MERGE", TK_MERGE},
{"META", TK_META},
{"ONLY", TK_ONLY},

File diff suppressed because it is too large Load Diff

View File

@ -286,7 +286,7 @@ TEST_F(ParserShowToUseTest, trimDatabase) {
run("TRIM DATABASE wxy_db");
setTrimDbReq("wxy_db", 100);
run("TRIM DATABASE wxy_db MAX_SPEED 100");
run("TRIM DATABASE wxy_db BWLIMIT 100");
}
TEST_F(ParserShowToUseTest, useDatabase) {

View File

@ -889,13 +889,16 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
char *data = taosMemoryMalloc(compressSize);
gzFile dstFp = NULL;
TdFilePtr pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
TdFilePtr pFile = NULL;
TdFilePtr pSrcFile = NULL;
pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
if (pSrcFile == NULL) {
ret = -1;
goto cmp_end;
}
TdFilePtr pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
ret = -2;
goto cmp_end;
@ -914,6 +917,9 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
}
cmp_end:
if (pFile) {
taosCloseFile(&pFile);
}
if (pSrcFile) {
taosCloseFile(&pSrcFile);
}

View File

@ -773,6 +773,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/odbc.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill_with_group.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py
,,n,system-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/insertMix.py -N 3

View File

@ -215,7 +215,13 @@ class TDTestCase:
for t in range (2):
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
tdSql.checkEqual(20470,len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdSql.checkEqual(193, len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult))
def ins_dnodes_check(self):
tdSql.execute('drop database if exists db2')
tdSql.execute('create database if not exists db2 vgroups 1 replica 1')

View File

@ -147,13 +147,13 @@ class VNode :
if self.lastVer != -1 and ret:
# first wal file ignore
if walFile.startVer == self.firstVer:
tdLog.info(f" {walFile.pathFile} can del, but is first. snapVer={self.snapVer} firstVer={self.firstVer}")
tdLog.info(f" can del {walFile.pathFile}, but is first. snapVer={self.snapVer} firstVer={self.firstVer}")
return False
# ver in stay range
smallVer = self.snapVer - self.walStayRange -1
if walFile.startVer >= smallVer:
tdLog.info(f" {walFile.pathFile} can del, but range not arrived. snapVer={self.snapVer} smallVer={smallVer}")
tdLog.info(f" can del {walFile.pathFile}, but range not arrived. snapVer={self.snapVer} smallVer={smallVer}")
return False
return ret
@ -161,9 +161,20 @@ class VNode :
# get log size
def getWalsSize(self):
size = 0
lastSize = 0
max = -1
for walFile in self.walFiles:
size += walFile.fsize
if self.canDelete(walFile) == False:
tdLog.info(f" calc vnode size {walFile.pathFile} size={walFile.fsize} startVer={walFile.startVer}")
size += walFile.fsize
if max < walFile.startVer:
max = walFile.startVer
lastSize = walFile.fsize
if lastSize > 0:
tdLog.info(f" last file size need reduct . lastSize={lastSize}")
size -= lastSize
return size
# vnode
@ -183,7 +194,7 @@ class VNode :
delTs = delTsLine.timestamp()
for walFile in self.walFiles:
mt = datetime.fromtimestamp(walFile.mtime)
info = f" {walFile.pathFile} mt={mt} line={delTsLine} start={walFile.startVer} snap={self.snapVer} end= {walFile.endVer}"
info = f" {walFile.pathFile} size={walFile.fsize} mt={mt} line={delTsLine} start={walFile.startVer} snap={self.snapVer} end= {walFile.endVer}"
tdLog.info(info)
if walFile.mtime < delTs and self.canDelete(walFile):
# wait a moment then check file exist
@ -199,25 +210,16 @@ class VNode :
if self.walSize == 0:
return True
time.sleep(2)
vnodeSize = self.getWalsSize()
if vnodeSize < self.walSize:
tdLog.info(f" wal size valid. {self.path} real = {vnodeSize} set = {self.walSize} ")
# need over 20%
if vnodeSize < self.walSize * 1.2:
tdLog.info(f" wal size valid. {self.path} real = {vnodeSize} set = {self.walSize}. allow over 20%.")
return True
# check valid
tdLog.info(f" wal size over set. {self.path} real = {vnodeSize} set = {self.walSize} ")
for walFile in self.walFiles:
if self.canDelete(walFile):
# wait a moment then check file exist
time.sleep(1)
if os.path.exists(walFile.pathFile):
tdLog.exit(f" wal file size over .\
\n wal file = {walFile.pathFile}\
\n snapVer = {self.snapVer}\
\n real = {vnodeSize} bytes\
\n set = {self.walSize} bytes")
return False
return True
# check over
tdLog.exit(f" wal size over set. {self.path} real = {vnodeSize} set = {self.walSize} ")
return False
# insert by async

View File

@ -0,0 +1,153 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 10
self.rowsPerTbl = 10000
self.duraion = '1h'
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, paraDict):
colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
tdLog.debug("%s"%(sqlString))
tsql.execute(sqlString)
return
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
for i in range(ctbNum):
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \
(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
tsql.execute(sqlString)
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (i < ctbNum/2):
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10)
else:
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10)
rowsBatched += 1
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
if sql != pre_insert:
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'test',
'dropFlag': 1,
'vgroups': 2,
'stbName': 'meters',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
'ctbPrefix': 't',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'batchNum': 3000,
'startTs': 1537146000000,
'tsStep': 600000}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create database")
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
tdLog.info("create stb")
self.create_stable(tsql=tdSql, paraDict=paraDict)
tdLog.info("create child tables")
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \
stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
return
def test_partition_by_with_interval_fill_prev_new_group_fill_error(self):
## every table has 1500 rows after fill, 10 tables, total 15000 rows.
## there is no data from 9-17 08:00:00 ~ 9-17 09:00:00, so first 60 rows of every group will be NULL, cause no prev value.
sql = "select _wstart, count(*),tbname from meters where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-18 09:00:00.000' partition by tbname interval(1m) fill(PREV) order by tbname, _wstart"
tdSql.query(sql)
for i in range(0,10):
for j in range(0,60):
tdSql.checkData(i*1500+j, 1, None)
sql = "select _wstart, count(*),tbname from meters where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-18 09:00:00.000' partition by tbname interval(1m) fill(LINEAR) order by tbname, _wstart"
tdSql.query(sql)
for i in range(0,10):
for j in range(0,60):
tdSql.checkData(i*1500+j, 1, None)
def test_fill_with_order_by(self):
sql = "select _wstart, _wend, count(ts), sum(c1) from meters where ts > '2018-11-25 00:00:00.000' and ts < '2018-11-26 00:00:00.00' interval(1d) fill(NULL) order by _wstart"
tdSql.query(sql)
tdSql.checkRows(1)
sql = "select _wstart, _wend, count(ts), sum(c1) from meters where ts > '2018-11-25 00:00:00.000' and ts < '2018-11-26 00:00:00.00' interval(1d) fill(NULL) order by _wstart desc"
tdSql.query(sql)
tdSql.checkRows(1)
def run(self):
self.prepareTestEnv()
self.test_partition_by_with_interval_fill_prev_new_group_fill_error()
self.test_fill_with_order_by()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -67,7 +67,7 @@ class TDTestCase:
tdSql.query(f"select distinct tbname from {dbname}.`sys_if_bytes_out`")
tdSql.checkRows(2)
tdSql.query(f"select * from {dbname}.t_fc70dec6677d4277c5d9799c4da806da order by times")
tdSql.query(f"select * from {dbname}.t_f67972b49aa8adf8bca5d0d54f0d850d order by times")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1.300000000)
tdSql.checkData(1, 1, 13.000000000)

View File

@ -237,7 +237,7 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
if totalConsumeRows < expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")

View File

@ -278,7 +278,7 @@ void shellRunSingleCommandWebsocketImp(char *command) {
}
if (code == TSDB_CODE_WS_SEND_TIMEOUT
|| code == TSDB_CODE_WS_RECV_TIMEOUT) {
fprintf(stderr, "Hint: use -t to increase the timeout in seconds\n");
fprintf(stderr, "Hint: use -T to increase the timeout in seconds\n");
} else if (code == TSDB_CODE_WS_INTERNAL_ERRO
|| code == TSDB_CODE_WS_CLOSED) {
shell.ws_conn = NULL;
@ -373,8 +373,6 @@ void shellRunSingleCommandWebsocketImp(char *command) {
} else {
printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows,
(et - st)/1E6);
printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
execute_time, net_time, total_time);
}
}
printf("\n");

View File

@ -1534,6 +1534,7 @@ int sml_ts3724_Test() {
const char *sql[] = {
"stb.2,t1=1 f1=283i32 1632299372000",
"stb_2,t1=1 f1=283i32 1632299372000",
".stb2,t1=1 f1=106i32 1632299378000",
"stb2.,t1=1 f1=106i32 1632299378000",
};
@ -1548,6 +1549,12 @@ int sml_ts3724_Test() {
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
pRes = taos_query(taos, "select * from stb_2");
TAOS_ROW row = taos_fetch_row(pRes);
int numRows = taos_affected_rows(pRes);
ASSERT(numRows == 1);
taos_free_result(pRes);
taos_close(taos);
return code;