Merge branch '3.0' into feat/sangshuduo/TD-14141-update-taostools-for3.0

This commit is contained in:
Shuduo Sang 2022-08-24 16:02:23 +08:00
commit 36c7ceebe3
72 changed files with 632 additions and 330 deletions

View File

@ -97,14 +97,13 @@ As a high-performance, scalable and SQL supported time-series database, TDengine
| **System Maintenance Requirements** | **Not Applicable** | **Might Be Applicable** | **Very Applicable** | **Description** |
| ------------------------------------------------- | ------------------ | ----------------------- | ------------------- | ------------------------------------------------------------ |
| Native high-reliability | | | √ | TDengine has a very robust, reliable and easily configurable system architecture to simplify routine operation. Human errors and accidents are eliminated to the greatest extent, with a streamlined experience for operators. |
| Minimize learning and maintenance costs | | | √ | In addition to being easily configurable, standard SQL support and the Taos shell for ad hoc queries makes maintenance simpler, allows reuse and reduces learning costs.|
| Minimize learning and maintenance costs | | | √ | In addition to being easily configurable, standard SQL support and the TDengine CLI for ad hoc queries makes maintenance simpler, allows reuse and reduces learning costs.|
| Abundant talent supply | √ | | | Given the above, and given the extensive training and professional services provided by TDengine, it is easy to migrate from existing solutions or create a new and lasting solution based on TDengine.|
## Comparison with other databases
- [Writing Performance Comparison of TDengine and InfluxDB ](https://tdengine.com/2022/02/23/4975.html)
- [Query Performance Comparison of TDengine and InfluxDB](https://tdengine.com/2022/02/24/5120.html)
- [TDengine vs InfluxDB、OpenTSDB、Cassandra、MySQL、ClickHouse](https://www.tdengine.com/downloads/TDengine_Testing_Report_en.pdf)
- [TDengine vs OpenTSDB](https://tdengine.com/2019/09/12/710.html)
- [TDengine vs Cassandra](https://tdengine.com/2019/09/12/708.html)
- [TDengine vs InfluxDB](https://tdengine.com/2019/09/12/706.html)

View File

@ -42,7 +42,7 @@ To do so, run the following command:
```
This command creates the `meters` supertable in the `test` database. In the `meters` supertable, it then creates 10,000 subtables named `d0` to `d9999`. Each table has 10,000 rows and each row has four columns: `ts`, `current`, `voltage`, and `phase`. The timestamps of the data in these columns range from 2017-07-14 10:40:00 000 to 2017-07-14 10:40:09 999. Each table is randomly assigned a `groupId` tag from 1 to ten and a `location` tag of either `California.SanFrancisco` or `California.SanDiego`.
This command creates the `meters` supertable in the `test` database. In the `meters` supertable, it then creates 10,000 subtables named `d0` to `d9999`. Each table has 10,000 rows and each row has four columns: `ts`, `current`, `voltage`, and `phase`. The timestamps of the data in these columns range from 2017-07-14 10:40:00 000 to 2017-07-14 10:40:09 999. Each table is randomly assigned a `groupId` tag from 1 to 10 and a `location` tag of either `Campbell`, `Cupertino`, `Los Angeles`, `Mountain View`, `Palo Alto`, `San Diego`, `San Francisco`, `San Jose`, `Santa Clara` or `Sunnyvale`.
The `taosBenchmark` command creates a deployment with 100 million data points that you can use for testing purposes. The time required depends on the hardware specifications of the local system.

View File

@ -22,8 +22,8 @@ TDengine的主要功能如下
9. 提供[命令行程序](../reference/taos-shell),便于管理集群,检查系统状态,做即席查询
10. 提供多种数据的[导入](../operation/import)、[导出](../operation/export)
11. 支持对[TDengine 集群本身的监控](../operation/monitor)
12. 提供 [C/C++](../reference/connector/cpp), [Java](../reference/connector/java), [Python](../reference/connector/python), [Go](../reference/connector/go), [Rust](../reference/connector/rust), [Node.js](../reference/connector/node) 等多种编程语言的[连接器](../reference/connector/)
13. 支持 [REST 接口](../reference/rest-api/)
12. 提供各种语言的[连接器](../connector): 如 C/C++, Java, Go, Node.JS, Rust, Python, C# 等
13. 支持 [REST 接口](../connector/rest-api/)
14. 支持与[ Grafana 无缝集成](../third-party/grafana)
15. 支持与 Google Data Studio 无缝集成
16. 支持 [Kubernetes 部署](../deployment/k8s)

View File

@ -9,7 +9,7 @@ import PkgListV3 from "/components/PkgListV3";
您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
TDengine 完整的软件包包括服务端taosd、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动taosc、命令行程序 (CLItaos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../reference/rest-api/)。
TDengine 完整的软件包包括服务端taosd、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动taosc、命令行程序 (CLItaos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../connector/rest-api/)。
为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。

View File

@ -3,7 +3,7 @@ title: 立即开始
description: '快速设置 TDengine 环境并体验其高效写入和查询'
---
TDengine 完整的软件包包括服务端taosd、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动taosc、命令行程序 (CLItaos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](/reference/taosadapter) 提供 [RESTful 接口](/reference/rest-api)。
TDengine 完整的软件包包括服务端taosd、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动taosc、命令行程序 (CLItaos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api)。
本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。

View File

@ -12,4 +12,4 @@
{{#include docs/examples/java/src/main/java/com/taos/example/WSConnectExample.java:main}}
```
更多连接参数配置,参考[Java 连接器](/reference/connector/java)
更多连接参数配置,参考[Java 连接器](../../connector/java)

View File

@ -14,10 +14,10 @@ import ConnCSNative from "./_connect_cs.mdx";
import ConnC from "./_connect_c.mdx";
import ConnR from "./_connect_r.mdx";
import ConnPHP from "./_connect_php.mdx";
import InstallOnWindows from "../../14-reference/03-connector/_linux_install.mdx";
import InstallOnLinux from "../../14-reference/03-connector/_windows_install.mdx";
import VerifyLinux from "../../14-reference/03-connector/_verify_linux.mdx";
import VerifyWindows from "../../14-reference/03-connector/_verify_windows.mdx";
import InstallOnWindows from "../../08-connector/_linux_install.mdx";
import InstallOnLinux from "../../08-connector/_windows_install.mdx";
import VerifyLinux from "../../08-connector/_verify_linux.mdx";
import VerifyWindows from "../../08-connector/_verify_windows.mdx";
TDengine 提供了丰富的应用程序开发接口为了便于用户快速开发自己的应用TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C#、Rust、Lua社区贡献和 PHP 社区贡献的连接器。这些连接器支持使用原生接口taosc和 REST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。
@ -33,7 +33,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
关键不同点在于:
1. 使用 REST 连接,用户无需安装客户端驱动程序 taosc具有跨平台易用的优势但性能要下降 30%左右。
2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](/reference/connector/cpp#参数绑定-api)、[订阅](/reference/connector/cpp#订阅和消费-api)等等。
2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](../../connector/cpp/#参数绑定-api)、[订阅](../../connector/cpp/#订阅和消费-api)等等。
## 安装客户端驱动 taosc

View File

@ -64,7 +64,7 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
```
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
这些 API 的文档请见 [C/C++ Connector](../../connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
</TabItem>
<TabItem value="java" label="Java">

View File

@ -12,7 +12,7 @@ title: 开发指南
7. 在很多场景下(如车辆管理)应用需要获取每个数据采集点的最新状态那么建议你采用TDengine的cache功能而不用单独部署Redis等缓存软件。
8. 如果你发现TDengine的函数无法满足你的要求那么你可以使用用户自定义函数来解决问题。
本部分内容就是按照上述的顺序组织的。为便于理解TDengine为每个功能为每个支持的编程语言都提供了示例代码。如果你希望深入了解SQL的使用需要查看[SQL手册](/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](/reference/connector/)。如果还希望想将TDengine与第三方系统集成起来比如Grafana, 请参考[第三方工具](/third-party/)。
本部分内容就是按照上述的顺序组织的。为便于理解TDengine为每个功能为每个支持的编程语言都提供了示例代码。如果你希望深入了解SQL的使用需要查看[SQL手册](/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](../connector/)。如果还希望想将TDengine与第三方系统集成起来比如Grafana, 请参考[第三方工具](../third-party/)。
如果在开发过程中遇到任何问题,请点击每个页面下方的["反馈问题"](https://github.com/taosdata/TDengine/issues/new/choose), 在GitHub上直接递交issue。

View File

@ -22,7 +22,7 @@ TDengine 客户端驱动的动态库位于:
## 支持的平台
请参考[支持的平台列表](/reference/connector#支持的平台)
请参考[支持的平台列表](../#支持的平台)
## 支持的版本
@ -30,7 +30,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一
## 安装步骤
TDengine 客户端驱动的安装请参考 [安装指南](/reference/connector#安装步骤)
TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
## 建立连接

View File

@ -35,7 +35,7 @@ REST 连接支持所有能运行 Java 的平台。
## 版本支持
请参考[版本支持列表](/reference/connector#版本支持)
请参考[版本支持列表](../#版本支持)
## TDengine DataType 和 Java DataType
@ -64,7 +64,7 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对
使用 Java Connector 连接数据库前,需要具备以下条件:
- 已安装 Java 1.8 或以上版本运行时环境和 Maven 3.6 或以上版本
- 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动)
- 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
### 安装连接器
@ -630,7 +630,7 @@ public void setNString(int columnIndex, ArrayList<String> list, int size) throws
### 无模式写入
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议Line Protocol、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../schemaless)。
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议Line Protocol、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。
**注意**

View File

@ -9,11 +9,11 @@ import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import Preparition from "./_preparition.mdx"
import GoInsert from "../../07-develop/03-insert-data/_go_sql.mdx"
import GoInfluxLine from "../../07-develop/03-insert-data/_go_line.mdx"
import GoOpenTSDBTelnet from "../../07-develop/03-insert-data/_go_opts_telnet.mdx"
import GoOpenTSDBJson from "../../07-develop/03-insert-data/_go_opts_json.mdx"
import GoQuery from "../../07-develop/04-query-data/_go.mdx"
import GoInsert from "../07-develop/03-insert-data/_go_sql.mdx"
import GoInfluxLine from "../07-develop/03-insert-data/_go_line.mdx"
import GoOpenTSDBTelnet from "../07-develop/03-insert-data/_go_opts_telnet.mdx"
import GoOpenTSDBJson from "../07-develop/03-insert-data/_go_opts_json.mdx"
import GoQuery from "../07-develop/04-query-data/_go.mdx"
`driver-go` 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言[ database/sql ](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。
@ -30,7 +30,7 @@ REST 连接支持所有能运行 Go 的平台。
## 版本支持
请参考[版本支持列表](/reference/connector#版本支持)
请参考[版本支持列表](../#版本支持)
## 支持的功能特性
@ -56,7 +56,7 @@ REST 连接支持所有能运行 Go 的平台。
### 安装前准备
* 安装 Go 开发环境Go 1.14 及以上GCC 4.8.5 及以上)
* 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动)
* 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
配置好环境变量,检查命令:

View File

@ -9,9 +9,9 @@ import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import Preparition from "./_preparition.mdx"
import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx"
import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx"
import RustQuery from "../../07-develop/04-query-data/_rust.mdx"
import RustInsert from "../07-develop/03-insert-data/_rust_sql.mdx"
import RustBind from "../07-develop/03-insert-data/_rust_stmt.mdx"
import RustQuery from "../07-develop/04-query-data/_rust.mdx"
[![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos)
@ -28,7 +28,7 @@ Websocket 连接支持所有能运行 Rust 的平台。
## 版本支持
请参考[版本支持列表](/reference/connector#版本支持)
请参考[版本支持列表](../#版本支持)
Rust 连接器仍然在快速开发中1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine以避免已知问题。
@ -37,7 +37,7 @@ Rust 连接器仍然在快速开发中1.0 之前无法保证其向后兼容
### 安装前准备
* 安装 Rust 开发工具链
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动)
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
### 添加 taos 依赖

View File

@ -8,7 +8,7 @@ description: "taospy 是 TDengine 的官方 Python 连接器。taospy 提供了
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
`taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](/reference/connector/cpp)和 [REST 接口](/reference/rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。
`taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](../cpp)和 [REST 接口](../rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。
除了对原生接口和 REST 接口的封装,`taospy` 还提供了符合 [Python 数据访问规范(PEP 249)](https://peps.python.org/pep-0249/) 的编程接口。这使得 `taospy` 和很多第三方工具集成变得简单,比如 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [pandas](https://pandas.pydata.org/)。
使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口与服务端建立的连接的方式下文中称为“REST 连接”。
@ -17,7 +17,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
## 支持的平台
- 原生连接[支持的平台](/reference/connector/#支持的平台)和 TDengine 客户端支持的平台一致。
- 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。
- REST 连接支持所有能运行 Python 的平台。
## 版本选择
@ -275,7 +275,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
##### RestClient 类的使用
`RestClient` 类是对于 [REST API](/reference/rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。
`RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。
```python title="RestClient 的使用"
{{#include docs/examples/python/rest_client_example.py}}

View File

@ -9,11 +9,11 @@ import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Preparition from "./_preparition.mdx";
import NodeInsert from "../../07-develop/03-insert-data/_js_sql.mdx";
import NodeInfluxLine from "../../07-develop/03-insert-data/_js_line.mdx";
import NodeOpenTSDBTelnet from "../../07-develop/03-insert-data/_js_opts_telnet.mdx";
import NodeOpenTSDBJson from "../../07-develop/03-insert-data/_js_opts_json.mdx";
import NodeQuery from "../../07-develop/04-query-data/_js.mdx";
import NodeInsert from "../07-develop/03-insert-data/_js_sql.mdx";
import NodeInfluxLine from "../07-develop/03-insert-data/_js_line.mdx";
import NodeOpenTSDBTelnet from "../07-develop/03-insert-data/_js_opts_telnet.mdx";
import NodeOpenTSDBJson from "../07-develop/03-insert-data/_js_opts_json.mdx";
import NodeQuery from "../07-develop/04-query-data/_js.mdx";
`@tdengine/client` 和 `@tdengine/rest` 是 TDengine 的官方 Node.js 语言连接器。 Node.js 开发人员可以通过它开发可以存取 TDengine 集群数据的应用软件。注意:从 TDengine 3.0 开始 Node.js 原生连接器的包名由 `td2.0-connector` 改名为 `@tdengine/client` 而 rest 连接器的包名由 `td2.0-rest-connector` 改为 `@tdengine/rest`。并且不与 TDengine 2.x 兼容。
@ -28,7 +28,7 @@ REST 连接器支持所有能运行 Node.js 的平台。
## 版本支持
请参考[版本支持列表](/reference/connector#版本支持)
请参考[版本支持列表](../#版本支持)
## 支持的功能特性
@ -52,7 +52,7 @@ REST 连接器支持所有能运行 Node.js 的平台。
### 安装前准备
- 安装 Node.js 开发环境
- 如果使用 REST 连接器,跳过此步。但如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动)。我们使用 [node-gyp](https://github.com/nodejs/node-gyp) 和 TDengine 实例进行交互,还需要根据具体操作系统来安装下文提到的一些依赖工具。
- 如果使用 REST 连接器,跳过此步。但如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)。我们使用 [node-gyp](https://github.com/nodejs/node-gyp) 和 TDengine 实例进行交互,还需要根据具体操作系统来安装下文提到的一些依赖工具。
<Tabs defaultValue="Linux">
<TabItem value="Linux" label="Linux 系统安装依赖工具">

View File

@ -9,16 +9,16 @@ import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import Preparition from "./_preparition.mdx"
import CSInsert from "../../07-develop/03-insert-data/_cs_sql.mdx"
import CSInfluxLine from "../../07-develop/03-insert-data/_cs_line.mdx"
import CSOpenTSDBTelnet from "../../07-develop/03-insert-data/_cs_opts_telnet.mdx"
import CSOpenTSDBJson from "../../07-develop/03-insert-data/_cs_opts_json.mdx"
import CSQuery from "../../07-develop/04-query-data/_cs.mdx"
import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx"
import CSInsert from "../07-develop/03-insert-data/_cs_sql.mdx"
import CSInfluxLine from "../07-develop/03-insert-data/_cs_line.mdx"
import CSOpenTSDBTelnet from "../07-develop/03-insert-data/_cs_opts_telnet.mdx"
import CSOpenTSDBJson from "../07-develop/03-insert-data/_cs_opts_json.mdx"
import CSQuery from "../07-develop/04-query-data/_cs.mdx"
import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx"
`TDengine.Connector` 是 TDengine 提供的 C# 语言连接器。C# 开发人员可以通过它开发存取 TDengine 集群数据的 C# 应用软件。
`TDengine.Connector` 连接器支持通过 TDengine 客户端驱动taosc建立与 TDengine 运行实例的连接提供数据写入、查询、订阅、schemaless 数据写入、参数绑定接口数据写入等功能 `TDengine.Connector` 目前暂未提供 REST 连接方式,用户可以参考 [REST API](/reference/rest-api/) 文档自行编写。
`TDengine.Connector` 连接器支持通过 TDengine 客户端驱动taosc建立与 TDengine 运行实例的连接提供数据写入、查询、订阅、schemaless 数据写入、参数绑定接口数据写入等功能 `TDengine.Connector` 目前暂未提供 REST 连接方式,用户可以参考 [REST API](../rest-api/) 文档自行编写。
本文介绍如何在 Linux 或 Windows 环境中安装 `TDengine.Connector`,并通过 `TDengine.Connector` 连接 TDengine 集群,进行数据写入、查询等基本操作。
@ -32,7 +32,7 @@ import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx"
## 版本支持
请参考[版本支持列表](/reference/connector#版本支持)
请参考[版本支持列表](../#版本支持)
## 支持的功能特性
@ -49,7 +49,7 @@ import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx"
* 安装 [.NET SDK](https://dotnet.microsoft.com/download)
* [Nuget 客户端](https://docs.microsoft.com/en-us/nuget/install-nuget-client-tools) (可选安装)
* 安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动)
* 安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
### 使用 dotnet CLI 安装

View File

@ -38,7 +38,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一
### 安装 TDengine 客户端驱动
TDengine 客户端驱动的安装请参考 [安装指南](/reference/connector#安装步骤)
TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
### 编译安装 php-tdengine

View File

Before

Width:  |  Height:  |  Size: 25 KiB

After

Width:  |  Height:  |  Size: 25 KiB

View File

@ -1,5 +1,7 @@
---
sidebar_label: 连接器
title: 连接器
description: 详细介绍各种语言的连接器及 REST API
---
TDengine 提供了丰富的应用程序开发接口为了便于用户快速开发自己的应用TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口taosc和 REST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。

View File

Before

Width:  |  Height:  |  Size: 81 KiB

After

Width:  |  Height:  |  Size: 81 KiB

View File

@ -18,7 +18,7 @@ stream_options: {
其中 subquery 是 select 普通查询语法的子集:
```sql
subquery: SELECT [DISTINCT] select_list
subquery: SELECT select_list
from_clause
[WHERE condition]
[PARTITION BY tag_list]
@ -37,7 +37,7 @@ window_clause: {
其中SESSION 是会话窗口tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val则自动开启下一个窗口。
窗口的定义与时序数据特色查询中的定义完全相同
窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished)
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。

View File

@ -1 +0,0 @@
label: REST API

View File

@ -156,7 +156,7 @@ AllowWebSockets
## 功能列表
- RESTful 接口
[https://docs.taosdata.com/reference/rest-api/](https://docs.taosdata.com/reference/rest-api/)
[RESTful API](../../connector/rest-api)
- 兼容 InfluxDB v1 写接口
[https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/](https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/)
- 兼容 OpenTSDB JSON 和 telnet 格式写入
@ -179,7 +179,7 @@ AllowWebSockets
### TDengine RESTful 接口
您可以使用任何支持 http 协议的客户端通过访问 RESTful 接口地址 `http://<fqdn>:6041/rest/sql` 来写入数据到 TDengine 或从 TDengine 中查询数据。细节请参考[官方文档](/reference/rest-api/)。
您可以使用任何支持 http 协议的客户端通过访问 RESTful 接口地址 `http://<fqdn>:6041/rest/sql` 来写入数据到 TDengine 或从 TDengine 中查询数据。细节请参考[官方文档](../../connector/rest-api/)。
### InfluxDB

View File

@ -8,7 +8,7 @@ TDengine 命令行程序(以下简称 TDengine CLI是用户操作 TDengine
## 安装
如果在 TDengine 服务器端执行,无需任何安装,已经自动安装好 TDengine CLI。如果要在非 TDengine 服务器端运行,需要安装 TDengine 客户端驱动安装包,具体安装,请参考 [连接器](/reference/connector/)。
如果在 TDengine 服务器端执行,无需任何安装,已经自动安装好 TDengine CLI。如果要在非 TDengine 服务器端运行,需要安装 TDengine 客户端驱动安装包,具体安装,请参考 [连接器](../../connector/)。
## 执行
@ -18,7 +18,7 @@ TDengine 命令行程序(以下简称 TDengine CLI是用户操作 TDengine
taos
```
如果连接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息。(请参考 [FAQ](/train-faq/faq) 来解决终端连接服务端失败的问题。TDengine CLI 的提示符号如下:
如果连接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息。(请参考 [FAQ](../../train-faq/faq) 来解决终端连接服务端失败的问题。TDengine CLI 的提示符号如下:
```cmd
taos>

View File

@ -90,7 +90,7 @@ http://127.0.0.1:6041/rest/sql
```
Basic cm9vdDp0YW9zZGF0YQ==
```
相关文档请参考[ TDengine REST API 文档](/reference/rest-api/)。
相关文档请参考[ TDengine REST API 文档](../../connector/rest-api/)。
在消息体中输入规则引擎替换模板:

View File

@ -184,7 +184,7 @@ echo `cat /tmp/confluent.current`/connect/connect.stdout
TDengine Sink Connector 的作用是同步指定 topic 的数据到 TDengine。用户无需提前创建数据库和超级表。可手动指定目标数据库的名字见配置参数 connection.database 也可按一定规则生成(见配置参数 connection.database.prefix)。
TDengine Sink Connector 内部使用 TDengine [无模式写入接口](/reference/connector/cpp#无模式写入-api)写数据到 TDengine目前支持三种格式的数据[InfluxDB 行协议格式](/develop/insert-data/influxdb-line)、 [OpenTSDB Telnet 协议格式](/develop/insert-data/opentsdb-telnet) 和 [OpenTSDB JSON 协议格式](/develop/insert-data/opentsdb-json)。
TDengine Sink Connector 内部使用 TDengine [无模式写入接口](../../connector/cpp#无模式写入-api)写数据到 TDengine目前支持三种格式的数据[InfluxDB 行协议格式](/develop/insert-data/influxdb-line)、 [OpenTSDB Telnet 协议格式](/develop/insert-data/opentsdb-telnet) 和 [OpenTSDB JSON 协议格式](/develop/insert-data/opentsdb-json)。
下面的示例将主题 meters 的数据,同步到目标数据库 power。数据格式为 InfluxDB Line 协议格式。

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
#include <assert.h>
#include <stdio.h>
#include <string.h>
@ -94,13 +95,8 @@ int32_t create_stream() {
}
taos_free_result(pRes);
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(pConn,
"create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, "
"count(k) from st1 partition by tbname interval(20s) ");
"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)");
if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1;

View File

@ -130,6 +130,7 @@ extern int32_t tsMqRebalanceInterval;
extern int32_t tsTtlUnit;
extern int32_t tsTtlPushInterval;
extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

View File

@ -170,6 +170,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG)

View File

@ -29,7 +29,7 @@ typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan;
typedef struct SReadHandle {
typedef struct {
void* tqReader;
void* meta;
void* config;
@ -41,6 +41,7 @@ typedef struct SReadHandle {
bool initTableReader;
bool initTqReader;
int32_t numOfVgroups;
void* pStateBackend;
} SReadHandle;
// in queue mode, data streams are seperated by msg
@ -78,8 +79,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
/**
* @brief Cleanup SSDataBlock for StreamScanInfo
*
* @param tinfo
*
* @param tinfo
*/
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo);
@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);

View File

@ -263,6 +263,14 @@ typedef struct {
SArray* checkpointVer;
} SStreamRecoveringState;
// incremental state storage
typedef struct {
SStreamTask* pOwner;
TDB* db;
TTB* pStateDb;
TXN txn;
} SStreamState;
typedef struct SStreamTask {
int64_t streamId;
int32_t taskId;
@ -312,6 +320,10 @@ typedef struct SStreamTask {
// msg handle
SMsgCb* pMsgCb;
// state backend
SStreamState* pState;
} SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -507,7 +519,7 @@ typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pStateDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SHashObj* pRecoverStatus;
void* ahandle;
@ -528,6 +540,36 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta);
SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);
int32_t streamStateAbort(SStreamState* pState);
typedef struct {
TBC* pCur;
} SStreamStateCur;
#if 1
int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen);
int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen);
SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen);
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen);
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen);
void streamStateFreeCur(SStreamStateCur* pCur);
int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen);
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
#endif
#ifdef __cplusplus
}
#endif

View File

@ -96,7 +96,12 @@ typedef struct {
typedef struct SQueryExecMetric {
int64_t start; // start timestamp, us
int64_t parsed; // start to parse, us
int64_t syntaxStart; // start to parse, us
int64_t syntaxEnd; // end to parse, us
int64_t ctgStart; // start to parse, us
int64_t ctgEnd; // end to parse, us
int64_t semanticEnd;
int64_t execEnd;
int64_t send; // start to send to server, us
int64_t rsp; // receive response from server, us
} SQueryExecMetric;

View File

@ -29,6 +29,7 @@ extern "C" {
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscPerf(...) do { taosPrintLog("TSC ", 0, cDebugFlag, __VA_ARGS__); } while(0)
#ifdef __cplusplus
}

View File

@ -69,14 +69,25 @@ static void deregisterRequest(SRequestObj *pRequest) {
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
int64_t nowUs = taosGetTimestampUs();
int64_t duration = nowUs - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
" ms, current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 "us, exec:%" PRId64 "us",
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart,
pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.execEnd - pRequest->metric.semanticEnd);
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 "us, exec:%" PRId64 "us",
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart,
pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.execEnd - pRequest->metric.semanticEnd);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
}
@ -330,7 +341,6 @@ void doDestroyRequest(void *p) {
schedulerFreeJob(&pRequest->body.queryJob, 0);
taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFreeClear(pRequest->pDb);
doFreeReqResultInfo(&pRequest->body.resInfo);
@ -349,6 +359,7 @@ void doDestroyRequest(void *p) {
taosMemoryFree(pRequest->body.param);
}
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
}

