Merge remote-tracking branch 'origin/main' into merge/m23

This commit is contained in:
dapan1121 2023-11-15 11:39:18 +08:00
commit dfb3feb5dc
43 changed files with 288 additions and 141 deletions

View File

@ -59,4 +59,4 @@ Query OK, 9 row(s) affected (0.004763s)
## Import using taosdump ## Import using taosdump
A convenient tool for importing and exporting data is provided by TDengine, `taosdump`, which can be used to export data from one TDengine cluster and import into another one. For the details of using `taosdump` please refer to [Tool for exporting and importing data: taosdump](/reference/taosdump). A convenient tool for importing and exporting data is provided by TDengine, `taosdump`, which can be used to export data from one TDengine cluster and import into another one. For the details of using `taosdump` please refer to the taosdump documentation.

View File

@ -19,4 +19,4 @@ The data of table or STable specified by `tb_name` will be exported into a file
## Export Using taosdump ## Export Using taosdump
With `taosdump`, you can choose to export the data of all databases, a database, a table or a STable, you can also choose to export the data within a time range, or even only export the schema definition of a table. For the details of using `taosdump` please refer to [Tool for exporting and importing data: taosdump](/reference/taosdump). With `taosdump`, you can choose to export the data of all databases, a database, a table or a STable, you can also choose to export the data within a time range, or even only export the schema definition of a table. For the details of using `taosdump` please refer to the taosdump documentation.

View File

@ -11,8 +11,6 @@ The collection of the monitoring information is enabled by default, but can be d
TDinsight is a complete solution which uses the monitoring database `log` mentioned previously, and Grafana, to monitor a TDengine cluster. TDinsight is a complete solution which uses the monitoring database `log` mentioned previously, and Grafana, to monitor a TDengine cluster.
Please refer to [TDinsight Grafana Dashboard](../../reference/tdinsight) to learn more details about using TDinsight to monitor TDengine.
A script `TDinsight.sh` is provided to deploy TDinsight automatically. A script `TDinsight.sh` is provided to deploy TDinsight automatically.
Download `TDinsight.sh` with the below command: Download `TDinsight.sh` with the below command:

View File

@ -36,6 +36,7 @@ REST connection supports all platforms that can run Java.
| taos-jdbcdriver version | major changes | TDengine version | | taos-jdbcdriver version | major changes | TDengine version |
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: | | :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
| 3.2.7 | Support VARBINARY and GEOMETRY types, and add time zone support for native connections. Support websocket auto reconnection | 3.2.0.0 or later |
| 3.2.5 | Subscription add committed() and assignment() method | 3.1.0.3 or later | | 3.2.5 | Subscription add committed() and assignment() method | 3.1.0.3 or later |
| 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | - | | 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | - |
| 3.2.3 | Fixed resultSet data parsing failure in some cases | - | | 3.2.3 | Fixed resultSet data parsing failure in some cases | - |
@ -178,7 +179,7 @@ Add following dependency in the `pom.xml` file of your Maven project:
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>3.2.2</version> <version>3.2.7</version>
</dependency> </dependency>
``` ```

View File

@ -218,7 +218,7 @@ The example to query the average system memory usage for the specified interval
### Importing the Dashboard ### Importing the Dashboard
You can install TDinsight dashboard in data source configuration page (like `http://localhost:3000/datasources/edit/1/dashboards`) as a monitoring visualization tool for TDengine cluster. Ensure that you use TDinsight for 3.x. Please note TDinsight for 3.x needs to configure and run taoskeeper correctly. Check the [TDinsight User Manual](/reference/tdinsight/) for the details. You can install TDinsight dashboard in data source configuration page (like `http://localhost:3000/datasources/edit/1/dashboards`) as a monitoring visualization tool for TDengine cluster. Ensure that you use TDinsight for 3.x. Please note TDinsight for 3.x needs to configure and run taoskeeper correctly.
![TDengine Database Grafana plugine import dashboard](./import_dashboard.webp) ![TDengine Database Grafana plugine import dashboard](./import_dashboard.webp)

View File

@ -36,6 +36,7 @@ REST 连接支持所有能运行 Java 的平台。
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 | | taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: | | :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: |
| 3.2.7 | 支持VARBINARY和GEOMETRY类型增加native连接的时区设置支持。增加websocket自动重连功能。 | 3.2.0.0 及更高版本 |
| 3.2.5 | 数据订阅增加 committed()、assignment() 方法 | 3.1.0.3 及更高版本 | | 3.2.5 | 数据订阅增加 committed()、assignment() 方法 | 3.1.0.3 及更高版本 |
| 3.2.4 | 数据订阅在 WebSocket 连接下增加 enable.auto.commit 参数,以及 unsubscribe() 方法。 | - | | 3.2.4 | 数据订阅在 WebSocket 连接下增加 enable.auto.commit 参数,以及 unsubscribe() 方法。 | - |
| 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - | | 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - |
@ -177,7 +178,7 @@ Maven 项目中,在 pom.xml 中添加以下依赖:
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>3.2.2</version> <version>3.2.7</version>
</dependency> </dependency>
``` ```
@ -1097,7 +1098,6 @@ TaosConsumer consumer = new TaosConsumer<>(config);
- httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。 - httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。
其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group) 注意TDengine服务端自3.2.0.0版本开始消息订阅中的auto.offset.reset默认值发生变化。 其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group) 注意TDengine服务端自3.2.0.0版本开始消息订阅中的auto.offset.reset默认值发生变化。
#### 订阅消费数据 #### 订阅消费数据
```java ```java

