enh: add specific threads for rsma task

This commit is contained in:
Cary Xu 2022-08-24 17:36:22 +08:00
parent 3f96e41691
commit a6fa87fb6a
49 changed files with 427 additions and 114 deletions

View File

@ -42,7 +42,7 @@ To do so, run the following command:
``` ```
This command creates the `meters` supertable in the `test` database. In the `meters` supertable, it then creates 10,000 subtables named `d0` to `d9999`. Each table has 10,000 rows and each row has four columns: `ts`, `current`, `voltage`, and `phase`. The timestamps of the data in these columns range from 2017-07-14 10:40:00 000 to 2017-07-14 10:40:09 999. Each table is randomly assigned a `groupId` tag from 1 to ten and a `location` tag of either `California.SanFrancisco` or `California.SanDiego`. This command creates the `meters` supertable in the `test` database. In the `meters` supertable, it then creates 10,000 subtables named `d0` to `d9999`. Each table has 10,000 rows and each row has four columns: `ts`, `current`, `voltage`, and `phase`. The timestamps of the data in these columns range from 2017-07-14 10:40:00 000 to 2017-07-14 10:40:09 999. Each table is randomly assigned a `groupId` tag from 1 to 10 and a `location` tag of either `Campbell`, `Cupertino`, `Los Angeles`, `Mountain View`, `Palo Alto`, `San Diego`, `San Francisco`, `San Jose`, `Santa Clara` or `Sunnyvale`.
The `taosBenchmark` command creates a deployment with 100 million data points that you can use for testing purposes. The time required depends on the hardware specifications of the local system. The `taosBenchmark` command creates a deployment with 100 million data points that you can use for testing purposes. The time required depends on the hardware specifications of the local system.

View File

@ -22,8 +22,8 @@ TDengine的主要功能如下
9. 提供[命令行程序](../reference/taos-shell),便于管理集群,检查系统状态,做即席查询 9. 提供[命令行程序](../reference/taos-shell),便于管理集群,检查系统状态,做即席查询
10. 提供多种数据的[导入](../operation/import)、[导出](../operation/export) 10. 提供多种数据的[导入](../operation/import)、[导出](../operation/export)
11. 支持对[TDengine 集群本身的监控](../operation/monitor) 11. 支持对[TDengine 集群本身的监控](../operation/monitor)
12. 提供 [C/C++](../reference/connector/cpp), [Java](../reference/connector/java), [Python](../reference/connector/python), [Go](../reference/connector/go), [Rust](../reference/connector/rust), [Node.js](../reference/connector/node) 等多种编程语言的[连接器](../reference/connector/) 12. 提供各种语言的[连接器](../connector): 如 C/C++, Java, Go, Node.JS, Rust, Python, C# 等
13. 支持 [REST 接口](../reference/rest-api/) 13. 支持 [REST 接口](../connector/rest-api/)
14. 支持与[ Grafana 无缝集成](../third-party/grafana) 14. 支持与[ Grafana 无缝集成](../third-party/grafana)
15. 支持与 Google Data Studio 无缝集成 15. 支持与 Google Data Studio 无缝集成
16. 支持 [Kubernetes 部署](../deployment/k8s) 16. 支持 [Kubernetes 部署](../deployment/k8s)

View File

@ -9,7 +9,7 @@ import PkgListV3 from "/components/PkgListV3";
您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. 您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
TDengine 完整的软件包包括服务端taosd、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动taosc、命令行程序 (CLItaos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../reference/rest-api/)。 TDengine 完整的软件包包括服务端taosd、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动taosc、命令行程序 (CLItaos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../connector/rest-api/)。
为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。 为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。

View File

@ -3,7 +3,7 @@ title: 立即开始
description: '快速设置 TDengine 环境并体验其高效写入和查询' description: '快速设置 TDengine 环境并体验其高效写入和查询'
--- ---
TDengine 完整的软件包包括服务端taosd、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动taosc、命令行程序 (CLItaos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](/reference/taosadapter) 提供 [RESTful 接口](/reference/rest-api)。 TDengine 完整的软件包包括服务端taosd、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动taosc、命令行程序 (CLItaos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api)。
本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。 本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。

View File