View File

@ -842,6 +842,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
pRequest->metric.execEnd = taosGetTimestampUs();
}
taosMemoryFree(pResult);

View File

@ -685,6 +685,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest;
pRequest->metric.ctgEnd = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery;
@ -693,6 +695,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
}
}
pRequest->metric.semanticEnd = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS) {
if (pQuery->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols);
@ -784,12 +788,16 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SQuery *pQuery = NULL;
pRequest->metric.syntaxStart = taosGetTimestampUs();
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pRequest->metric.syntaxEnd = taosGetTimestampUs();
if (!updateMetaForce) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
@ -816,6 +824,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
.requestObjRefId = pCxt->requestRid,
.mgmtEps = pCxt->mgmtEpSet};
pRequest->metric.ctgStart = taosGetTimestampUs();
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
&pRequest->body.queryJob);
pCxt = NULL;

View File

@ -66,8 +66,9 @@ static const SSysDbTableSchema bnodesSchema[] = {
};
static const SSysDbTableSchema clusterSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
};

View File

@ -164,6 +164,7 @@ int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {

View File

@ -27,6 +27,7 @@ void mndCleanupCluster(SMnode *pMnode);
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
int64_t mndGetClusterId(SMnode *pMnode);
int64_t mndGetClusterCreateTime(SMnode *pMnode);
float mndGetClusterUpTime(SMnode *pMnode);
#ifdef __cplusplus
}