View File

@ -105,7 +105,7 @@ spec:
# TZ for timezone settings, we recommend to always set it. # TZ for timezone settings, we recommend to always set it.
- name: TZ - name: TZ
value: "Asia/Shanghai" value: "Asia/Shanghai"
# TAOS_ prefix will configured in taos.cfg, strip prefix and camelCase. # Environment variables with prefix TAOS_ will be parsed and converted into corresponding parameter in taos.cfg. For example, serverPort in taos.cfg should be configured by TAOS_SERVER_PORT when using K8S to deploy
- name: TAOS_SERVER_PORT - name: TAOS_SERVER_PORT
value: "6030" value: "6030"
# Must set if you want a cluster. # Must set if you want a cluster.

View File

@ -53,7 +53,7 @@ database_option: {
- 1表示一阶段压缩。 - 1表示一阶段压缩。
- 2表示两阶段压缩。 - 2表示两阶段压缩。
- DURATION数据文件存储数据的时间跨度。可以使用加单位的表示形式如 DURATION 100h、DURATION 10d 等,支持 m分钟、h小时和 d三个单位。不加时间单位时默认单位为天如 DURATION 50 表示 50 天。 - DURATION数据文件存储数据的时间跨度。可以使用加单位的表示形式如 DURATION 100h、DURATION 10d 等,支持 m分钟、h小时和 d三个单位。不加时间单位时默认单位为天如 DURATION 50 表示 50 天。
- WAL_FSYNC_PERIOD当 WAL 参数设置为 2 时,落盘的周期。默认为 3000单位毫秒。最小为 0表示每次写入立即落盘最大为 180000即三分钟。 - WAL_FSYNC_PERIOD当 WAL_LEVEL 参数设置为 2 时,用于设置落盘的周期。默认为 3000单位毫秒。最小为 0表示每次写入立即落盘最大为 180000即三分钟。
- MAXROWS文件块中记录的最大条数默认为 4096 条。 - MAXROWS文件块中记录的最大条数默认为 4096 条。
- MINROWS文件块中记录的最小条数默认为 100 条。 - MINROWS文件块中记录的最小条数默认为 100 条。
- KEEP表示数据文件保存的天数缺省值为 3650取值范围 [1, 365000]且必须大于或等于3倍的 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m分钟、h小时和 d三个单位。也可以不写单位如 KEEP 50此时默认单位为天。企业版支持[多级存储](https://docs.taosdata.com/tdinternal/arch/#%E5%A4%9A%E7%BA%A7%E5%AD%98%E5%82%A8)功能, 因此, 可以设置多个保存时间(多个以英文逗号分隔,最多 3 个,满足 keep 0 <= keep 1 <= keep 2如 KEEP 100h,100d,3650d; 社区版不支持多级存储功能(即使配置了多个保存时间, 也不会生效, KEEP 会取最大的保存时间)。 - KEEP表示数据文件保存的天数缺省值为 3650取值范围 [1, 365000]且必须大于或等于3倍的 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m分钟、h小时和 d三个单位。也可以不写单位如 KEEP 50此时默认单位为天。企业版支持[多级存储](https://docs.taosdata.com/tdinternal/arch/#%E5%A4%9A%E7%BA%A7%E5%AD%98%E5%82%A8)功能, 因此, 可以设置多个保存时间(多个以英文逗号分隔,最多 3 个,满足 keep 0 <= keep 1 <= keep 2如 KEEP 100h,100d,3650d; 社区版不支持多级存储功能(即使配置了多个保存时间, 也不会生效, KEEP 会取最大的保存时间)。

View File

@ -59,4 +59,4 @@ Query OK, 9 row(s) affected (0.004763s)
## taosdump 工具导入 ## taosdump 工具导入
TDengine 提供了方便的数据库导入导出工具 taosdump。用户可以将 taosdump 从一个系统导出的数据,导入到其他系统中。具体使用方法,请参见:[TDengine 数据备份工具: taosdump](/reference/taosdump) TDengine 提供了方便的数据库导入导出工具 taosdump。用户可以将 taosdump 从一个系统导出的数据,导入到其他系统中。具体使用方法,请参考 taosdump 的相关文档

View File

@ -17,5 +17,4 @@ select * from <tb_name> >> data.csv;
## 用 taosdump 导出数据 ## 用 taosdump 导出数据
利用 taosdump用户可以根据需要选择导出所有数据库、一个数据库或者数据库中的一张表所有数据或一时间段的数据甚至仅仅表的定义。具体使用方法请参见 利用 taosdump用户可以根据需要选择导出所有数据库、一个数据库或者数据库中的一张表所有数据或一时间段的数据甚至仅仅表的定义。具体使用方法请参考 taosdump 的相关文档。
[TDengine 数据备份工具: taosdump](/reference/taosdump)。

View File

@ -218,7 +218,7 @@ docker run -d \
### 导入 Dashboard ### 导入 Dashboard
在数据源配置页面,您可以为该数据源导入 TDinsight 面板,作为 TDengine 集群的监控可视化工具。如果 TDengine 服务端为 3.0 版本请选择 `TDinsight for 3.x` 导入。注意 TDinsight for 3.x 需要运行和配置 taoskeeper,相关使用说明请见 [TDinsight 用户手册](/reference/tdinsight/) 在数据源配置页面,您可以为该数据源导入 TDinsight 面板,作为 TDengine 集群的监控可视化工具。如果 TDengine 服务端为 3.0 版本请选择 `TDinsight for 3.x` 导入。注意 TDinsight for 3.x 需要运行和配置 taoskeeper。
![TDengine Database Grafana plugine import dashboard](./import_dashboard.webp) ![TDengine Database Grafana plugine import dashboard](./import_dashboard.webp)

View File

@ -0,0 +1,69 @@
---
title: TSZ 压缩算法
description: TDengine 对浮点数进行高效压缩的算法
---
TSZ 压缩算法是 TDengine 为浮点数据类型提供的可选压缩算法,可以实现浮点数有损至无损全状态压缩,相比默认压缩算法, TSZ 压缩算法压缩率更高,即使切至无损状态,压缩率也会比默认压缩高一倍。
## 适合场景
- TSZ 压缩算法是通过数据预测技术完成的压缩,所以更适合有规律变化的数据
- TSZ 压缩时间会更长一些,如果您的服务器 CPU 空闲多,存储空间小的情况下适合选用
## 使用步骤
- TDengine 支持版本为 3.2.0.0 或以上
- 开启选项
在 taos.cfg 配置中增加以下内容,即可开启 TSZ 压缩算法,功能打开后,会替换默认算法。
以下表示字段类型是 float 及 double 类型都使用此压缩算法,也可以单独只配置一个
```sql
lossyColumns float|double
```
- 配置需重启服务生效
- Taosd 日志输出以下内容,表明功能已生效:
```sql
02/22 10:49:27.607990 00002933 UTL lossyColumns float|double
```
## 配置参数
### fPrecision
FLOAT 类型精度控制:
| 属性 | 说明 |
| -------- | -------------------------------- |
| 适用范围 | 服务器端 |
| 含义 | 设置 float 类型浮点数压缩精度 |
| 取值范围 | 0.1 ~ 0.00000001 |
| 缺省值 | 0.00000001 |
| 补充说明 | 小于此值的浮点数尾数部分将被截取 |
### dPrecision
DOUBLE 类型精度控制:
| 属性 | 说明 |
| -------- | -------------------------------- |
| 适用范围 | 服务器端 |
| 含义 | 设置 double 类型浮点数压缩精度 |
| 取值范围 | 0.1 ~ 0.0000000000000001 |
| 缺省值 | 0.0000000000000001 |
| 补充说明 | 小于此值的浮点数尾数部分将被截取 |
### ifAdtFse
TSZ 压缩中可选择的算法 FSE默认为 HUFFMAN
| 属性 | 说明 |
| -------- | -------------------------------- |
| 适用范围 | 服务器端 |
| 含义 | 使用 FSE 算法替换 HUFFMAN 算法, FSE 算法压缩速度更快,但解压稍慢,追求压缩速度可选用此算法 |
| 取值范围 | 0关闭 1打开 |
| 缺省值 | 0关闭 |
## 注意事项
- 打开 TSZ 后生成的存储数据格式,回退至 3.2.0.0 之前的版本,数据将不能被识别

View File

@ -67,7 +67,7 @@
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>3.0.0</version> <version>3.2.7</version>
<!-- <scope>system</scope>--> <!-- <scope>system</scope>-->
<!-- <systemPath>${project.basedir}/src/main/resources/lib/taos-jdbcdriver-2.0.15-dist.jar</systemPath>--> <!-- <systemPath>${project.basedir}/src/main/resources/lib/taos-jdbcdriver-2.0.15-dist.jar</systemPath>-->
</dependency> </dependency>

View File

@ -1140,3 +1140,5 @@ elif [ "$verType" == "client" ]; then
else else
echo "please input correct verType" echo "please input correct verType"
fi fi

View File

@ -281,6 +281,11 @@ fi
chmod a+x ${install_dir}/install.sh chmod a+x ${install_dir}/install.sh
if [[ $dbName == "taos" ]]; then if [[ $dbName == "taos" ]]; then
cp ${top_dir}/../enterprise/packaging/start-all.sh ${install_dir}
cp ${top_dir}/../enterprise/packaging/stop-all.sh ${install_dir}
cp ${top_dir}/../enterprise/packaging/README.md ${install_dir}
chmod a+x ${install_dir}/start-all.sh
chmod a+x ${install_dir}/stop-all.sh
# Copy example code # Copy example code
mkdir -p ${install_dir}/examples mkdir -p ${install_dir}/examples
examples_dir="${top_dir}/examples" examples_dir="${top_dir}/examples"
@ -360,12 +365,6 @@ if [ "$verMode" == "cluster" ]; then
git clone --depth 1 https://github.com/taosdata/taos-connector-rust ${install_dir}/connector/rust git clone --depth 1 https://github.com/taosdata/taos-connector-rust ${install_dir}/connector/rust
rm -rf ${install_dir}/connector/rust/.git ||: rm -rf ${install_dir}/connector/rust/.git ||:
cp ${top_dir}/../enterprise/packaging/start-all.sh ${install_dir}
cp ${top_dir}/../enterprise/packaging/stop-all.sh ${install_dir}
cp ${top_dir}/../enterprise/packaging/README.md ${install_dir}
chmod a+x ${install_dir}/start-all.sh
chmod a+x ${install_dir}/stop-all.sh
# copy taosx # copy taosx
if [ -d ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ]; then if [ -d ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ]; then
cp -r ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ${install_dir} cp -r ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ${install_dir}

View File

@ -507,8 +507,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
if (tsNumOfTaskQueueThreads >= 10) { if (tsNumOfTaskQueueThreads >= 50) {
tsNumOfTaskQueueThreads = 10; tsNumOfTaskQueueThreads = 50;
} }
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0)
return -1; return -1;

View File

@ -882,7 +882,6 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea
if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) { if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) {
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST; terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
mndReleaseIdx(pMnode, idx.pIdx); mndReleaseIdx(pMnode, idx.pIdx);
goto _OVER; goto _OVER;
} }