@ -33,7 +33,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
关键不同点在于: 关键不同点在于:
1. 使用 REST 连接,用户无需安装客户端驱动程序 taosc具有跨平台易用的优势但性能要下降 30%左右。 1. 使用 REST 连接,用户无需安装客户端驱动程序 taosc具有跨平台易用的优势但性能要下降 30%左右。
2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](../../connector/cpp#参数绑定-api)、[订阅](../../connector/cpp#订阅和消费-api)等等。 2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](../../connector/cpp/#参数绑定-api)、[订阅](../../connector/cpp/#订阅和消费-api)等等。
## 安装客户端驱动 taosc ## 安装客户端驱动 taosc

View File

@ -12,7 +12,7 @@ title: 开发指南
7. 在很多场景下(如车辆管理)应用需要获取每个数据采集点的最新状态那么建议你采用TDengine的cache功能而不用单独部署Redis等缓存软件。 7. 在很多场景下(如车辆管理)应用需要获取每个数据采集点的最新状态那么建议你采用TDengine的cache功能而不用单独部署Redis等缓存软件。
8. 如果你发现TDengine的函数无法满足你的要求那么你可以使用用户自定义函数来解决问题。 8. 如果你发现TDengine的函数无法满足你的要求那么你可以使用用户自定义函数来解决问题。
本部分内容就是按照上述的顺序组织的。为便于理解TDengine为每个功能为每个支持的编程语言都提供了示例代码。如果你希望深入了解SQL的使用需要查看[SQL手册](/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](../reference/connector/)。如果还希望想将TDengine与第三方系统集成起来比如Grafana, 请参考[第三方工具](/third-party/)。 本部分内容就是按照上述的顺序组织的。为便于理解TDengine为每个功能为每个支持的编程语言都提供了示例代码。如果你希望深入了解SQL的使用需要查看[SQL手册](/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](../connector/)。如果还希望想将TDengine与第三方系统集成起来比如Grafana, 请参考[第三方工具](../third-party/)。
如果在开发过程中遇到任何问题,请点击每个页面下方的["反馈问题"](https://github.com/taosdata/TDengine/issues/new/choose), 在GitHub上直接递交issue。 如果在开发过程中遇到任何问题,请点击每个页面下方的["反馈问题"](https://github.com/taosdata/TDengine/issues/new/choose), 在GitHub上直接递交issue。

View File

@ -22,7 +22,7 @@ TDengine 客户端驱动的动态库位于:
## 支持的平台 ## 支持的平台
请参考[支持的平台列表](../connector#支持的平台) 请参考[支持的平台列表](../#支持的平台)
## 支持的版本 ## 支持的版本
@ -30,7 +30,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一
## 安装步骤 ## 安装步骤
TDengine 客户端驱动的安装请参考 [安装指南](../connector#安装步骤) TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
## 建立连接 ## 建立连接

View File

@ -35,7 +35,7 @@ REST 连接支持所有能运行 Java 的平台。
## 版本支持 ## 版本支持
请参考[版本支持列表](../connector#版本支持) 请参考[版本支持列表](../#版本支持)
## TDengine DataType 和 Java DataType ## TDengine DataType 和 Java DataType
@ -64,7 +64,7 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对
使用 Java Connector 连接数据库前,需要具备以下条件: 使用 Java Connector 连接数据库前,需要具备以下条件:
- 已安装 Java 1.8 或以上版本运行时环境和 Maven 3.6 或以上版本 - 已安装 Java 1.8 或以上版本运行时环境和 Maven 3.6 或以上版本
- 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动) - 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
### 安装连接器 ### 安装连接器
@ -630,7 +630,7 @@ public void setNString(int columnIndex, ArrayList<String> list, int size) throws
### 无模式写入 ### 无模式写入
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议Line Protocol、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../schemaless)。 TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议Line Protocol、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。
**注意** **注意**

View File

@ -30,7 +30,7 @@ REST 连接支持所有能运行 Go 的平台。
## 版本支持 ## 版本支持
请参考[版本支持列表](../connector#版本支持) 请参考[版本支持列表](../#版本支持)
## 支持的功能特性 ## 支持的功能特性
@ -56,7 +56,7 @@ REST 连接支持所有能运行 Go 的平台。
### 安装前准备 ### 安装前准备
* 安装 Go 开发环境Go 1.14 及以上GCC 4.8.5 及以上) * 安装 Go 开发环境Go 1.14 及以上GCC 4.8.5 及以上)
* 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动) * 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
配置好环境变量,检查命令: 配置好环境变量,检查命令:

View File

@ -28,7 +28,7 @@ Websocket 连接支持所有能运行 Rust 的平台。
## 版本支持 ## 版本支持
请参考[版本支持列表](../connector#版本支持) 请参考[版本支持列表](../#版本支持)
Rust 连接器仍然在快速开发中1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine以避免已知问题。 Rust 连接器仍然在快速开发中1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine以避免已知问题。
@ -37,7 +37,7 @@ Rust 连接器仍然在快速开发中1.0 之前无法保证其向后兼容
### 安装前准备 ### 安装前准备
* 安装 Rust 开发工具链 * 安装 Rust 开发工具链
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动) * 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
### 添加 taos 依赖 ### 添加 taos 依赖

View File

@ -8,7 +8,7 @@ description: "taospy 是 TDengine 的官方 Python 连接器。taospy 提供了
import Tabs from "@theme/Tabs"; import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem"; import TabItem from "@theme/TabItem";
`taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](../connector/cpp)和 [REST 接口](/reference/rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。 `taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](../cpp)和 [REST 接口](../rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。
除了对原生接口和 REST 接口的封装,`taospy` 还提供了符合 [Python 数据访问规范(PEP 249)](https://peps.python.org/pep-0249/) 的编程接口。这使得 `taospy` 和很多第三方工具集成变得简单,比如 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [pandas](https://pandas.pydata.org/)。 除了对原生接口和 REST 接口的封装,`taospy` 还提供了符合 [Python 数据访问规范(PEP 249)](https://peps.python.org/pep-0249/) 的编程接口。这使得 `taospy` 和很多第三方工具集成变得简单,比如 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [pandas](https://pandas.pydata.org/)。
使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口与服务端建立的连接的方式下文中称为“REST 连接”。 使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口与服务端建立的连接的方式下文中称为“REST 连接”。
@ -17,7 +17,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
## 支持的平台 ## 支持的平台
- 原生连接[支持的平台](../connector/#支持的平台)和 TDengine 客户端支持的平台一致。 - 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。
- REST 连接支持所有能运行 Python 的平台。 - REST 连接支持所有能运行 Python 的平台。
## 版本选择 ## 版本选择
@ -275,7 +275,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
##### RestClient 类的使用 ##### RestClient 类的使用
`RestClient` 类是对于 [REST API](/reference/rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。 `RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。
```python title="RestClient 的使用" ```python title="RestClient 的使用"
{{#include docs/examples/python/rest_client_example.py}} {{#include docs/examples/python/rest_client_example.py}}

View File

@ -28,7 +28,7 @@ REST 连接器支持所有能运行 Node.js 的平台。
## 版本支持 ## 版本支持
请参考[版本支持列表](../connector#版本支持) 请参考[版本支持列表](../#版本支持)
## 支持的功能特性 ## 支持的功能特性
@ -52,7 +52,7 @@ REST 连接器支持所有能运行 Node.js 的平台。
### 安装前准备 ### 安装前准备
- 安装 Node.js 开发环境 - 安装 Node.js 开发环境
- 如果使用 REST 连接器,跳过此步。但如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动)。我们使用 [node-gyp](https://github.com/nodejs/node-gyp) 和 TDengine 实例进行交互,还需要根据具体操作系统来安装下文提到的一些依赖工具。 - 如果使用 REST 连接器,跳过此步。但如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)。我们使用 [node-gyp](https://github.com/nodejs/node-gyp) 和 TDengine 实例进行交互,还需要根据具体操作系统来安装下文提到的一些依赖工具。
<Tabs defaultValue="Linux"> <Tabs defaultValue="Linux">
<TabItem value="Linux" label="Linux 系统安装依赖工具"> <TabItem value="Linux" label="Linux 系统安装依赖工具">

View File

@ -18,7 +18,7 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx"
`TDengine.Connector` 是 TDengine 提供的 C# 语言连接器。C# 开发人员可以通过它开发存取 TDengine 集群数据的 C# 应用软件。 `TDengine.Connector` 是 TDengine 提供的 C# 语言连接器。C# 开发人员可以通过它开发存取 TDengine 集群数据的 C# 应用软件。
`TDengine.Connector` 连接器支持通过 TDengine 客户端驱动taosc建立与 TDengine 运行实例的连接提供数据写入、查询、订阅、schemaless 数据写入、参数绑定接口数据写入等功能 `TDengine.Connector` 目前暂未提供 REST 连接方式,用户可以参考 [REST API](/reference/rest-api/) 文档自行编写。 `TDengine.Connector` 连接器支持通过 TDengine 客户端驱动taosc建立与 TDengine 运行实例的连接提供数据写入、查询、订阅、schemaless 数据写入、参数绑定接口数据写入等功能 `TDengine.Connector` 目前暂未提供 REST 连接方式,用户可以参考 [REST API](../rest-api/) 文档自行编写。
本文介绍如何在 Linux 或 Windows 环境中安装 `TDengine.Connector`,并通过 `TDengine.Connector` 连接 TDengine 集群,进行数据写入、查询等基本操作。 本文介绍如何在 Linux 或 Windows 环境中安装 `TDengine.Connector`,并通过 `TDengine.Connector` 连接 TDengine 集群,进行数据写入、查询等基本操作。
@ -32,7 +32,7 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx"
## 版本支持 ## 版本支持
请参考[版本支持列表](../connector#版本支持) 请参考[版本支持列表](../#版本支持)
## 支持的功能特性 ## 支持的功能特性
@ -49,7 +49,7 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx"
* 安装 [.NET SDK](https://dotnet.microsoft.com/download) * 安装 [.NET SDK](https://dotnet.microsoft.com/download)
* [Nuget 客户端](https://docs.microsoft.com/en-us/nuget/install-nuget-client-tools) (可选安装) * [Nuget 客户端](https://docs.microsoft.com/en-us/nuget/install-nuget-client-tools) (可选安装)
* 安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动) * 安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
### 使用 dotnet CLI 安装 ### 使用 dotnet CLI 安装

View File

@ -38,7 +38,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一
### 安装 TDengine 客户端驱动 ### 安装 TDengine 客户端驱动
TDengine 客户端驱动的安装请参考 [安装指南](../connector#安装步骤) TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
### 编译安装 php-tdengine ### 编译安装 php-tdengine

View File

@ -1,5 +1,7 @@
--- ---
sidebar_label: 连接器
title: 连接器 title: 连接器
description: 详细介绍各种语言的连接器及 REST API
--- ---
TDengine 提供了丰富的应用程序开发接口为了便于用户快速开发自己的应用TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口taosc和 REST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。 TDengine 提供了丰富的应用程序开发接口为了便于用户快速开发自己的应用TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口taosc和 REST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。

View File

@ -18,7 +18,7 @@ stream_options: {
其中 subquery 是 select 普通查询语法的子集: 其中 subquery 是 select 普通查询语法的子集:
```sql ```sql
subquery: SELECT [DISTINCT] select_list subquery: SELECT select_list
from_clause from_clause
[WHERE condition] [WHERE condition]
[PARTITION BY tag_list] [PARTITION BY tag_list]
@ -37,7 +37,7 @@ window_clause: {
其中SESSION 是会话窗口tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val则自动开启下一个窗口。 其中SESSION 是会话窗口tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val则自动开启下一个窗口。
窗口的定义与时序数据特色查询中的定义完全相同 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished)
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。

View File

@ -156,7 +156,7 @@ AllowWebSockets
## 功能列表 ## 功能列表
- RESTful 接口 - RESTful 接口
[https://docs.taosdata.com/reference/rest-api/](https://docs.taosdata.com/reference/rest-api/) [RESTful API](../../connector/rest-api)
- 兼容 InfluxDB v1 写接口 - 兼容 InfluxDB v1 写接口
[https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/](https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/) [https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/](https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/)
- 兼容 OpenTSDB JSON 和 telnet 格式写入 - 兼容 OpenTSDB JSON 和 telnet 格式写入
@ -179,7 +179,7 @@ AllowWebSockets
### TDengine RESTful 接口 ### TDengine RESTful 接口
您可以使用任何支持 http 协议的客户端通过访问 RESTful 接口地址 `http://<fqdn>:6041/rest/sql` 来写入数据到 TDengine 或从 TDengine 中查询数据。细节请参考[官方文档](/reference/rest-api/)。 您可以使用任何支持 http 协议的客户端通过访问 RESTful 接口地址 `http://<fqdn>:6041/rest/sql` 来写入数据到 TDengine 或从 TDengine 中查询数据。细节请参考[官方文档](../../connector/rest-api/)。
### InfluxDB ### InfluxDB

View File

@ -90,7 +90,7 @@ http://127.0.0.1:6041/rest/sql
``` ```
Basic cm9vdDp0YW9zZGF0YQ== Basic cm9vdDp0YW9zZGF0YQ==
``` ```
相关文档请参考[ TDengine REST API 文档](/reference/rest-api/)。 相关文档请参考[ TDengine REST API 文档](../../connector/rest-api/)。
在消息体中输入规则引擎替换模板: 在消息体中输入规则引擎替换模板:

View File

@ -9,7 +9,7 @@ import Release from "/components/ReleaseV3";
<Release type="tdengine" version="3.0.0.1" /> <Release type="tdengine" version="3.0.0.1" />
## 3.0.0.0 <!-- ## 3.0.0.0
<Release type="tdengine" version="3.0.0.0" /> <Release type="tdengine" version="3.0.0.0" /> -->

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// clang-format off
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
@ -94,13 +95,8 @@ int32_t create_stream() {
} }
taos_free_result(pRes); taos_free_result(pRes);
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(pConn, pRes = taos_query(pConn,
"create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, " "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)");
"count(k) from st1 partition by tbname interval(20s) ");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;

View File

@ -29,7 +29,7 @@ typedef void* DataSinkHandle;
struct SRpcMsg; struct SRpcMsg;
struct SSubplan; struct SSubplan;
typedef struct SReadHandle { typedef struct {
void* tqReader; void* tqReader;
void* meta; void* meta;
void* config; void* config;
@ -41,6 +41,7 @@ typedef struct SReadHandle {
bool initTableReader; bool initTableReader;
bool initTqReader; bool initTqReader;
int32_t numOfVgroups; int32_t numOfVgroups;
void* pStateBackend;
} SReadHandle; } SReadHandle;
// in queue mode, data streams are seperated by msg // in queue mode, data streams are seperated by msg
@ -78,8 +79,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
/** /**
* @brief Cleanup SSDataBlock for StreamScanInfo * @brief Cleanup SSDataBlock for StreamScanInfo
* *
* @param tinfo * @param tinfo
*/ */
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo); void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo);
@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len); int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);