View File

@ -179,6 +179,7 @@ typedef struct {
char name[TSDB_CLUSTER_ID_LEN];
int64_t createdTime;
int64_t updateTime;
int32_t upTime;
} SClusterObj;
typedef struct {

View File

@ -19,7 +19,7 @@
#include "mndTrans.h"
#define CLUSTER_VER_NUMBE 1
#define CLUSTER_RESERVE_SIZE 64
#define CLUSTER_RESERVE_SIZE 60
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw);
@ -29,6 +29,7 @@ static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SCl
static int32_t mndCreateDefaultCluster(SMnode *pMnode);
static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
static int32_t mndProcessUptimeTimer(SRpcMsg *pReq);
int32_t mndInitCluster(SMnode *pMnode) {
SSdbTable table = {
@ -42,8 +43,10 @@ int32_t mndInitCluster(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndClusterActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_UPTIME_TIMER, mndProcessUptimeTimer);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
return sdbSetTable(pMnode->pSdb, table);
}
@ -62,40 +65,69 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
return 0;
}
int64_t mndGetClusterId(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
int64_t clusterId = -1;
static SClusterObj *mndAcquireCluster(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SClusterObj *pCluster = NULL;
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
if (pIter == NULL) break;
return pCluster;
}
return NULL;
}
static void mndReleaseCluster(SMnode *pMnode, SClusterObj *pCluster) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pCluster);
}
int64_t mndGetClusterId(SMnode *pMnode) {
int64_t clusterId = 0;
SClusterObj *pCluster = mndAcquireCluster(pMnode);
if (pCluster != NULL) {
clusterId = pCluster->id;
sdbRelease(pSdb, pCluster);
mndReleaseCluster(pMnode, pCluster);
}
return clusterId;
}
int64_t mndGetClusterCreateTime(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
int64_t createTime = INT64_MAX;
while (1) {
SClusterObj *pCluster = NULL;
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
if (pIter == NULL) break;
int64_t createTime = 0;
SClusterObj *pCluster = mndAcquireCluster(pMnode);
if (pCluster != NULL) {
createTime = pCluster->createdTime;
sdbRelease(pSdb, pCluster);
mndReleaseCluster(pMnode, pCluster);
}
return createTime;
}
static int32_t mndGetClusterUpTimeImp(SClusterObj *pCluster) {
#if 0
int32_t upTime = taosGetTimestampSec() - pCluster->updateTime / 1000;
upTime = upTime + pCluster->upTime;
return upTime;
#else
return pCluster->upTime;
#endif
}
float mndGetClusterUpTime(SMnode *pMnode) {
int64_t upTime = 0;
SClusterObj *pCluster = mndAcquireCluster(pMnode);
if (pCluster != NULL) {
upTime = mndGetClusterUpTimeImp(pCluster);
mndReleaseCluster(pMnode, pCluster);
}
return upTime / 86400.0f;
}
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -107,6 +139,7 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0;
@ -144,6 +177,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0;
@ -162,6 +196,7 @@ _OVER:
static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%" PRId64 ", perform insert action, row:%p", pCluster->id, pCluster);
pSdb->pMnode->clusterId = pCluster->id;
pCluster->updateTime = taosGetTimestampMs();
return 0;
}
@ -171,7 +206,10 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
}
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) {
mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p, uptime from %d to %d", pOld->id, pOld,
pNew, pOld->upTime, pNew->upTime);
pOld->upTime = pNew->upTime;
pOld->updateTime = taosGetTimestampMs();
return 0;
}
@ -242,6 +280,10 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, buf, false);
int32_t upTime = mndGetClusterUpTimeImp(pCluster);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&upTime, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false);
@ -257,3 +299,40 @@ static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SClusterObj clusterObj = {0};
SClusterObj *pCluster = mndAcquireCluster(pMnode);
if (pCluster != NULL) {
memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
clusterObj.upTime += tsUptimeInterval;
mndReleaseCluster(pMnode, pCluster);
}
if (clusterObj.id <= 0) {
mError("can't get cluster info while update uptime");
return 0;
}
mTrace("update cluster uptime to %" PRId64, clusterObj.upTime);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) return -1;
SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return 0;
}

