merge: from main to 3.0 branch (#30296)

* chore(ci): statistics ci report

* chore(ci): statistics ci report never fail

* chore(ci): statistics ci report sort by size desc

* Update .github/workflows/taosd-ci-build.yml

* docs: upgrade the node.js connector version (#30264)

* feat: change the max value of minFreeDiskSize to 2TB (#30250)

* feat: change the max value of minFreeDiskSize to 2TB

* fix: CI test case change

* enh: remove useless comment

* docs(query): enhance time window clause descriptions in the documentation

* docs: fix image position in DST document (#30266)

* fix node.js example error (#30287)

* test: node example case

* fix: node.js example error

* fix: restore test case validation

* fix(stream): fix race condition in send msg. (#30277)

Co-authored-by: 54liuyao <54liuyao@163.com>
Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com>
Co-authored-by: wangmm0220 <wangmm0220@gmail.com>
Co-authored-by: yihaoDeng <luomoxyz@126.com>

* add c stmt2 example to doc (#30286)

* add: stmt2 insert example

* fix: free all heap

* format code

* add c stmt2 example

* update en doc desc

* mod makefile

---------

Co-authored-by: pengrongkun94@qq.com <pengrongkun94@qq.com>

---------

Co-authored-by: freemine <freemine@yeah.net>
Co-authored-by: WANG Xu <feici02@outlook.com>
Co-authored-by: kevin men <men_shi_bin@163.com>
Co-authored-by: Hongze Cheng <hzcheng@taosdata.com>
Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com>
Co-authored-by: Linhe Huo <linhehuo@gmail.com>
Co-authored-by: Haojun Liao <hjxilinx@users.noreply.github.com>
Co-authored-by: 54liuyao <54liuyao@163.com>
Co-authored-by: wangmm0220 <wangmm0220@gmail.com>
Co-authored-by: yihaoDeng <luomoxyz@126.com>
Co-authored-by: She Yanjie <57549981+sheyanjie-qq@users.noreply.github.com>
Co-authored-by: pengrongkun94@qq.com <pengrongkun94@qq.com>
This commit is contained in:
Simon Guan 2025-03-20 16:54:39 +08:00 committed by GitHub
commit 9b7434d0ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 639 additions and 372 deletions

View File

@ -90,6 +90,16 @@ jobs:
which taosadapter
which taoskeeper
- name: Statistics ldd
run: |
find ${{ github.workspace }}/debug/build/lib -type f -name "*.so" -print0 | xargs -0 ldd || true
find ${{ github.workspace }}/debug/build/bin -type f -print0 | xargs -0 ldd || true
- name: Statistics size
run: |
find ${{ github.workspace }}/debug/build/lib -type f -print0 | xargs -0 ls -lhrS
find ${{ github.workspace }}/debug/build/bin -type f -print0 | xargs -0 ls -lhrS
- name: Start taosd
run: |
cp /etc/taos/taos.cfg ./

View File

@ -191,7 +191,7 @@ INTERVAL(interval_val [, interval_offset])
The time window clause includes 3 sub-clauses:
- INTERVAL clause: used to generate windows of equal time periods, where interval_val specifies the size of each time window, and interval_offset specifies;
- INTERVAL clause: used to generate windows of equal time periods, where interval_val specifies the size of each time window, and interval_offset specifies its starting offset. By default, windows begin at Unix time 0 (1970-01-01 00:00:00 UTC). If interval_offset is specified, the windows start from "Unix time 0 + interval_offset";
- SLIDING clause: used to specify the time the window slides forward;
- FILL: used to specify the filling mode of data in case of missing data in the window interval.

View File

@ -146,9 +146,19 @@ Not supported
```
</TabItem>
<TabItem label="C" value="c">
The example code for binding parameters with stmt2 (TDengine v3.3.5.0 or higher is required) is as follows:
```c
{{#include docs/examples/c/stmt2_insert_demo.c}}
```
The example code for binding parameters with stmt is as follows:
```c
{{#include docs/examples/c/stmt_insert_demo.c}}
```
</TabItem>
<TabItem label="REST API" value="rest">
Not supported

View File

@ -55,7 +55,7 @@ When network I/O and other processing resources are not bottlenecks, by optimizi
Generally, when TDengine needs to select a mount point from the same level to create a new data file, it uses a round-robin strategy for selection. However, in reality, each disk may have different capacities, or the same capacity but different amounts of data written, leading to an imbalance in available space on each disk. In practice, this may result in selecting a disk with very little remaining space.
To address this issue, starting from 3.1.1.0, a new configuration minDiskFreeSize was introduced. When the available space on a disk is less than or equal to this threshold, that disk will no longer be selected for generating new data files. The unit of this configuration item is bytes, and its value should be greater than 2GB, i.e., mount points with less than 2GB of available space will be skipped.
To address this issue, starting from 3.1.1.0, a new configuration minDiskFreeSize was introduced. When the available space on a disk is less than or equal to this threshold, that disk will no longer be selected for generating new data files. The unit of this configuration item is bytes. If its value is set as 2GB, i.e., mount points with less than 2GB of available space will be skipped.
Starting from version 3.3.2.0, a new configuration `disable_create_new_file` has been introduced to control the prohibition of generating new files on a certain mount point. The default value is `false`, which means new files can be generated on each mount point by default.

View File

@ -170,7 +170,7 @@ The effective value of charset is UTF-8.
|tempDir | |Not supported |Specifies the directory for generating temporary files during system operation, default value /tmp|
|minimalDataDirGB | |Not supported |Minimum space to be reserved in the time-series data storage directory specified by dataDir, in GB, default value 2|
|minimalTmpDirGB | |Not supported |Minimum space to be reserved in the temporary file directory specified by tempDir, in GB, default value 1|
|minDiskFreeSize |After 3.1.1.0|Supported, effective immediately |When the available space on a disk is less than or equal to this threshold, the disk will no longer be selected for generating new data files, unit is bytes, range 52428800-1073741824, default value 52428800; Enterprise parameter|
|minDiskFreeSize |After 3.1.1.0|Supported, effective immediately |When the available space on a disk is less than or equal to this threshold, the disk will no longer be selected for generating new data files, unit is bytes, range 52428800-2199023255552, default value 52428800; Enterprise parameter|
|s3MigrateIntervalSec|After 3.3.4.3|Supported, effective immediately |Trigger cycle for automatic upload of local data files to S3, in seconds. Minimum: 600; Maximum: 100000. Default value 3600; Enterprise parameter|
|s3MigrateEnabled |After 3.3.4.3|Supported, effective immediately |Whether to automatically perform S3 migration, default value is 0, which means auto S3 migration is off, can be set to 1; Enterprise parameter|
|s3Accesskey |After 3.3.4.3|Supported, effective after restart|Colon-separated user SecretId:SecretKey, for example AKIDsQmwsfKxTo2A6nGVXZN0UlofKn6JRRSJ:lIdoy99ygEacU7iHfogaN2Xq0yumSm1E; Enterprise parameter|

View File

@ -112,7 +112,7 @@ The differences between NULL, NULL_F, VALUE, VALUE_F filling modes for different
Time windows can be divided into sliding time windows and tumbling time windows.
The INTERVAL clause is used to generate windows of equal time periods, and SLIDING is used to specify the time the window slides forward. Each executed query is a time window, and the time window slides forward as time flows. When defining continuous queries, it is necessary to specify the size of the time window (time window) and the forward sliding times for each execution. As shown, [t0s, t0e], [t1s, t1e], [t2s, t2e] are the time window ranges for three continuous queries, and the sliding time range is indicated by sliding time. Query filtering, aggregation, and other operations are performed independently for each time window. When SLIDING is equal to INTERVAL, the sliding window becomes a tumbling window.
The INTERVAL clause is used to generate windows of equal time periods, and SLIDING is used to specify the time the window slides forward. Each executed query is a time window, and the time window slides forward as time flows. When defining continuous queries, it is necessary to specify the size of the time window (time window) and the forward sliding times for each execution. As shown, [t0s, t0e], [t1s, t1e], [t2s, t2e] are the time window ranges for three continuous queries, and the sliding time range is indicated by sliding time. Query filtering, aggregation, and other operations are performed independently for each time window. When SLIDING is equal to INTERVAL, the sliding window becomes a tumbling window. By default, windows begin at Unix time 0 (1970-01-01 00:00:00 UTC). If interval_offset is specified, the windows start from "Unix time 0 + interval_offset".
<figure>
<Image img={imgStep01} alt=""/>

View File

@ -25,6 +25,7 @@ Support all platforms that can run Node.js.
| Node.js Connector Version | Major Changes | TDengine Version |
| ------------------------- | ------------------------------------------------------------------------ | --------------------------- |
| 3.1.5 | Password supports special characters. | - |
| 3.1.4 | Modified the readme.| - |
| 3.1.3 | Upgraded the es5-ext version to address vulnerabilities in the lower version. | - |
| 3.1.2 | Optimized data protocol and parsing, significantly improved performance. | - |

View File

@ -9,6 +9,7 @@ TARGETS = connect_example \
with_reqid_demo \
sml_insert_demo \
stmt_insert_demo \
stmt2_insert_demo \
tmq_demo
SOURCES = connect_example.c \
@ -18,6 +19,7 @@ SOURCES = connect_example.c \
with_reqid_demo.c \
sml_insert_demo.c \
stmt_insert_demo.c \
stmt2_insert_demo.c \
tmq_demo.c
LIBS = -ltaos -lpthread

View File

@ -0,0 +1,204 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o stmt2_insert_demo stmt2_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "taos.h"
#define NUM_OF_SUB_TABLES 10
#define NUM_OF_ROWS 10
/**
* @brief Executes an SQL query and checks for errors.
*
* @param taos Pointer to TAOS connection.
* @param sql SQL query string.
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
fprintf(stderr, "Error: %s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
}
/**
* @brief Checks return status and exits if an error occurs.
*
* @param stmt2 Pointer to TAOS_STMT2.
* @param code Error code.
* @param msg Error message prefix.
*/
void checkErrorCode(TAOS_STMT2 *stmt2, int code, const char *msg) {
if (code != 0) {
fprintf(stderr, "%s. Code: %d, Error: %s\n", msg, code, taos_stmt2_error(stmt2));
taos_stmt2_close(stmt2);
exit(EXIT_FAILURE);
}
}
/**
* @brief Prepares data bindings for batch insertion.
*
* @param table_name Pointer to store allocated table names.
* @param tags Pointer to store allocated tag bindings.
* @param params Pointer to store allocated parameter bindings.
*/
void prepareBindData(char ***table_name, TAOS_STMT2_BIND ***tags, TAOS_STMT2_BIND ***params) {
*table_name = (char **)malloc(NUM_OF_SUB_TABLES * sizeof(char *));
*tags = (TAOS_STMT2_BIND **)malloc(NUM_OF_SUB_TABLES * sizeof(TAOS_STMT2_BIND *));
*params = (TAOS_STMT2_BIND **)malloc(NUM_OF_SUB_TABLES * sizeof(TAOS_STMT2_BIND *));
for (int i = 0; i < NUM_OF_SUB_TABLES; i++) {
// Allocate and assign table name
(*table_name)[i] = (char *)malloc(20 * sizeof(char));
sprintf((*table_name)[i], "d_bind_%d", i);
// Allocate memory for tags data
int *gid = (int *)malloc(sizeof(int));
int *gid_len = (int *)malloc(sizeof(int));
*gid = i;
*gid_len = sizeof(int);
char *location = (char *)malloc(20 * sizeof(char));
int *location_len = (int *)malloc(sizeof(int));
*location_len = sprintf(location, "location_%d", i);
(*tags)[i] = (TAOS_STMT2_BIND *)malloc(2 * sizeof(TAOS_STMT2_BIND));
(*tags)[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_INT, gid, gid_len, NULL, 1};
(*tags)[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, location, location_len, NULL, 1};
// Allocate memory for columns data
(*params)[i] = (TAOS_STMT2_BIND *)malloc(4 * sizeof(TAOS_STMT2_BIND));
int64_t *ts = (int64_t *)malloc(NUM_OF_ROWS * sizeof(int64_t));
float *current = (float *)malloc(NUM_OF_ROWS * sizeof(float));
int *voltage = (int *)malloc(NUM_OF_ROWS * sizeof(int));
float *phase = (float *)malloc(NUM_OF_ROWS * sizeof(float));
int32_t *ts_len = (int32_t *)malloc(NUM_OF_ROWS * sizeof(int32_t));
int32_t *current_len = (int32_t *)malloc(NUM_OF_ROWS * sizeof(int32_t));
int32_t *voltage_len = (int32_t *)malloc(NUM_OF_ROWS * sizeof(int32_t));
int32_t *phase_len = (int32_t *)malloc(NUM_OF_ROWS * sizeof(int32_t));
(*params)[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_TIMESTAMP, ts, ts_len, NULL, NUM_OF_ROWS};
(*params)[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_FLOAT, current, current_len, NULL, NUM_OF_ROWS};
(*params)[i][2] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_INT, voltage, voltage_len, NULL, NUM_OF_ROWS};
(*params)[i][3] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_FLOAT, phase, phase_len, NULL, NUM_OF_ROWS};
for (int j = 0; j < NUM_OF_ROWS; j++) {
struct timeval tv;
gettimeofday(&tv, NULL);
ts[j] = tv.tv_sec * 1000LL + tv.tv_usec / 1000 + j;
current[j] = (float)rand() / RAND_MAX * 30;
voltage[j] = rand() % 300;
phase[j] = (float)rand() / RAND_MAX;
ts_len[j] = sizeof(int64_t);
current_len[j] = sizeof(float);
voltage_len[j] = sizeof(int);
phase_len[j] = sizeof(float);
}
}
}
/**
* @brief Frees allocated memory for binding data.
*
* @param table_name Pointer to allocated table names.
* @param tags Pointer to allocated tag bindings.
* @param params Pointer to allocated parameter bindings.
*/
void freeBindData(char ***table_name, TAOS_STMT2_BIND ***tags, TAOS_STMT2_BIND ***params) {
for (int i = 0; i < NUM_OF_SUB_TABLES; i++) {
free((*table_name)[i]);
for (int j = 0; j < 2; j++) {
free((*tags)[i][j].buffer);
free((*tags)[i][j].length);
}
free((*tags)[i]);
for (int j = 0; j < 4; j++) {
free((*params)[i][j].buffer);
free((*params)[i][j].length);
}
free((*params)[i]);
}
free(*table_name);
free(*tags);
free(*params);
}
/**
* @brief Inserts data using the TAOS stmt2 API.
*
* @param taos Pointer to TAOS connection.
*/
void insertData(TAOS *taos) {
TAOS_STMT2_OPTION option = {0, false, false, NULL, NULL};
TAOS_STMT2 *stmt2 = taos_stmt2_init(taos, &option);
if (!stmt2) {
fprintf(stderr, "Failed to initialize TAOS statement.\n");
exit(EXIT_FAILURE);
}
// stmt2 prepare sql
checkErrorCode(stmt2, taos_stmt2_prepare(stmt2, "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)", 0),
"Statement preparation failed");
char **table_name;
TAOS_STMT2_BIND **tags, **params;
prepareBindData(&table_name, &tags, &params);
// stmt2 bind batch
TAOS_STMT2_BINDV bindv = {NUM_OF_SUB_TABLES, table_name, tags, params};
checkErrorCode(stmt2, taos_stmt2_bind_param(stmt2, &bindv, -1), "Parameter binding failed");
// stmt2 exec batch
int affected;
checkErrorCode(stmt2, taos_stmt2_exec(stmt2, &affected), "Execution failed");
printf("Successfully inserted %d rows.\n", affected);
// free and close
freeBindData(&table_name, &tags, &params);
taos_stmt2_close(stmt2);
}
int main() {
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
exit(EXIT_FAILURE);
}
// create database and table
executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
insertData(taos);
taos_close(taos);
taos_cleanup();
}

View File

@ -4,7 +4,7 @@
"main": "index.js",
"license": "MIT",
"dependencies": {
"@tdengine/websocket": "^3.1.2"
"@tdengine/websocket": "^3.1.5"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"

View File

@ -1,4 +1,3 @@
const { sleep } = require("@tdengine/websocket");
const taos = require("@tdengine/websocket");
// ANCHOR: create_consumer
@ -52,6 +51,12 @@ async function prepare() {
await wsSql.close();
}
const delay = function(ms) {
return new Promise(function(resolve) {
setTimeout(resolve, ms);
});
};
async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
@ -60,7 +65,7 @@ async function insert() {
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 50; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
await sleep(100);
await delay(100);
}
await wsSql.close();
}

View File

@ -182,7 +182,7 @@ INTERVAL(interval_val [, interval_offset])
```
时间窗口子句包括 3 个子句:
- INTERVAL 子句用于产生相等时间周期的窗口interval_val 指定每个时间窗口的大小interval_offset 指定窗口偏移量;
- INTERVAL 子句用于产生相等时间周期的窗口interval_val 指定每个时间窗口的大小interval_offset 指定窗口偏移量;默认情况下,窗口是从 Unix time 01970-01-01 00:00:00 UTC开始划分的如果设置了 interval_offset那么窗口的划分将从 “Unix time 0 + interval_offset” 开始;
- SLIDING 子句:用于指定窗口向前滑动的时间;
- FILL用于指定窗口区间数据缺失的情况下数据的填充模式。

View File

@ -141,9 +141,20 @@ stmt 绑定参数的示例代码如下:
```
</TabItem>
<TabItem label="C" value="c">
stmt2 绑定参数的示例代码如下(需要 TDengine v3.3.5.0 及以上):
```c
{{#include docs/examples/c/stmt2_insert_demo.c}}
```
stmt 绑定参数的示例代码如下:
```c
{{#include docs/examples/c/stmt_insert_demo.c}}
```
</TabItem>
<TabItem label="REST API" value="rest">
不支持

View File

@ -56,7 +56,7 @@ dataDir /mnt/data6 2 0
一般情况下,当 TDengine 要从同级挂载点中选择一个用于生成新的数据文件时,采用 round robin 策略进行选择。但现实中有可能每个磁盘的容量不相同,或者容量相同但写入的数据量不相同,这就导致会出现每个磁盘上的可用空间不均衡,在实际进行选择时有可能会选择到一个剩余空间已经很小的磁盘。
为了解决这个问题,从 3.1.1.0 开始引入了一个新的配置 minDiskFreeSize当某块磁盘上的可用空间小于等于这个阈值时该磁盘将不再被选择用于生成新的数据文件。该配置项的单位为字节其值应该大于 2GB会跳过可用空间小于 2GB 的挂载点。
为了解决这个问题,从 3.1.1.0 开始引入了一个新的配置 minDiskFreeSize当某块磁盘上的可用空间小于等于这个阈值时该磁盘将不再被选择用于生成新的数据文件。该配置项的单位为字节若配置值大于 2GB会跳过可用空间小于 2GB 的挂载点。
从 3.3.2.0 版本开始,引入了一个新的配置 disable_create_new_file用于控制在某个挂载点上禁止生成新文件其缺省值为 false即每个挂载点上默认都可以生成新文件。

View File

@ -582,9 +582,9 @@ charset 的有效值是 UTF-8。
- 说明:当某块磁盘上的可用空间小于等于这个阈值时,该磁盘将不再被选择用于生成新的数据文件。 **`企业版参数`**
- 类型:整数
- 单位byte
- 默认值52428800
- 最小值52428800
- 最大值:1073741824
- 默认值52428800 (50MB)
- 最小值52428800 (50MB)
- 最大值:2199023255552 (2TB)
- 动态修改:支持通过 SQL 修改,立即生效。
- 支持版本:从 v3.1.0.0 版本开始引入

View File

@ -104,7 +104,7 @@ NULL、NULL_F、VALUE、 VALUE_F 这几种填充模式针对不同场景区别
时间窗口又可分为滑动时间窗口和翻转时间窗口。
INTERVAL 子句用于产生相等时间周期的窗口SLIDING 用以指定窗口向前滑动的时间。每次执行的查询是一个时间窗口时间窗口随着时间流动向前滑动。在定义连续查询的时候需要指定时间窗口time window 大小和每次前向增量时间forward sliding times。如图[t0s, t0e] [t1s , t1e][t2s, t2e] 是分别是执行三次连续查询的时间窗口范围,窗口的前向滑动的时间范围 sliding time 标识 。查询过滤、聚合等操作按照每个时间窗口为独立的单位执行。当 SLIDING 与 INTERVAL 相等的时候,滑动窗口即为翻转窗口。
INTERVAL 子句用于产生相等时间周期的窗口SLIDING 用以指定窗口向前滑动的时间。每次执行的查询是一个时间窗口时间窗口随着时间流动向前滑动。在定义连续查询的时候需要指定时间窗口time window 大小和每次前向增量时间forward sliding times。如图[t0s, t0e] [t1s , t1e][t2s, t2e] 是分别是执行三次连续查询的时间窗口范围,窗口的前向滑动的时间范围 sliding time 标识 。查询过滤、聚合等操作按照每个时间窗口为独立的单位执行。当 SLIDING 与 INTERVAL 相等的时候,滑动窗口即为翻转窗口。默认情况下,窗口是从 Unix time 01970-01-01 00:00:00 UTC开始划分的如果设置了 interval_offset那么窗口的划分将从 “Unix time 0 + interval_offset” 开始。
![TDengine Database 时间窗口示意图](./timewindow-1.webp)

View File

@ -24,6 +24,7 @@ Node.js 连接器源码托管在 [GitHub](https://github.com/taosdata/taos-conne
| Node.js 连接器 版本 | 主要变化 | TDengine 版本 |
| ------------------| ----------------------| ----------------|
| 3.1.5 | 密码支持特殊字符 | - |
| 3.1.4 | 修改 readme | - |
| 3.1.3 | 升级了 es5-ext 版本,解决低版本的漏洞 | - |
| 3.1.2 | 对数据协议和解析进行了优化,性能得到大幅提升| - |

View File

@ -43,6 +43,8 @@ TDengine 在不同组件中均支持使用 IANA 时区(除 Windows taos.cfg
夏令时Daylight Saving TimeDST是一种通过将时间提前一小时以充分利用日光、节约能源的制度。通常在春季开始秋季结束。夏令时的具体开始和结束时间因地区而异。以下均以柏林时间为例对夏令时和夏令时的影响做说明。
![DST Berlin](./02-dst/dst-berlin.png)
按照这个规则,可以看到:
- 柏林当地时间 2024 年 03 月 31 日 02:00:00 到 03:00:00 (不含 03:00:00之间的时间不存在跳变
@ -92,7 +94,7 @@ select * from t1 where ts >= '2024-10-27T01:59:59.000Z';
我们使用下表来展示夏令时在写入和查询中的影响。
![DST Berlin](./02-dst/dst-berlin.png)
![DST Table](./02-dst/dst-table.png)
### 表格说明

View File

@ -606,7 +606,8 @@ typedef enum ELogicConditionType {
#define TFS_MAX_LEVEL (TFS_MAX_TIERS - 1)
#define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024 // 50MB
#define TFS_MIN_DISK_FREE_SIZE_MAX (2ULL * 1024 * 1024 * 1024 * 1024) // 2TB
enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED };
enum { TRANS_OPER_INIT = 0, TRANS_OPER_EXECUTE, TRANS_OPER_ROLLBACK };

View File

@ -2692,6 +2692,7 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
goto _exit;
if (len >= size - 1) {
goto _exit;
}

View File

@ -1000,8 +1000,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER_LAZY,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
// min free disk space used to check if the disk is full [50MB, 1GB]
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, TFS_MIN_DISK_FREE_SIZE_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "streamNotifyMessageSize", tsStreamNotifyMessageSize, 8, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL));

View File

@ -600,7 +600,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool;
pStreamCtrlPool->name = "vnode-stream-ctrl";
pStreamCtrlPool->max = 1;
pStreamCtrlPool->max = 4;
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
SWWorkerPool *pStreamChkPool = &pMgmt->streamChkPool;

View File

@ -862,7 +862,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
int32_t vgId = TD_VID(pVnode);
int64_t suid = pTask->outputInfo.tbSink.stbUid;
const char* id = pTask->id.idStr;
int32_t timeout = 300; // 5min
int32_t timeout = 60; // 1min
int64_t start = taosGetTimestampSec();
while (pTableSinkInfo->uid == 0) {
@ -985,6 +985,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
if (code) {
tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
return code;
} else {
tqDebug("s-task:%s no table name given, generated sub-table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId);
}
} else {
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&

View File

@ -3049,7 +3049,7 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
qInfo("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type);
return;
}
if (qDebugFlag & DEBUG_DEBUG) {
if (qDebugFlag & DEBUG_INFO) {
char* pBuf = NULL;
int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr);
if (code == 0) {
@ -3069,13 +3069,13 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
pBlock->info.version);
return;
}
if (qDebugFlag & DEBUG_DEBUG) {
if (qDebugFlag & DEBUG_INFO) {
char* pBuf = NULL;
char flagBuf[64];
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr);
if (code == 0) {
qDebug("%s", pBuf);
qInfo("%s", pBuf);
taosMemoryFree(pBuf);
}
}

View File

@ -949,7 +949,7 @@ void streamBackendCleanup(void* arg) {
streamMutexDestroy(&pHandle->mutex);
streamMutexDestroy(&pHandle->cfMutex);
stDebug("vgId:%d destroy stream backend:%p", (int32_t) pHandle->vgId, pHandle);
stDebug("vgId:%d destroy stream backend:%p", (int32_t)pHandle->vgId, pHandle);
taosMemoryFree(pHandle);
}
@ -3119,12 +3119,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
break; \
} \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
if (pState->pTdbState->recalc) { \
wrapper = pState->pTdbState->pOwner->pRecalBackend; \
} \
TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1)); \
char toString[128] = {0}; \
if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString))); \
TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString))); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
@ -3137,8 +3134,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
taosMemoryFree(err); \
code = TSDB_CODE_THIRDPARTY_ERROR; \
} else { \
stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
ttlVLen, wrapper); \
stInfo("[InternalERR] write streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, \
funcname, vLen, ttlVLen, wrapper); \
} \
taosMemoryFree(ttlV); \
} while (0);
@ -4261,22 +4258,16 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
int32_t res = 0;
SSessionKey tmpKey = *key;
int32_t valSize = *pVLen;
void* tmp = taosMemoryMalloc(valSize);
if (!tmp) {
return -1;
}
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen);
if (code == 0) {
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
memcpy(tmp, *pVal, valSize);
goto _end;
}
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize);
goto _end;
}
@ -4291,7 +4282,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
if (code == 0) {
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize);
goto _end;
}
}
@ -4299,11 +4289,11 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
*key = tmpKey;
res = 1;
memset(tmp, 0, valSize);
_end:
taosMemoryFreeClear(*pVal);
*pVal = tmp;
if (res == 0 && valSize > *pVLen){
stError("[InternalERR] [skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],valSize:%d bigger than get rocksdb len:%d", key->win.skey, key->win.ekey, key->groupId, valSize, *pVLen);
}
streamStateFreeCur(pCur);
return res;
}
@ -4587,6 +4577,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
void* val, int32_t vlen, int64_t ttl, void* tmpBuf) {
int32_t code = 0;
char buf[128] = {0};
char toString[128] = {0};
char* dst = NULL;
size_t size = 0;
@ -4600,6 +4591,10 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
}
}
int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf);
ginitDict[cfIdx].toStrFunc((void*)key, toString);
qInfo("[InternalERR] write cfIdx:%d key:%s vlen:%d", cfIdx, toString, vlen);
char* ttlV = tmpBuf;
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV);

View File

@ -133,6 +133,7 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg);
if (code != 0) {
rpcFreeCont(buf);
stError("s-task:%s (child %d) failed to send retrieve req to task:0x%x (vgId:%d) QID:0x%" PRIx64 " code:%s",
pTask->id.idStr, pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId, tstrerror(code));
} else {
@ -245,12 +246,13 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) {
void clearBufferedDispatchMsg(SStreamTask* pTask) {
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
streamMutexLock(&pMsgInfo->lock);
if (pMsgInfo->pData != NULL) {
destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask));
}
streamMutexLock(&pMsgInfo->lock);
pMsgInfo->checkpointId = -1;
pMsgInfo->transId = -1;
pMsgInfo->pData = NULL;

View File

@ -1019,11 +1019,13 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
double el = (taosGetTimestampMs() - st) / 1000.0;
if (el > 2.0) { // elapsed more than 5 sec, not occupy the CPU anymore
stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id);
stDebug("s-task:%s occupy more than 2.0s, release the exec threads and idle for 500ms", id);
streamTaskSetIdleInfo(pTask, 500);
return code;
}
}
}
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not

View File

@ -1394,13 +1394,23 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
if (!isLeader) {
streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
} else { // wait for nodeep update if become leader from follower
if (prevStage == NODE_ROLE_FOLLOWER) {
pMeta->startInfo.tasksWillRestart = 1;
}
}
streamMetaWUnLock(pMeta);
if (isLeader) {
if (prevStage == NODE_ROLE_FOLLOWER) {
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64
" restart after nodeEp being updated",
pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
} else {
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
}
streamMetaStartHb(pMeta);
} else {
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,

View File

@ -151,10 +151,16 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true;
pNewPos->beFlushed = true;
if (p) {
memcpy(pNewPos->pRowBuff, p, *pVLen);
} else {
int32_t len = getRowStateRowSize(pFileState);
if (p) {
if (*pVLen > len){
qError("[InternalERR] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],session window buffer is too small, *pVLen:%d, len:%d", pKey->win.skey, pKey->win.ekey, pKey->groupId, *pVLen, len);
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}else{
memcpy(pNewPos->pRowBuff, p, *pVLen);
}
} else {
memset(pNewPos->pRowBuff, 0, len);
}

View File

@ -465,7 +465,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
}
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
streamMetaRLock(pMeta);
streamMetaWLock(pMeta);
SArray* pTaskList = NULL;
int32_t num = taosArrayGetSize(pMeta->pTaskList);
@ -473,7 +473,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
if (num == 0) {
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
streamMetaRUnLock(pMeta);
streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
@ -482,7 +482,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
// send hb msg to mnode before closing all tasks.
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
if (code != TSDB_CODE_SUCCESS) {
streamMetaRUnLock(pMeta);
streamMetaWUnLock(pMeta);
return code;
}
@ -509,7 +509,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
streamMetaRUnLock(pMeta);
streamMetaWUnLock(pMeta);
return code;
}

View File

@ -1097,6 +1097,7 @@ int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
if (vlen != pFileState->rowSize) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
qError("[InternalERR] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],vlen:%d, rowSize:%d", key.win.skey, key.win.ekey, key.groupId, vlen, pFileState->rowSize);
QUERY_CHECK_CODE(code, lino, _end);
}

View File

@ -34,6 +34,7 @@ declare -a TEST_EXES=(
"query_data_demo"
"with_reqid_demo"
"stmt_insert_demo"
"stmt2_insert_demo"
"tmq_demo"
"sml_insert_demo"
)
@ -46,6 +47,7 @@ declare -a NEED_CLEAN=(
"false"
"false"
"false"
"false"
"true"
)

View File

@ -102,7 +102,7 @@ class TDTestCase:
"alias": "tsMinDiskFreeSize",
"values": ["51200K", "100M", "1G"],
"check_values": ["52428800", "104857600", "1073741824"],
"except_values": ["1024K", "1.1G", "1T"]
"except_values": ["1024K", "2049G", "3T"]
},
{
"name": "tmqMaxTopicNum",