View File

@ -263,6 +263,14 @@ typedef struct {
SArray* checkpointVer; SArray* checkpointVer;
} SStreamRecoveringState; } SStreamRecoveringState;
// incremental state storage
typedef struct {
SStreamTask* pOwner;
TDB* db;
TTB* pStateDb;
TXN txn;
} SStreamState;
typedef struct SStreamTask { typedef struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
@ -312,6 +320,10 @@ typedef struct SStreamTask {
// msg handle // msg handle
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
// state backend
SStreamState* pState;
} SStreamTask; } SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -507,7 +519,7 @@ typedef struct SStreamMeta {
char* path; char* path;
TDB* db; TDB* db;
TTB* pTaskDb; TTB* pTaskDb;
TTB* pStateDb; TTB* pCheckpointDb;
SHashObj* pTasks; SHashObj* pTasks;
SHashObj* pRecoverStatus; SHashObj* pRecoverStatus;
void* ahandle; void* ahandle;
@ -528,6 +540,36 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta);
SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);
int32_t streamStateAbort(SStreamState* pState);
typedef struct {
TBC* pCur;
} SStreamStateCur;
#if 1
int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen);
int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen);
SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen);
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen);
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen);
void streamStateFreeCur(SStreamStateCur* pCur);
int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen);
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -96,7 +96,12 @@ typedef struct {
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
int64_t start; // start timestamp, us int64_t start; // start timestamp, us
int64_t parsed; // start to parse, us int64_t syntaxStart; // start to parse, us
int64_t syntaxEnd; // end to parse, us
int64_t ctgStart; // start to parse, us
int64_t ctgEnd; // end to parse, us
int64_t semanticEnd;
int64_t execEnd;
int64_t send; // start to send to server, us int64_t send; // start to send to server, us
int64_t rsp; // receive response from server, us int64_t rsp; // receive response from server, us
} SQueryExecMetric; } SQueryExecMetric;

