Merge branch '3.0' into fix/TS-4236/assert

This commit is contained in:
xinsheng Ren 2024-08-07 09:32:46 +08:00 committed by GitHub
commit 1599cdc00c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
70 changed files with 846 additions and 179 deletions

View File

@ -622,14 +622,14 @@ For more information, see [Data Subscription](../../develop/tmq).
In addition to the native connection, the Java client library also supports subscribing via websocket.
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoopFull.java:consumer_demo}}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java:consumer_demo}}
```
</TabItem>
<TabItem value="ws" label="WebSocket connection">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java:consumer_demo}}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:consumer_demo}}
```
</TabItem>

View File

@ -4,11 +4,7 @@ title: TDengine 产品简介
toc_max_heading_level: 4
---
在 2016 年底,涛思数据创始人陶建辉先生凭借多年的技术研发经验,敏锐地捕捉到时序数据呈指数级增长的态势,发现时序大数据处理领域缺乏一个高效、开放且易于
使用的工具并意识到时序大数据的处理是一个巨大的技术挑战与商业机遇。因此在2017 年,他创办了涛思数据,并亲自主导研发了 TDengine专注于时序数据处理。至今他仍致力于此领域的研究与发展。
TDengine 作为涛思数据的旗舰产品其核心是一个高性能、分布式的时序数据库。通过集成的缓存、数据订阅、流计算和数据清洗与转换等功能TDengine 已经发展成为
一个专为物联网、工业互联网、金融和 IT 运维等关键行业量身定制的时序大数据平台。该平台能够高效地汇聚、存储、分析、计算和分发来自海量数据采集点的大规模数据流,每日处理能力可达 TB 乃至 PB 级别。借助 TDengine企业可以实现实时的业务监控和预警进而发掘出有价值的商业洞察。
TDengine 是一个高性能、分布式的时序数据库。通过集成的缓存、数据订阅、流计算和数据清洗与转换等功能TDengine 已经发展成为一个专为物联网、工业互联网、金融和 IT 运维等关键行业量身定制的时序大数据平台。该平台能够高效地汇聚、存储、分析、计算和分发来自海量数据采集点的大规模数据流,每日处理能力可达 TB 乃至 PB 级别。借助 TDengine企业可以实现实时的业务监控和预警进而发掘出有价值的商业洞察。
自 2019 年 7 月 以 来, 涛 思 数 据 陆 续 将 TDengine 的 不 同 版 本 开 源, 包 括 单 机版2019 年 7 月、集群版2020 年 8 月以及云原生版2022 年 8 月。开源之后TDengine 迅速获得了全球开发者的关注,多次在 GitHub 网站全球趋势排行榜上位居榜首。截至编写本书时TDengine 在 GitHub 网站上已积累近 2.3 万颗星安装实例超过53 万个,覆盖 60 多个国家和地区,广泛应用于电力、石油、化工、新能源、智能制造、汽车、环境监测等行业或领域,赢得了全球开发者的广泛认可
@ -44,7 +40,7 @@ TDengine 经过特别优化,以适应时间序列数据的独特需求,引
9. 编程连接器TDengine 提供不同语言的连接器,包括 C/C++、Java、Go、Node.js、Rust、Python、C#、R、PHP 等。而且 TDengine 支持 REST 接口,应用可以直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库。
10. 数据安全共享TDengine 通过数据库视图功能和权限管理,确保数据访问的安全性。数据订阅功能可将数据实时分发给应用,订阅主题可通过 SQL 定义,实现灵活精细的数据分发控制,保护数据安全和隐私
10. 数据安全共享TDengine 通过数据库视图功能和权限管理,确保数据访问的安全性。结合数据订阅功能实现灵活精细的数据分发控制,保护数据安全和隐私
11. 编程连接器TDengine 提供了丰富的编程语言连接器,包括 C/C++、Java、Go、Node.js、Rust、Python、C#、R、PHP 等,并支持 REST ful 接口方便应用通过HTTP POST 请求操作数据库。

View File