View File

@ -678,7 +678,7 @@ _OVER:
return -1; return -1;
} }
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { static int32_t mndPersistTaskDropReq(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -690,7 +690,12 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
STransAction action = {0}; STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet, 0); SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
@ -706,7 +711,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
int32_t sz = taosArrayGetSize(pTasks); int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) { for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j); SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndPersistTaskDropReq(pTrans, pTask) < 0) { if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) {
return -1; return -1;
} }
} }
@ -1085,9 +1090,10 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
STransAction action = {0}; STransAction action = {0};
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
TSDB_CODE_SYN_PROPOSE_NOT_READY); TSDB_CODE_SYN_PROPOSE_NOT_READY);
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
@ -1283,12 +1289,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (pStream == NULL) { if (pStream == NULL) {
if (dropReq.igNotExists) { if (dropReq.igNotExists) {
mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name); mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq); tFreeSMDropStreamReq(&dropReq);
return 0; return 0;
} else { } else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
mError("stream:%s not exist failed to drop", dropReq.name);
tFreeSMDropStreamReq(&dropReq); tFreeSMDropStreamReq(&dropReq);
return -1; return -1;
} }
@ -1660,7 +1667,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { static int32_t mndPauseStreamTask(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) {
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
@ -1673,8 +1680,12 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
STransAction action = {0}; STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet, 0); initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
@ -1682,7 +1693,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
return 0; return 0;
} }
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t mndPauseAllStreamTasks(SMnode* pMnode, STrans *pTrans, SStreamObj *pStream) {
SArray *tasks = pStream->tasks; SArray *tasks = pStream->tasks;
int32_t size = taosArrayGetSize(tasks); int32_t size = taosArrayGetSize(tasks);
@ -1691,7 +1702,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t sz = taosArrayGetSize(pTasks); int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) { for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j); SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndPauseStreamTask(pTrans, pTask) < 0) { if (mndPauseStreamTask(pMnode, pTrans, pTask) < 0) {
return -1; return -1;
} }
@ -1768,7 +1779,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
} }
// pause all tasks // pause all tasks
if (mndPauseAllStreamTasks(pTrans, pStream) < 0) { if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -1795,7 +1806,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { static int32_t mndResumeStreamTask(STrans *pTrans, SMnode* pMnode, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1806,8 +1817,12 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
pReq->igUntreated = igUntreated; pReq->igUntreated = igUntreated;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
STransAction action = {0}; STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet, 0); initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
@ -1815,14 +1830,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
return 0; return 0;
} }
int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUntreated) { int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode* pMnode, SStreamObj *pStream, int8_t igUntreated) {
int32_t size = taosArrayGetSize(pStream->tasks); int32_t size = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i); SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks); int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) { for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j); SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { if (mndResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) {
return -1; return -1;
} }
@ -1831,7 +1846,6 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
} }
} }
} }
// pStream->pHTasksList is null
return 0; return 0;
} }
@ -1884,7 +1898,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
} }
// resume all tasks // resume all tasks
if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) { if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -2534,8 +2548,12 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
STransAction action = {0}; STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet, 0); initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);