View File

@ -29,6 +29,7 @@ extern "C" {
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) #define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0) #define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) #define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscPerf(...) do { taosPrintLog("TSC ", 0, cDebugFlag, __VA_ARGS__); } while(0)
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -69,14 +69,25 @@ static void deregisterRequest(SRequestObj *pRequest) {
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t duration = taosGetTimestampUs() - pRequest->metric.start; int64_t nowUs = taosGetTimestampUs();
int64_t duration = nowUs - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
" ms, current:%d, app current:%d", " ms, current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 "us, exec:%" PRId64 "us",
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart,
pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.execEnd - pRequest->metric.semanticEnd);
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 "us, exec:%" PRId64 "us",
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart,
pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.execEnd - pRequest->metric.semanticEnd);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
} }
@ -330,7 +341,6 @@ void doDestroyRequest(void *p) {
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFreeClear(pRequest->pDb); taosMemoryFreeClear(pRequest->pDb);
doFreeReqResultInfo(&pRequest->body.resInfo); doFreeReqResultInfo(&pRequest->body.resInfo);
@ -349,6 +359,7 @@ void doDestroyRequest(void *p) {
taosMemoryFree(pRequest->body.param); taosMemoryFree(pRequest->body.param);
} }
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFree(pRequest); taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
} }