View File

@ -100,6 +100,16 @@ static void mndGrantHeartBeat(SMnode *pMnode) {
}
}
static void mndIncreaseUpTime(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
static void *mndThreadFp(void *param) {
SMnode *pMnode = param;
int64_t lastTime = 0;
@ -129,6 +139,10 @@ static void *mndThreadFp(void *param) {
if (lastTime % (tsGrantHBInterval * 10) == 0) {
mndGrantHeartBeat(pMnode);
}
if ((lastTime % (tsUptimeInterval * 10)) == ((tsUptimeInterval - 1) * 10)) {
mndIncreaseUpTime(pMnode);
}
}
return NULL;
@ -556,7 +570,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
}
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER) {
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
return -1;
}
@ -705,7 +720,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
if (pObj->id == pMnode->selfDnodeId) {
pClusterInfo->first_ep_dnode_id = pObj->id;
tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
// pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
} else {
tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));

View File

@ -68,7 +68,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
} else {
mInfo("trans:%d, is proposed and post sem", transId, tstrerror(pMgmt->errCode));
mDebug("trans:%d, is proposed and post sem", transId, tstrerror(pMgmt->errCode));
}
pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock);
@ -118,7 +118,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = cbMeta.code;
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
mDebug("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
cbMeta.code, cbMeta.index, cbMeta.term);
taosWLockLatch(&pMgmt->lock);
@ -126,7 +126,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
if (pMgmt->errCode != 0) {
mError("trans:-1, failed to propose sync reconfig since %s, post sem", tstrerror(pMgmt->errCode));
} else {
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem",
mDebug("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem",
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term);
}
pMgmt->transId = 0;
@ -228,7 +228,7 @@ int32_t mndInitSync(SMnode *pMnode) {
syncInfo.isStandBy = pMgmt->standby;
syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT;
mInfo("start to open mnode sync, standby:%d", pMgmt->standby);
mDebug("start to open mnode sync, standby:%d", pMgmt->standby);
if (pMgmt->standby || pMgmt->replica.id > 0) {
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = 1;
@ -236,7 +236,7 @@ int32_t mndInitSync(SMnode *pMnode) {
SNodeInfo *pNode = &pCfg->nodeInfo[0];
tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pMgmt->replica.port;
mInfo("mnode ep:%s:%u", pNode->nodeFqdn, pNode->nodePort);
mDebug("mnode ep:%s:%u", pNode->nodeFqdn, pNode->nodePort);
}
tsem_init(&pMgmt->syncSem, 0, 0);

View File

@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut
)
add_test(
NAME smaTest
COMMAND smaTest
)
if(NOT ${TD_WINDOWS})
add_test(
NAME smaTest
COMMAND smaTest
)
endif(NOT ${TD_WINDOWS})

View File

@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut
)
add_test(
NAME stbTest
COMMAND stbTest
)
if(NOT ${TD_WINDOWS})
add_test(
NAME stbTest
COMMAND stbTest
)
endif(NOT ${TD_WINDOWS})