View File

@ -831,7 +831,7 @@ struct SDiskDataBuilder {
SBlkInfo bi; SBlkInfo bi;
}; };
typedef struct SLDataIter { struct SLDataIter {
SRBTreeNode node; SRBTreeNode node;
SSttBlk *pSttBlk; SSttBlk *pSttBlk;
int64_t cid; // for debug purpose int64_t cid; // for debug purpose
@ -845,7 +845,7 @@ typedef struct SLDataIter {
SSttBlockLoadInfo *pBlockLoadInfo; SSttBlockLoadInfo *pBlockLoadInfo;
bool ignoreEarlierTs; bool ignoreEarlierTs;
struct SSttFileReader *pReader; struct SSttFileReader *pReader;
} SLDataIter; };
#define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row)) #define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row))

View File

@ -1105,10 +1105,6 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
} }
} }
static void ddxx() {
}
// this function should be executed by only one thread, so we set an sentinel to protect this function // this function should be executed by only one thread, so we set an sentinel to protect this function
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
@ -1155,7 +1151,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs", tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",
id, pTask->execInfo.step1Start, pTask->execInfo.step1El); id, pTask->execInfo.step1Start, pTask->execInfo.step1El);
} else { } else {
tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start); tqDebug("s-task:%s already in step2, no need to scan-history data, step2 startTs:%" PRId64, id,
pTask->execInfo.step2Start);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;

View File