View File

@ -842,6 +842,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
} }
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
pRequest->metric.execEnd = taosGetTimestampUs();
} }
taosMemoryFree(pResult); taosMemoryFree(pResult);

View File

@ -685,6 +685,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
SQuery *pQuery = pWrapper->pQuery; SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest; SRequestObj *pRequest = pWrapper->pRequest;
pRequest->metric.ctgEnd = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery; pRequest->stableQuery = pQuery->stableQuery;
@ -693,6 +695,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
} }
} }
pRequest->metric.semanticEnd = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
if (pQuery->haveResultSet) { if (pQuery->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols); setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols);
@ -784,12 +788,16 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SQuery *pQuery = NULL; SQuery *pQuery = NULL;
pRequest->metric.syntaxStart = taosGetTimestampUs();
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)}; SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq); code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pRequest->metric.syntaxEnd = taosGetTimestampUs();
if (!updateMetaForce) { if (!updateMetaForce) {
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
@ -816,6 +824,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
.requestObjRefId = pCxt->requestRid, .requestObjRefId = pCxt->requestRid,
.mgmtEps = pCxt->mgmtEpSet}; .mgmtEps = pCxt->mgmtEpSet};
pRequest->metric.ctgStart = taosGetTimestampUs();
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper, code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
&pRequest->body.queryJob); &pRequest->body.queryJob);
pCxt = NULL; pCxt = NULL;

View File

@ -77,7 +77,7 @@ bool tsMonitorComp = false;
// telem // telem
bool tsEnableTelem = true; bool tsEnableTelem = true;
int32_t tsTelemInterval = 86400; int32_t tsTelemInterval = 43200;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
uint16_t tsTelemPort = 80; uint16_t tsTelemPort = 80;

View File

@ -131,7 +131,9 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
char* pCont = mndBuildTelemetryReport(pMnode); char* pCont = mndBuildTelemetryReport(pMnode);
if (pCont != NULL) { if (pCont != NULL) {
if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) { if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) {
mError("failed to send telemetry msg"); mError("failed to send telemetry report");
} else {
mTrace("succeed to send telemetry report");
} }
taosMemoryFree(pCont); taosMemoryFree(pCont);
} }

View File