View File

@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
ASSERT(0);
}
if (streamLoadTasks(pTq->pStreamMeta) < 0) {
ASSERT(0);
}
return pTq;
}
@ -664,6 +668,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
ASSERT(pTask->exec.executor);
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
if (pTask->pState == NULL) {
return -1;
}
// sink
/*pTask->ahandle = pTq->pVnode;*/
if (pTask->outputType == TASK_OUTPUT__SMA) {

View File

@ -182,6 +182,8 @@ static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIt
STsdbReader* pReader, bool* freeTSRow);
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
STbData* piMemTbData);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
@ -1414,7 +1416,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
int64_t minKey = 0;
if (pReader->order == TSDB_ORDER_ASC) {
minKey = INT64_MAX; // chosen the minimum value
if (minKey > tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
minKey = tsLast;
}
@ -1427,7 +1429,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
}
} else {
minKey = INT64_MIN;
if (minKey < tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
minKey = tsLast;
}
@ -1510,82 +1512,83 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
return TSDB_CODE_SUCCESS;
}
#if 0
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData;
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SArray* pDelList = pBlockScanInfo->delSkyline;
bool freeTSRow = false;
uint64_t uid = pBlockScanInfo->uid;
if (pBlockData->nRow > 0) {
// no last block available, only data block exists
if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
}
// row in last file block
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
ASSERT(ts >= key);
if (ASCENDING_TRAVERSE(pReader->order)) {
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) {
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
// ascending order traverse
if (ASCENDING_TRAVERSE(pReader->order)) {
if (key < k.ts) {
// imem & mem are all empty, only file exist
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return TSDB_CODE_SUCCESS;
} else {
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
tRowMergerGetRow(&merge, &pTSRow);
freeTSRow = true;
}
} else if (k.ts < key) { // k.ts < key
doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
} else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
tRowMerge(&merge, pRow);
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
freeTSRow = true;
}
} else { // descending order scan
if (key < k.ts) {
doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
} else if (k.ts < key) {
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
} else {
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
freeTSRow = true;
ASSERT(0);
return TSDB_CODE_SUCCESS;
}
} else { // descending order: mem rows -----> imem rows ------> file block
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
} else { // desc order
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, pRow, pSchema);
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
if (ts == key) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
tRowMergerGetRow(&merge, &pTSRow);
freeTSRow = true;
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
}
} else { // only last block exists
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
tRowMergerClear(&merge);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
}
#endif
static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
@ -1987,10 +1990,35 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
return false;
}
ASSERT(pLastBlockReader->lastBlockData.nRow > 0);
return true;
}
// todo refactor
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return TSDB_CODE_SUCCESS;
} else {
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
@ -2007,112 +2035,13 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
}
// mem + file
// mem + file + last block
if (pBlockScanInfo->iter.hasVal) {
return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
}
if (pBlockData->nRow > 0) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
// no last block available, only data block exists
if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) {
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return TSDB_CODE_SUCCESS;
} else {
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
}
// row in last file block
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
ASSERT(ts >= key);
if (ASCENDING_TRAVERSE(pReader->order)) {
if (key < ts) {
// imem & mem are all empty, only file exist
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return TSDB_CODE_SUCCESS;
} else {
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
} else if (key == ts) {
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
} else {
ASSERT(0);
return TSDB_CODE_SUCCESS;
}
} else { // desc order
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
if (ts == key) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
} else { // only last block exists
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
// files data blocks + last block
return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
}
}
@ -2137,9 +2066,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
while (1) {
// todo check the validate of row in file block
bool hasBlockData = false;
{
bool hasBlockData = false;
while (pBlockData->nRow > 0) { // find the first qualified row in data block
if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
hasBlockData = true;
@ -2154,13 +2082,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
break;
}
}
}
bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
// no data in last block and block, no need to proceed.
if ((hasBlockData == false) && (hasBlockLData == false)) {
break;
}
// no data in last block and block, no need to proceed.
if ((hasBlockData == false) && (hasBlockLData == false)) {
break;
}
buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
@ -3224,10 +3152,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
if (ik.ts < k.ts) { // ik.ts < k.ts
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
} else if (k.ts < ik.ts) {
doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
if (ik.ts != k.ts) {
if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
} else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
}
} else { // ik.ts == k.ts
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
*freeTSRow = true;

View File

@ -80,11 +80,9 @@ struct SqlFunctionCtx;
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void initResultRow(SResultRow* pResultRow);
void closeResultRow(SResultRow* pResultRow);
bool isResultRowClosed(SResultRow* pResultRow);
struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset);