@ -155,7 +155,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
streamMetaInitBackend(pMeta); streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
code = streamMetaLoadAllTasks(pTq->pStreamMeta); code = streamMetaLoadAllTasks(pTq->pStreamMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -168,12 +168,15 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq); tqResetStreamTaskStatus(pTq);
streamMetaWUnLock(pMeta);
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
} else { } else {
streamMetaResetStartInfo(&pMeta->startInfo);
streamMetaWUnLock(pMeta);
tqInfo("vgId:%d, follower node not start stream tasks", vgId); tqInfo("vgId:%d, follower node not start stream tasks", vgId);
} }
streamMetaWUnLock(pMeta);
code = terrno; code = terrno;
return code; return code;
} }

View File

@ -212,9 +212,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
goto _err; goto _err;
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;

View File

@ -524,7 +524,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// clear info for the new file // clear info for the new file
cleanupInfoFoxNextFileset(pReader->status.pTableMap); cleanupInfoForNextFileset(pReader->status.pTableMap);
int32_t k = 0; int32_t k = 0;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
@ -1444,7 +1444,9 @@ static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlock
} }
} }
static void doPinSttBlock(SLastBlockReader* pLastBlockReader) { tMergeTreePinSttBlock(&pLastBlockReader->mergeTree); } static void doPinSttBlock(SLastBlockReader* pLastBlockReader) {
tMergeTreePinSttBlock(&pLastBlockReader->mergeTree);
}
static void doUnpinSttBlock(SLastBlockReader* pLastBlockReader) { static void doUnpinSttBlock(SLastBlockReader* pLastBlockReader) {
tMergeTreeUnpinSttBlock(&pLastBlockReader->mergeTree); tMergeTreeUnpinSttBlock(&pLastBlockReader->mergeTree);
@ -1454,7 +1456,6 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
bool* copied) { bool* copied) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
*copied = false; *copied = false;
// avoid the fetch next row replace the referenced stt block in buffer // avoid the fetch next row replace the referenced stt block in buffer
@ -1540,12 +1541,6 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (ps == NULL) { if (ps == NULL) {
return terrno; return terrno;
} }
int32_t code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to init row merger, code:%s", tstrerror(code));
return code;
}
} }
int64_t minKey = 0; int64_t minKey = 0;
@ -1688,7 +1683,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid, tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr); fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
// only last block exists // only stt block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) { if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
code = tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied); code = tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
if (code) { if (code) {
@ -1767,12 +1762,6 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
if (ps == NULL) { if (ps == NULL) {
return terrno; return terrno;
} }
int32_t code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to init row merger, code:%s", tstrerror(code));
return code;
}
} }
if (hasDataInFileBlock(pBlockData, pDumpInfo)) { if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
@ -1874,12 +1863,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (ps == NULL) { if (ps == NULL) {
return terrno; return terrno;
} }
code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to init row merger, code:%s", tstrerror(code));
return code;
}
} }
int64_t minKey = 0; int64_t minKey = 0;
@ -2202,12 +2185,6 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
if (ps == NULL) { if (ps == NULL) {
return terrno; return terrno;
} }
code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to init row merger, code:%s", tstrerror(code));
return code;
}
} }
if (copied) { if (copied) {

View File

@ -245,7 +245,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
} }
void cleanupInfoFoxNextFileset(SSHashObj* pTableMap) { void cleanupInfoForNextFileset(SSHashObj* pTableMap) {
STableBlockScanInfo** p = NULL; STableBlockScanInfo** p = NULL;
int32_t iter = 0; int32_t iter = 0;

View File

@ -248,7 +248,7 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
void clearBlockScanInfo(STableBlockScanInfo* p); void clearBlockScanInfo(STableBlockScanInfo* p);
void destroyAllBlockScanInfo(SSHashObj* pTableMap); void destroyAllBlockScanInfo(SSHashObj* pTableMap);
void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step); void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step);
void cleanupInfoFoxNextFileset(SSHashObj* pTableMap); void cleanupInfoForNextFileset(SSHashObj* pTableMap);
int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables); int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables);
void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf); void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf);
void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index); void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index);

View File

@ -21,6 +21,8 @@
#include "cos.h" #include "cos.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#include "audit.h"
#include "tstrbuild.h"
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);

View File

@ -1106,7 +1106,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id, stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { } else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
@ -1147,8 +1147,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} }
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d", stDebug("s-task:%s failed to dispatch msg to downstream, add into timer to retry in %dms, ref:%d",
pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // this message has been sent successfully, let's try next one. } else { // this message has been sent successfully, let's try next one.

View File

@ -134,7 +134,7 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration)
pTask->schedHistoryInfo.numOfTicks = numOfTicks; pTask->schedHistoryInfo.numOfTicks = numOfTicks;
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s scan-history start in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref); stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) { if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer); pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer);

View File