@ -1308,7 +1308,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
if (pTrans->policy == TRN_POLICY_ROLLBACK) { if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->lastAction != 0) { if (pTrans->lastAction != 0) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->lastAction); STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->lastAction);
if (pAction->retryCode != 0 && pAction->retryCode != pAction->errCode) { if (pAction->retryCode != 0 && pAction->retryCode == pAction->errCode) {
if (pTrans->failedTimes < 6) { if (pTrans->failedTimes < 6) {
mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id, mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id,
pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes); pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes);

View File

@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut PUBLIC sut
) )
add_test( if(NOT ${TD_WINDOWS})
NAME smaTest add_test(
COMMAND smaTest NAME smaTest
) COMMAND smaTest
)
endif(NOT ${TD_WINDOWS})

View File

@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut PUBLIC sut
) )
add_test( if(NOT ${TD_WINDOWS})
NAME stbTest add_test(
COMMAND stbTest NAME stbTest
) COMMAND stbTest
)
endif(NOT ${TD_WINDOWS})

View File

@ -734,10 +734,12 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
tsem_post(&(pRSmaStat->notEmpty));
int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1); int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
if (atomic_load_8(&pInfo->assigned) == 0) {
tsem_post(&(pRSmaStat->notEmpty));
}
// smoothing consume // smoothing consume
int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE; int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE;
if (n > 1) { if (n > 1) {
@ -1526,7 +1528,9 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
ASSERT(qItem->level == pItem->level); ASSERT(qItem->level == pItem->level);
ASSERT(qItem->fetchLevel == pItem->fetchLevel); ASSERT(qItem->fetchLevel == pItem->fetchLevel);
#endif #endif
tsem_post(&(pStat->notEmpty)); if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
tsem_post(&(pStat->notEmpty));
}
smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level, smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level,
pRSmaInfo->suid); pRSmaInfo->suid);
} break; } break;
@ -1691,38 +1695,48 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
while (true) { while (true) {
// step 1: rsma exec - consume data in buffer queue for all suids // step 1: rsma exec - consume data in buffer queue for all suids
if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) { if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) {
void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock void *pIter = NULL;
while (pIter) { while ((pIter = taosHashIterate(infoHash, pIter))) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
int64_t itemSize = 0; if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel ||
RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { int32_t batchCnt = -1;
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads;
int32_t qallItemSize = taosQallItemSize(pInfo->qall); bool occupied = (batchMax <= 1);
if (qallItemSize > 0) { if (batchMax > 1) {
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); batchMax = 100 / batchMax;
smaDebug("vgId:%d, qitemSize:%" PRIi64 ", batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), itemSize,
qallItemSize, type);
} }
while (occupied || (++batchCnt > batchMax)) { // greedy mode
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
int32_t qallItemSize = taosQallItemSize(pInfo->qall);
if (qallItemSize > 0) {
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type);
}
if (type == RSMA_EXEC_OVERFLOW) { if (type == RSMA_EXEC_OVERFLOW) {
tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
} }
if (qallItemSize > 0) { if (qallItemSize > 0) {
// subtract the item size after the task finished, commit should wait for all items be consumed atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); continue;
} else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
continue;
}
break;
} }
ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
} }
ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
} }
pIter = taosHashIterate(infoHash, pIter);
} }
if (type == RSMA_EXEC_COMMIT) { if (type == RSMA_EXEC_COMMIT) {
if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
break; break;
} else { } else {
// commit should wait for all items be consumed
continue; continue;
} }
} }
@ -1761,15 +1775,16 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if (pEnv->flag & SMA_ENV_FLG_CLOSE) { if (pEnv->flag & SMA_ENV_FLG_CLOSE) {
break; break;
} }
tsem_wait(&pRSmaStat->notEmpty);
if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
smaInfo("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
atomic_load_64(&pRSmaStat->nBufItems));
break;
}
} }
tsem_wait(&pRSmaStat->notEmpty);
if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
smaInfo("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
atomic_load_64(&pRSmaStat->nBufItems));
break;
}
} // end of while(true) } // end of while(true)
_end: _end:

View File

@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
ASSERT(0); ASSERT(0);
} }
if (streamLoadTasks(pTq->pStreamMeta) < 0) {
ASSERT(0);
}
return pTq; return pTq;
} }
@ -664,6 +668,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
ASSERT(pTask->exec.executor); ASSERT(pTask->exec.executor);
} }
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
if (pTask->pState == NULL) {
return -1;
}
// sink // sink
/*pTask->ahandle = pTq->pVnode;*/ /*pTask->ahandle = pTq->pVnode;*/
if (pTask->outputType == TASK_OUTPUT__SMA) { if (pTask->outputType == TASK_OUTPUT__SMA) {

View File

@ -1416,7 +1416,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
int64_t minKey = 0; int64_t minKey = 0;
if (pReader->order == TSDB_ORDER_ASC) { if (pReader->order == TSDB_ORDER_ASC) {
minKey = INT64_MAX; // chosen the minimum value minKey = INT64_MAX; // chosen the minimum value
if (minKey > tsLast && pLastBlockReader->lastBlockData.nRow > 0) { if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
minKey = tsLast; minKey = tsLast;
} }
@ -1429,7 +1429,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
} }
} else { } else {
minKey = INT64_MIN; minKey = INT64_MIN;
if (minKey < tsLast && pLastBlockReader->lastBlockData.nRow > 0) { if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
minKey = tsLast; minKey = tsLast;
} }

