diff --git a/docs/en/02-intro/index.md b/docs/en/02-intro/index.md index 9f0876d297..b4636e54a6 100644 --- a/docs/en/02-intro/index.md +++ b/docs/en/02-intro/index.md @@ -97,14 +97,13 @@ As a high-performance, scalable and SQL supported time-series database, TDengine | **System Maintenance Requirements** | **Not Applicable** | **Might Be Applicable** | **Very Applicable** | **Description** | | ------------------------------------------------- | ------------------ | ----------------------- | ------------------- | ------------------------------------------------------------ | | Native high-reliability | | | √ | TDengine has a very robust, reliable and easily configurable system architecture to simplify routine operation. Human errors and accidents are eliminated to the greatest extent, with a streamlined experience for operators. | -| Minimize learning and maintenance costs | | | √ | In addition to being easily configurable, standard SQL support and the Taos shell for ad hoc queries makes maintenance simpler, allows reuse and reduces learning costs.| +| Minimize learning and maintenance costs | | | √ | In addition to being easily configurable, standard SQL support and the TDengine CLI for ad hoc queries makes maintenance simpler, allows reuse and reduces learning costs.| | Abundant talent supply | √ | | | Given the above, and given the extensive training and professional services provided by TDengine, it is easy to migrate from existing solutions or create a new and lasting solution based on TDengine.| ## Comparison with other databases - [Writing Performance Comparison of TDengine and InfluxDB ](https://tdengine.com/2022/02/23/4975.html) - [Query Performance Comparison of TDengine and InfluxDB](https://tdengine.com/2022/02/24/5120.html) -- [TDengine vs InfluxDB、OpenTSDB、Cassandra、MySQL、ClickHouse](https://www.tdengine.com/downloads/TDengine_Testing_Report_en.pdf) - [TDengine vs OpenTSDB](https://tdengine.com/2019/09/12/710.html) - [TDengine vs Cassandra](https://tdengine.com/2019/09/12/708.html) - [TDengine vs InfluxDB](https://tdengine.com/2019/09/12/706.html) diff --git a/docs/en/05-get-started/01-docker.md b/docs/en/05-get-started/01-docker.md index 32eee6b942..de5b620a77 100644 --- a/docs/en/05-get-started/01-docker.md +++ b/docs/en/05-get-started/01-docker.md @@ -42,7 +42,7 @@ To do so, run the following command: ``` - This command creates the `meters` supertable in the `test` database. In the `meters` supertable, it then creates 10,000 subtables named `d0` to `d9999`. Each table has 10,000 rows and each row has four columns: `ts`, `current`, `voltage`, and `phase`. The timestamps of the data in these columns range from 2017-07-14 10:40:00 000 to 2017-07-14 10:40:09 999. Each table is randomly assigned a `groupId` tag from 1 to ten and a `location` tag of either `California.SanFrancisco` or `California.SanDiego`. +This command creates the `meters` supertable in the `test` database. In the `meters` supertable, it then creates 10,000 subtables named `d0` to `d9999`. Each table has 10,000 rows and each row has four columns: `ts`, `current`, `voltage`, and `phase`. The timestamps of the data in these columns range from 2017-07-14 10:40:00 000 to 2017-07-14 10:40:09 999. Each table is randomly assigned a `groupId` tag from 1 to 10 and a `location` tag of either `Campbell`, `Cupertino`, `Los Angeles`, `Mountain View`, `Palo Alto`, `San Diego`, `San Francisco`, `San Jose`, `Santa Clara` or `Sunnyvale`. The `taosBenchmark` command creates a deployment with 100 million data points that you can use for testing purposes. The time required depends on the hardware specifications of the local system. diff --git a/docs/zh/02-intro.md b/docs/zh/02-intro.md index a6ef2b94b6..f726b4ea92 100644 --- a/docs/zh/02-intro.md +++ b/docs/zh/02-intro.md @@ -22,8 +22,8 @@ TDengine的主要功能如下: 9. 提供[命令行程序](../reference/taos-shell),便于管理集群,检查系统状态,做即席查询 10. 提供多种数据的[导入](../operation/import)、[导出](../operation/export) 11. 支持对[TDengine 集群本身的监控](../operation/monitor) -12. 提供 [C/C++](../reference/connector/cpp), [Java](../reference/connector/java), [Python](../reference/connector/python), [Go](../reference/connector/go), [Rust](../reference/connector/rust), [Node.js](../reference/connector/node) 等多种编程语言的[连接器](../reference/connector/) -13. 支持 [REST 接口](../reference/rest-api/) +12. 提供各种语言的[连接器](../connector): 如 C/C++, Java, Go, Node.JS, Rust, Python, C# 等 +13. 支持 [REST 接口](../connector/rest-api/) 14. 支持与[ Grafana 无缝集成](../third-party/grafana) 15. 支持与 Google Data Studio 无缝集成 16. 支持 [Kubernetes 部署](../deployment/k8s) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 85005b9551..a1c1802d77 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -9,7 +9,7 @@ import PkgListV3 from "/components/PkgListV3"; 您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. -TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../reference/rest-api/)。 +TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../connector/rest-api/)。 为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。 diff --git a/docs/zh/05-get-started/index.md b/docs/zh/05-get-started/index.md index 794081b4e4..20f8235d87 100644 --- a/docs/zh/05-get-started/index.md +++ b/docs/zh/05-get-started/index.md @@ -3,7 +3,7 @@ title: 立即开始 description: '快速设置 TDengine 环境并体验其高效写入和查询' --- -TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](/reference/taosadapter) 提供 [RESTful 接口](/reference/rest-api)。 +TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api)。 本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。 diff --git a/docs/zh/07-develop/01-connect/_connect_java.mdx b/docs/zh/07-develop/01-connect/_connect_java.mdx index f5b8ea1cc2..86c70ef7dc 100644 --- a/docs/zh/07-develop/01-connect/_connect_java.mdx +++ b/docs/zh/07-develop/01-connect/_connect_java.mdx @@ -12,4 +12,4 @@ {{#include docs/examples/java/src/main/java/com/taos/example/WSConnectExample.java:main}} ``` -更多连接参数配置,参考[Java 连接器](/reference/connector/java) +更多连接参数配置,参考[Java 连接器](../../connector/java) diff --git a/docs/zh/07-develop/01-connect/index.md b/docs/zh/07-develop/01-connect/index.md index cc9f2e2a69..3e44e6c5da 100644 --- a/docs/zh/07-develop/01-connect/index.md +++ b/docs/zh/07-develop/01-connect/index.md @@ -14,10 +14,10 @@ import ConnCSNative from "./_connect_cs.mdx"; import ConnC from "./_connect_c.mdx"; import ConnR from "./_connect_r.mdx"; import ConnPHP from "./_connect_php.mdx"; -import InstallOnWindows from "../../14-reference/03-connector/_linux_install.mdx"; -import InstallOnLinux from "../../14-reference/03-connector/_windows_install.mdx"; -import VerifyLinux from "../../14-reference/03-connector/_verify_linux.mdx"; -import VerifyWindows from "../../14-reference/03-connector/_verify_windows.mdx"; +import InstallOnWindows from "../../08-connector/_linux_install.mdx"; +import InstallOnLinux from "../../08-connector/_windows_install.mdx"; +import VerifyLinux from "../../08-connector/_verify_linux.mdx"; +import VerifyWindows from "../../08-connector/_verify_windows.mdx"; TDengine 提供了丰富的应用程序开发接口,为了便于用户快速开发自己的应用,TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C#、Rust、Lua(社区贡献)和 PHP (社区贡献)的连接器。这些连接器支持使用原生接口(taosc)和 REST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。 @@ -33,7 +33,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速 关键不同点在于: 1. 使用 REST 连接,用户无需安装客户端驱动程序 taosc,具有跨平台易用的优势,但性能要下降 30%左右。 -2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](/reference/connector/cpp#参数绑定-api)、[订阅](/reference/connector/cpp#订阅和消费-api)等等。 +2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](../../connector/cpp/#参数绑定-api)、[订阅](../../connector/cpp/#订阅和消费-api)等等。 ## 安装客户端驱动 taosc diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index da8bf5e20e..2f5c13d9b0 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -64,7 +64,7 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); ``` -这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。 +这些 API 的文档请见 [C/C++ Connector](../../connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。 diff --git a/docs/zh/07-develop/index.md b/docs/zh/07-develop/index.md index 4d0f3c3cea..20c0170844 100644 --- a/docs/zh/07-develop/index.md +++ b/docs/zh/07-develop/index.md @@ -12,7 +12,7 @@ title: 开发指南 7. 在很多场景下(如车辆管理),应用需要获取每个数据采集点的最新状态,那么建议你采用TDengine的cache功能,而不用单独部署Redis等缓存软件。 8. 如果你发现TDengine的函数无法满足你的要求,那么你可以使用用户自定义函数来解决问题。 -本部分内容就是按照上述的顺序组织的。为便于理解,TDengine为每个功能为每个支持的编程语言都提供了示例代码。如果你希望深入了解SQL的使用,需要查看[SQL手册](/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](/reference/connector/)。如果还希望想将TDengine与第三方系统集成起来,比如Grafana, 请参考[第三方工具](/third-party/)。 +本部分内容就是按照上述的顺序组织的。为便于理解,TDengine为每个功能为每个支持的编程语言都提供了示例代码。如果你希望深入了解SQL的使用,需要查看[SQL手册](/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](../connector/)。如果还希望想将TDengine与第三方系统集成起来,比如Grafana, 请参考[第三方工具](../third-party/)。 如果在开发过程中遇到任何问题,请点击每个页面下方的["反馈问题"](https://github.com/taosdata/TDengine/issues/new/choose), 在GitHub上直接递交issue。 diff --git a/docs/zh/14-reference/02-rest-api/02-rest-api.mdx b/docs/zh/08-connector/02-rest-api.mdx similarity index 100% rename from docs/zh/14-reference/02-rest-api/02-rest-api.mdx rename to docs/zh/08-connector/02-rest-api.mdx diff --git a/docs/zh/14-reference/03-connector/cpp.mdx b/docs/zh/08-connector/03-cpp.mdx similarity index 99% rename from docs/zh/14-reference/03-connector/cpp.mdx rename to docs/zh/08-connector/03-cpp.mdx index bd5776d035..d27eeb7dfb 100644 --- a/docs/zh/14-reference/03-connector/cpp.mdx +++ b/docs/zh/08-connector/03-cpp.mdx @@ -22,7 +22,7 @@ TDengine 客户端驱动的动态库位于: ## 支持的平台 -请参考[支持的平台列表](/reference/connector#支持的平台) +请参考[支持的平台列表](../#支持的平台) ## 支持的版本 @@ -30,7 +30,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一 ## 安装步骤 -TDengine 客户端驱动的安装请参考 [安装指南](/reference/connector#安装步骤) +TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤) ## 建立连接 diff --git a/docs/zh/14-reference/03-connector/java.mdx b/docs/zh/08-connector/04-java.mdx similarity index 99% rename from docs/zh/14-reference/03-connector/java.mdx rename to docs/zh/08-connector/04-java.mdx index 183994313e..20d2e4fabd 100644 --- a/docs/zh/14-reference/03-connector/java.mdx +++ b/docs/zh/08-connector/04-java.mdx @@ -35,7 +35,7 @@ REST 连接支持所有能运行 Java 的平台。 ## 版本支持 -请参考[版本支持列表](/reference/connector#版本支持) +请参考[版本支持列表](../#版本支持) ## TDengine DataType 和 Java DataType @@ -64,7 +64,7 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对 使用 Java Connector 连接数据库前,需要具备以下条件: - 已安装 Java 1.8 或以上版本运行时环境和 Maven 3.6 或以上版本 -- 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动) +- 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](../#安装客户端驱动) ### 安装连接器 @@ -630,7 +630,7 @@ public void setNString(int columnIndex, ArrayList list, int size) throws ### 无模式写入 -TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../schemaless)。 +TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。 **注意**: diff --git a/docs/zh/14-reference/03-connector/go.mdx b/docs/zh/08-connector/05-go.mdx similarity index 95% rename from docs/zh/14-reference/03-connector/go.mdx rename to docs/zh/08-connector/05-go.mdx index a87c948d4a..9d30f75190 100644 --- a/docs/zh/14-reference/03-connector/go.mdx +++ b/docs/zh/08-connector/05-go.mdx @@ -9,11 +9,11 @@ import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import Preparition from "./_preparition.mdx" -import GoInsert from "../../07-develop/03-insert-data/_go_sql.mdx" -import GoInfluxLine from "../../07-develop/03-insert-data/_go_line.mdx" -import GoOpenTSDBTelnet from "../../07-develop/03-insert-data/_go_opts_telnet.mdx" -import GoOpenTSDBJson from "../../07-develop/03-insert-data/_go_opts_json.mdx" -import GoQuery from "../../07-develop/04-query-data/_go.mdx" +import GoInsert from "../07-develop/03-insert-data/_go_sql.mdx" +import GoInfluxLine from "../07-develop/03-insert-data/_go_line.mdx" +import GoOpenTSDBTelnet from "../07-develop/03-insert-data/_go_opts_telnet.mdx" +import GoOpenTSDBJson from "../07-develop/03-insert-data/_go_opts_json.mdx" +import GoQuery from "../07-develop/04-query-data/_go.mdx" `driver-go` 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言[ database/sql ](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。 @@ -30,7 +30,7 @@ REST 连接支持所有能运行 Go 的平台。 ## 版本支持 -请参考[版本支持列表](/reference/connector#版本支持) +请参考[版本支持列表](../#版本支持) ## 支持的功能特性 @@ -56,7 +56,7 @@ REST 连接支持所有能运行 Go 的平台。 ### 安装前准备 * 安装 Go 开发环境(Go 1.14 及以上,GCC 4.8.5 及以上) -* 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动) +* 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) 配置好环境变量,检查命令: diff --git a/docs/zh/14-reference/03-connector/rust.mdx b/docs/zh/08-connector/06-rust.mdx similarity index 97% rename from docs/zh/14-reference/03-connector/rust.mdx rename to docs/zh/08-connector/06-rust.mdx index ae644e1911..187e2f0b33 100644 --- a/docs/zh/14-reference/03-connector/rust.mdx +++ b/docs/zh/08-connector/06-rust.mdx @@ -9,9 +9,9 @@ import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import Preparition from "./_preparition.mdx" -import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx" -import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx" -import RustQuery from "../../07-develop/04-query-data/_rust.mdx" +import RustInsert from "../07-develop/03-insert-data/_rust_sql.mdx" +import RustBind from "../07-develop/03-insert-data/_rust_stmt.mdx" +import RustQuery from "../07-develop/04-query-data/_rust.mdx" [![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos) @@ -28,7 +28,7 @@ Websocket 连接支持所有能运行 Rust 的平台。 ## 版本支持 -请参考[版本支持列表](/reference/connector#版本支持) +请参考[版本支持列表](../#版本支持) Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。 @@ -37,7 +37,7 @@ Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容 ### 安装前准备 * 安装 Rust 开发工具链 -* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动) +* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) ### 添加 taos 依赖 diff --git a/docs/zh/14-reference/03-connector/python.mdx b/docs/zh/08-connector/07-python.mdx similarity index 96% rename from docs/zh/14-reference/03-connector/python.mdx rename to docs/zh/08-connector/07-python.mdx index 9ce81f9d70..88a5d4f84d 100644 --- a/docs/zh/14-reference/03-connector/python.mdx +++ b/docs/zh/08-connector/07-python.mdx @@ -8,7 +8,7 @@ description: "taospy 是 TDengine 的官方 Python 连接器。taospy 提供了 import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; -`taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](/reference/connector/cpp)和 [REST 接口](/reference/rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。 +`taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](../cpp)和 [REST 接口](../rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。 除了对原生接口和 REST 接口的封装,`taospy` 还提供了符合 [Python 数据访问规范(PEP 249)](https://peps.python.org/pep-0249/) 的编程接口。这使得 `taospy` 和很多第三方工具集成变得简单,比如 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [pandas](https://pandas.pydata.org/)。 使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口与服务端建立的连接的方式下文中称为“REST 连接”。 @@ -17,7 +17,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con ## 支持的平台 -- 原生连接[支持的平台](/reference/connector/#支持的平台)和 TDengine 客户端支持的平台一致。 +- 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。 - REST 连接支持所有能运行 Python 的平台。 ## 版本选择 @@ -275,7 +275,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 ##### RestClient 类的使用 -`RestClient` 类是对于 [REST API](/reference/rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。 +`RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。 ```python title="RestClient 的使用" {{#include docs/examples/python/rest_client_example.py}} diff --git a/docs/zh/14-reference/03-connector/node.mdx b/docs/zh/08-connector/08-node.mdx similarity index 93% rename from docs/zh/14-reference/03-connector/node.mdx rename to docs/zh/08-connector/08-node.mdx index b089da99d2..63d690e554 100644 --- a/docs/zh/14-reference/03-connector/node.mdx +++ b/docs/zh/08-connector/08-node.mdx @@ -9,11 +9,11 @@ import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; import Preparition from "./_preparition.mdx"; -import NodeInsert from "../../07-develop/03-insert-data/_js_sql.mdx"; -import NodeInfluxLine from "../../07-develop/03-insert-data/_js_line.mdx"; -import NodeOpenTSDBTelnet from "../../07-develop/03-insert-data/_js_opts_telnet.mdx"; -import NodeOpenTSDBJson from "../../07-develop/03-insert-data/_js_opts_json.mdx"; -import NodeQuery from "../../07-develop/04-query-data/_js.mdx"; +import NodeInsert from "../07-develop/03-insert-data/_js_sql.mdx"; +import NodeInfluxLine from "../07-develop/03-insert-data/_js_line.mdx"; +import NodeOpenTSDBTelnet from "../07-develop/03-insert-data/_js_opts_telnet.mdx"; +import NodeOpenTSDBJson from "../07-develop/03-insert-data/_js_opts_json.mdx"; +import NodeQuery from "../07-develop/04-query-data/_js.mdx"; `@tdengine/client` 和 `@tdengine/rest` 是 TDengine 的官方 Node.js 语言连接器。 Node.js 开发人员可以通过它开发可以存取 TDengine 集群数据的应用软件。注意:从 TDengine 3.0 开始 Node.js 原生连接器的包名由 `td2.0-connector` 改名为 `@tdengine/client` 而 rest 连接器的包名由 `td2.0-rest-connector` 改为 `@tdengine/rest`。并且不与 TDengine 2.x 兼容。 @@ -28,7 +28,7 @@ REST 连接器支持所有能运行 Node.js 的平台。 ## 版本支持 -请参考[版本支持列表](/reference/connector#版本支持) +请参考[版本支持列表](../#版本支持) ## 支持的功能特性 @@ -52,7 +52,7 @@ REST 连接器支持所有能运行 Node.js 的平台。 ### 安装前准备 - 安装 Node.js 开发环境 -- 如果使用 REST 连接器,跳过此步。但如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动)。我们使用 [node-gyp](https://github.com/nodejs/node-gyp) 和 TDengine 实例进行交互,还需要根据具体操作系统来安装下文提到的一些依赖工具。 +- 如果使用 REST 连接器,跳过此步。但如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)。我们使用 [node-gyp](https://github.com/nodejs/node-gyp) 和 TDengine 实例进行交互,还需要根据具体操作系统来安装下文提到的一些依赖工具。 diff --git a/docs/zh/14-reference/03-connector/csharp.mdx b/docs/zh/08-connector/09-csharp.mdx similarity index 91% rename from docs/zh/14-reference/03-connector/csharp.mdx rename to docs/zh/08-connector/09-csharp.mdx index 723c12932b..8214717583 100644 --- a/docs/zh/14-reference/03-connector/csharp.mdx +++ b/docs/zh/08-connector/09-csharp.mdx @@ -9,16 +9,16 @@ import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import Preparition from "./_preparition.mdx" -import CSInsert from "../../07-develop/03-insert-data/_cs_sql.mdx" -import CSInfluxLine from "../../07-develop/03-insert-data/_cs_line.mdx" -import CSOpenTSDBTelnet from "../../07-develop/03-insert-data/_cs_opts_telnet.mdx" -import CSOpenTSDBJson from "../../07-develop/03-insert-data/_cs_opts_json.mdx" -import CSQuery from "../../07-develop/04-query-data/_cs.mdx" -import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx" +import CSInsert from "../07-develop/03-insert-data/_cs_sql.mdx" +import CSInfluxLine from "../07-develop/03-insert-data/_cs_line.mdx" +import CSOpenTSDBTelnet from "../07-develop/03-insert-data/_cs_opts_telnet.mdx" +import CSOpenTSDBJson from "../07-develop/03-insert-data/_cs_opts_json.mdx" +import CSQuery from "../07-develop/04-query-data/_cs.mdx" +import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx" `TDengine.Connector` 是 TDengine 提供的 C# 语言连接器。C# 开发人员可以通过它开发存取 TDengine 集群数据的 C# 应用软件。 -`TDengine.Connector` 连接器支持通过 TDengine 客户端驱动(taosc)建立与 TDengine 运行实例的连接,提供数据写入、查询、订阅、schemaless 数据写入、参数绑定接口数据写入等功能 `TDengine.Connector` 目前暂未提供 REST 连接方式,用户可以参考 [REST API](/reference/rest-api/) 文档自行编写。 +`TDengine.Connector` 连接器支持通过 TDengine 客户端驱动(taosc)建立与 TDengine 运行实例的连接,提供数据写入、查询、订阅、schemaless 数据写入、参数绑定接口数据写入等功能 `TDengine.Connector` 目前暂未提供 REST 连接方式,用户可以参考 [REST API](../rest-api/) 文档自行编写。 本文介绍如何在 Linux 或 Windows 环境中安装 `TDengine.Connector`,并通过 `TDengine.Connector` 连接 TDengine 集群,进行数据写入、查询等基本操作。 @@ -32,7 +32,7 @@ import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx" ## 版本支持 -请参考[版本支持列表](/reference/connector#版本支持) +请参考[版本支持列表](../#版本支持) ## 支持的功能特性 @@ -49,7 +49,7 @@ import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx" * 安装 [.NET SDK](https://dotnet.microsoft.com/download) * [Nuget 客户端](https://docs.microsoft.com/en-us/nuget/install-nuget-client-tools) (可选安装) -* 安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动) +* 安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) ### 使用 dotnet CLI 安装 diff --git a/docs/zh/14-reference/03-connector/php.mdx b/docs/zh/08-connector/10-php.mdx similarity index 97% rename from docs/zh/14-reference/03-connector/php.mdx rename to docs/zh/08-connector/10-php.mdx index 5617dc6f73..53611c0274 100644 --- a/docs/zh/14-reference/03-connector/php.mdx +++ b/docs/zh/08-connector/10-php.mdx @@ -38,7 +38,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一 ### 安装 TDengine 客户端驱动 -TDengine 客户端驱动的安装请参考 [安装指南](/reference/connector#安装步骤) +TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤) ### 编译安装 php-tdengine diff --git a/docs/zh/14-reference/03-connector/_01-error-code.md b/docs/zh/08-connector/_01-error-code.md similarity index 100% rename from docs/zh/14-reference/03-connector/_01-error-code.md rename to docs/zh/08-connector/_01-error-code.md diff --git a/docs/zh/14-reference/03-connector/_category_.yml b/docs/zh/08-connector/_category_.yml similarity index 100% rename from docs/zh/14-reference/03-connector/_category_.yml rename to docs/zh/08-connector/_category_.yml diff --git a/docs/zh/14-reference/03-connector/_linux_install.mdx b/docs/zh/08-connector/_linux_install.mdx similarity index 100% rename from docs/zh/14-reference/03-connector/_linux_install.mdx rename to docs/zh/08-connector/_linux_install.mdx diff --git a/docs/zh/14-reference/03-connector/_preparition.mdx b/docs/zh/08-connector/_preparition.mdx similarity index 100% rename from docs/zh/14-reference/03-connector/_preparition.mdx rename to docs/zh/08-connector/_preparition.mdx diff --git a/docs/zh/14-reference/03-connector/_verify_linux.mdx b/docs/zh/08-connector/_verify_linux.mdx similarity index 100% rename from docs/zh/14-reference/03-connector/_verify_linux.mdx rename to docs/zh/08-connector/_verify_linux.mdx diff --git a/docs/zh/14-reference/03-connector/_verify_windows.mdx b/docs/zh/08-connector/_verify_windows.mdx similarity index 100% rename from docs/zh/14-reference/03-connector/_verify_windows.mdx rename to docs/zh/08-connector/_verify_windows.mdx diff --git a/docs/zh/14-reference/03-connector/_windows_install.mdx b/docs/zh/08-connector/_windows_install.mdx similarity index 100% rename from docs/zh/14-reference/03-connector/_windows_install.mdx rename to docs/zh/08-connector/_windows_install.mdx diff --git a/docs/zh/14-reference/03-connector/connector.webp b/docs/zh/08-connector/connector.webp similarity index 100% rename from docs/zh/14-reference/03-connector/connector.webp rename to docs/zh/08-connector/connector.webp diff --git a/docs/zh/14-reference/03-connector/03-connector.mdx b/docs/zh/08-connector/index.md similarity index 98% rename from docs/zh/14-reference/03-connector/03-connector.mdx rename to docs/zh/08-connector/index.md index bdad0b7e25..17de8e926c 100644 --- a/docs/zh/14-reference/03-connector/03-connector.mdx +++ b/docs/zh/08-connector/index.md @@ -1,5 +1,7 @@ --- +sidebar_label: 连接器 title: 连接器 +description: 详细介绍各种语言的连接器及 REST API --- TDengine 提供了丰富的应用程序开发接口,为了便于用户快速开发自己的应用,TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口(taosc)和 REST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。 diff --git a/docs/zh/14-reference/03-connector/tdengine-jdbc-connector.webp b/docs/zh/08-connector/tdengine-jdbc-connector.webp similarity index 100% rename from docs/zh/14-reference/03-connector/tdengine-jdbc-connector.webp rename to docs/zh/08-connector/tdengine-jdbc-connector.webp diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index a967299e40..28f52be59a 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -18,7 +18,7 @@ stream_options: { 其中 subquery 是 select 普通查询语法的子集: ```sql -subquery: SELECT [DISTINCT] select_list +subquery: SELECT select_list from_clause [WHERE condition] [PARTITION BY tag_list] @@ -37,7 +37,7 @@ window_clause: { 其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 -窗口的定义与时序数据特色查询中的定义完全相同。 +窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) 例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 diff --git a/docs/zh/14-reference/02-rest-api/_category_.yml b/docs/zh/14-reference/02-rest-api/_category_.yml deleted file mode 100644 index 57a20d8458..0000000000 --- a/docs/zh/14-reference/02-rest-api/_category_.yml +++ /dev/null @@ -1 +0,0 @@ -label: REST API diff --git a/docs/zh/14-reference/04-taosadapter.md b/docs/zh/14-reference/04-taosadapter.md index 6177b52e4c..71bf5f4223 100644 --- a/docs/zh/14-reference/04-taosadapter.md +++ b/docs/zh/14-reference/04-taosadapter.md @@ -156,7 +156,7 @@ AllowWebSockets ## 功能列表 - RESTful 接口 - [https://docs.taosdata.com/reference/rest-api/](https://docs.taosdata.com/reference/rest-api/) + [RESTful API](../../connector/rest-api) - 兼容 InfluxDB v1 写接口 [https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/](https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/) - 兼容 OpenTSDB JSON 和 telnet 格式写入 @@ -179,7 +179,7 @@ AllowWebSockets ### TDengine RESTful 接口 -您可以使用任何支持 http 协议的客户端通过访问 RESTful 接口地址 `http://:6041/rest/sql` 来写入数据到 TDengine 或从 TDengine 中查询数据。细节请参考[官方文档](/reference/rest-api/)。 +您可以使用任何支持 http 协议的客户端通过访问 RESTful 接口地址 `http://:6041/rest/sql` 来写入数据到 TDengine 或从 TDengine 中查询数据。细节请参考[官方文档](../../connector/rest-api/)。 ### InfluxDB diff --git a/docs/zh/14-reference/08-taos-shell.md b/docs/zh/14-reference/08-taos-shell.md index 2f3b551502..5804549878 100644 --- a/docs/zh/14-reference/08-taos-shell.md +++ b/docs/zh/14-reference/08-taos-shell.md @@ -8,7 +8,7 @@ TDengine 命令行程序(以下简称 TDengine CLI)是用户操作 TDengine ## 安装 -如果在 TDengine 服务器端执行,无需任何安装,已经自动安装好 TDengine CLI。如果要在非 TDengine 服务器端运行,需要安装 TDengine 客户端驱动安装包,具体安装,请参考 [连接器](/reference/connector/)。 +如果在 TDengine 服务器端执行,无需任何安装,已经自动安装好 TDengine CLI。如果要在非 TDengine 服务器端运行,需要安装 TDengine 客户端驱动安装包,具体安装,请参考 [连接器](../../connector/)。 ## 执行 @@ -18,7 +18,7 @@ TDengine 命令行程序(以下简称 TDengine CLI)是用户操作 TDengine taos ``` -如果连接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息。(请参考 [FAQ](/train-faq/faq) 来解决终端连接服务端失败的问题)。TDengine CLI 的提示符号如下: +如果连接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息。(请参考 [FAQ](../../train-faq/faq) 来解决终端连接服务端失败的问题)。TDengine CLI 的提示符号如下: ```cmd taos> diff --git a/docs/zh/20-third-party/09-emq-broker.md b/docs/zh/20-third-party/09-emq-broker.md index dd98374558..f252e520a7 100644 --- a/docs/zh/20-third-party/09-emq-broker.md +++ b/docs/zh/20-third-party/09-emq-broker.md @@ -90,7 +90,7 @@ http://127.0.0.1:6041/rest/sql ``` Basic cm9vdDp0YW9zZGF0YQ== ``` -相关文档请参考[ TDengine REST API 文档](/reference/rest-api/)。 +相关文档请参考[ TDengine REST API 文档](../../connector/rest-api/)。 在消息体中输入规则引擎替换模板: diff --git a/docs/zh/20-third-party/11-kafka.md b/docs/zh/20-third-party/11-kafka.md index 8369806adc..09bda4664f 100644 --- a/docs/zh/20-third-party/11-kafka.md +++ b/docs/zh/20-third-party/11-kafka.md @@ -184,7 +184,7 @@ echo `cat /tmp/confluent.current`/connect/connect.stdout TDengine Sink Connector 的作用是同步指定 topic 的数据到 TDengine。用户无需提前创建数据库和超级表。可手动指定目标数据库的名字(见配置参数 connection.database), 也可按一定规则生成(见配置参数 connection.database.prefix)。 -TDengine Sink Connector 内部使用 TDengine [无模式写入接口](/reference/connector/cpp#无模式写入-api)写数据到 TDengine,目前支持三种格式的数据:[InfluxDB 行协议格式](/develop/insert-data/influxdb-line)、 [OpenTSDB Telnet 协议格式](/develop/insert-data/opentsdb-telnet) 和 [OpenTSDB JSON 协议格式](/develop/insert-data/opentsdb-json)。 +TDengine Sink Connector 内部使用 TDengine [无模式写入接口](../../connector/cpp#无模式写入-api)写数据到 TDengine,目前支持三种格式的数据:[InfluxDB 行协议格式](/develop/insert-data/influxdb-line)、 [OpenTSDB Telnet 协议格式](/develop/insert-data/opentsdb-telnet) 和 [OpenTSDB JSON 协议格式](/develop/insert-data/opentsdb-json)。 下面的示例将主题 meters 的数据,同步到目标数据库 power。数据格式为 InfluxDB Line 协议格式。 diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 2fcf4dd62c..55556f21a1 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +// clang-format off #include #include #include @@ -94,13 +95,8 @@ int32_t create_stream() { } taos_free_result(pRes); - /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ - /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ - /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ - /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query(pConn, - "create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, " - "count(k) from st1 partition by tbname interval(20s) "); + "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tglobal.h b/include/common/tglobal.h index cd74ffd477..f872d1dbc2 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -130,6 +130,7 @@ extern int32_t tsMqRebalanceInterval; extern int32_t tsTtlUnit; extern int32_t tsTtlPushInterval; extern int32_t tsGrantHBInterval; +extern int32_t tsUptimeInterval; #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index e2bb3e2ae1..006ba7f21b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -170,6 +170,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a64815f14f..1ce88905c2 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,7 +29,7 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; -typedef struct SReadHandle { +typedef struct { void* tqReader; void* meta; void* config; @@ -41,6 +41,7 @@ typedef struct SReadHandle { bool initTableReader; bool initTqReader; int32_t numOfVgroups; + void* pStateBackend; } SReadHandle; // in queue mode, data streams are seperated by msg @@ -78,8 +79,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO /** * @brief Cleanup SSDataBlock for StreamScanInfo - * - * @param tinfo + * + * @param tinfo */ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo); @@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/); +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/); int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 384c6a289f..16b259cf59 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -263,6 +263,14 @@ typedef struct { SArray* checkpointVer; } SStreamRecoveringState; +// incremental state storage +typedef struct { + SStreamTask* pOwner; + TDB* db; + TTB* pStateDb; + TXN txn; +} SStreamState; + typedef struct SStreamTask { int64_t streamId; int32_t taskId; @@ -312,6 +320,10 @@ typedef struct SStreamTask { // msg handle SMsgCb* pMsgCb; + + // state backend + SStreamState* pState; + } SStreamTask; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -507,7 +519,7 @@ typedef struct SStreamMeta { char* path; TDB* db; TTB* pTaskDb; - TTB* pStateDb; + TTB* pCheckpointDb; SHashObj* pTasks; SHashObj* pRecoverStatus; void* ahandle; @@ -528,6 +540,36 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta); +SStreamState* streamStateOpen(char* path, SStreamTask* pTask); +void streamStateClose(SStreamState* pState); +int32_t streamStateBegin(SStreamState* pState); +int32_t streamStateCommit(SStreamState* pState); +int32_t streamStateAbort(SStreamState* pState); + +typedef struct { + TBC* pCur; +} SStreamStateCur; + +#if 1 +int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen); +int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen); +int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen); + +SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen); +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen); +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen); +void streamStateFreeCur(SStreamStateCur* pCur); + +int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen); + +int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); + +int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); + +#endif + #ifdef __cplusplus } #endif diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index f275ae0885..855dfb15ee 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -96,7 +96,12 @@ typedef struct { typedef struct SQueryExecMetric { int64_t start; // start timestamp, us - int64_t parsed; // start to parse, us + int64_t syntaxStart; // start to parse, us + int64_t syntaxEnd; // end to parse, us + int64_t ctgStart; // start to parse, us + int64_t ctgEnd; // end to parse, us + int64_t semanticEnd; + int64_t execEnd; int64_t send; // start to send to server, us int64_t rsp; // receive response from server, us } SQueryExecMetric; diff --git a/source/client/inc/clientLog.h b/source/client/inc/clientLog.h index d47edcd795..ec0a41a68f 100644 --- a/source/client/inc/clientLog.h +++ b/source/client/inc/clientLog.h @@ -29,6 +29,7 @@ extern "C" { #define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) #define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0) #define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) +#define tscPerf(...) do { taosPrintLog("TSC ", 0, cDebugFlag, __VA_ARGS__); } while(0) #ifdef __cplusplus } diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index ff1b9322c9..ae92d2dc7c 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -69,14 +69,25 @@ static void deregisterRequest(SRequestObj *pRequest) { int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); - int64_t duration = taosGetTimestampUs() - pRequest->metric.start; + int64_t nowUs = taosGetTimestampUs(); + int64_t duration = nowUs - pRequest->metric.start; tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 " ms, current:%d, app current:%d", pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { + tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 "us, exec:%" PRId64 "us", + duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, + pRequest->metric.ctgEnd - pRequest->metric.ctgStart, + pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, + pRequest->metric.execEnd - pRequest->metric.semanticEnd); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { + tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 "us, exec:%" PRId64 "us", + duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, + pRequest->metric.ctgEnd - pRequest->metric.ctgStart, + pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, + pRequest->metric.execEnd - pRequest->metric.semanticEnd); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); } @@ -330,7 +341,6 @@ void doDestroyRequest(void *p) { schedulerFreeJob(&pRequest->body.queryJob, 0); taosMemoryFreeClear(pRequest->msgBuf); - taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFreeClear(pRequest->pDb); doFreeReqResultInfo(&pRequest->body.resInfo); @@ -349,6 +359,7 @@ void doDestroyRequest(void *p) { taosMemoryFree(pRequest->body.param); } + taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFree(pRequest); tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5f0af55d13..998b9cee5c 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -842,6 +842,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { } schedulerFreeJob(&pRequest->body.queryJob, 0); + + pRequest->metric.execEnd = taosGetTimestampUs(); } taosMemoryFree(pResult); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f449641f10..31ae443d5b 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -685,6 +685,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { SQuery *pQuery = pWrapper->pQuery; SRequestObj *pRequest = pWrapper->pRequest; + pRequest->metric.ctgEnd = taosGetTimestampUs(); + if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); pRequest->stableQuery = pQuery->stableQuery; @@ -693,6 +695,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { } } + pRequest->metric.semanticEnd = taosGetTimestampUs(); + if (code == TSDB_CODE_SUCCESS) { if (pQuery->haveResultSet) { setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols); @@ -784,12 +788,16 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { SQuery *pQuery = NULL; + pRequest->metric.syntaxStart = taosGetTimestampUs(); + SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)}; code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq); if (code != TSDB_CODE_SUCCESS) { goto _error; } + pRequest->metric.syntaxEnd = taosGetTimestampUs(); + if (!updateMetaForce) { STscObj *pTscObj = pRequest->pTscObj; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; @@ -816,6 +824,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { .requestObjRefId = pCxt->requestRid, .mgmtEps = pCxt->mgmtEpSet}; + pRequest->metric.ctgStart = taosGetTimestampUs(); + code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob); pCxt = NULL; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 65041e1f12..68a77a9f33 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -66,8 +66,9 @@ static const SSysDbTableSchema bnodesSchema[] = { }; static const SSysDbTableSchema clusterSchema[] = { - {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, }; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index adc5af1a17..a628f551f4 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -164,6 +164,7 @@ int32_t tsMqRebalanceInterval = 2; int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; +int32_t tsUptimeInterval = 300; // seconds #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { diff --git a/source/dnode/mnode/impl/inc/mndCluster.h b/source/dnode/mnode/impl/inc/mndCluster.h index 0de253fb6a..2cb41edd7c 100644 --- a/source/dnode/mnode/impl/inc/mndCluster.h +++ b/source/dnode/mnode/impl/inc/mndCluster.h @@ -27,6 +27,7 @@ void mndCleanupCluster(SMnode *pMnode); int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len); int64_t mndGetClusterId(SMnode *pMnode); int64_t mndGetClusterCreateTime(SMnode *pMnode); +float mndGetClusterUpTime(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8cff7fe48e..ea05215fe9 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -179,6 +179,7 @@ typedef struct { char name[TSDB_CLUSTER_ID_LEN]; int64_t createdTime; int64_t updateTime; + int32_t upTime; } SClusterObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index a82bf739f5..7d633f90bd 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -19,7 +19,7 @@ #include "mndTrans.h" #define CLUSTER_VER_NUMBE 1 -#define CLUSTER_RESERVE_SIZE 64 +#define CLUSTER_RESERVE_SIZE 60 static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster); static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw); @@ -29,6 +29,7 @@ static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SCl static int32_t mndCreateDefaultCluster(SMnode *pMnode); static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter); +static int32_t mndProcessUptimeTimer(SRpcMsg *pReq); int32_t mndInitCluster(SMnode *pMnode) { SSdbTable table = { @@ -42,8 +43,10 @@ int32_t mndInitCluster(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndClusterActionDelete, }; + mndSetMsgHandle(pMnode, TDMT_MND_UPTIME_TIMER, mndProcessUptimeTimer); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster); + return sdbSetTable(pMnode->pSdb, table); } @@ -62,40 +65,69 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) { return 0; } -int64_t mndGetClusterId(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - int64_t clusterId = -1; +static SClusterObj *mndAcquireCluster(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; while (1) { SClusterObj *pCluster = NULL; pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster); if (pIter == NULL) break; + return pCluster; + } + + return NULL; +} + +static void mndReleaseCluster(SMnode *pMnode, SClusterObj *pCluster) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pCluster); +} + +int64_t mndGetClusterId(SMnode *pMnode) { + int64_t clusterId = 0; + SClusterObj *pCluster = mndAcquireCluster(pMnode); + if (pCluster != NULL) { clusterId = pCluster->id; - sdbRelease(pSdb, pCluster); + mndReleaseCluster(pMnode, pCluster); } return clusterId; } int64_t mndGetClusterCreateTime(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - int64_t createTime = INT64_MAX; - - while (1) { - SClusterObj *pCluster = NULL; - pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster); - if (pIter == NULL) break; - + int64_t createTime = 0; + SClusterObj *pCluster = mndAcquireCluster(pMnode); + if (pCluster != NULL) { createTime = pCluster->createdTime; - sdbRelease(pSdb, pCluster); + mndReleaseCluster(pMnode, pCluster); } return createTime; } +static int32_t mndGetClusterUpTimeImp(SClusterObj *pCluster) { +#if 0 + int32_t upTime = taosGetTimestampSec() - pCluster->updateTime / 1000; + upTime = upTime + pCluster->upTime; + return upTime; +#else + return pCluster->upTime; +#endif +} + +float mndGetClusterUpTime(SMnode *pMnode) { + int64_t upTime = 0; + SClusterObj *pCluster = mndAcquireCluster(pMnode); + if (pCluster != NULL) { + upTime = mndGetClusterUpTimeImp(pCluster); + mndReleaseCluster(pMnode, pCluster); + } + + return upTime / 86400.0f; +} + static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -107,6 +139,7 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER) SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER) + SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER) SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER) terrno = 0; @@ -144,6 +177,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER) SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER) SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER) terrno = 0; @@ -162,6 +196,7 @@ _OVER: static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) { mTrace("cluster:%" PRId64 ", perform insert action, row:%p", pCluster->id, pCluster); pSdb->pMnode->clusterId = pCluster->id; + pCluster->updateTime = taosGetTimestampMs(); return 0; } @@ -171,7 +206,10 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { } static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) { - mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p", pOld->id, pOld, pNew); + mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p, uptime from %d to %d", pOld->id, pOld, + pNew, pOld->upTime, pNew->upTime); + pOld->upTime = pNew->upTime; + pOld->updateTime = taosGetTimestampMs(); return 0; } @@ -242,6 +280,10 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock * pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, buf, false); + int32_t upTime = mndGetClusterUpTimeImp(pCluster); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&upTime, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false); @@ -257,3 +299,40 @@ static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SClusterObj clusterObj = {0}; + SClusterObj *pCluster = mndAcquireCluster(pMnode); + if (pCluster != NULL) { + memcpy(&clusterObj, pCluster, sizeof(SClusterObj)); + clusterObj.upTime += tsUptimeInterval; + mndReleaseCluster(pMnode, pCluster); + } + + if (clusterObj.id <= 0) { + mError("can't get cluster info while update uptime"); + return 0; + } + + mTrace("update cluster uptime to %" PRId64, clusterObj.upTime); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq); + if (pTrans == NULL) return -1; + + SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 1693ef7d65..65a539bc90 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -100,6 +100,16 @@ static void mndGrantHeartBeat(SMnode *pMnode) { } } +static void mndIncreaseUpTime(SMnode *pMnode) { + int32_t contLen = 0; + void *pReq = mndBuildTimerMsg(&contLen); + if (pReq != NULL) { + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } +} + static void *mndThreadFp(void *param) { SMnode *pMnode = param; int64_t lastTime = 0; @@ -129,6 +139,10 @@ static void *mndThreadFp(void *param) { if (lastTime % (tsGrantHBInterval * 10) == 0) { mndGrantHeartBeat(pMnode); } + + if ((lastTime % (tsUptimeInterval * 10)) == ((tsUptimeInterval - 1) * 10)) { + mndIncreaseUpTime(pMnode); + } } return NULL; @@ -556,7 +570,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { } if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || - pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER) { + pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || + pMsg->msgType == TDMT_MND_UPTIME_TIMER) { return -1; } @@ -705,7 +720,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr if (pObj->id == pMnode->selfDnodeId) { pClusterInfo->first_ep_dnode_id = pObj->id; tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep)); - pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f); + pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode); + // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f); tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role)); } else { tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role)); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 8e8cff853c..b7129cf56e 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -68,7 +68,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); } else { - mInfo("trans:%d, is proposed and post sem", transId, tstrerror(pMgmt->errCode)); + mDebug("trans:%d, is proposed and post sem", transId, tstrerror(pMgmt->errCode)); } pMgmt->transId = 0; taosWUnLockLatch(&pMgmt->lock); @@ -118,7 +118,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->errCode = cbMeta.code; - mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, + mDebug("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); taosWLockLatch(&pMgmt->lock); @@ -126,7 +126,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM if (pMgmt->errCode != 0) { mError("trans:-1, failed to propose sync reconfig since %s, post sem", tstrerror(pMgmt->errCode)); } else { - mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem", + mDebug("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem", pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); } pMgmt->transId = 0; @@ -228,7 +228,7 @@ int32_t mndInitSync(SMnode *pMnode) { syncInfo.isStandBy = pMgmt->standby; syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT; - mInfo("start to open mnode sync, standby:%d", pMgmt->standby); + mDebug("start to open mnode sync, standby:%d", pMgmt->standby); if (pMgmt->standby || pMgmt->replica.id > 0) { SSyncCfg *pCfg = &syncInfo.syncCfg; pCfg->replicaNum = 1; @@ -236,7 +236,7 @@ int32_t mndInitSync(SMnode *pMnode) { SNodeInfo *pNode = &pCfg->nodeInfo[0]; tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn)); pNode->nodePort = pMgmt->replica.port; - mInfo("mnode ep:%s:%u", pNode->nodeFqdn, pNode->nodePort); + mDebug("mnode ep:%s:%u", pNode->nodeFqdn, pNode->nodePort); } tsem_init(&pMgmt->syncSem, 0, 0); diff --git a/source/dnode/mnode/impl/test/sma/CMakeLists.txt b/source/dnode/mnode/impl/test/sma/CMakeLists.txt index 3f9ec123a8..a55b45ca11 100644 --- a/source/dnode/mnode/impl/test/sma/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/sma/CMakeLists.txt @@ -5,7 +5,9 @@ target_link_libraries( PUBLIC sut ) -add_test( - NAME smaTest - COMMAND smaTest -) +if(NOT ${TD_WINDOWS}) + add_test( + NAME smaTest + COMMAND smaTest + ) +endif(NOT ${TD_WINDOWS}) diff --git a/source/dnode/mnode/impl/test/stb/CMakeLists.txt b/source/dnode/mnode/impl/test/stb/CMakeLists.txt index dcfbe658fc..e3a3fc2e79 100644 --- a/source/dnode/mnode/impl/test/stb/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/stb/CMakeLists.txt @@ -5,7 +5,9 @@ target_link_libraries( PUBLIC sut ) -add_test( - NAME stbTest - COMMAND stbTest -) \ No newline at end of file +if(NOT ${TD_WINDOWS}) + add_test( + NAME stbTest + COMMAND stbTest + ) +endif(NOT ${TD_WINDOWS}) \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c6bc8e6e59..1456c6c067 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ASSERT(0); } + if (streamLoadTasks(pTq->pStreamMeta) < 0) { + ASSERT(0); + } + return pTq; } @@ -664,6 +668,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ASSERT(pTask->exec.executor); } + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + if (pTask->pState == NULL) { + return -1; + } + // sink /*pTask->ahandle = pTq->pVnode;*/ if (pTask->outputType == TASK_OUTPUT__SMA) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index bbef249079..0b027367da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -182,6 +182,8 @@ static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIt STsdbReader* pReader, bool* freeTSRow); static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); +static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); + static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData); static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, @@ -1414,7 +1416,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf int64_t minKey = 0; if (pReader->order == TSDB_ORDER_ASC) { minKey = INT64_MAX; // chosen the minimum value - if (minKey > tsLast && pLastBlockReader->lastBlockData.nRow > 0) { + if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) { minKey = tsLast; } @@ -1427,7 +1429,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf } } else { minKey = INT64_MIN; - if (minKey < tsLast && pLastBlockReader->lastBlockData.nRow > 0) { + if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) { minKey = tsLast; } @@ -1510,82 +1512,83 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf return TSDB_CODE_SUCCESS; } -#if 0 -static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, - SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { - SRowMerger merge = {0}; - STSRow* pTSRow = NULL; - SBlockData* pBlockData = &pReader->status.fileBlockData; +static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key, + STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - TSDBKEY k = TSDBROW_KEY(pRow); - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - SArray* pDelList = pBlockScanInfo->delSkyline; - bool freeTSRow = false; - uint64_t uid = pBlockScanInfo->uid; + if (pBlockData->nRow > 0) { + // no last block available, only data block exists + if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) { + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + } + + // row in last file block + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); + ASSERT(ts >= key); + + if (ASCENDING_TRAVERSE(pReader->order)) { + if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + } else if (key == ts) { + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; - // ascending order traverse - if (ASCENDING_TRAVERSE(pReader->order)) { - if (key < k.ts) { - // imem & mem are all empty, only file exist - if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { - return TSDB_CODE_SUCCESS; - } else { tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); + tRowMergerGetRow(&merge, &pTSRow); - freeTSRow = true; - } - } else if (k.ts < key) { // k.ts < key - doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow); - } else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - tRowMerge(&merge, pRow); - doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); - - tRowMergerGetRow(&merge, &pTSRow); - freeTSRow = true; - } - } else { // descending order scan - if (key < k.ts) { - doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow); - } else if (k.ts < key) { - if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); return TSDB_CODE_SUCCESS; } else { - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - tRowMergerGetRow(&merge, &pTSRow); - freeTSRow = true; + ASSERT(0); + return TSDB_CODE_SUCCESS; } - } else { // descending order: mem rows -----> imem rows ------> file block - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + } else { // desc order + SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - tRowMergerInit(&merge, pRow, pSchema); - doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + tRowMergerInit(&merge, &fRow1, pReader->pSchema); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - tRowMerge(&merge, &fRow); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + if (ts == key) { + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } tRowMergerGetRow(&merge, &pTSRow); - freeTSRow = true; + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; } - } + } else { // only last block exists + SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; + int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); - tRowMergerClear(&merge); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid); + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + + TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + tRowMergerGetRow(&merge, &pTSRow); + + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - if (freeTSRow) { taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; } - - return TSDB_CODE_SUCCESS; } -#endif - static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; @@ -1987,10 +1990,35 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { return false; } + + ASSERT(pLastBlockReader->lastBlockData.nRow > 0); return true; } -// todo refactor +int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader) { + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + + if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { + return TSDB_CODE_SUCCESS; + } else { + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + tRowMergerGetRow(&merge, &pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2007,112 +2035,13 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader); } - // mem + file + // mem + file + last block if (pBlockScanInfo->iter.hasVal) { return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader); } - if (pBlockData->nRow > 0) { - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - - // no last block available, only data block exists - if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) { - if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { - return TSDB_CODE_SUCCESS; - } else { - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - tRowMergerGetRow(&merge, &pTSRow); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; - } - } - - // row in last file block - int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); - ASSERT(ts >= key); - - if (ASCENDING_TRAVERSE(pReader->order)) { - if (key < ts) { - // imem & mem are all empty, only file exist - if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { - return TSDB_CODE_SUCCESS; - } else { - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - tRowMergerGetRow(&merge, &pTSRow); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; - } - } else if (key == ts) { - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - - tRowMergerGetRow(&merge, &pTSRow); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; - } else { - ASSERT(0); - return TSDB_CODE_SUCCESS; - } - } else { // desc order - SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - tRowMergerInit(&merge, &fRow1, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - - if (ts == key) { - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - } - - tRowMergerGetRow(&merge, &pTSRow); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; - } - } else { // only last block exists - SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); - - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - - TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); - tRowMergerGetRow(&merge, &pTSRow); - - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; - } + // files data blocks + last block + return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData); } } @@ -2137,9 +2066,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { while (1) { // todo check the validate of row in file block + bool hasBlockData = false; { - bool hasBlockData = false; - while (pBlockData->nRow > 0) { // find the first qualified row in data block if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { hasBlockData = true; @@ -2154,13 +2082,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { break; } } + } + + bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); - bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); - - // no data in last block and block, no need to proceed. - if ((hasBlockData == false) && (hasBlockLData == false)) { - break; - } + // no data in last block and block, no need to proceed. + if ((hasBlockData == false) && (hasBlockLData == false)) { + break; } buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); @@ -3224,10 +3152,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - if (ik.ts < k.ts) { // ik.ts < k.ts - doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); - } else if (k.ts < ik.ts) { - doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + if (ik.ts != k.ts) { + if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts + doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); + } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { + doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + } } else { // ik.ts == k.ts doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); *freeTSRow = true; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 4da4747108..3365960325 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -80,11 +80,9 @@ struct SqlFunctionCtx; size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); -void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void initResultRow(SResultRow* pResultRow); void closeResultRow(SResultRow* pResultRow); -bool isResultRowClosed(SResultRow* pResultRow); struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 601c22a3ba..67f7cb2f6f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -150,6 +150,7 @@ typedef struct { SQueryTableDataCond tableCond; int64_t recoverStartVer; int64_t recoverEndVer; + SStreamState* pState; } SStreamTaskInfo; typedef struct { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 118c061c3a..197d94dcf4 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -31,20 +31,6 @@ void initResultRowInfo(SResultRowInfo* pResultRowInfo) { pResultRowInfo->cur.pageId = -1; } -void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) { - if (pResultRowInfo == NULL) { - return; - } - - for (int32_t i = 0; i < pResultRowInfo->size; ++i) { - // if (pResultRowInfo->pResult[i]) { - // taosMemoryFreeClear(pResultRowInfo->pResult[i]->key); - // } - } -} - -bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); } - void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; } // TODO refactor: use macro @@ -483,6 +469,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; code = createResultData(&type, rows, &output); if (code != TSDB_CODE_SUCCESS) { + terrno = code; qError("failed to create result, reason:%s", tstrerror(code)); goto end; } @@ -491,6 +478,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray if(code != TSDB_CODE_SUCCESS){ qError("failed to calculate scalar, reason:%s", tstrerror(code)); terrno = code; + goto end; } // int64_t st2 = taosGetTimestampUs(); // qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1); @@ -777,6 +765,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, } if (pTagCond) { + terrno = TDB_CODE_SUCCESS; SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond); if(terrno != TDB_CODE_SUCCESS){ colDataDestroy(pColInfoData); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 46ef5700fc..98c7c56d72 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { pCtx->input.colDataAggIsSet = pStatus->hasAgg; - pCtx->input.numOfRows = pStatus->numOfRows; + pCtx->input.numOfRows = pStatus->numOfRows; pCtx->input.startRowIndex = pStatus->startOffset; } @@ -3640,7 +3640,6 @@ _error: void cleanupBasicInfo(SOptrBasicInfo* pInfo) { assert(pInfo != NULL); - cleanupResultRowInfo(&pInfo->resultRowInfo); pInfo->pRes = blockDataDestroy(pInfo->pRes); } @@ -3716,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode); - int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; + int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey); w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order); @@ -3989,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, bool assignUid = groupbyTbname(group); - size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); + size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - if(assignUid){ + if (assignUid) { for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); info->groupId = info->uid; taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); } - }else{ + } else { int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4616,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } + if (pHandle && pHandle->pStateBackend) { + (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend; + } + (*pTaskInfo)->sql = sql; sql = NULL; (*pTaskInfo)->pSubplan = pPlan; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index e9298487e7..3d54b791a8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2972,7 +2972,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { taosHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); - cleanupResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo); } @@ -3129,8 +3128,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.watermark); - if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA || - pBlock->info.type == STREAM_INVALID) { + ASSERT(pBlock->info.type != STREAM_INVERT); + if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_CLEAR) { SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); @@ -4264,8 +4263,6 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) { } } clearDiskbasedBuf(pInfo->streamAggSup.pResultBuf); - cleanupResultRowInfo(&pInfo->binfo.resultRowInfo); - initResultRowInfo(&pInfo->binfo.resultRowInfo); } static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index ce841ed83c..957fd46ba5 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -283,7 +283,7 @@ typedef struct SSchJob { } SSchJob; typedef struct SSchTaskCtx { - SSchJob *pJob; + int64_t jobRid; SSchTask *pTask; } SSchTaskCtx; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index d16d15c119..9cab39c301 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -821,7 +821,13 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { int32_t schLaunchTaskImpl(void *param) { SSchTaskCtx *pCtx = (SSchTaskCtx *)param; - SSchJob *pJob = pCtx->pJob; + SSchJob *pJob = schAcquireJob(pCtx->jobRid); + if (NULL == pJob) { + taosMemoryFree(param); + qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid); + SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING); + } + SSchTask *pTask = pCtx->pTask; int8_t status = 0; int32_t code = 0; @@ -880,6 +886,8 @@ _return: } } + schReleaseJob(pJob->refId); + SCH_RET(code); } @@ -890,7 +898,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - param->pJob = pJob; + param->jobRid = pJob->refId; param->pTask = pTask; if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 06ca26f029..102bad7426 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) return 0; } -// TODO: handle version int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchCnt = 1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5ff700546c..20a2f7d332 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -14,7 +14,7 @@ */ #include "executor.h" -#include "tstream.h" +#include "streamInc.h" #include "ttimer.h" SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { @@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pMeta->path = strdup(path); + int32_t len = strlen(path) + 20; + char* streamPath = taosMemoryCalloc(1, len); + sprintf(streamPath, "%s/%s", path, "stream"); + pMeta->path = strdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { goto _err; } + sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); + mkdir(streamPath, 0755); + taosMemoryFree(streamPath); + if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { goto _err; } - // open state storage backend - if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) { + if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) { goto _err; } @@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - if (streamLoadTasks(pMeta) < 0) { - goto _err; - } return pMeta; _err: if (pMeta->path) taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); - if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); + if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); taosMemoryFree(pMeta); return NULL; @@ -67,7 +70,7 @@ _err: void streamMetaClose(SStreamMeta* pMeta) { tdbCommit(pMeta->db, &pMeta->txn); tdbTbClose(pMeta->pTaskDb); - tdbTbClose(pMeta->pStateDb); + tdbTbClose(pMeta->pCheckpointDb); tdbClose(pMeta->db); void* pIter = NULL; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 263053778b..0505c3edd6 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea } int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { +#if 0 void* buf = NULL; ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); @@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { FAIL: if (buf) taosMemoryFree(buf); return -1; +#endif return 0; } int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { +#if 0 void* pVal = NULL; int32_t vLen = 0; if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) { @@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { pTask->nextCheckId = aggCheckpoint.checkpointId + 1; pTask->checkpointInfo = aggCheckpoint.checkpointVer; - +#endif return 0; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c new file mode 100644 index 0000000000..6ccc90fa51 --- /dev/null +++ b/source/libs/stream/src/streamState.c @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "streamInc.h" +#include "ttimer.h" + +SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { + SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); + if (pState == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + char statePath[200]; + sprintf(statePath, "%s/%d", path, pTask->taskId); + if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) { + goto _err; + } + + // open state storage backend + if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) { + goto _err; + } + + pState->pOwner = pTask; + + return pState; + +_err: + if (pState->pStateDb) tdbTbClose(pState->pStateDb); + if (pState->db) tdbClose(pState->db); + taosMemoryFree(pState); + return NULL; +} + +void streamStateClose(SStreamState* pState) { + tdbCommit(pState->db, &pState->txn); + tdbTbClose(pState->pStateDb); + tdbClose(pState->db); + + taosMemoryFree(pState); +} + +int32_t streamStateBegin(SStreamState* pState) { + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStateCommit(SStreamState* pState) { + if (tdbCommit(pState->db, &pState->txn) < 0) { + return -1; + } + memset(&pState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStateAbort(SStreamState* pState) { + if (tdbAbort(pState->db, &pState->txn) < 0) { + return -1; + } + memset(&pState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) { + return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn); +} +int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) { + return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen); +} + +int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) { + return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn); +} + +SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) return NULL; + tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); + + int32_t c; + tdbTbcMoveTo(pCur->pCur, key, kLen, &c); + if (c != 0) { + taosMemoryFree(pCur); + return NULL; + } + return 0; +} + +int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) { + return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen); +} + +int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToFirst(pCur->pCur); +} + +int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToLast(pCur->pCur); +} + +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + taosMemoryFree(pCur); + return NULL; + } + if (c > 0) return pCur; + + if (tdbTbcMoveToNext(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + taosMemoryFree(pCur); + return NULL; + } + if (c < 0) return pCur; + + if (tdbTbcMoveToPrev(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + +int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToNext(pCur->pCur); +} + +int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToPrev(pCur->pCur); +} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4009a47c65..ce5917de29 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); } + + if (pTask->pState) streamStateClose(pTask->pState); + taosMemoryFree(pTask); } diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 7cfb188ac9..935f536a90 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -145,7 +145,7 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t *buf if (nread < 0) { uError("http-report read error:%s", uv_err_name(nread)); } else { - uInfo("http-report succ to read %d bytes, just ignore it", nread); + uTrace("http-report succ to read %d bytes, just ignore it", nread); } uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } @@ -155,7 +155,7 @@ static void clientSentCb(uv_write_t* req, int32_t status) { terrno = TAOS_SYSTEM_ERROR(status); uError("http-report failed to send data %s", uv_strerror(status)); } else { - uInfo("http-report succ to send data"); + uTrace("http-report succ to send data"); } uv_read_start((uv_stream_t *)&cli->tcp, clientAllocBuffCb, clientRecvCb); } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index a8da680910..93ced912f8 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { if (found == NULL) { // file corrupted, no complete log // TODO delete and search in previous files - ASSERT(0); + /*ASSERT(0);*/ terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) { int code = walSaveMeta(pWal); if (code < 0) { - taosArrayDestroy(actualLog); return -1; } } diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index dd61f7d225..f9f42aa103 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -702,7 +702,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { taosMsleep(50); } - uInfo("cache:%s will be cleaned up", pCacheObj->name); + uTrace("cache:%s will be cleaned up", pCacheObj->name); doCleanupDataCache(pCacheObj); } diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index e8f1f06ef1..ba877915b1 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -83,8 +83,8 @@ int32_t tsCompressInit() { if (lossyFloat == false && lossyDouble == false) return 0; tdszInit(fPrecision, dPrecision, maxRange, curRange, Compressor); - if (lossyFloat) uInfo("lossy compression float is opened. "); - if (lossyDouble) uInfo("lossy compression double is opened. "); + if (lossyFloat) uTrace("lossy compression float is opened. "); + if (lossyDouble) uTrace("lossy compression double is opened. "); return 1; } // exit call diff --git a/tests/script/tmp/monitor.sim b/tests/script/tmp/monitor.sim index 8eb787e950..c0c1da567c 100644 --- a/tests/script/tmp/monitor.sim +++ b/tests/script/tmp/monitor.sim @@ -21,6 +21,6 @@ sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 bin print =============== create drop qnode 1 sql create qnode on dnode 1 -sql create snode on dnode 1 -sql create bnode on dnode 1 +#sql create snode on dnode 1 +#sql create bnode on dnode 1