View File

@ -150,6 +150,7 @@ typedef struct {
SQueryTableDataCond tableCond;
int64_t recoverStartVer;
int64_t recoverEndVer;
SStreamState* pState;
} SStreamTaskInfo;
typedef struct {

View File

@ -31,20 +31,6 @@ void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
pResultRowInfo->cur.pageId = -1;
}
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) {
if (pResultRowInfo == NULL) {
return;
}
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
// if (pResultRowInfo->pResult[i]) {
// taosMemoryFreeClear(pResultRowInfo->pResult[i]->key);
// }
}
}
bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); }
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
// TODO refactor: use macro
@ -483,6 +469,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
code = createResultData(&type, rows, &output);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qError("failed to create result, reason:%s", tstrerror(code));
goto end;
}
@ -491,6 +478,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
if(code != TSDB_CODE_SUCCESS){
qError("failed to calculate scalar, reason:%s", tstrerror(code));
terrno = code;
goto end;
}
// int64_t st2 = taosGetTimestampUs();
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
@ -777,6 +765,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
}
if (pTagCond) {
terrno = TDB_CODE_SUCCESS;
SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond);
if(terrno != TDB_CODE_SUCCESS){
colDataDestroy(pColInfoData);

View File

@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pCtx->input.colDataAggIsSet = pStatus->hasAgg;
pCtx->input.numOfRows = pStatus->numOfRows;
pCtx->input.numOfRows = pStatus->numOfRows;
pCtx->input.startRowIndex = pStatus->startOffset;
}
@ -3640,7 +3640,6 @@ _error:
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
assert(pInfo != NULL);
cleanupResultRowInfo(&pInfo->resultRowInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
@ -3716,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
@ -3989,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
bool assignUid = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if(assignUid){
if (assignUid) {
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = info->uid;
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
}
}else{
} else {
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -4616,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete;
}
if (pHandle && pHandle->pStateBackend) {
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
}
(*pTaskInfo)->sql = sql;
sql = NULL;
(*pTaskInfo)->pSubplan = pPlan;

View File

@ -2972,7 +2972,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
taosHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
cleanupResultRowInfo(&pInfo->binfo.resultRowInfo);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
}
@ -3129,8 +3128,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
maxTs = TMAX(maxTs, pBlock->info.watermark);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
pBlock->info.type == STREAM_INVALID) {
ASSERT(pBlock->info.type != STREAM_INVERT);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
@ -4264,8 +4263,6 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
}
}
clearDiskbasedBuf(pInfo->streamAggSup.pResultBuf);
cleanupResultRowInfo(&pInfo->binfo.resultRowInfo);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
}
static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) {

View File

@ -283,7 +283,7 @@ typedef struct SSchJob {
} SSchJob;
typedef struct SSchTaskCtx {
SSchJob *pJob;
int64_t jobRid;
SSchTask *pTask;
} SSchTaskCtx;

View File

@ -821,7 +821,13 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
int32_t schLaunchTaskImpl(void *param) {
SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
SSchJob *pJob = pCtx->pJob;
SSchJob *pJob = schAcquireJob(pCtx->jobRid);
if (NULL == pJob) {
taosMemoryFree(param);
qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
}
SSchTask *pTask = pCtx->pTask;
int8_t status = 0;
int32_t code = 0;
@ -880,6 +886,8 @@ _return:
}
}
schReleaseJob(pJob->refId);
SCH_RET(code);
}
@ -890,7 +898,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
param->pJob = pJob;
param->jobRid = pJob->refId;
param->pTask = pTask;
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {

View File

@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
return 0;
}
// TODO: handle version
int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
int32_t batchCnt = 1;