View File

@ -150,6 +150,7 @@ typedef struct {
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
int64_t recoverStartVer; int64_t recoverStartVer;
int64_t recoverEndVer; int64_t recoverEndVer;
SStreamState* pState;
} SStreamTaskInfo; } SStreamTaskInfo;
typedef struct { typedef struct {

View File

@ -469,6 +469,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
code = createResultData(&type, rows, &output); code = createResultData(&type, rows, &output);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qError("failed to create result, reason:%s", tstrerror(code)); qError("failed to create result, reason:%s", tstrerror(code));
goto end; goto end;
} }
@ -477,6 +478,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
if(code != TSDB_CODE_SUCCESS){ if(code != TSDB_CODE_SUCCESS){
qError("failed to calculate scalar, reason:%s", tstrerror(code)); qError("failed to calculate scalar, reason:%s", tstrerror(code));
terrno = code; terrno = code;
goto end;
} }
// int64_t st2 = taosGetTimestampUs(); // int64_t st2 = taosGetTimestampUs();
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1); // qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
@ -763,6 +765,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
} }
if (pTagCond) { if (pTagCond) {
terrno = TDB_CODE_SUCCESS;
SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond); SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond);
if(terrno != TDB_CODE_SUCCESS){ if(terrno != TDB_CODE_SUCCESS){
colDataDestroy(pColInfoData); colDataDestroy(pColInfoData);

View File

@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pCtx->input.colDataAggIsSet = pStatus->hasAgg; pCtx->input.colDataAggIsSet = pStatus->hasAgg;
pCtx->input.numOfRows = pStatus->numOfRows; pCtx->input.numOfRows = pStatus->numOfRows;
pCtx->input.startRowIndex = pStatus->startOffset; pCtx->input.startRowIndex = pStatus->startOffset;
} }
@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode); SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey); STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order); w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
bool assignUid = groupbyTbname(group); bool assignUid = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if(assignUid){ if (assignUid) {
for (int32_t i = 0; i < numOfTables; i++) { for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = info->uid; info->groupId = info->uid;
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
} }
}else{ } else {
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -4615,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete; goto _complete;
} }
if (pHandle && pHandle->pStateBackend) {
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
}
(*pTaskInfo)->sql = sql; (*pTaskInfo)->sql = sql;
sql = NULL; sql = NULL;
(*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pSubplan = pPlan;

View File

@ -3128,8 +3128,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
maxTs = TMAX(maxTs, pBlock->info.watermark); maxTs = TMAX(maxTs, pBlock->info.watermark);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA || ASSERT(pBlock->info.type != STREAM_INVERT);
pBlock->info.type == STREAM_INVALID) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type; pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) { } else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));

View File

@ -283,7 +283,7 @@ typedef struct SSchJob {
} SSchJob; } SSchJob;
typedef struct SSchTaskCtx { typedef struct SSchTaskCtx {
SSchJob *pJob; int64_t jobRid;
SSchTask *pTask; SSchTask *pTask;
} SSchTaskCtx; } SSchTaskCtx;

View File

@ -821,7 +821,13 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
int32_t schLaunchTaskImpl(void *param) { int32_t schLaunchTaskImpl(void *param) {
SSchTaskCtx *pCtx = (SSchTaskCtx *)param; SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
SSchJob *pJob = pCtx->pJob; SSchJob *pJob = schAcquireJob(pCtx->jobRid);
if (NULL == pJob) {
taosMemoryFree(param);
qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
}
SSchTask *pTask = pCtx->pTask; SSchTask *pTask = pCtx->pTask;
int8_t status = 0; int8_t status = 0;
int32_t code = 0; int32_t code = 0;
@ -880,6 +886,8 @@ _return:
} }
} }
schReleaseJob(pJob->refId);
SCH_RET(code); SCH_RET(code);
} }
@ -890,7 +898,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
param->pJob = pJob; param->jobRid = pJob->refId;
param->pTask = pTask; param->pTask = pTask;
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {

View File

@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
return 0; return 0;
} }
// TODO: handle version
int32_t streamExecForAll(SStreamTask* pTask) { int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
int32_t batchCnt = 1; int32_t batchCnt = 1;

View File

@ -14,7 +14,7 @@
*/ */
#include "executor.h" #include "executor.h"
#include "tstream.h" #include "streamInc.h"
#include "ttimer.h" #include "ttimer.h"
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pMeta->path = strdup(path); int32_t len = strlen(path) + 20;
char* streamPath = taosMemoryCalloc(1, len);
sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = strdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
goto _err; goto _err;
} }
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
mkdir(streamPath, 0755);
taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
goto _err; goto _err;
} }
// open state storage backend if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) {
if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) {
goto _err; goto _err;
} }
@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle; pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
if (streamLoadTasks(pMeta) < 0) {
goto _err;
}
return pMeta; return pMeta;
_err: _err:
if (pMeta->path) taosMemoryFree(pMeta->path); if (pMeta->path) taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db); if (pMeta->db) tdbClose(pMeta->db);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
return NULL; return NULL;
@ -67,7 +70,7 @@ _err:
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
tdbCommit(pMeta->db, &pMeta->txn); tdbCommit(pMeta->db, &pMeta->txn);
tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pStateDb); tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db); tdbClose(pMeta->db);
void* pIter = NULL; void* pIter = NULL;