@ -321,6 +321,10 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin
while ((pNode = tdListNext(&iter)) != NULL && i < max) { while ((pNode = tdListNext(&iter)) != NULL && i < max) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (pPos->beUsed == used) { if (pPos->beUsed == used) {
if (used && !pPos->pRowBuff) {
ASSERT(pPos->needFree == true);
continue;
}
tdListAppend(pFlushList, &pPos); tdListAppend(pFlushList, &pPos);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveByPosFn(pFileState, pPos); pFileState->stateBuffRemoveByPosFn(pFileState, pPos);

View File

@ -293,6 +293,15 @@ int32_t httpSendQuit() {
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag) { EHttpCompFlag flag) {
if (server == NULL || uri == NULL) {
tError("http-report failed to report to invalid addr");
return -1;
}
if (pCont == NULL || contLen == 0) {
tError("http-report failed to report empty packet");
return -1;
}
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
if (load == NULL) { if (load == NULL) {
tError("http-report already released"); tError("http-report already released");

View File

@ -23,6 +23,8 @@ sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 i
sql create stream stream_t2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s); sql create stream stream_t2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s);
sleep 1000 sleep 1000
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0); sql insert into t1 values(1648791210000,1,2,3,1.0);
sql insert into t1 values(1648791216000,2,2,3,1.1); sql insert into t1 values(1648791216000,2,2,3,1.1);
sql insert into t1 values(1648791220000,3,2,3,2.1); sql insert into t1 values(1648791220000,3,2,3,2.1);
@ -314,6 +316,8 @@ sql create stream streams11 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 i
sql create stream streams12 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); sql create stream streams12 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sleep 1000 sleep 1000
sleep 1000
sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1); sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1); sql insert into t1 values(1648791233002,3,2,3,2.1);
@ -448,6 +452,8 @@ sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 i
sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt22 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt22 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sleep 1000 sleep 1000
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1); sql insert into t1 values(1648791223001,2,2,2,1.1);
sql insert into t1 values(1648791233002,3,3,3,2.1); sql insert into t1 values(1648791233002,3,3,3,2.1);
@ -712,6 +718,8 @@ sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart as ts, count(*),min(a) c1 from st interval(10s) sliding(5s); sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart as ts, count(*),min(a) c1 from st interval(10s) sliding(5s);
sleep 1000 sleep 1000
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791243000,2,1,1,1.0); sql insert into t1 values(1648791243000,2,1,1,1.0);

View File

@ -338,11 +338,38 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
#
def checkVGroups(self):
# db2
tdSql.execute("create database db2 vgroups 2;")
tdSql.execute("use db2;")
tdSql.execute("create table st(ts timestamp, age int) tags(area int);")
tdSql.execute("create table t1 using st tags(1);")
tdSql.query("select distinct(tbname) from st limit 1 offset 100;")
tdSql.checkRows(0)
tdLog.info("check db2 vgroups 2 limit 1 offset 100 successfully!")
# db1
tdSql.execute("create database db1 vgroups 1;")
tdSql.execute("use db1;")
tdSql.execute("create table st(ts timestamp, age int) tags(area int);")
tdSql.execute("create table t1 using st tags(1);")
tdSql.query("select distinct(tbname) from st limit 1 offset 100;")
tdSql.checkRows(0)
tdLog.info("check db1 vgroups 1 limit 1 offset 100 successfully!")
def run(self): def run(self):
# tdSql.prepare() # tdSql.prepare()
self.prepareTestEnv() self.prepareTestEnv()
self.tmqCase1() self.tmqCase1()
# one vgroup diff more than one vgroup check
self.checkVGroups()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")

View File