View File

@ -14,7 +14,7 @@
*/
#include "executor.h"
#include "tstream.h"
#include "streamInc.h"
#include "ttimer.h"
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pMeta->path = strdup(path);
int32_t len = strlen(path) + 20;
char* streamPath = taosMemoryCalloc(1, len);
sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = strdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
goto _err;
}
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
mkdir(streamPath, 0755);
taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
goto _err;
}
// open state storage backend
if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) {
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) {
goto _err;
}
@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
if (streamLoadTasks(pMeta) < 0) {
goto _err;
}
return pMeta;
_err:
if (pMeta->path) taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db);
taosMemoryFree(pMeta);
return NULL;
@ -67,7 +70,7 @@ _err:
void streamMetaClose(SStreamMeta* pMeta) {
tdbCommit(pMeta->db, &pMeta->txn);
tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pStateDb);
tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db);
void* pIter = NULL;

View File

@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea
}
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
#if 0
void* buf = NULL;
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
FAIL:
if (buf) taosMemoryFree(buf);
return -1;
#endif
return 0;
}
int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
#if 0
void* pVal = NULL;
int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
pTask->checkpointInfo = aggCheckpoint.checkpointVer;
#endif
return 0;
}

View File

@ -0,0 +1,187 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "streamInc.h"
#include "ttimer.h"
SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
char statePath[200];
sprintf(statePath, "%s/%d", path, pTask->taskId);
if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) {
goto _err;
}
// open state storage backend
if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) {
goto _err;
}
pState->pOwner = pTask;
return pState;
_err:
if (pState->pStateDb) tdbTbClose(pState->pStateDb);
if (pState->db) tdbClose(pState->db);
taosMemoryFree(pState);
return NULL;
}
void streamStateClose(SStreamState* pState) {
tdbCommit(pState->db, &pState->txn);
tdbTbClose(pState->pStateDb);
tdbClose(pState->db);
taosMemoryFree(pState);
}
int32_t streamStateBegin(SStreamState* pState) {
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamStateCommit(SStreamState* pState) {
if (tdbCommit(pState->db, &pState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamStateAbort(SStreamState* pState) {
if (tdbAbort(pState->db, &pState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn);
}
int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen);
}
int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) {
return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn);
}
SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
int32_t c;
tdbTbcMoveTo(pCur->pCur, key, kLen, &c);
if (c != 0) {
taosMemoryFree(pCur);
return NULL;
}
return 0;
}
int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) {
return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen);
}
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToFirst(pCur->pCur);
}
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToLast(pCur->pCur);
}
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) {
taosMemoryFree(pCur);
return NULL;
}
if (c > 0) return pCur;
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
taosMemoryFree(pCur);
return NULL;
}
return pCur;
}
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) {
taosMemoryFree(pCur);
return NULL;
}
if (c < 0) return pCur;
if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
taosMemoryFree(pCur);
return NULL;
}
return pCur;
}
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToNext(pCur->pCur);
}
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToPrev(pCur->pCur);
}