@ -0,0 +1,74 @@
---
sidebar_label: Docker
title: 通过 Docker 快速体验 TDengine
description: 使用 Docker 快速体验 TDengine 的高效写入和查询
---
本节首先介绍如何通过 Docker 快速体验 TDengine然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker请使用[安装包的方式快速体验](../../get-started/package/)。如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考 [TDengine GitHub 主页](https://github.com/taosdata/TDengine)下载源码构建和安装。
## 启动 TDengine
如果已经安装了 Docker首先拉取最新的 TDengine 容器镜像:
```shell
docker pull tdengine/tdengine:latest
```
或者指定版本的容器镜像:
```shell
docker pull tdengine/tdengine:3.0.1.4
```
然后只需执行下面的命令:
```shell
docker run -d -p 6030:6030 -p 6041:6041 -p 6043-6060:6043-6060 -p 6043-6060:6043-6060/udp tdengine/tdengine
```
注意TDengine 3.0 服务端仅使用 6030 TCP 端口。6041 为 taosAdapter 所使用提供 REST 服务端口。6043-6049 为 taosAdapter 提供第三方应用接入所使用端口,可根据需要选择是否打开。
如果需要将数据持久化到本机的某一个文件夹,则执行下边的命令:
```shell
docker run -d -v ~/data/taos/dnode/data:/var/lib/taos \
-v ~/data/taos/dnode/log:/var/log/taos \
-p 6030:6030 -p 6041:6041 -p 6043-6060:6043-6060 -p 6043-6060:6043-6060/udp tdengine/tdengine
```
:::note
- /var/lib/taos: TDengine 默认数据文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode/data为你自己的数据目录
- /var/log/taos: TDengine 默认日志文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode/log为你自己的日志目录
:::
确定该容器已经启动并且在正常运行。
```shell
docker ps
```
进入该容器并执行 `bash`
```shell
docker exec -it <container name> bash
```
然后就可以执行相关的 Linux 命令操作和访问 TDengine。
Docker 工具自身的下载和使用请参考 [Docker 官网文档](https://docs.docker.com/get-docker/)。
## TDengine 命令行界面
进入容器,执行 `taos`
```
$ taos
taos>
```
## 快速体验
想要快速体验 TDengine 的写入和查询能力,请参考[快速体验](../use)

View File

@ -0,0 +1,270 @@
---
sidebar_label: 安装包
title: 使用安装包立即开始
description: 使用安装包快速体验 TDengine
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import PkgListV3 from "/components/PkgListV3";
您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
TDengine 完整的软件包包括服务端taosd、应用驱动taosc、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、命令行程序CLItaos和一些工具软件。目前 TDinsight 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/components/taosadapter/) 提供 [RESTful 接口](../../reference/connector/rest-api/)。
为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 Lite 版本的安装包。
在 Linux 系统上TDengine 社区版提供 Deb 和 RPM 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 Deb 支持 Debian/Ubuntu 及其衍生系统RPM 支持 CentOS/RHEL/SUSE 及其衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。需要注意的是RPM 和 Deb 包不含 `taosdump` 和 TDinsight 安装脚本,这些工具需要通过安装 taosTools 包获得。TDengine 也提供 Windows x64 平台和 macOS x64/m1 平台的安装包。
## 运行环境要求
在linux系统中运行环境最低要求如下:
linux 内核版本 - 3.10.0-1160.83.1.el7.x86_64;
glibc 版本 - 2.17;
如果通过clone源码进行编译安装还需要满足:
cmake版本 - 3.26.4或以上;
gcc 版本 - 9.3.1或以上;
## 安装
**注意**
从TDengine 3.0.6.0 开始,不再提供单独的 taosTools 安装包,原 taosTools 安装包中包含的工具都在 TDengine-server 安装包中,如果需要请直接下载 TDengine -server 安装包。
<Tabs>
<TabItem label="Deb 安装" value="debinst">
1. 从列表中下载获得 Deb 安装包;
<PkgListV3 type={6}/>
2. 进入到安装包所在目录,执行如下的安装命令:
> 请将 `<version>` 替换为下载的安装包版本
```bash
sudo dpkg -i TDengine-server-<version>-Linux-x64.deb
```
</TabItem>
<TabItem label="RPM 安装" value="rpminst">
1. 从列表中下载获得 RPM 安装包;
<PkgListV3 type={5}/>
2. 进入到安装包所在目录,执行如下的安装命令:
> 请将 `<version>` 替换为下载的安装包版本
```bash
sudo rpm -ivh TDengine-server-<version>-Linux-x64.rpm
```
</TabItem>
<TabItem label="tar.gz 安装" value="tarinst">
1. 从列表中下载获得 tar.gz 安装包;
<PkgListV3 type={0}/>
2. 进入到安装包所在目录,使用 `tar` 解压安装包;
3. 进入到安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本。
> 请将 `<version>` 替换为下载的安装包版本
```bash
tar -zxvf TDengine-server-<version>-Linux-x64.tar.gz
```
解压文件后,进入相应子目录,执行其中的 `install.sh` 安装脚本:
```bash
sudo ./install.sh
```
:::info
install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以运行 `./install.sh -e no`。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。
:::
</TabItem>
<TabItem value="apt-get" label="apt-get">
可以使用 `apt-get` 工具从官方仓库安装。
**配置包仓库**
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list
```
如果安装 Beta 版需要安装包仓库:
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
```
**使用 `apt-get` 命令安装**
```bash
sudo apt-get update
apt-cache policy tdengine
sudo apt-get install tdengine
```
:::tip
apt-get 方式只适用于 Debian 或 Ubuntu 系统。
:::
</TabItem>
<TabItem label="Windows 安装" value="windows">
**注意**
- 目前 TDengine 在 Windows 平台上只支持 Windows Server 2016/2019 和 Windows 10/11。
- 从 TDengine 3.1.0.0 开始,只提供 Windows 客户端安装包。如果需要 Windows 服务端安装包,请联系 TDengine 销售团队升级为企业版。
- Windows 上需要安装 VC 运行时库,可在此下载安装 [VC运行时库](https://learn.microsoft.com/zh-cn/cpp/windows/latest-supported-vc-redist?view=msvc-170), 如果已经安装此运行库可忽略。
按照以下步骤安装:
1. 从列表中下载获得 exe 安装程序;
<PkgListV3 type={3}/>
2. 运行可执行程序来安装 TDengine。
Note: 从 3.0.1.7 开始,只提供 TDengine 客户端的 Windows 客户端的下载。想要使用TDengine 服务端的 Windows 版本,请联系销售升级为企业版本。
</TabItem>
<TabItem label="macOS 安装" value="macos">
1. 从列表中下载获得 pkg 安装程序;
<PkgListV3 type={7}/>
2. 运行可执行程序来安装 TDengine。如果安装被阻止可以右键或者按 Ctrl 点击安装包,选择 `打开`
</TabItem>
</Tabs>
:::info
下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases/tdengine)。
:::
:::note
当安装第一个节点时,出现 `Enter FQDN:` 提示的时候,不需要输入任何内容。只有当安装第二个或以后更多的节点时,才需要输入已有集群中任何一个可用节点的 FQDN支持该新节点加入集群。当然也可以不输入而是在新节点启动前配置到新节点的配置文件中。
:::
## 启动
<Tabs>
<TabItem label="Linux 系统" value="linux">
安装后,请使用 `systemctl` 命令来启动 TDengine 的服务进程。
```bash
systemctl start taosd
systemctl start taosadapter
systemctl start taoskeeper
systemctl start taos-explorer
```
你也可以直接运行 start-all.sh 脚本来启动上面的所有服务
```bash
start-all.sh
```
可以使用 systemctl 来单独管理上面的每一个服务
```bash
systemctl start taosd
systemctl stop taosd
systemctl restart taosd
systemctl status taosd
```
:::info
- `systemctl` 命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 `sudo`
- `systemctl stop taosd` 指令在执行后并不会马上停止 TDengine 服务,而是会等待系统中必要的落盘工作正常完成。在数据量很大的情况下,这可能会消耗较长时间。
- 如果系统中不支持 `systemd`,也可以用手动运行 `/usr/local/taos/bin/taosd` 方式启动 TDengine 服务。
:::
</TabItem>
<TabItem label="Windows 系统" value="windows">
安装后,可以在拥有管理员权限的 cmd 窗口执行 `sc start taosd` 或在 `C:\TDengine` 目录下,运行 `taosd.exe` 来启动 TDengine 服务进程。如需使用 http/REST 服务,请执行 `sc start taosadapter` 或运行 `taosadapter.exe` 来启动 taosAdapter 服务进程。
</TabItem>
<TabItem label="macOS 系统" value="macos">
安装后,在应用程序目录下,双击 TDengine 图标来启动程序,也可以运行 `sudo launchctl start ` 来启动 TDengine 服务进程。
```bash
sudo launchctl start com.tdengine.taosd
sudo launchctl start com.tdengine.taosadapter
sudo launchctl start com.tdengine.taoskeeper
sudo launchctl start com.tdengine.taos-explorer
```
你也可以直接运行 start-all.sh 脚本来启动上面的所有服务
```bash
start-all.sh
```
可以使用 `launchctl` 命令管理上面提到的每个 TDengine 服务,以下示例使用 `taosd`
```bash
sudo launchctl start com.tdengine.taosd
sudo launchctl stop com.tdengine.taosd
sudo launchctl list | grep taosd
sudo launchctl print system/com.tdengine.taosd
```
:::info
- `launchctl` 命令管理`com.tdengine.taosd`需要管理员权限,务必在前面加 `sudo` 来增强安全性。
- `sudo launchctl list | grep taosd` 指令返回的第一列是 `taosd` 程序的 PID若为 `-` 则说明 TDengine 服务未运行。
- 故障排查:
- 如果服务异常请查看系统日志 `launchd.log` 或者 `/var/log/taos` 目录下 `taosdlog` 日志获取更多信息。
:::
</TabItem>
</Tabs>
## TDengine 命令行CLI
为便于检查 TDengine 的状态执行数据库Database的各种即席Ad Hoc查询TDengine 提供一命令行应用程序(以下简称为 TDengine CLItaos。要进入 TDengine 命令行,您只要在终端执行 `taos` (Linux/Mac) 或 `taos.exe` (Windows) 即可。 TDengine CLI 的提示符号如下:
```cmd
taos>
```
在 TDengine CLI 中,用户可以通过 SQL 命令来创建/删除数据库、表等并进行数据库Database插入查询操作。在终端中运行的 SQL 语句需要以分号(;)结束来运行。示例:
```sql
CREATE DATABASE demo;
USE demo;
CREATE TABLE t (ts TIMESTAMP, speed INT);
INSERT INTO t VALUES ('2019-07-15 00:00:00', 10);
INSERT INTO t VALUES ('2019-07-15 01:00:00', 20);
SELECT * FROM t;
ts | speed |
========================================
2019-07-15 00:00:00.000 | 10 |
2019-07-15 01:00:00.000 | 20 |
Query OK, 2 row(s) in set (0.003128s)
```
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在机器上运行,更多细节请参考 [TDengine 命令行](../../reference/components/taos-cli/)。
## 快速体验
想要快速体验 TDengine 的写入和查询能力,请参考[快速体验](../use)

View File

@ -8,11 +8,22 @@ toc_max_heading_level: 4
taosBenchmark 是一个专为测试 TDengine 性能而设计的工具它能够全面评估TDengine 在写入、查询和订阅等方面的功能表现。该工具能够模拟大量设备产生的数据,并允许用户灵活控制数据库、超级表、标签列的数量和类型、数据列的数量和类型、子表数量、每张子表的数据量、写入数据的时间间隔、工作线程数量以及是否写入乱序数据等策略。
启动 TDengine 的服务,在终端中执行 taosBenchmark -y 命令,系统将自动在数据库 test 下创建一张名为 meters的超级表。这张超级表将包含 10 000 张子表,表名从 d0 到 d9999每张表包含 10,000条记录。每条记录包含 ts时间戳、current电流、voltage电压和 phase相位4个字段。时间戳范围从“2017-07-14 10:40:00 000”到“2017-07-14 10:40:09 999”。每张表还带有 location 和 groupId 两个标签其中groupId 设置为 1 到 10而 location 则设置为 California.Campbell、California.Cupertino 等城市信息。
启动 TDengine 的服务,在终端中执行如下命令
```shell
taosBenchmark -y
```
系统将自动在数据库 test 下创建一张名为 meters的超级表。这张超级表将包含 10 000 张子表,表名从 d0 到 d9999每张表包含 10,000条记录。每条记录包含 ts时间戳、current电流、voltage电压和 phase相位4个字段。时间戳范围从“2017-07-14 10:40:00 000”到“2017-07-14 10:40:09 999”。每张表还带有 location 和 groupId 两个标签其中groupId 设置为 1 到 10而 location 则设置为 California.Campbell、California.Cupertino 等城市信息。
执行该命令后,系统将迅速完成 1 亿条记录的写入过程。实际所需时间取决于硬件性能,但即便在普通 PC 服务器上,这个过程通常也只需要十几秒。
taosBenchmark 提供了丰富的选项,允许用户自定义测试参数,如表的数目、记录条数等。要查看详细的参数列表,请在终端中输入 taosBenchmark --help 命令。有关taosBenchmark 的详细使用方法,请参考 TDengine 的官方文档
taosBenchmark 提供了丰富的选项,允许用户自定义测试参数,如表的数目、记录条数等。要查看详细的参数列表,请在终端中输入如下命令
```shell
taosBenchmark --help
```
有关taosBenchmark 的详细使用方法,请参考[taosBenchmark 参考手册](../../reference/components/taosbenchmark)
## 体验查询

View File

@ -0,0 +1,26 @@
可以使用 apt-get 工具从官方仓库安装。
**安装包仓库**
```
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list
```
如果安装 Beta 版需要安装包仓库
```
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
```
**使用 apt-get 命令安装**
```
sudo apt-get update
apt-cache policy tdengine
sudo apt-get install tdengine
```
:::tip
apt-get 方式只适用于 Debian 或 Ubuntu 系统
::::

View File

@ -0,0 +1 @@
label: 立即开始

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

View File

@ -1,15 +1,37 @@
---
sidebar_label: 立即开始
title: 快速体验 TDengine
toc_max_heading_level: 4
---
TDengine 的安装包含括服务端taosd、应用驱动taosc、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、命令行程序CLItaos和一些工具软件。
为了适应不同用户的操作系统偏好TDengine 在 Linux 系统上提供 tar.gz 、 Deb 和 RPM 格式安装包。此外,还支持 apt-get 方式安装,这种方式简便快捷,适合熟悉 Linux 包管理的用户。
除了 Linux 平台以外TDengine 还支持在 Windows X64 平台和 macOS X64/M1 平台上安装,扩大了其适用性,满足了跨平台的需求。
对于希望进行虚拟化安装的用户TDengine 同样提供了 Docker 镜像,使得用户可以快速搭建和体验 TDengine 环境,不需要烦琐的手动配置过程。
本节将详细指导如何在 Linux 操作系统中高效地安装和启动 TDengine 3.3.0.0 版本。同时,为了迎合不同用户的多样化需求,本节还将介绍 TDengine 在 Docker 容器中的安装和启动步骤,为用户提供更多灵活性和便利性选项。
---
title: 立即开始
description: '快速设置 TDengine 环境并体验其高效写入和查询'
---
import xiaot from './xiaot.webp'
import xiaot_new from './xiaot-20231007.png'
import channel from './channel.webp'
import official_account from './official-account.webp'
TDengine 完整的软件包包括服务端taosd、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动taosc、命令行程序 (CLItaos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/components/taosadapter) 提供 [RESTful 接口](../reference/connector/rest-api)。
本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。
```mdx-code-block
import DocCardList from '@theme/DocCardList';
import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/>
```
## 加入 TDengine 官方社区
微信扫描以下二维码,学习了解 TDengine 的最新技术与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。
<table width="100%">
<tr align="center">
<td style={{padding:'1em 3em',border:0}}><img src={xiaot_new} alt="小 T 的二维码" width="200" /></td>
<td style={{padding:'1em 3em',border:0}}><img src={channel} alt="TDengine 微信视频号" width="200" /></td>
<td style={{padding:'1em 3em',border:0}}><img src={official_account} alt="TDengine 微信公众号" width="200" /></td>
</tr>
<tr align="center">
<td style={{padding:'1em 3em',border:0}}>加入 TDengine 微信群<br/>了解学习最新物联网技术</td>
<td style={{padding:'1em 3em',border:0}}>关注 TDengine 视频号<br/>收看技术直播与教学视频</td>
<td style={{padding:'1em 3em',border:0}}>关注 TDengine 公众号<br/>阅读技术文章与行业案例</td>
</tr>
</table>

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 112 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 112 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

View File

@ -2,4 +2,13 @@
sidebar_label: 快速入门
title: TDengine 入门
toc_max_heading_level: 4
---
---
本章主要介绍 TDengine 的数据模型以及基本的写入和查询功能。
```mdx-code-block
import DocCardList from '@theme/DocCardList';
import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/>
```

View File

@ -6,4 +6,13 @@ toc_max_heading_level: 4
TDengine 不仅是一个高性能、分布式的时序数据库核心产品,而且集成了专为时序数据量身定制的一系列功能,包括数据订阅、缓存、流计算和 ETL 等。这些功能共同构成了一个完整的时序数据处理解决方案。因此,当你选择使用 TDengine 时,你的应用程序无须额外集成 Kafka、Redis、Spark 或 Flink 等第三方工具从而极大地简化应用程序的设计复杂度并显著降低运维成本。下图直观地展示了传统大数据平台架构与TDengine 架构之间的异同点,突显了 TDengine 在时序数据处理领域的独特优势。
![传统大数据平台架构与 TDengine 架构的对比](./architecture-compare.png)
![传统大数据平台架构与 TDengine 架构的对比](./architecture-compare.png)
本章主要介绍 TDengine 的一些高级功能,如数据订阅、缓存、流计算、边云协同和数据接入等。
```mdx-code-block
import DocCardList from '@theme/DocCardList';
import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/>
```

View File

@ -8,7 +8,7 @@ toc_max_heading_level: 4
## 容错
TDengine 支持 WAL 机制,实现数据的容错能力,保证数据的高可。TDengine 接收到应用程序的请求数据包时,会先将请求的原始数据包写入数据库日志文件,等数据成功写入数据库数据文件后,再删除相应的 WAL。这样保证了 TDengine 能够在断电等因素导致的服务重启时,从数据库日志文件中恢复数据,避免数据丢失。涉及的配置参数有如下两个:
TDengine 支持 WAL 机制,实现数据的容错能力,保证数据的高可。TDengine 接收到应用程序的请求数据包时,会先将请求的原始数据包写入数据库日志文件,等数据成功写入数据库数据文件后,再删除相应的 WAL。这样保证了 TDengine 能够在断电等因素导致的服务重启时,从数据库日志文件中恢复数据,避免数据丢失。涉及的配置参数有如下两个:
- wal_level WAL 级别1 表示写 WAL但不执行 fsync 2 表示写 WAL而且执行 fsync。默认值为 1。
- wal_fsync_period当 wal_level 设置为 2 时,执行 fsync 的周期;当 wal-level 设置为 0 时,表示每次写入,立即执行 fsync。

View File

@ -2,4 +2,13 @@
sidebar_label: 运维管理
title: TDengine 运维管理
toc_max_heading_level: 4
---
---
本章主要介绍如何规划、建设、维护以及监控 TDengine 集群。
```mdx-code-block
import DocCardList from '@theme/DocCardList';
import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/>
```

View File

@ -3,6 +3,8 @@ title: 技术内幕
description: TDengine 的内部设计
---
本章简要说明 TDengine 的一些内部设计。
```mdx-code-block
import DocCardList from '@theme/DocCardList';
import {useCurrentSidebarCategory} from '@docusaurus/theme-common';

View File

@ -4,7 +4,7 @@ title: 连接器
description: 详细介绍各种语言的连接器及 REST API
---
TDengine 提供了丰富的应用程序开发接口为了便于用户快速开发自己的应用TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口taoscREST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。
TDengine 提供了丰富的应用程序开发接口为了便于用户快速开发自己的应用TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口taoscWebSocket 接口连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。
![TDengine Database connector architecture](./connector.webp)

View File

@ -2,4 +2,13 @@
title: 实践案例
sidebar_label: 实践案例
toc_max_heading_level: 4
---
---
本章介绍了 TDengine 在各行业应用的一些典型案例。
```mdx-code-block
import DocCardList from '@theme/DocCardList';
import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/>
```

View File

@ -1261,13 +1261,16 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
return TSDB_CODE_SUCCESS;
}
static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
*ppCols = NULL;
int32_t code = 0;
int32_t rows = pDataBlock->info.capacity;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
if (pCols == NULL) {
return NULL;
return terrno;
}
for (int32_t i = 0; i < numOfCols; ++i) {
@ -1280,8 +1283,11 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
if (pCols[i].varmeta.offset == NULL) {
return NULL;
if (pCols[i].varmeta.offset == NULL || pCols[i].pData == NULL) {
code = terrno;
taosMemoryFree(pCols[i].varmeta.offset);
taosMemoryFree(pCols[i].pData);
goto _error;
}
pCols[i].varmeta.length = pColInfoData->varmeta.length;
@ -1290,12 +1296,20 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows));
pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes);
if (pCols[i].nullbitmap == NULL || pCols[i].pData == NULL) {
return NULL;
code = terrno;
taosMemoryFree(pCols[i].nullbitmap);
taosMemoryFree(pCols[i].pData);
goto _error;
}
}
}
return pCols;
*ppCols = pCols;
return code;
_error:
taosMemoryFree(pCols);
return code;
}
static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
@ -1423,16 +1437,15 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
int64_t p1 = taosGetTimestampUs();
SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
if (pCols == NULL) {
SColumnInfoData* pCols = NULL;
int32_t code = createHelpColInfoData(pDataBlock, &pCols);
if (code != 0) {
destroyTupleIndex(index);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
return code;
}
int64_t p2 = taosGetTimestampUs();
int32_t code = blockDataAssign(pCols, pDataBlock, index);
code = blockDataAssign(pCols, pDataBlock, index);
if (code) {
return code;
}

View File

@ -47,6 +47,7 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq);
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId);
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact);
int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len);
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact);
#ifdef __cplusplus

View File

@ -30,6 +30,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList);
bool mndDbIsExist(SMnode *pMnode, const char *db);
SSdbRaw *mndDbActionEncode(SDbObj *pDb);
const char *mndGetDbStr(const char *src);

View File

@ -224,6 +224,21 @@ SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pCompact);
pCompact = NULL;
}
int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len) {
int32_t code = 0;
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
if (pCompact == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
(void)strncpy(dbname, pCompact->dbname, len);
mndReleaseCompact(pMnode, pCompact);
TAOS_RETURN(code);
}
// compact db
@ -488,7 +503,7 @@ _OVER:
}
tFreeSKillCompactReq(&killCompactReq);
sdbRelease(pMnode->pSdb, pCompact);
mndReleaseCompact(pMnode, pCompact);
TAOS_RETURN(code);
}
@ -640,16 +655,12 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
sdbRelease(pMnode->pSdb, pDetail);
}
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
if (pCompact == NULL) TAOS_RETURN(code);
char dbname[TSDB_TABLE_FNAME_LEN] = {0};
TAOS_CHECK_RETURN(mndCompactGetDbName(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN));
SDbObj *pDb = mndAcquireDb(pMnode, pCompact->dbname);
if (pDb == NULL) {
if (!mndDbIsExist(pMnode, dbname)) {
needSave = true;
mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, pCompact->dbname);
} else {
mndReleaseDb(pMnode, pDb);
pDb = NULL;
mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
}
if (!needSave) {
@ -666,7 +677,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
}
mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
mndTransSetDbName(pTrans, pCompact->dbname, NULL);
mndTransSetDbName(pTrans, dbname, NULL);
pIter = NULL;
while (1) {
@ -734,24 +745,20 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
sdbRelease(pMnode->pSdb, pDetail);
}
pDb = mndAcquireDb(pMnode, pCompact->dbname);
if (pDb == NULL) {
if (!mndDbIsExist(pMnode, dbname)) {
allFinished = true;
mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, pCompact->dbname);
} else {
mndReleaseDb(pMnode, pDb);
pDb = NULL;
mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
}
if (allFinished) {
mInfo("compact:%d, all finished", pCompact->compactId);
mInfo("compact:%d, all finished", compactId);
pIter = NULL;
while (1) {
SCompactDetailObj *pDetail = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
if (pIter == NULL) break;
if (pDetail->compactId == pCompact->compactId) {
if (pDetail->compactId == compactId) {
SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
if (pCommitRaw == NULL) {
mndTransDrop(pTrans);
@ -774,7 +781,15 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
sdbRelease(pMnode->pSdb, pDetail);
}
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
if (pCompact == NULL) {
mndTransDrop(pTrans);
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
mndReleaseCompact(pMnode, pCompact);
if (pCommitRaw == NULL) {
mndTransDrop(pTrans);
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
@ -793,11 +808,9 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
mndTransDrop(pTrans);
sdbRelease(pMnode->pSdb, pCompact);
TAOS_RETURN(code);
}
sdbRelease(pMnode->pSdb, pCompact);
mndTransDrop(pTrans);
return 0;
}
@ -827,8 +840,8 @@ void mndCompactPullup(SMnode *pMnode) {
if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
}
mndReleaseCompact(pMnode, pCompact);
}
mndReleaseCompact(pMnode, pCompact);
}
taosArrayDestroy(pArray);
}

View File

@ -398,6 +398,17 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
sdbRelease(pSdb, pDb);
}
bool mndDbIsExist(SMnode *pMnode, const char *db) {
SDbObj *pDb = mndAcquireDb(pMnode, db);
if (pDb == NULL) {
return false;
} else {
mndReleaseDb(pMnode, pDb);
pDb = NULL;
return true;
}
}
static int32_t mndCheckDbName(const char *dbName, SUserObj *pUser) {
char *pos = strstr(dbName, TS_PATH_DELIMITER);
if (pos == NULL) {

View File

@ -349,6 +349,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
streamMetaWLock(pStreamMeta);
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
streamMetaWUnLock(pStreamMeta);
if (pTaskList == NULL) {
return terrno;
}
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);

View File

@ -355,7 +355,7 @@ void tsdbCacherowsReaderClose(void* pReader) {
return;
}
if (p->pSchema != NULL) {
if (p->pSchema != NULL && p->transferBuf != NULL) {
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
taosMemoryFreeClear(p->transferBuf[i]);
}
@ -450,23 +450,27 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
bool hasRes = false;
SArray* pRow = NULL;
void** pRes = NULL;
SCacheRowsReader* pr = pReader;
int32_t pkBufLen = 0;
int32_t code = TSDB_CODE_SUCCESS;
bool hasRes = false;
SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
pr->pReadSnap = NULL;
pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
if (pRow == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
if (pRes == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
int32_t pkBufLen = (pr->rowKey.numOfPKs > 0) ? pr->pkColumn.bytes : 0;
pkBufLen = (pr->rowKey.numOfPKs > 0) ? pr->pkColumn.bytes : 0;
for (int32_t j = 0; j < pr->numOfCols; ++j) {
int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes;
@ -690,6 +694,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
_end:
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
pr->pReadSnap = NULL;
if (pr->pCurFileSet) {
pr->pCurFileSet = NULL;
}

View File

@ -2434,21 +2434,25 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))};
if (info.pKeyRangeList == NULL) {
pReader->code = terrno;
return false;
}
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
if (code != TSDB_CODE_SUCCESS) {
pReader->code = code;
return false;
}
code = initMemDataIterator(pScanInfo, pReader);
if (code != TSDB_CODE_SUCCESS) {
pReader->code = code;
return false;
}
code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
if (code != TSDB_CODE_SUCCESS) {
pReader->code = code;
return code;
}
@ -2461,7 +2465,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
for (int32_t i = 0; i < taosArrayGetSize(info.pKeyRangeList); ++i) {
SSttKeyRange* pKeyRange = taosArrayGet(info.pKeyRangeList, i);
if (pKeyRange == NULL) {
return TSDB_CODE_INVALID_PARA;
continue;
}
if (pkCompEx(&pScanInfo->sttRange.skey, &pKeyRange->skey) > 0) {
@ -2766,6 +2770,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SBlockData* pBlockData = &pReader->status.fileBlockData;
(void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
if (pReader->code != 0) {
code = pReader->code;
goto _end;
}
while (1) {
bool hasBlockData = false;
@ -3180,6 +3188,10 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
}
bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (pReader->code != TSDB_CODE_SUCCESS) {
return pReader->code;
}
if (!hasDataInSttFile) {
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
@ -3273,6 +3285,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (pReader->code != 0) {
return pReader->code;
}
}
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
@ -3314,6 +3329,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (pReader->code != 0) {
return pReader->code;
}
// no data in stt block, no need to proceed.
while (hasDataInSttBlock(pScanInfo)) {
@ -3414,6 +3432,10 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en
}
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
if (pBlockScanInfo == NULL || *pBlockScanInfo == NULL) {
return TSDB_CODE_SUCCESS;
}
if (pReader->pIgnoreTables &&
taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) {
bool hasNexTable = moveToNextTable(pUidList, pStatus);
@ -4791,6 +4813,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
void* p = pReader->pReadSnap;
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
tsdbUntakeReadSnap2(pReader, p, true);
pReader->pReadSnap = NULL;
}
(void) tsem_destroy(&pReader->resumeAfterSuspend);
@ -4873,6 +4896,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
void* p = pReader->pReadSnap;
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
tsdbUntakeReadSnap2(pReader, p, false);
pReader->pReadSnap = NULL;
}
if (pReader->bFilesetDelimited) {

View File

@ -1141,6 +1141,7 @@ uint64_t ctgGetClusterCacheSize(SCatalog *pCtg);
void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone);
void ctgClearAllHandleMeta(int64_t *clearedSize, int64_t *clearedNum, bool *roundDone);
void ctgProcessTimerEvent(void *param, void *tmrId);
int32_t ctgBuildUseDbOutput(SUseDbOutput** ppOut, SDBVgInfo* vgInfo);
int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgGetCachedStbNameFromSuid(SCatalog* pCtg, char* dbFName, uint64_t suid, char **stbName);

View File

@ -2881,6 +2881,11 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask* pTask) {
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
if (NULL != dbCache) {
if (pTask->subTask) {
pMsgCtx->reqType = TDMT_MND_USE_DB;
CTG_ERR_JRET(ctgBuildUseDbOutput((SUseDbOutput**)&pMsgCtx->out, dbCache->vgCache.vgInfo));
}
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res));
ctgReleaseVgInfoToCache(pCtg, dbCache);

View File

@ -2590,6 +2590,22 @@ int32_t dupViewMetaFromRsp(SViewMetaRsp* pRsp, SViewMeta* pViewMeta) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgBuildUseDbOutput(SUseDbOutput** ppOut, SDBVgInfo* vgInfo) {
*ppOut = taosMemoryCalloc(1, sizeof(SUseDbOutput));
if (NULL == *ppOut) {
CTG_ERR_RET(terrno);
}
int32_t code = cloneDbVgInfo(vgInfo, &(*ppOut)->dbVgroup);
if (code) {
taosMemoryFreeClear(*ppOut);
CTG_RET(code);
}
return TSDB_CODE_SUCCESS;
}
uint64_t ctgGetTbTSMACacheSize(STableTSMAInfo* pTsmaInfo) {
if (!pTsmaInfo) {
return 0;

View File

@ -26,7 +26,7 @@
#define T_LONG_JMP(_obj, _c) \
do { \
ASSERT((_c) != -1); \
ASSERT(1); \
longjmp((_obj), (_c)); \
} while (0)

View File

@ -147,12 +147,7 @@ _error:
if (pInfo != NULL) {
destroyAggOperatorInfo(pInfo);
}
if (pOperator != NULL) {
cleanupExprSupp(&pOperator->exprSupp);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -180,11 +175,11 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
if (pOperator->blocking && pAggInfo->hasValidBlock) return false;
SExprSupp* pSup = &pOperator->exprSupp;
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (pOperator->blocking && pAggInfo->hasValidBlock) {
return false;
}
SExprSupp* pSup = &pOperator->exprSupp;
int64_t st = taosGetTimestampUs();
int32_t order = pAggInfo->binfo.inputTsOrder;
SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
@ -458,7 +453,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
if (pResultRow == NULL || pResultRow->pageId == -1) {
int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);

View File

@ -187,6 +187,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
if (totalTables) QUERY_CHECK_NULL(pList, code, lino, _error, terrno);
uint64_t suid = tableListGetSuid(pTableListInfo);
code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
@ -246,7 +247,7 @@ _error:
}
pInfo->pTableList = NULL;
destroyCacheScanOperator(pInfo);
taosMemoryFree(pOperator);
destroyOperator(pOperator);
return code;
}
@ -439,8 +440,8 @@ void destroyCacheScanOperator(void* param) {
SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pBufferedRes);
taosMemoryFree(pInfo->pSlotIds);
taosMemoryFree(pInfo->pDstSlotIds);
taosMemoryFreeClear(pInfo->pSlotIds);
taosMemoryFreeClear(pInfo->pDstSlotIds);
taosArrayDestroy(pInfo->pCidList);
taosArrayDestroy(pInfo->pFuncTypeList);
taosArrayDestroy(pInfo->pUidList);
@ -462,13 +463,13 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
*pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
if (*pSlotIds == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
*pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
if (*pDstSlotIds == NULL) {
taosMemoryFree(*pSlotIds);
return TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(*pSlotIds);
return terrno;
}
SSchemaInfo* pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos);

View File

@ -82,9 +82,15 @@ static int32_t setCountWindowOutputBuff(SExprSupp* pExprSup, SCountWindowSupp* p
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SCountWindowResult* pBuff = getCountWinStateInfo(pCountSup);
QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
(*pResult) = &pBuff->row;
code = setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
(*ppResBuff) = pBuff;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
@ -335,7 +341,7 @@ _error:
destroyCountWindowOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}

View File

@ -145,7 +145,7 @@ _error:
destroyEWindowOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}

View File

@ -443,7 +443,7 @@ _error:
doDestroyExchangeOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -989,6 +989,10 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
dataInfo.taskId = pExchangeInfo->pTaskId;
dataInfo.index = pIdx->srcIdx;
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
if (dataInfo.pSrcUidList == NULL) {
return terrno;
}
dataInfo.srcOpType = pBasicParam->srcOpType;
dataInfo.tableSeq = pBasicParam->tableSeq;
@ -1007,6 +1011,10 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
}
pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
if (pDataInfo->pSrcUidList == NULL) {
return terrno;
}
pDataInfo->srcOpType = pBasicParam->srcOpType;
pDataInfo->tableSeq = pBasicParam->tableSeq;
}

View File

@ -826,6 +826,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
if (tsTagFilterCache) {
tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
tListLen(context.digest), tableList,
taosArrayGetSize(tableList) * sizeof(STableKeyInfo));

View File

@ -1293,6 +1293,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (uid == 0) {
if (numOfTables != 0) {
STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0);
if (!tmp) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
taosRUnLockLatch(&pTaskInfo->lock);
return terrno;
}
if (tmp) uid = tmp->uid;
ts = INT64_MIN;
pScanInfo->currentTable = 0;
@ -1422,6 +1427,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
if (!pList) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
tDeleteSchemaWrapper(mtInfo.schema);
return code;
}
int32_t size = tableListGetSize(pTableListInfo);
code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
@ -1506,6 +1516,7 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
int32_t numOfTables = tableListGetSize(pTableListInfo);
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
@ -1520,7 +1531,7 @@ _end:
return pUidList;
}
static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
static int32_t extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -1528,23 +1539,25 @@ static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = pOperator->info;
void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
} else {
if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
extractTableList(pList, pOperator->pDownstream[0]);
code = extractTableList(pList, pOperator->pDownstream[0]);
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
}
return code;
}
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
@ -1552,16 +1565,17 @@ int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
return TSDB_CODE_INVALID_PARA;
}
*pList = NULL;
SArray* pArray = taosArrayInit(0, POINTER_BYTES);
if (pArray == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
SOperatorInfo* pOperator = pTaskInfo->pRoot;
extractTableList(pArray, pOperator);
*pList = pArray;
return TSDB_CODE_SUCCESS;
int32_t code = extractTableList(pArray, pTaskInfo->pRoot);
if (code == 0) {
*pList = pArray;
}
return code;
}
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {

View File

@ -158,8 +158,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (isIntervalQuery) {
if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
pResult = getResultRowByPos(pResultBuf, p1, true);
if (NULL == pResult) {
T_LONG_JMP(pTaskInfo->env, terrno);
if (pResult == NULL) {
pTaskInfo->code = terrno;
return NULL;
}
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
@ -171,7 +172,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// todo
pResult = getResultRowByPos(pResultBuf, p1, true);
if (NULL == pResult) {
T_LONG_JMP(pTaskInfo->env, terrno);
pTaskInfo->code = terrno;
return NULL;
}
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
@ -184,7 +186,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
if (pPage == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
pTaskInfo->code = terrno;
return NULL;
}
releaseBufPage(pResultBuf, pPage);
}
@ -193,7 +196,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (pResult == NULL) {
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
pTaskInfo->code = terrno;
return NULL;
}
// add a new result set for a new group
@ -202,7 +206,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
sizeof(SResultRowPosition));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
pTaskInfo->code = code;
return NULL;
}
}
@ -212,7 +217,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// too many time window in query
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
pTaskInfo->code = TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW;
return NULL;
}
return pResult;
@ -1107,6 +1113,11 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
if (!pTable) {
taosArrayDestroy(pDeleterParam->pUidList);
taosMemoryFree(pDeleterParam);
return TSDB_CODE_OUT_OF_MEMORY;
}
void* tmp = taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
if (!tmp) {
taosArrayDestroy(pDeleterParam->pUidList);

View File

@ -567,7 +567,7 @@ _error:
}
pTaskInfo->code = code;
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
return code;
}

View File

@ -1504,7 +1504,7 @@ _error:
destroyGroupCacheOperator(pInfo);
}
taosMemoryFree(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}

View File

@ -615,7 +615,12 @@ _error:
if (pInfo != NULL) {
destroyGroupOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
return code;
}
@ -1241,7 +1246,7 @@ _error:
destroyPartitionOperatorInfo(pInfo);
}
pTaskInfo->code = code;
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
TAOS_RETURN(code);
}
@ -1253,6 +1258,9 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo,
false, pAggSup, false);
if (pResultRow == NULL || pTaskInfo->code != 0) {
return pTaskInfo->code;
}
return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
}
@ -1783,8 +1791,8 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
_error:
pTaskInfo->code = code;
destroyStreamPartitionOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo);
destroyOperator(pOperator);
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}

View File

@ -180,7 +180,7 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
_error:
destroyProjectOperatorInfo(pInfo);
taosMemoryFree(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -529,7 +529,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_error:
destroyIndefinitOperatorInfo(pInfo);
taosMemoryFree(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -703,6 +703,9 @@ int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
int64_t groupId = 0;
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
pTaskInfo, false, pSup, true);
if (pRow == NULL || pTaskInfo->code != 0) {
return pTaskInfo->code;
}
for (int32_t i = 0; i < numOfExprs; ++i) {
struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);

View File

@ -155,6 +155,11 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
schemaInfo.tablename = taosStrdup(mr.me.name);
schemaInfo.dbname = taosStrdup(dbName);
if (schemaInfo.tablename == NULL || schemaInfo.dbname == NULL) {
pAPI->metaReaderFn.clearReader(&mr);
cleanupQueriedTableScanInfo(&schemaInfo);
return terrno;
}
if (mr.me.type == TSDB_SUPER_TABLE) {
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
@ -166,8 +171,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
pAPI->metaReaderFn.clearReader(&mr);
taosMemoryFree(schemaInfo.tablename);
taosMemoryFree(schemaInfo.dbname);
cleanupQueriedTableScanInfo(&schemaInfo);
return code;
}
@ -177,18 +181,26 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
}
pAPI->metaReaderFn.clearReader(&mr);
if (schemaInfo.sw == NULL) {
cleanupQueriedTableScanInfo(&schemaInfo);
return terrno;
}
pAPI->metaReaderFn.clearReader(&mr);
schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
if (schemaInfo.qsw == NULL) {
cleanupQueriedTableScanInfo(&schemaInfo);
return terrno;
}
void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY;
if (p == NULL) {
cleanupQueriedTableScanInfo(&schemaInfo);
return terrno;
}
return code;
}
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {

View File

@ -1295,8 +1295,14 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
(*ppRes) = NULL;
return code;
}
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
if (!tmp) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
taosRUnLockLatch(&pTaskInfo->lock);
(*ppRes) = NULL;
return terrno;
}
tInfo = *tmp;
taosRUnLockLatch(&pTaskInfo->lock);
code = pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
@ -1460,7 +1466,7 @@ _error:
destroyTableScanOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -3551,6 +3557,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray**
size_t size = tableListGetSize(pTableListInfo);
for (int32_t i = 0; i < size; ++i) {
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno);
void* tmp = taosArrayPush(tableIdList, &pkeyInfo->uid);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
@ -4140,7 +4147,7 @@ _error:
destroyStreamScanOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -4154,6 +4161,14 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
if (!item) {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
GET_TASKID(pTaskInfo));
tDecoderClear(&(*mr).coder);
pAPI->metaReaderFn.clearReader(mr);
T_LONG_JMP(pTaskInfo->env, terrno);
}
code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid);
tDecoderClear(&(*mr).coder);
if (code != TSDB_CODE_SUCCESS) {
@ -4688,8 +4703,8 @@ _error:
pInfo->pTableListInfo = NULL;
}
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
if (pInfo != NULL) destroyTagScanOperatorInfo(pInfo);
destroyOperator(pOperator);
return code;
}
@ -4839,18 +4854,23 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
return TSDB_CODE_SUCCESS;
}
static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
pInfo->bGroupProcessed = false;
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
if (!tableKeyInfo) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
}
pInfo->tableEndIndex = i - 1;
return TSDB_CODE_SUCCESS;
}
static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
@ -4879,7 +4899,11 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
}
static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
setGroupStartEndIndex(pInfo);
int32_t code = setGroupStartEndIndex(pInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo));
if (pSubTblsInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -4906,7 +4930,7 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
}
int32_t bufPageSize = pInfo->bufPageSize;
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
int32_t code =
code =
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pSubTblsInfo->aInputs);
@ -5184,7 +5208,9 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
return code;
}
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
STableKeyInfo* pTmpGpId = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno);
pInfo->groupId = pTmpGpId->groupId;
code = startSubTablesTableMergeScan(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
}
@ -5198,6 +5224,7 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno);
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
}
if (pBlock != NULL) {
@ -5214,7 +5241,10 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
STableKeyInfo* pTmpGpId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno);
pInfo->groupId = pTmpGpId->groupId;
code = startSubTablesTableMergeScan(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
resetLimitInfoForNextGroup(&pInfo->limitInfo);
@ -5544,6 +5574,7 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
QUERY_CHECK_NULL(tableKeyInfo, code, lino, _end, terrno);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
@ -5669,7 +5700,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
return code;
}
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
pInfo->groupId = tmp->groupId;
startGroupTableMergeScan(pOperator);
}
@ -5683,6 +5716,7 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pOperator);
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno);
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
}
if (pBlock != NULL) {
@ -5704,7 +5738,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
STableKeyInfo* tmp = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
pInfo->groupId = tmp->groupId;
startGroupTableMergeScan(pOperator);
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
@ -5912,8 +5948,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
_error:
pTaskInfo->code = code;
pInfo->base.pTableListInfo = NULL;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
if (pInfo != NULL) destroyTableMergeScanOperatorInfo(pInfo);
destroyOperator(pOperator);
return code;
}
@ -6070,7 +6106,7 @@ _error:
if (pInfo != NULL) {
destoryTableCountScanOperator(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}

View File

@ -130,7 +130,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
pGroupIdCalc->lastKeysLen = 0;
pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
if (!pGroupIdCalc->keyBuf) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
}
}
}
@ -164,7 +164,7 @@ _error:
destroySortOperatorInfo(pInfo);
}
taosMemoryFree(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -370,8 +370,13 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
if (ps == NULL) {
return terrno;
}
ps->param = pOperator->pDownstream[0];
ps->onlyRef = true;
code = tsortAddSource(pInfo->pSortHandle, ps);
if (code) {
taosMemoryFree(ps);
@ -464,6 +469,9 @@ void destroySortOperatorInfo(void* param) {
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
if (pInfo == NULL) {
return terrno;
}
SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
@ -638,6 +646,10 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
if (ps == NULL || param == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
param->childOpInfo = pOperator->pDownstream[0];
param->grpSortOpInfo = pInfo;
ps->param = param;
@ -824,6 +836,6 @@ _error:
if (pInfo != NULL) {
destroyGroupSortOperatorInfo(pInfo);
}
taosMemoryFree(pOperator);
destroyOperator(pOperator);
return code;
}

View File

@ -926,7 +926,7 @@ _error:
destroyStreamCountAggOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;

View File

@ -980,8 +980,8 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
return code;
_error:
destroyStreamEventOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo);
destroyOperator(pOperator);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;

View File

@ -1458,8 +1458,8 @@ _error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
destroyStreamFillOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}

View File

@ -2012,8 +2012,8 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
return code;
_error:
destroyStreamFinalIntervalOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -3832,7 +3832,7 @@ _error:
destroyStreamSessionAggOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
@ -4088,7 +4088,7 @@ _error:
if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
@ -4977,8 +4977,8 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
return code;
_error:
destroyStreamStateOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo);
destroyOperator(pOperator);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
@ -5312,8 +5312,8 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
return code;
_error:
destroyStreamFinalIntervalOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}

View File

@ -2813,8 +2813,8 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
_error:
if (pInfo) {
pInfo->pTableListInfo = NULL;
destroyBlockDistScanOperatorInfo(pInfo);
}
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
return code;
}

View File

@ -1207,8 +1207,8 @@ _error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}

View File

@ -75,9 +75,9 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
if (pResultRow == NULL) {
if (pResultRow == NULL || pTaskInfo->code != 0) {
*pResult = NULL;
return TSDB_CODE_SUCCESS;
return pTaskInfo->code;
}
// set time window for current result
@ -1402,7 +1402,7 @@ _error:
if (pInfo != NULL) {
destroyIntervalOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -1678,7 +1678,7 @@ _error:
destroyStateWindowOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -1770,8 +1770,8 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
return code;
_error:
destroySWindowOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -2083,7 +2083,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
_error:
destroyMAIOperatorInfo(miaInfo);
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}
@ -2415,7 +2415,7 @@ _error:
destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
}
taosMemoryFreeClear(pOperator);
destroyOperator(pOperator);
pTaskInfo->code = code;
return code;
}

View File

@ -285,6 +285,10 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize,
pSortHandle->pageSize = pageSize;
pSortHandle->numOfPages = numOfPages;
pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL);
if (pSortHandle->pSortInfo == NULL) {
return terrno;
}
pSortHandle->loops = 0;
pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
@ -1708,6 +1712,9 @@ int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t);
pHandle->extRowsMemSize = extRowsMemSize;
pHandle->aExtRowsOrders = taosArrayDup(pHandle->pSortInfo, NULL);
if (pHandle->aExtRowsOrders == NULL) {
return terrno;
}
int32_t code = initRowIdSort(pHandle);
if (code) {

View File

@ -305,7 +305,7 @@ int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, dou
return TSDB_CODE_NO_DISKSPACE;
}
int32_t ret = createDiskbasedBuf(&(*pBucket)->pBuffer, (*pBucket)->bufPageSize, (*pBucket)->bufPageSize * 1024, "1", tsTempDir);
int32_t ret = createDiskbasedBuf(&(*pBucket)->pBuffer, (*pBucket)->bufPageSize, (*pBucket)->bufPageSize * DEFAULT_NUM_OF_SLOT * 4, "1", tsTempDir);
if (ret != 0) {
tMemBucketDestroy(*pBucket);
return ret;

View File

@ -581,6 +581,8 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(*pDst, pSrc, sizeof(*pSrc));
(*pDst)->vgArray = NULL;
if (pSrc->vgHash) {
(*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
HASH_ENTRY_LOCK);

View File

@ -401,8 +401,8 @@ static int32_t concatCopyHelper(const char *input, char *output, bool hasNchar,
taosMemoryFree(newBuf);
return TSDB_CODE_SCALAR_CONVERT_ERROR;
}
(void)memcpy(varDataVal(output) + *dataLen, newBuf, varDataLen(input) * TSDB_NCHAR_SIZE);
*dataLen += varDataLen(input) * TSDB_NCHAR_SIZE;
(void)memcpy(varDataVal(output) + *dataLen, newBuf, len);
*dataLen += len;
taosMemoryFree(newBuf);
} else {
(void)memcpy(varDataVal(output) + *dataLen, varDataVal(input), varDataLen(input));

View File

@ -819,10 +819,21 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
if (pHandle == NULL) {
goto _EXIT;
}
pHandle->list = tdListNew(sizeof(SCfComparator));
if (pHandle->list == NULL) {
goto _EXIT;
}
(void)taosThreadMutexInit(&pHandle->mutex, NULL);
(void)taosThreadMutexInit(&pHandle->cfMutex, NULL);
pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (pHandle->cfInst == NULL) {
goto _EXIT;
}
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();

View File

@ -1277,10 +1277,13 @@ static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64
if (pMeta->closeFlag) {
streamMetaWUnLock(pMeta);
stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
return -1;
return TSDB_CODE_FAILED;
}
*pList = taosArrayDup(pMeta->pTaskList, NULL);
if (*pList == NULL) {
return terrno;
}
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);

View File

@ -235,7 +235,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
void* newRet = NULL;
int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
if (newRet == NULL) {
if (code) {
if (code != -1) {
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
tstrerror(code));
}

View File

@ -48,7 +48,7 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
terrstr(terrno));
terrstr());
return terrno;
}

View File

@ -445,9 +445,8 @@ int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) {
return EBUSY;
#else
int32_t code = pthread_mutex_trylock(mutex);
if (code) {
terrno = TAOS_SYSTEM_ERROR(code);
return terrno;
if (code && code != EBUSY) {
code = TAOS_SYSTEM_ERROR(code);
}
return code;
#endif
@ -816,9 +815,8 @@ int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) {
return pthread_mutex_trylock((pthread_mutex_t *)lock);
#else
int32_t code = pthread_spin_trylock((pthread_spinlock_t *)lock);
if (code) {
terrno = TAOS_SYSTEM_ERROR(code);
return code;
if (code && code != EBUSY) {
code = TAOS_SYSTEM_ERROR(code);
}
return code;
#endif

View File

@ -624,6 +624,9 @@ void taosHashTableResize(SHashObj *pHashObj) {
size_t inc = newCapacity - pHashObj->capacity;
void *p = taosMemoryCalloc(inc, sizeof(SHashEntry));
if (p == NULL) {
return;
}
for (int32_t i = 0; i < inc; ++i) {
pHashObj->hashList[i + pHashObj->capacity] = (void *)((char *)p + i * sizeof(SHashEntry));

View File

@ -367,6 +367,9 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
pPBuf->fileSize = 0;
pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
pPBuf->freePgList = tdListNew(POINTER_BYTES);
if (pPBuf->pFree == NULL || pPBuf->freePgList == NULL) {
goto _error;
}
// at least more than 2 pages must be in memory
if (inMemBufSize < pagesize * 2) {