View File

@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea
} }
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
#if 0
void* buf = NULL; void* buf = NULL;
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
FAIL: FAIL:
if (buf) taosMemoryFree(buf); if (buf) taosMemoryFree(buf);
return -1; return -1;
#endif
return 0; return 0;
} }
int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
#if 0
void* pVal = NULL; void* pVal = NULL;
int32_t vLen = 0; int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) { if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
pTask->nextCheckId = aggCheckpoint.checkpointId + 1; pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
pTask->checkpointInfo = aggCheckpoint.checkpointVer; pTask->checkpointInfo = aggCheckpoint.checkpointVer;
#endif
return 0; return 0;
} }

View File

@ -0,0 +1,187 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "streamInc.h"
#include "ttimer.h"
SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
char statePath[200];
sprintf(statePath, "%s/%d", path, pTask->taskId);
if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) {
goto _err;
}
// open state storage backend
if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) {
goto _err;
}
pState->pOwner = pTask;
return pState;
_err:
if (pState->pStateDb) tdbTbClose(pState->pStateDb);
if (pState->db) tdbClose(pState->db);
taosMemoryFree(pState);
return NULL;
}
void streamStateClose(SStreamState* pState) {
tdbCommit(pState->db, &pState->txn);
tdbTbClose(pState->pStateDb);
tdbClose(pState->db);
taosMemoryFree(pState);
}
int32_t streamStateBegin(SStreamState* pState) {
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamStateCommit(SStreamState* pState) {
if (tdbCommit(pState->db, &pState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamStateAbort(SStreamState* pState) {
if (tdbAbort(pState->db, &pState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn);
}
int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen);
}
int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) {
return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn);
}
SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
int32_t c;
tdbTbcMoveTo(pCur->pCur, key, kLen, &c);
if (c != 0) {
taosMemoryFree(pCur);
return NULL;
}
return 0;
}
int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) {
return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen);
}
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToFirst(pCur->pCur);
}
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToLast(pCur->pCur);
}
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) {
taosMemoryFree(pCur);
return NULL;
}
if (c > 0) return pCur;
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
taosMemoryFree(pCur);
return NULL;
}
return pCur;
}
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) {
taosMemoryFree(pCur);
return NULL;
}
if (c < 0) return pCur;
if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
taosMemoryFree(pCur);
return NULL;
}
return pCur;
}
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToNext(pCur->pCur);
}
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToPrev(pCur->pCur);
}

View File

@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
} }
if (pTask->pState) streamStateClose(pTask->pState);
taosMemoryFree(pTask); taosMemoryFree(pTask);
} }

View File

@ -276,7 +276,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
while (transReadComplete(pBuf)) { while (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
if (true == pBuf->invalid || false == uvHandleReq(conn)) { if (true == pBuf->invalid || false == uvHandleReq(conn)) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), conn); tError("%s conn %p read invalid packet, dst: %s, srv: %s", transLabel(pTransInst), conn, conn->dst, conn->src);
destroyConn(conn, true); destroyConn(conn, true);
return; return;
} }

View File

@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
if (found == NULL) { if (found == NULL) {
// file corrupted, no complete log // file corrupted, no complete log
// TODO delete and search in previous files // TODO delete and search in previous files
ASSERT(0); /*ASSERT(0);*/
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
int code = walSaveMeta(pWal); int code = walSaveMeta(pWal);
if (code < 0) { if (code < 0) {
taosArrayDestroy(actualLog);
return -1; return -1;
} }
} }

View File

@ -4,7 +4,7 @@ system sh/exec.sh -n dnode1 -s start
sql connect sql connect
print =============== conflict stb print =============== conflict stb
sql create database db vgroups 1; sql create database db vgroups 4;
sql use db; sql use db;
sql create table stb (ts timestamp, i int) tags (j int); sql create table stb (ts timestamp, i int) tags (j int);
sql_error create table stb using stb tags (1); sql_error create table stb using stb tags (1);
@ -16,6 +16,9 @@ sql_error create table ctb (ts timestamp, i int) tags (j int);
sql create table ntb (ts timestamp, i int); sql create table ntb (ts timestamp, i int);
sql_error create table ntb (ts timestamp, i int) tags (j int); sql_error create table ntb (ts timestamp, i int) tags (j int);
sql drop table ntb
sql create table ntb (ts timestamp, i int) tags (j int);
sql drop database db sql drop database db
print =============== create database d1 print =============== create database d1