@ -80,6 +80,7 @@ typedef struct {
#ifdef WEBSOCKET #ifdef WEBSOCKET
bool restful; bool restful;
bool cloud; bool cloud;
bool local;
char* dsn; char* dsn;
int32_t timeout; int32_t timeout;
#endif #endif

View File

@ -410,7 +410,7 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) {
shellInitArgs(argc, argv); shellInitArgs(argc, argv);
shell.info.clientVersion = shell.info.clientVersion =
"Welcome to the %s Command Line Interface, Client Version:%s\r\n" "Welcome to the %s Command Line Interface, Client Version:%s\r\n"
"Copyright (c) 2022 by %s, all rights reserved.\r\n\r\n"; "Copyright (c) 2023 by %s, all rights reserved.\r\n\r\n";
#ifdef CUS_NAME #ifdef CUS_NAME
strcpy(shell.info.cusName, CUS_NAME); strcpy(shell.info.cusName, CUS_NAME);
#else #else

View File

@ -83,6 +83,11 @@ SWords shellCommands[] = {
{"alter local \"asynclog\" \"1\";", 0, 0, NULL}, {"alter local \"asynclog\" \"1\";", 0, 0, NULL},
{"alter topic", 0, 0, NULL}, {"alter topic", 0, 0, NULL},
{"alter user <user_name> <user_actions> <anyword> ;", 0, 0, NULL}, {"alter user <user_name> <user_actions> <anyword> ;", 0, 0, NULL},
#ifdef TD_ENTERPRISE
{"balance vgroup;", 0, 0, NULL},
{"balance vgroup leader <vgroup_id>", 0, 0, NULL},
#endif
// 20 // 20
{"create table <anyword> using <stb_name> tags(", 0, 0, NULL}, {"create table <anyword> using <stb_name> tags(", 0, 0, NULL},
{"create database <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> " {"create database <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> "
@ -127,9 +132,12 @@ SWords shellCommands[] = {
{"kill query ", 0, 0, NULL}, {"kill query ", 0, 0, NULL},
{"kill transaction ", 0, 0, NULL}, {"kill transaction ", 0, 0, NULL},
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
{"merge vgroup ", 0, 0, NULL}, {"merge vgroup <vgroup_id> <vgroup_id>", 0, 0, NULL},
#endif #endif
{"pause stream <stream_name> ;", 0, 0, NULL}, {"pause stream <stream_name> ;", 0, 0, NULL},
#ifdef TD_ENTERPRISE
{"redistribute vgroup <vgroup_id> dnode <dnode_id> ;", 0, 0, NULL},
#endif
{"resume stream <stream_name> ;", 0, 0, NULL}, {"resume stream <stream_name> ;", 0, 0, NULL},
{"reset query cache;", 0, 0, NULL}, {"reset query cache;", 0, 0, NULL},
{"restore dnode <dnode_id> ;", 0, 0, NULL}, {"restore dnode <dnode_id> ;", 0, 0, NULL},
@ -187,7 +195,7 @@ SWords shellCommands[] = {
{"show consumers;", 0, 0, NULL}, {"show consumers;", 0, 0, NULL},
{"show grants;", 0, 0, NULL}, {"show grants;", 0, 0, NULL},
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
{"split vgroup ", 0, 0, NULL}, {"split vgroup <vgroup_id>", 0, 0, NULL},
#endif #endif
{"insert into <tb_name> values(", 0, 0, NULL}, {"insert into <tb_name> values(", 0, 0, NULL},
{"insert into <tb_name> using <stb_name> tags(", 0, 0, NULL}, {"insert into <tb_name> using <stb_name> tags(", 0, 0, NULL},
@ -268,7 +276,9 @@ char* db_options[] = {"keep ",
"wal_retention_size ", "wal_retention_size ",
"wal_segment_size "}; "wal_segment_size "};
char* alter_db_options[] = {"cachemodel ", "replica ", "keep ", "cachesize ", "wal_fsync_period ", "wal_level "}; char* alter_db_options[] = {"cachemodel ", "replica ", "keep ", "stt_trigger ",
"wal_retention_period ", "wal_retention_size ", "cachesize ",
"wal_fsync_period ", "buffer ", "pages " ,"wal_level "};
char* data_types[] = {"timestamp", "int", char* data_types[] = {"timestamp", "int",
"int unsigned", "varchar(16)", "int unsigned", "varchar(16)",
@ -312,26 +322,27 @@ bool waitAutoFill = false;
#define WT_VAR_TOPIC 5 #define WT_VAR_TOPIC 5
#define WT_VAR_STREAM 6 #define WT_VAR_STREAM 6
#define WT_VAR_UDFNAME 7 #define WT_VAR_UDFNAME 7
#define WT_VAR_VGROUPID 8
#define WT_FROM_DB_MAX 7 // max get content from db #define WT_FROM_DB_MAX 8 // max get content from db
#define WT_FROM_DB_CNT (WT_FROM_DB_MAX + 1) #define WT_FROM_DB_CNT (WT_FROM_DB_MAX + 1)
#define WT_VAR_ALLTABLE 8 #define WT_VAR_ALLTABLE 9
#define WT_VAR_FUNC 9 #define WT_VAR_FUNC 10
#define WT_VAR_KEYWORD 10 #define WT_VAR_KEYWORD 11
#define WT_VAR_TBACTION 11 #define WT_VAR_TBACTION 12
#define WT_VAR_DBOPTION 12 #define WT_VAR_DBOPTION 13
#define WT_VAR_ALTER_DBOPTION 13 #define WT_VAR_ALTER_DBOPTION 14
#define WT_VAR_DATATYPE 14 #define WT_VAR_DATATYPE 15
#define WT_VAR_KEYTAGS 15 #define WT_VAR_KEYTAGS 16
#define WT_VAR_ANYWORD 16 #define WT_VAR_ANYWORD 17
#define WT_VAR_TBOPTION 17 #define WT_VAR_TBOPTION 18
#define WT_VAR_USERACTION 18 #define WT_VAR_USERACTION 19
#define WT_VAR_KEYSELECT 19 #define WT_VAR_KEYSELECT 20
#define WT_VAR_SYSTABLE 20 #define WT_VAR_SYSTABLE 21
#define WT_VAR_LANGUAGE 21 #define WT_VAR_LANGUAGE 22
#define WT_VAR_CNT 22 #define WT_VAR_CNT 23
#define WT_TEXT 0xFF #define WT_TEXT 0xFF
@ -345,11 +356,11 @@ TdThread* threads[WT_FROM_DB_CNT];
// obtain var name with sql from server // obtain var name with sql from server
char varTypes[WT_VAR_CNT][64] = { char varTypes[WT_VAR_CNT][64] = {
"<db_name>", "<stb_name>", "<tb_name>", "<dnode_id>", "<user_name>", "<topic_name>", "<stream_name>", "<db_name>", "<stb_name>", "<tb_name>", "<dnode_id>", "<user_name>", "<topic_name>", "<stream_name>",
"<udf_name>", "<all_table>", "<function>", "<keyword>", "<tb_actions>", "<db_options>", "<alter_db_options>", "<udf_name>", "<vgroup_id>", "<all_table>", "<function>", "<keyword>", "<tb_actions>", "<db_options>", "<alter_db_options>",
"<data_types>", "<key_tags>", "<anyword>", "<tb_options>", "<user_actions>", "<key_select>", "<sys_table>", "<udf_language>"}; "<data_types>", "<key_tags>", "<anyword>", "<tb_options>", "<user_actions>", "<key_select>", "<sys_table>", "<udf_language>"};
char varSqls[WT_FROM_DB_CNT][64] = {"show databases;", "show stables;", "show tables;", "show dnodes;", char varSqls[WT_FROM_DB_CNT][64] = {"show databases;", "show stables;", "show tables;", "show dnodes;",
"show users;", "show topics;", "show streams;", "show functions;"}; "show users;", "show topics;", "show streams;", "show functions;", "show vgroups;"};
// var words current cursor, if user press any one key except tab, cursorVar can be reset to -1 // var words current cursor, if user press any one key except tab, cursorVar can be reset to -1
int cursorVar = -1; int cursorVar = -1;
@ -520,7 +531,10 @@ void showHelp() {
printf( printf(
"\n\n\ "\n\n\
----- special commands on enterpise version ----- \n\ ----- special commands on enterpise version ----- \n\
balance vgroup; \n\
balance vgroup leader <vgroup_id> \n\
compact database <db_name>; \n\ compact database <db_name>; \n\
redistribute vgroup <vgroup_id> dnode <dnode_id> ;\n\
split vgroup <vgroup_id>;"); split vgroup <vgroup_id>;");
#endif #endif
@ -675,9 +689,9 @@ bool shellAutoInit() {
// generate varType // generate varType
GenerateVarType(WT_VAR_FUNC, functions, sizeof(functions) / sizeof(char*)); GenerateVarType(WT_VAR_FUNC, functions, sizeof(functions) / sizeof(char*));
GenerateVarType(WT_VAR_KEYWORD, keywords, sizeof(keywords) / sizeof(char*)); GenerateVarType(WT_VAR_KEYWORD, keywords, sizeof(keywords) / sizeof(char*));
GenerateVarType(WT_VAR_TBACTION, tb_actions, sizeof(tb_actions) / sizeof(char*));
GenerateVarType(WT_VAR_DBOPTION, db_options, sizeof(db_options) / sizeof(char*)); GenerateVarType(WT_VAR_DBOPTION, db_options, sizeof(db_options) / sizeof(char*));
GenerateVarType(WT_VAR_ALTER_DBOPTION, alter_db_options, sizeof(alter_db_options) / sizeof(char*)); GenerateVarType(WT_VAR_ALTER_DBOPTION, alter_db_options, sizeof(alter_db_options) / sizeof(char*));
GenerateVarType(WT_VAR_TBACTION, tb_actions, sizeof(tb_actions) / sizeof(char*));
GenerateVarType(WT_VAR_DATATYPE, data_types, sizeof(data_types) / sizeof(char*)); GenerateVarType(WT_VAR_DATATYPE, data_types, sizeof(data_types) / sizeof(char*));
GenerateVarType(WT_VAR_KEYTAGS, key_tags, sizeof(key_tags) / sizeof(char*)); GenerateVarType(WT_VAR_KEYTAGS, key_tags, sizeof(key_tags) / sizeof(char*));
GenerateVarType(WT_VAR_TBOPTION, tb_options, sizeof(tb_options) / sizeof(char*)); GenerateVarType(WT_VAR_TBOPTION, tb_options, sizeof(tb_options) / sizeof(char*));

View File

@ -47,6 +47,7 @@ int main(int argc, char *argv[]) {
#ifdef WEBSOCKET #ifdef WEBSOCKET
shell.args.timeout = SHELL_WS_TIMEOUT; shell.args.timeout = SHELL_WS_TIMEOUT;
shell.args.cloud = true; shell.args.cloud = true;
shell.args.local = false;
#endif #endif
#if !defined(WINDOWS) #if !defined(WINDOWS)

View File

@ -130,11 +130,21 @@ void shellCheckConnectMode() {
} }
if (shell.args.cloud) { if (shell.args.cloud) {
shell.args.dsn = getenv("TDENGINE_CLOUD_DSN"); shell.args.dsn = getenv("TDENGINE_CLOUD_DSN");
if (shell.args.dsn) { if (shell.args.dsn && strlen(shell.args.dsn) > 4) {
shell.args.cloud = true; shell.args.cloud = true;
shell.args.local = false;
shell.args.restful = false; shell.args.restful = false;
return; return;
} }
shell.args.dsn = getenv("TDENGINE_DSN");
if (shell.args.dsn && strlen(shell.args.dsn) > 4) {
shell.args.cloud = true;
shell.args.local = true;
shell.args.restful = false;
return;
}
if (shell.args.restful) { if (shell.args.restful) {
if (!shell.args.host) { if (!shell.args.host) {
shell.args.host = "localhost"; shell.args.host = "localhost";

View File

@ -59,8 +59,18 @@ int shell_conn_ws_server(bool first) {
fprintf(stdout, "successfully connected to %s\n\n", fprintf(stdout, "successfully connected to %s\n\n",
shell.args.dsn); shell.args.dsn);
} else if (first && shell.args.cloud) { } else if (first && shell.args.cloud) {
if(shell.args.local) {
const char* host = strstr(shell.args.dsn, "@");
if(host) {
host += 1;
} else {
host = shell.args.dsn;
}
fprintf(stdout, "successfully connected to %s\n", host);
} else {
fprintf(stdout, "successfully connected to cloud service\n"); fprintf(stdout, "successfully connected to cloud service\n");
} }
}
fflush(stdout); fflush(stdout);
// switch to current database if have // switch to current database if have