View File

@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
}
if (pTask->pState) streamStateClose(pTask->pState);
taosMemoryFree(pTask);
}

View File

@ -145,7 +145,7 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t *buf
if (nread < 0) {
uError("http-report read error:%s", uv_err_name(nread));
} else {
uInfo("http-report succ to read %d bytes, just ignore it", nread);
uTrace("http-report succ to read %d bytes, just ignore it", nread);
}
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
@ -155,7 +155,7 @@ static void clientSentCb(uv_write_t* req, int32_t status) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("http-report failed to send data %s", uv_strerror(status));
} else {
uInfo("http-report succ to send data");
uTrace("http-report succ to send data");
}
uv_read_start((uv_stream_t *)&cli->tcp, clientAllocBuffCb, clientRecvCb);
}

View File

@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
if (found == NULL) {
// file corrupted, no complete log
// TODO delete and search in previous files
ASSERT(0);
/*ASSERT(0);*/
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
int code = walSaveMeta(pWal);
if (code < 0) {
taosArrayDestroy(actualLog);
return -1;
}
}

View File

@ -702,7 +702,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
taosMsleep(50);
}
uInfo("cache:%s will be cleaned up", pCacheObj->name);
uTrace("cache:%s will be cleaned up", pCacheObj->name);
doCleanupDataCache(pCacheObj);
}

View File

@ -83,8 +83,8 @@ int32_t tsCompressInit() {
if (lossyFloat == false && lossyDouble == false) return 0;
tdszInit(fPrecision, dPrecision, maxRange, curRange, Compressor);
if (lossyFloat) uInfo("lossy compression float is opened. ");
if (lossyDouble) uInfo("lossy compression double is opened. ");
if (lossyFloat) uTrace("lossy compression float is opened. ");
if (lossyDouble) uTrace("lossy compression double is opened. ");
return 1;
}
// exit call

View File

@ -21,6 +21,6 @@ sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 bin
print =============== create drop qnode 1
sql create qnode on dnode 1
sql create snode on dnode 1
sql create bnode on dnode 1
#sql create snode on dnode 1
#sql create bnode on dnode 1