diff --git a/docs/en/08-client-libraries/04-java.mdx b/docs/en/08-client-libraries/04-java.mdx index 7b9aa35e4a..a59098b90c 100644 --- a/docs/en/08-client-libraries/04-java.mdx +++ b/docs/en/08-client-libraries/04-java.mdx @@ -622,14 +622,14 @@ For more information, see [Data Subscription](../../develop/tmq). In addition to the native connection, the Java client library also supports subscribing via websocket. ```java -{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoopFull.java:consumer_demo}} +{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java:consumer_demo}} ``` ```java -{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java:consumer_demo}} +{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:consumer_demo}} ``` diff --git a/docs/zh/03-intro.md b/docs/zh/03-intro.md index 519c60cf66..096b18757a 100644 --- a/docs/zh/03-intro.md +++ b/docs/zh/03-intro.md @@ -4,11 +4,7 @@ title: TDengine 产品简介 toc_max_heading_level: 4 --- -在 2016 年底,涛思数据创始人陶建辉先生凭借多年的技术研发经验,敏锐地捕捉到时序数据呈指数级增长的态势,发现时序大数据处理领域缺乏一个高效、开放且易于 -使用的工具,并意识到时序大数据的处理是一个巨大的技术挑战与商业机遇。因此,在2017 年,他创办了涛思数据,并亲自主导研发了 TDengine,专注于时序数据处理。至今他仍致力于此领域的研究与发展。 - -TDengine 作为涛思数据的旗舰产品,其核心是一个高性能、分布式的时序数据库。通过集成的缓存、数据订阅、流计算和数据清洗与转换等功能,TDengine 已经发展成为 -一个专为物联网、工业互联网、金融和 IT 运维等关键行业量身定制的时序大数据平台。该平台能够高效地汇聚、存储、分析、计算和分发来自海量数据采集点的大规模数据流,每日处理能力可达 TB 乃至 PB 级别。借助 TDengine,企业可以实现实时的业务监控和预警,进而发掘出有价值的商业洞察。 +TDengine 是一个高性能、分布式的时序数据库。通过集成的缓存、数据订阅、流计算和数据清洗与转换等功能,TDengine 已经发展成为一个专为物联网、工业互联网、金融和 IT 运维等关键行业量身定制的时序大数据平台。该平台能够高效地汇聚、存储、分析、计算和分发来自海量数据采集点的大规模数据流,每日处理能力可达 TB 乃至 PB 级别。借助 TDengine,企业可以实现实时的业务监控和预警,进而发掘出有价值的商业洞察。 自 2019 年 7 月 以 来, 涛 思 数 据 陆 续 将 TDengine 的 不 同 版 本 开 源, 包 括 单 机版(2019 年 7 月)、集群版(2020 年 8 月)以及云原生版(2022 年 8 月)。开源之后,TDengine 迅速获得了全球开发者的关注,多次在 GitHub 网站全球趋势排行榜上位居榜首。截至编写本书时,TDengine 在 GitHub 网站上已积累近 2.3 万颗星,安装实例超过53 万个,覆盖 60 多个国家和地区,广泛应用于电力、石油、化工、新能源、智能制造、汽车、环境监测等行业或领域,赢得了全球开发者的广泛认可 @@ -44,7 +40,7 @@ TDengine 经过特别优化,以适应时间序列数据的独特需求,引 9. 编程连接器:TDengine 提供不同语言的连接器,包括 C/C++、Java、Go、Node.js、Rust、Python、C#、R、PHP 等。而且 TDengine 支持 REST 接口,应用可以直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库。 -10. 数据安全共享:TDengine 通过数据库视图功能和权限管理,确保数据访问的安全性。数据订阅功能可将数据实时分发给应用,订阅主题可通过 SQL 定义,实现灵活精细的数据分发控制,保护数据安全和隐私 +10. 数据安全共享:TDengine 通过数据库视图功能和权限管理,确保数据访问的安全性。结合数据订阅功能实现灵活精细的数据分发控制,保护数据安全和隐私 11. 编程连接器:TDengine 提供了丰富的编程语言连接器,包括 C/C++、Java、Go、Node.js、Rust、Python、C#、R、PHP 等,并支持 REST ful 接口,方便应用通过HTTP POST 请求操作数据库。 diff --git a/docs/zh/04-get-started/01-docker.md b/docs/zh/04-get-started/01-docker.md new file mode 100644 index 0000000000..d0a40e8e28 --- /dev/null +++ b/docs/zh/04-get-started/01-docker.md @@ -0,0 +1,74 @@ +--- +sidebar_label: Docker +title: 通过 Docker 快速体验 TDengine +description: 使用 Docker 快速体验 TDengine 的高效写入和查询 +--- + +本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考 [TDengine GitHub 主页](https://github.com/taosdata/TDengine)下载源码构建和安装。 + +## 启动 TDengine + +如果已经安装了 Docker,首先拉取最新的 TDengine 容器镜像: + +```shell +docker pull tdengine/tdengine:latest +``` + +或者指定版本的容器镜像: + +```shell +docker pull tdengine/tdengine:3.0.1.4 +``` + +然后只需执行下面的命令: + +```shell +docker run -d -p 6030:6030 -p 6041:6041 -p 6043-6060:6043-6060 -p 6043-6060:6043-6060/udp tdengine/tdengine +``` + +注意:TDengine 3.0 服务端仅使用 6030 TCP 端口。6041 为 taosAdapter 所使用提供 REST 服务端口。6043-6049 为 taosAdapter 提供第三方应用接入所使用端口,可根据需要选择是否打开。 + +如果需要将数据持久化到本机的某一个文件夹,则执行下边的命令: + +```shell +docker run -d -v ~/data/taos/dnode/data:/var/lib/taos \ + -v ~/data/taos/dnode/log:/var/log/taos \ + -p 6030:6030 -p 6041:6041 -p 6043-6060:6043-6060 -p 6043-6060:6043-6060/udp tdengine/tdengine +``` + +:::note + +- /var/lib/taos: TDengine 默认数据文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode/data为你自己的数据目录 +- /var/log/taos: TDengine 默认日志文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode/log为你自己的日志目录 + +::: + +确定该容器已经启动并且在正常运行。 + +```shell +docker ps +``` + +进入该容器并执行 `bash` + +```shell +docker exec -it bash +``` + +然后就可以执行相关的 Linux 命令操作和访问 TDengine。 + +注:Docker 工具自身的下载和使用请参考 [Docker 官网文档](https://docs.docker.com/get-docker/)。 + +## TDengine 命令行界面 + +进入容器,执行 `taos`: + +``` +$ taos + +taos> +``` + +## 快速体验 + +想要快速体验 TDengine 的写入和查询能力,请参考[快速体验](../use) \ No newline at end of file diff --git a/docs/zh/04-get-started/03-package.md b/docs/zh/04-get-started/03-package.md new file mode 100644 index 0000000000..93804f31dc --- /dev/null +++ b/docs/zh/04-get-started/03-package.md @@ -0,0 +1,270 @@ +--- +sidebar_label: 安装包 +title: 使用安装包立即开始 +description: 使用安装包快速体验 TDengine +--- + +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; +import PkgListV3 from "/components/PkgListV3"; + +您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. + +TDengine 完整的软件包包括服务端(taosd)、应用驱动(taosc)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、命令行程序(CLI,taos)和一些工具软件。目前 TDinsight 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/components/taosadapter/) 提供 [RESTful 接口](../../reference/connector/rest-api/)。 + +为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 Lite 版本的安装包。 + +在 Linux 系统上,TDengine 社区版提供 Deb 和 RPM 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 Deb 支持 Debian/Ubuntu 及其衍生系统,RPM 支持 CentOS/RHEL/SUSE 及其衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。需要注意的是,RPM 和 Deb 包不含 `taosdump` 和 TDinsight 安装脚本,这些工具需要通过安装 taosTools 包获得。TDengine 也提供 Windows x64 平台和 macOS x64/m1 平台的安装包。 + +## 运行环境要求 +在linux系统中,运行环境最低要求如下: + +linux 内核版本 - 3.10.0-1160.83.1.el7.x86_64; + +glibc 版本 - 2.17; + +如果通过clone源码进行编译安装,还需要满足: + +cmake版本 - 3.26.4或以上; + +gcc 版本 - 9.3.1或以上; + + +## 安装 + +**注意** + +从TDengine 3.0.6.0 开始,不再提供单独的 taosTools 安装包,原 taosTools 安装包中包含的工具都在 TDengine-server 安装包中,如果需要请直接下载 TDengine -server 安装包。 + + + + +1. 从列表中下载获得 Deb 安装包; + +2. 进入到安装包所在目录,执行如下的安装命令: + +> 请将 `` 替换为下载的安装包版本 + +```bash +sudo dpkg -i TDengine-server--Linux-x64.deb +``` + + + + + +1. 从列表中下载获得 RPM 安装包; + +2. 进入到安装包所在目录,执行如下的安装命令: + +> 请将 `` 替换为下载的安装包版本 + +```bash +sudo rpm -ivh TDengine-server--Linux-x64.rpm +``` + + + + + +1. 从列表中下载获得 tar.gz 安装包; + +2. 进入到安装包所在目录,使用 `tar` 解压安装包; +3. 进入到安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本。 + +> 请将 `` 替换为下载的安装包版本 + +```bash +tar -zxvf TDengine-server--Linux-x64.tar.gz +``` + +解压文件后,进入相应子目录,执行其中的 `install.sh` 安装脚本: + +```bash +sudo ./install.sh +``` + +:::info +install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以运行 `./install.sh -e no`。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。 +::: + + + + + +可以使用 `apt-get` 工具从官方仓库安装。 + +**配置包仓库** + +```bash +wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - +echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list +``` + +如果安装 Beta 版需要安装包仓库: + +```bash +wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - +echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list +``` + +**使用 `apt-get` 命令安装** + +```bash +sudo apt-get update +apt-cache policy tdengine +sudo apt-get install tdengine +``` + +:::tip +apt-get 方式只适用于 Debian 或 Ubuntu 系统。 +::: + + + + +**注意** +- 目前 TDengine 在 Windows 平台上只支持 Windows Server 2016/2019 和 Windows 10/11。 +- 从 TDengine 3.1.0.0 开始,只提供 Windows 客户端安装包。如果需要 Windows 服务端安装包,请联系 TDengine 销售团队升级为企业版。 +- Windows 上需要安装 VC 运行时库,可在此下载安装 [VC运行时库](https://learn.microsoft.com/zh-cn/cpp/windows/latest-supported-vc-redist?view=msvc-170), 如果已经安装此运行库可忽略。 + +按照以下步骤安装: + +1. 从列表中下载获得 exe 安装程序; + +2. 运行可执行程序来安装 TDengine。 +Note: 从 3.0.1.7 开始,只提供 TDengine 客户端的 Windows 客户端的下载。想要使用TDengine 服务端的 Windows 版本,请联系销售升级为企业版本。 + + + + +1. 从列表中下载获得 pkg 安装程序; + +2. 运行可执行程序来安装 TDengine。如果安装被阻止,可以右键或者按 Ctrl 点击安装包,选择 `打开`。 + + + + +:::info +下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases/tdengine)。 +::: + +:::note +当安装第一个节点时,出现 `Enter FQDN:` 提示的时候,不需要输入任何内容。只有当安装第二个或以后更多的节点时,才需要输入已有集群中任何一个可用节点的 FQDN,支持该新节点加入集群。当然也可以不输入,而是在新节点启动前,配置到新节点的配置文件中。 + +::: + +## 启动 + + + + +安装后,请使用 `systemctl` 命令来启动 TDengine 的服务进程。 + +```bash +systemctl start taosd +systemctl start taosadapter +systemctl start taoskeeper +systemctl start taos-explorer +``` + +你也可以直接运行 start-all.sh 脚本来启动上面的所有服务 +```bash +start-all.sh +``` + +可以使用 systemctl 来单独管理上面的每一个服务 + +```bash +systemctl start taosd +systemctl stop taosd +systemctl restart taosd +systemctl status taosd +``` + +:::info + +- `systemctl` 命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 `sudo`。 +- `systemctl stop taosd` 指令在执行后并不会马上停止 TDengine 服务,而是会等待系统中必要的落盘工作正常完成。在数据量很大的情况下,这可能会消耗较长时间。 +- 如果系统中不支持 `systemd`,也可以用手动运行 `/usr/local/taos/bin/taosd` 方式启动 TDengine 服务。 + +::: + + + + + +安装后,可以在拥有管理员权限的 cmd 窗口执行 `sc start taosd` 或在 `C:\TDengine` 目录下,运行 `taosd.exe` 来启动 TDengine 服务进程。如需使用 http/REST 服务,请执行 `sc start taosadapter` 或运行 `taosadapter.exe` 来启动 taosAdapter 服务进程。 + + + + + +安装后,在应用程序目录下,双击 TDengine 图标来启动程序,也可以运行 `sudo launchctl start ` 来启动 TDengine 服务进程。 + + +```bash +sudo launchctl start com.tdengine.taosd +sudo launchctl start com.tdengine.taosadapter +sudo launchctl start com.tdengine.taoskeeper +sudo launchctl start com.tdengine.taos-explorer +``` + +你也可以直接运行 start-all.sh 脚本来启动上面的所有服务 +```bash +start-all.sh +``` + +可以使用 `launchctl` 命令管理上面提到的每个 TDengine 服务,以下示例使用 `taosd` : + +```bash +sudo launchctl start com.tdengine.taosd +sudo launchctl stop com.tdengine.taosd +sudo launchctl list | grep taosd +sudo launchctl print system/com.tdengine.taosd +``` + +:::info + +- `launchctl` 命令管理`com.tdengine.taosd`需要管理员权限,务必在前面加 `sudo` 来增强安全性。 +- `sudo launchctl list | grep taosd` 指令返回的第一列是 `taosd` 程序的 PID,若为 `-` 则说明 TDengine 服务未运行。 +- 故障排查: +- 如果服务异常请查看系统日志 `launchd.log` 或者 `/var/log/taos` 目录下 `taosdlog` 日志获取更多信息。 + +::: + + + + + +## TDengine 命令行(CLI) + +为便于检查 TDengine 的状态,执行数据库(Database)的各种即席(Ad Hoc)查询,TDengine 提供一命令行应用程序(以下简称为 TDengine CLI)taos。要进入 TDengine 命令行,您只要在终端执行 `taos` (Linux/Mac) 或 `taos.exe` (Windows) 即可。 TDengine CLI 的提示符号如下: + +```cmd +taos> +``` + +在 TDengine CLI 中,用户可以通过 SQL 命令来创建/删除数据库、表等,并进行数据库(Database)插入查询操作。在终端中运行的 SQL 语句需要以分号(;)结束来运行。示例: + +```sql +CREATE DATABASE demo; +USE demo; +CREATE TABLE t (ts TIMESTAMP, speed INT); +INSERT INTO t VALUES ('2019-07-15 00:00:00', 10); +INSERT INTO t VALUES ('2019-07-15 01:00:00', 20); +SELECT * FROM t; + + ts | speed | +======================================== + 2019-07-15 00:00:00.000 | 10 | + 2019-07-15 01:00:00.000 | 20 | + +Query OK, 2 row(s) in set (0.003128s) +``` + +除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在机器上运行,更多细节请参考 [TDengine 命令行](../../reference/components/taos-cli/)。 + +## 快速体验 + +想要快速体验 TDengine 的写入和查询能力,请参考[快速体验](../use) \ No newline at end of file diff --git a/docs/zh/04-get-started/08-cloud.md b/docs/zh/04-get-started/05-cloud.md similarity index 100% rename from docs/zh/04-get-started/08-cloud.md rename to docs/zh/04-get-started/05-cloud.md diff --git a/docs/zh/04-get-started/10-use.md b/docs/zh/04-get-started/07-use.md similarity index 68% rename from docs/zh/04-get-started/10-use.md rename to docs/zh/04-get-started/07-use.md index aad93cc551..d206ed4102 100644 --- a/docs/zh/04-get-started/10-use.md +++ b/docs/zh/04-get-started/07-use.md @@ -8,11 +8,22 @@ toc_max_heading_level: 4 taosBenchmark 是一个专为测试 TDengine 性能而设计的工具,它能够全面评估TDengine 在写入、查询和订阅等方面的功能表现。该工具能够模拟大量设备产生的数据,并允许用户灵活控制数据库、超级表、标签列的数量和类型、数据列的数量和类型、子表数量、每张子表的数据量、写入数据的时间间隔、工作线程数量以及是否写入乱序数据等策略。 -启动 TDengine 的服务,在终端中执行 taosBenchmark -y 命令,系统将自动在数据库 test 下创建一张名为 meters的超级表。这张超级表将包含 10 000 张子表,表名从 d0 到 d9999,每张表包含 10,000条记录。每条记录包含 ts(时间戳)、current(电流)、voltage(电压)和 phase(相位)4个字段。时间戳范围从“2017-07-14 10:40:00 000”到“2017-07-14 10:40:09 999”。每张表还带有 location 和 groupId 两个标签,其中,groupId 设置为 1 到 10,而 location 则设置为 California.Campbell、California.Cupertino 等城市信息。 +启动 TDengine 的服务,在终端中执行如下命令 + +```shell +taosBenchmark -y +``` + +系统将自动在数据库 test 下创建一张名为 meters的超级表。这张超级表将包含 10 000 张子表,表名从 d0 到 d9999,每张表包含 10,000条记录。每条记录包含 ts(时间戳)、current(电流)、voltage(电压)和 phase(相位)4个字段。时间戳范围从“2017-07-14 10:40:00 000”到“2017-07-14 10:40:09 999”。每张表还带有 location 和 groupId 两个标签,其中,groupId 设置为 1 到 10,而 location 则设置为 California.Campbell、California.Cupertino 等城市信息。 执行该命令后,系统将迅速完成 1 亿条记录的写入过程。实际所需时间取决于硬件性能,但即便在普通 PC 服务器上,这个过程通常也只需要十几秒。 -taosBenchmark 提供了丰富的选项,允许用户自定义测试参数,如表的数目、记录条数等。要查看详细的参数列表,请在终端中输入 taosBenchmark --help 命令。有关taosBenchmark 的详细使用方法,请参考 TDengine 的官方文档 +taosBenchmark 提供了丰富的选项,允许用户自定义测试参数,如表的数目、记录条数等。要查看详细的参数列表,请在终端中输入如下命令 +```shell +taosBenchmark --help +``` + +有关taosBenchmark 的详细使用方法,请参考[taosBenchmark 参考手册](../../reference/components/taosbenchmark) ## 体验查询 diff --git a/docs/zh/04-get-started/01-install.md b/docs/zh/04-get-started/_01-install.md similarity index 100% rename from docs/zh/04-get-started/01-install.md rename to docs/zh/04-get-started/_01-install.md diff --git a/docs/zh/04-get-started/03-docker.md b/docs/zh/04-get-started/_03-docker.md similarity index 100% rename from docs/zh/04-get-started/03-docker.md rename to docs/zh/04-get-started/_03-docker.md diff --git a/docs/zh/04-get-started/_apt_get_install.mdx b/docs/zh/04-get-started/_apt_get_install.mdx new file mode 100644 index 0000000000..b1bc4a1351 --- /dev/null +++ b/docs/zh/04-get-started/_apt_get_install.mdx @@ -0,0 +1,26 @@ +可以使用 apt-get 工具从官方仓库安装。 + +**安装包仓库** + +``` +wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - +echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list +``` + +如果安装 Beta 版需要安装包仓库 + +``` +echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list +``` + +**使用 apt-get 命令安装** + +``` +sudo apt-get update +apt-cache policy tdengine +sudo apt-get install tdengine +``` + +:::tip +apt-get 方式只适用于 Debian 或 Ubuntu 系统 +:::: diff --git a/docs/zh/04-get-started/_category_.yml b/docs/zh/04-get-started/_category_.yml new file mode 100644 index 0000000000..b2348fade6 --- /dev/null +++ b/docs/zh/04-get-started/_category_.yml @@ -0,0 +1 @@ +label: 立即开始 diff --git a/docs/zh/04-get-started/channel.webp b/docs/zh/04-get-started/channel.webp new file mode 100644 index 0000000000..8dba93d411 Binary files /dev/null and b/docs/zh/04-get-started/channel.webp differ diff --git a/docs/zh/04-get-started/index.md b/docs/zh/04-get-started/index.md index 2b7db0ad4c..57bcf9ace3 100644 --- a/docs/zh/04-get-started/index.md +++ b/docs/zh/04-get-started/index.md @@ -1,15 +1,37 @@ ---- -sidebar_label: 立即开始 -title: 快速体验 TDengine -toc_max_heading_level: 4 ---- - -TDengine 的安装包含括服务端(taosd)、应用驱动(taosc)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、命令行程序(CLI,taos)和一些工具软件。 - -为了适应不同用户的操作系统偏好,TDengine 在 Linux 系统上提供 tar.gz 、 Deb 和 RPM 格式安装包。此外,还支持 apt-get 方式安装,这种方式简便快捷,适合熟悉 Linux 包管理的用户。 - -除了 Linux 平台以外,TDengine 还支持在 Windows X64 平台和 macOS X64/M1 平台上安装,扩大了其适用性,满足了跨平台的需求。 - -对于希望进行虚拟化安装的用户,TDengine 同样提供了 Docker 镜像,使得用户可以快速搭建和体验 TDengine 环境,不需要烦琐的手动配置过程。 - -本节将详细指导如何在 Linux 操作系统中高效地安装和启动 TDengine 3.3.0.0 版本。同时,为了迎合不同用户的多样化需求,本节还将介绍 TDengine 在 Docker 容器中的安装和启动步骤,为用户提供更多灵活性和便利性选项。 \ No newline at end of file +--- +title: 立即开始 +description: '快速设置 TDengine 环境并体验其高效写入和查询' +--- + +import xiaot from './xiaot.webp' +import xiaot_new from './xiaot-20231007.png' +import channel from './channel.webp' +import official_account from './official-account.webp' + +TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/components/taosadapter) 提供 [RESTful 接口](../reference/connector/rest-api)。 + +本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。 + +```mdx-code-block +import DocCardList from '@theme/DocCardList'; +import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; + + +``` + +## 加入 TDengine 官方社区 + +微信扫描以下二维码,学习了解 TDengine 的最新技术,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。 + + + + + + + + + + + + +
小 T 的二维码TDengine 微信视频号TDengine 微信公众号
加入 TDengine 微信群
了解学习最新物联网技术
关注 TDengine 视频号
收看技术直播与教学视频
关注 TDengine 公众号
阅读技术文章与行业案例
diff --git a/docs/zh/04-get-started/official-account.webp b/docs/zh/04-get-started/official-account.webp new file mode 100644 index 0000000000..fcbc3107fc Binary files /dev/null and b/docs/zh/04-get-started/official-account.webp differ diff --git a/docs/zh/04-get-started/xiaot-03.webp b/docs/zh/04-get-started/xiaot-03.webp new file mode 100644 index 0000000000..c115346e9e Binary files /dev/null and b/docs/zh/04-get-started/xiaot-03.webp differ diff --git a/docs/zh/04-get-started/xiaot-20231007.png b/docs/zh/04-get-started/xiaot-20231007.png new file mode 100644 index 0000000000..553bcbd090 Binary files /dev/null and b/docs/zh/04-get-started/xiaot-20231007.png differ diff --git a/docs/zh/04-get-started/xiaot-new.webp b/docs/zh/04-get-started/xiaot-new.webp new file mode 100644 index 0000000000..483b54d2ef Binary files /dev/null and b/docs/zh/04-get-started/xiaot-new.webp differ diff --git a/docs/zh/04-get-started/xiaot.webp b/docs/zh/04-get-started/xiaot.webp new file mode 100644 index 0000000000..91b3b3ef68 Binary files /dev/null and b/docs/zh/04-get-started/xiaot.webp differ diff --git a/docs/zh/05-basic/index.md b/docs/zh/05-basic/index.md index 1da5d95b3e..18a456da1f 100644 --- a/docs/zh/05-basic/index.md +++ b/docs/zh/05-basic/index.md @@ -2,4 +2,13 @@ sidebar_label: 快速入门 title: TDengine 入门 toc_max_heading_level: 4 ---- \ No newline at end of file +--- + +本章主要介绍 TDengine 的数据模型以及基本的写入和查询功能。 + +```mdx-code-block +import DocCardList from '@theme/DocCardList'; +import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; + + +``` \ No newline at end of file diff --git a/docs/zh/06-advanced/index.md b/docs/zh/06-advanced/index.md index f9e9b4a483..d8a18244ef 100644 --- a/docs/zh/06-advanced/index.md +++ b/docs/zh/06-advanced/index.md @@ -6,4 +6,13 @@ toc_max_heading_level: 4 TDengine 不仅是一个高性能、分布式的时序数据库核心产品,而且集成了专为时序数据量身定制的一系列功能,包括数据订阅、缓存、流计算和 ETL 等。这些功能共同构成了一个完整的时序数据处理解决方案。因此,当你选择使用 TDengine 时,你的应用程序无须额外集成 Kafka、Redis、Spark 或 Flink 等第三方工具,从而极大地简化应用程序的设计复杂度,并显著降低运维成本。下图直观地展示了传统大数据平台架构与TDengine 架构之间的异同点,突显了 TDengine 在时序数据处理领域的独特优势。 -![传统大数据平台架构与 TDengine 架构的对比](./architecture-compare.png) \ No newline at end of file +![传统大数据平台架构与 TDengine 架构的对比](./architecture-compare.png) + +本章主要介绍 TDengine 的一些高级功能,如数据订阅、缓存、流计算、边云协同和数据接入等。 + +```mdx-code-block +import DocCardList from '@theme/DocCardList'; +import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; + + +``` \ No newline at end of file diff --git a/docs/zh/07-operation/10-disaster.md b/docs/zh/07-operation/10-disaster.md index 9fb2cce8ca..71589cf07e 100644 --- a/docs/zh/07-operation/10-disaster.md +++ b/docs/zh/07-operation/10-disaster.md @@ -8,7 +8,7 @@ toc_max_heading_level: 4 ## 容错 -TDengine 支持 WAL 机制,实现数据的容错能力,保证数据的高可用。TDengine 接收到应用程序的请求数据包时,会先将请求的原始数据包写入数据库日志文件,等数据成功写入数据库数据文件后,再删除相应的 WAL。这样保证了 TDengine 能够在断电等因素导致的服务重启时,从数据库日志文件中恢复数据,避免数据丢失。涉及的配置参数有如下两个: +TDengine 支持 WAL 机制,实现数据的容错能力,保证数据的高可靠。TDengine 接收到应用程序的请求数据包时,会先将请求的原始数据包写入数据库日志文件,等数据成功写入数据库数据文件后,再删除相应的 WAL。这样保证了 TDengine 能够在断电等因素导致的服务重启时,从数据库日志文件中恢复数据,避免数据丢失。涉及的配置参数有如下两个: - wal_level :WAL 级别,1 表示写 WAL,但不执行 fsync ; 2 表示写 WAL,而且执行 fsync。默认值为 1。 - wal_fsync_period:当 wal_level 设置为 2 时,执行 fsync 的周期;当 wal-level 设置为 0 时,表示每次写入,立即执行 fsync。 diff --git a/docs/zh/07-operation/index.md b/docs/zh/07-operation/index.md index bfcb47787e..6474bad226 100644 --- a/docs/zh/07-operation/index.md +++ b/docs/zh/07-operation/index.md @@ -2,4 +2,13 @@ sidebar_label: 运维管理 title: TDengine 运维管理 toc_max_heading_level: 4 ---- \ No newline at end of file +--- + +本章主要介绍如何规划、建设、维护以及监控 TDengine 集群。 + +```mdx-code-block +import DocCardList from '@theme/DocCardList'; +import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; + + +``` \ No newline at end of file diff --git a/docs/zh/10-tdinternal/index.md b/docs/zh/10-tdinternal/index.md index 21f106edc9..4a71d639ae 100644 --- a/docs/zh/10-tdinternal/index.md +++ b/docs/zh/10-tdinternal/index.md @@ -3,6 +3,8 @@ title: 技术内幕 description: TDengine 的内部设计 --- +本章简要说明 TDengine 的一些内部设计。 + ```mdx-code-block import DocCardList from '@theme/DocCardList'; import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; diff --git a/docs/zh/14-reference/05-connector/02-rest-api.mdx b/docs/zh/14-reference/05-connector/60-rest-api.mdx similarity index 100% rename from docs/zh/14-reference/05-connector/02-rest-api.mdx rename to docs/zh/14-reference/05-connector/60-rest-api.mdx diff --git a/docs/zh/14-reference/05-connector/index.md b/docs/zh/14-reference/05-connector/index.md index 21bb9396e9..6c73b14309 100644 --- a/docs/zh/14-reference/05-connector/index.md +++ b/docs/zh/14-reference/05-connector/index.md @@ -4,7 +4,7 @@ title: 连接器 description: 详细介绍各种语言的连接器及 REST API --- -TDengine 提供了丰富的应用程序开发接口,为了便于用户快速开发自己的应用,TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口(taosc)和 REST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。 +TDengine 提供了丰富的应用程序开发接口,为了便于用户快速开发自己的应用,TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口(taosc)和 WebSocket 接口连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。 ![TDengine Database connector architecture](./connector.webp) diff --git a/docs/zh/25-application/index.md b/docs/zh/25-application/index.md index f49e6efae0..3c0eea9de8 100644 --- a/docs/zh/25-application/index.md +++ b/docs/zh/25-application/index.md @@ -2,4 +2,13 @@ title: 实践案例 sidebar_label: 实践案例 toc_max_heading_level: 4 ---- \ No newline at end of file +--- + +本章介绍了 TDengine 在各行业应用的一些典型案例。 + +```mdx-code-block +import DocCardList from '@theme/DocCardList'; +import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; + + +``` \ No newline at end of file diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f50a624ea7..f8ae16a05c 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1261,13 +1261,16 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB return TSDB_CODE_SUCCESS; } -static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { +static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) { + *ppCols = NULL; + + int32_t code = 0; int32_t rows = pDataBlock->info.capacity; size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData)); if (pCols == NULL) { - return NULL; + return terrno; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -1280,8 +1283,11 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { if (IS_VAR_DATA_TYPE(pCols[i].info.type)) { pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t)); pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length); - if (pCols[i].varmeta.offset == NULL) { - return NULL; + if (pCols[i].varmeta.offset == NULL || pCols[i].pData == NULL) { + code = terrno; + taosMemoryFree(pCols[i].varmeta.offset); + taosMemoryFree(pCols[i].pData); + goto _error; } pCols[i].varmeta.length = pColInfoData->varmeta.length; @@ -1290,12 +1296,20 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows)); pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes); if (pCols[i].nullbitmap == NULL || pCols[i].pData == NULL) { - return NULL; + code = terrno; + taosMemoryFree(pCols[i].nullbitmap); + taosMemoryFree(pCols[i].pData); + goto _error; } } } - return pCols; + *ppCols = pCols; + return code; + + _error: + taosMemoryFree(pCols); + return code; } static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) { @@ -1423,16 +1437,15 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { int64_t p1 = taosGetTimestampUs(); - SColumnInfoData* pCols = createHelpColInfoData(pDataBlock); - if (pCols == NULL) { + SColumnInfoData* pCols = NULL; + int32_t code = createHelpColInfoData(pDataBlock, &pCols); + if (code != 0) { destroyTupleIndex(index); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + return code; } int64_t p2 = taosGetTimestampUs(); - - int32_t code = blockDataAssign(pCols, pDataBlock, index); + code = blockDataAssign(pCols, pDataBlock, index); if (code) { return code; } diff --git a/source/dnode/mnode/impl/inc/mndCompact.h b/source/dnode/mnode/impl/inc/mndCompact.h index 43cf78a90e..43996705d2 100644 --- a/source/dnode/mnode/impl/inc/mndCompact.h +++ b/source/dnode/mnode/impl/inc/mndCompact.h @@ -47,6 +47,7 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq); SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId); void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact); +int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len); void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index 8704286826..b72d1386c1 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -30,6 +30,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq); bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb); void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList); +bool mndDbIsExist(SMnode *pMnode, const char *db); SSdbRaw *mndDbActionEncode(SDbObj *pDb); const char *mndGetDbStr(const char *src); diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index 8b45b13dd1..aae3a262f4 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -224,6 +224,21 @@ SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) { void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pCompact); + pCompact = NULL; +} + +int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len) { + int32_t code = 0; + SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId); + if (pCompact == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + + (void)strncpy(dbname, pCompact->dbname, len); + mndReleaseCompact(pMnode, pCompact); + TAOS_RETURN(code); } // compact db @@ -488,7 +503,7 @@ _OVER: } tFreeSKillCompactReq(&killCompactReq); - sdbRelease(pMnode->pSdb, pCompact); + mndReleaseCompact(pMnode, pCompact); TAOS_RETURN(code); } @@ -640,16 +655,12 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { sdbRelease(pMnode->pSdb, pDetail); } - SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId); - if (pCompact == NULL) TAOS_RETURN(code); + char dbname[TSDB_TABLE_FNAME_LEN] = {0}; + TAOS_CHECK_RETURN(mndCompactGetDbName(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN)); - SDbObj *pDb = mndAcquireDb(pMnode, pCompact->dbname); - if (pDb == NULL) { + if (!mndDbIsExist(pMnode, dbname)) { needSave = true; - mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, pCompact->dbname); - } else { - mndReleaseDb(pMnode, pDb); - pDb = NULL; + mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname); } if (!needSave) { @@ -666,7 +677,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { } mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id); - mndTransSetDbName(pTrans, pCompact->dbname, NULL); + mndTransSetDbName(pTrans, dbname, NULL); pIter = NULL; while (1) { @@ -734,24 +745,20 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { sdbRelease(pMnode->pSdb, pDetail); } - pDb = mndAcquireDb(pMnode, pCompact->dbname); - if (pDb == NULL) { + if (!mndDbIsExist(pMnode, dbname)) { allFinished = true; - mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, pCompact->dbname); - } else { - mndReleaseDb(pMnode, pDb); - pDb = NULL; + mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname); } if (allFinished) { - mInfo("compact:%d, all finished", pCompact->compactId); + mInfo("compact:%d, all finished", compactId); pIter = NULL; while (1) { SCompactDetailObj *pDetail = NULL; pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); if (pIter == NULL) break; - if (pDetail->compactId == pCompact->compactId) { + if (pDetail->compactId == compactId) { SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); if (pCommitRaw == NULL) { mndTransDrop(pTrans); @@ -774,7 +781,15 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { sdbRelease(pMnode->pSdb, pDetail); } + SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId); + if (pCompact == NULL) { + mndTransDrop(pTrans); + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); + mndReleaseCompact(pMnode, pCompact); if (pCommitRaw == NULL) { mndTransDrop(pTrans); code = TSDB_CODE_MND_RETURN_VALUE_NULL; @@ -793,11 +808,9 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr()); mndTransDrop(pTrans); - sdbRelease(pMnode->pSdb, pCompact); TAOS_RETURN(code); } - sdbRelease(pMnode->pSdb, pCompact); mndTransDrop(pTrans); return 0; } @@ -827,8 +840,8 @@ void mndCompactPullup(SMnode *pMnode) { if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) { mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code)); } + mndReleaseCompact(pMnode, pCompact); } - mndReleaseCompact(pMnode, pCompact); } taosArrayDestroy(pArray); } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 5a7831ac0e..dd3f89c9d0 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -398,6 +398,17 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) { sdbRelease(pSdb, pDb); } +bool mndDbIsExist(SMnode *pMnode, const char *db) { + SDbObj *pDb = mndAcquireDb(pMnode, db); + if (pDb == NULL) { + return false; + } else { + mndReleaseDb(pMnode, pDb); + pDb = NULL; + return true; + } +} + static int32_t mndCheckDbName(const char *dbName, SUserObj *pUser) { char *pos = strstr(dbName, TS_PATH_DELIMITER); if (pos == NULL) { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b8ebb2254b..4b206fc04f 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -349,6 +349,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { streamMetaWLock(pStreamMeta); pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); streamMetaWUnLock(pStreamMeta); + if (pTaskList == NULL) { + return terrno; + } tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 38a2dd3ab2..af5b3523e8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -355,7 +355,7 @@ void tsdbCacherowsReaderClose(void* pReader) { return; } - if (p->pSchema != NULL) { + if (p->pSchema != NULL && p->transferBuf != NULL) { for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { taosMemoryFreeClear(p->transferBuf[i]); } @@ -450,23 +450,27 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 return TSDB_CODE_INVALID_PARA; } + int32_t code = TSDB_CODE_SUCCESS; + bool hasRes = false; + SArray* pRow = NULL; + void** pRes = NULL; SCacheRowsReader* pr = pReader; + int32_t pkBufLen = 0; - int32_t code = TSDB_CODE_SUCCESS; - bool hasRes = false; - SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol)); + pr->pReadSnap = NULL; + pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol)); if (pRow == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } - void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); + pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); if (pRes == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } - int32_t pkBufLen = (pr->rowKey.numOfPKs > 0) ? pr->pkColumn.bytes : 0; + pkBufLen = (pr->rowKey.numOfPKs > 0) ? pr->pkColumn.bytes : 0; for (int32_t j = 0; j < pr->numOfCols; ++j) { int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes; @@ -690,6 +694,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 _end: tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true); + pr->pReadSnap = NULL; + if (pr->pCurFileSet) { pr->pCurFileSet = NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 65be287125..09ca1cdc84 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2434,21 +2434,25 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))}; if (info.pKeyRangeList == NULL) { + pReader->code = terrno; return false; } int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); if (code != TSDB_CODE_SUCCESS) { + pReader->code = code; return false; } code = initMemDataIterator(pScanInfo, pReader); if (code != TSDB_CODE_SUCCESS) { + pReader->code = code; return false; } code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); if (code != TSDB_CODE_SUCCESS) { + pReader->code = code; return code; } @@ -2461,7 +2465,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan for (int32_t i = 0; i < taosArrayGetSize(info.pKeyRangeList); ++i) { SSttKeyRange* pKeyRange = taosArrayGet(info.pKeyRangeList, i); if (pKeyRange == NULL) { - return TSDB_CODE_INVALID_PARA; + continue; } if (pkCompEx(&pScanInfo->sttRange.skey, &pKeyRange->skey) > 0) { @@ -2766,6 +2770,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SBlockData* pBlockData = &pReader->status.fileBlockData; (void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); + if (pReader->code != 0) { + code = pReader->code; + goto _end; + } while (1) { bool hasBlockData = false; @@ -3180,6 +3188,10 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { } bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + if (pReader->code != TSDB_CODE_SUCCESS) { + return pReader->code; + } + if (!hasDataInSttFile) { bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { @@ -3273,6 +3285,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) { (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + if (pReader->code != 0) { + return pReader->code; + } } TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); @@ -3314,6 +3329,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + if (pReader->code != 0) { + return pReader->code; + } // no data in stt block, no need to proceed. while (hasDataInSttBlock(pScanInfo)) { @@ -3414,6 +3432,10 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en } STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter; + if (pBlockScanInfo == NULL || *pBlockScanInfo == NULL) { + return TSDB_CODE_SUCCESS; + } + if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) { bool hasNexTable = moveToNextTable(pUidList, pStatus); @@ -4791,6 +4813,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { void* p = pReader->pReadSnap; if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) { tsdbUntakeReadSnap2(pReader, p, true); + pReader->pReadSnap = NULL; } (void) tsem_destroy(&pReader->resumeAfterSuspend); @@ -4873,6 +4896,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { void* p = pReader->pReadSnap; if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) { tsdbUntakeReadSnap2(pReader, p, false); + pReader->pReadSnap = NULL; } if (pReader->bFilesetDelimited) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index afadc8d8c2..f70cfff71d 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -1141,6 +1141,7 @@ uint64_t ctgGetClusterCacheSize(SCatalog *pCtg); void ctgClearHandleMeta(SCatalog* pCtg, int64_t *pClearedSize, int64_t *pCleardNum, bool *roundDone); void ctgClearAllHandleMeta(int64_t *clearedSize, int64_t *clearedNum, bool *roundDone); void ctgProcessTimerEvent(void *param, void *tmrId); +int32_t ctgBuildUseDbOutput(SUseDbOutput** ppOut, SDBVgInfo* vgInfo); int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); int32_t ctgGetCachedStbNameFromSuid(SCatalog* pCtg, char* dbFName, uint64_t suid, char **stbName); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 7f57a09460..4caa66445a 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2881,6 +2881,11 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask* pTask) { CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache)); if (NULL != dbCache) { + if (pTask->subTask) { + pMsgCtx->reqType = TDMT_MND_USE_DB; + CTG_ERR_JRET(ctgBuildUseDbOutput((SUseDbOutput**)&pMsgCtx->out, dbCache->vgCache.vgInfo)); + } + CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res)); ctgReleaseVgInfoToCache(pCtg, dbCache); diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index c46ded17b8..fa4df70f59 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -2590,6 +2590,22 @@ int32_t dupViewMetaFromRsp(SViewMetaRsp* pRsp, SViewMeta* pViewMeta) { return TSDB_CODE_SUCCESS; } + +int32_t ctgBuildUseDbOutput(SUseDbOutput** ppOut, SDBVgInfo* vgInfo) { + *ppOut = taosMemoryCalloc(1, sizeof(SUseDbOutput)); + if (NULL == *ppOut) { + CTG_ERR_RET(terrno); + } + + int32_t code = cloneDbVgInfo(vgInfo, &(*ppOut)->dbVgroup); + if (code) { + taosMemoryFreeClear(*ppOut); + CTG_RET(code); + } + + return TSDB_CODE_SUCCESS; +} + uint64_t ctgGetTbTSMACacheSize(STableTSMAInfo* pTsmaInfo) { if (!pTsmaInfo) { return 0; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index ca79f3f285..46fc618f76 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -26,7 +26,7 @@ #define T_LONG_JMP(_obj, _c) \ do { \ - ASSERT((_c) != -1); \ + ASSERT(1); \ longjmp((_obj), (_c)); \ } while (0) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 78b18e24cc..f0aadd12de 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -147,12 +147,7 @@ _error: if (pInfo != NULL) { destroyAggOperatorInfo(pInfo); } - - if (pOperator != NULL) { - cleanupExprSupp(&pOperator->exprSupp); - } - - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -180,11 +175,11 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; - if (pOperator->blocking && pAggInfo->hasValidBlock) return false; - - SExprSupp* pSup = &pOperator->exprSupp; - SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (pOperator->blocking && pAggInfo->hasValidBlock) { + return false; + } + SExprSupp* pSup = &pOperator->exprSupp; int64_t st = taosGetTimestampUs(); int32_t order = pAggInfo->binfo.inputTsOrder; SSDataBlock* pBlock = pAggInfo->pNewGroupBlock; @@ -458,7 +453,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin * not assign result buffer yet, add new result buffer * all group belong to one result set, and each group result has different group id so set the id to be one */ - if (pResultRow->pageId == -1) { + if (pResultRow == NULL || pResultRow->pageId == -1) { int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); if (ret != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, terrno); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index d9e413b144..652ebc0f9a 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -187,6 +187,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull); STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); + if (totalTables) QUERY_CHECK_NULL(pList, code, lino, _error, terrno); uint64_t suid = tableListGetSuid(pTableListInfo); code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, @@ -246,7 +247,7 @@ _error: } pInfo->pTableList = NULL; destroyCacheScanOperator(pInfo); - taosMemoryFree(pOperator); + destroyOperator(pOperator); return code; } @@ -439,8 +440,8 @@ void destroyCacheScanOperator(void* param) { SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pBufferedRes); - taosMemoryFree(pInfo->pSlotIds); - taosMemoryFree(pInfo->pDstSlotIds); + taosMemoryFreeClear(pInfo->pSlotIds); + taosMemoryFreeClear(pInfo->pDstSlotIds); taosArrayDestroy(pInfo->pCidList); taosArrayDestroy(pInfo->pFuncTypeList); taosArrayDestroy(pInfo->pUidList); @@ -462,13 +463,13 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); if (*pSlotIds == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } *pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); if (*pDstSlotIds == NULL) { - taosMemoryFree(*pSlotIds); - return TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(*pSlotIds); + return terrno; } SSchemaInfo* pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos); diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 02b4a61bcb..63c0c5fe87 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -82,9 +82,15 @@ static int32_t setCountWindowOutputBuff(SExprSupp* pExprSup, SCountWindowSupp* p int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SCountWindowResult* pBuff = getCountWinStateInfo(pCountSup); + QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno); (*pResult) = &pBuff->row; code = setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); (*ppResBuff) = pBuff; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } @@ -335,7 +341,7 @@ _error: destroyCountWindowOperatorInfo(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 3f98583fbe..f9ae8be84f 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -145,7 +145,7 @@ _error: destroyEWindowOperatorInfo(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 2688f5698b..21b1c2838b 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -443,7 +443,7 @@ _error: doDestroyExchangeOperatorInfo(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -989,6 +989,10 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic dataInfo.taskId = pExchangeInfo->pTaskId; dataInfo.index = pIdx->srcIdx; dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); + if (dataInfo.pSrcUidList == NULL) { + return terrno; + } + dataInfo.srcOpType = pBasicParam->srcOpType; dataInfo.tableSeq = pBasicParam->tableSeq; @@ -1007,6 +1011,10 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic pDataInfo->status = EX_SOURCE_DATA_NOT_READY; } pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); + if (pDataInfo->pSrcUidList == NULL) { + return terrno; + } + pDataInfo->srcOpType = pBasicParam->srcOpType; pDataInfo->tableSeq = pBasicParam->tableSeq; } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 9f11b65461..c2297d9fba 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -826,6 +826,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf if (tsTagFilterCache) { tableList = taosArrayDup(pTableListInfo->pTableList, NULL); + QUERY_CHECK_NULL(tableList, code, lino, end, terrno); + code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), tableList, taosArrayGetSize(tableList) * sizeof(STableKeyInfo)); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9029f000c5..dc0baebd66 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1293,6 +1293,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (uid == 0) { if (numOfTables != 0) { STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0); + if (!tmp) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + taosRUnLockLatch(&pTaskInfo->lock); + return terrno; + } if (tmp) uid = tmp->uid; ts = INT64_MIN; pScanInfo->currentTable = 0; @@ -1422,6 +1427,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); + if (!pList) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + tDeleteSchemaWrapper(mtInfo.schema); + return code; + } int32_t size = tableListGetSize(pTableListInfo); code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, @@ -1506,6 +1516,7 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) { int32_t numOfTables = tableListGetSize(pTableListInfo); for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); + QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno); void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } @@ -1520,7 +1531,7 @@ _end: return pUidList; } -static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) { +static int32_t extractTableList(SArray* pList, const SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -1528,23 +1539,25 @@ static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) { if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info; - void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo); + + void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = pOperator->info; - void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo); + + void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } else { if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) { - extractTableList(pList, pOperator->pDownstream[0]); + code = extractTableList(pList, pOperator->pDownstream[0]); } } _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); + qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code)); } + return code; } int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) { @@ -1552,16 +1565,17 @@ int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) { return TSDB_CODE_INVALID_PARA; } + *pList = NULL; SArray* pArray = taosArrayInit(0, POINTER_BYTES); if (pArray == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } - SOperatorInfo* pOperator = pTaskInfo->pRoot; - extractTableList(pArray, pOperator); - - *pList = pArray; - return TSDB_CODE_SUCCESS; + int32_t code = extractTableList(pArray, pTaskInfo->pRoot); + if (code == 0) { + *pList = pArray; + } + return code; } int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) { diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index dc2dd84f37..cbb124b9e0 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -158,8 +158,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR if (isIntervalQuery) { if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists. pResult = getResultRowByPos(pResultBuf, p1, true); - if (NULL == pResult) { - T_LONG_JMP(pTaskInfo->env, terrno); + if (pResult == NULL) { + pTaskInfo->code = terrno; + return NULL; } ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); @@ -171,7 +172,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // todo pResult = getResultRowByPos(pResultBuf, p1, true); if (NULL == pResult) { - T_LONG_JMP(pTaskInfo->env, terrno); + pTaskInfo->code = terrno; + return NULL; } ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); @@ -184,7 +186,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); if (pPage == NULL) { qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, terrno); + pTaskInfo->code = terrno; + return NULL; } releaseBufPage(pResultBuf, pPage); } @@ -193,7 +196,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR if (pResult == NULL) { pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize); if (pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, terrno); + pTaskInfo->code = terrno; + return NULL; } // add a new result set for a new group @@ -202,7 +206,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR sizeof(SResultRowPosition)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); + pTaskInfo->code = code; + return NULL; } } @@ -212,7 +217,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // too many time window in query if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); + pTaskInfo->code = TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW; + return NULL; } return pResult; @@ -1107,6 +1113,11 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i); + if (!pTable) { + taosArrayDestroy(pDeleterParam->pUidList); + taosMemoryFree(pDeleterParam); + return TSDB_CODE_OUT_OF_MEMORY; + } void* tmp = taosArrayPush(pDeleterParam->pUidList, &pTable->uid); if (!tmp) { taosArrayDestroy(pDeleterParam->pUidList); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 64d7c0484a..882a0dc4b6 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -567,7 +567,7 @@ _error: } pTaskInfo->code = code; - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); return code; } diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index c753f0fd9b..f2e24f160c 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1504,7 +1504,7 @@ _error: destroyGroupCacheOperator(pInfo); } - taosMemoryFree(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 96abfdb180..309fb4b52f 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -615,7 +615,12 @@ _error: if (pInfo != NULL) { destroyGroupOperatorInfo(pInfo); } - destroyOperator(pOperator); + + if (pOperator) { + pOperator->info = NULL; + destroyOperator(pOperator); + } + return code; } @@ -1241,7 +1246,7 @@ _error: destroyPartitionOperatorInfo(pInfo); } pTaskInfo->code = code; - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); TAOS_RETURN(code); } @@ -1253,6 +1258,9 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup, false); + if (pResultRow == NULL || pTaskInfo->code != 0) { + return pTaskInfo->code; + } return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset); } @@ -1783,8 +1791,8 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart _error: pTaskInfo->code = code; - destroyStreamPartitionOperatorInfo(pInfo); - taosMemoryFreeClear(pOperator); + if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo); + destroyOperator(pOperator); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 3f86291d6a..7c9f242a63 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -180,7 +180,7 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* _error: destroyProjectOperatorInfo(pInfo); - taosMemoryFree(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -529,7 +529,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _error: destroyIndefinitOperatorInfo(pInfo); - taosMemoryFree(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -703,6 +703,9 @@ int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, int64_t groupId = 0; SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup, true); + if (pRow == NULL || pTaskInfo->code != 0) { + return pTaskInfo->code; + } for (int32_t i = 0; i < numOfExprs; ++i) { struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset); diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 5c4d3fe009..7100e10276 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -155,6 +155,11 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo schemaInfo.tablename = taosStrdup(mr.me.name); schemaInfo.dbname = taosStrdup(dbName); + if (schemaInfo.tablename == NULL || schemaInfo.dbname == NULL) { + pAPI->metaReaderFn.clearReader(&mr); + cleanupQueriedTableScanInfo(&schemaInfo); + return terrno; + } if (mr.me.type == TSDB_SUPER_TABLE) { schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); @@ -166,8 +171,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid); if (code != TSDB_CODE_SUCCESS) { pAPI->metaReaderFn.clearReader(&mr); - taosMemoryFree(schemaInfo.tablename); - taosMemoryFree(schemaInfo.dbname); + cleanupQueriedTableScanInfo(&schemaInfo); return code; } @@ -177,18 +181,26 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); } + pAPI->metaReaderFn.clearReader(&mr); + if (schemaInfo.sw == NULL) { + cleanupQueriedTableScanInfo(&schemaInfo); return terrno; } - pAPI->metaReaderFn.clearReader(&mr); schemaInfo.qsw = extractQueriedColumnSchema(pScanNode); if (schemaInfo.qsw == NULL) { + cleanupQueriedTableScanInfo(&schemaInfo); return terrno; } void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo); - return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY; + if (p == NULL) { + cleanupQueriedTableScanInfo(&schemaInfo); + return terrno; + } + + return code; } SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 90a88539a2..1b7ae670fe 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1295,8 +1295,14 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { (*ppRes) = NULL; return code; } - - tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); + STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); + if (!tmp) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + taosRUnLockLatch(&pTaskInfo->lock); + (*ppRes) = NULL; + return terrno; + } + tInfo = *tmp; taosRUnLockLatch(&pTaskInfo->lock); code = pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1); @@ -1460,7 +1466,7 @@ _error: destroyTableScanOperatorInfo(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -3551,6 +3557,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray** size_t size = tableListGetSize(pTableListInfo); for (int32_t i = 0; i < size; ++i) { STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i); + QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno); void* tmp = taosArrayPush(tableIdList, &pkeyInfo->uid); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } @@ -4140,7 +4147,7 @@ _error: destroyStreamScanOperatorInfo(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -4154,6 +4161,14 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); + if (!item) { + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), + GET_TASKID(pTaskInfo)); + tDecoderClear(&(*mr).coder); + pAPI->metaReaderFn.clearReader(mr); + T_LONG_JMP(pTaskInfo->env, terrno); + } + code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid); tDecoderClear(&(*mr).coder); if (code != TSDB_CODE_SUCCESS) { @@ -4688,8 +4703,8 @@ _error: pInfo->pTableListInfo = NULL; } - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); + if (pInfo != NULL) destroyTagScanOperatorInfo(pInfo); + destroyOperator(pOperator); return code; } @@ -4839,18 +4854,23 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu return TSDB_CODE_SUCCESS; } -static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) { +static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) { pInfo->bGroupProcessed = false; size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); + if (!tableKeyInfo) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } if (tableKeyInfo->groupId != pInfo->groupId) { break; } } pInfo->tableEndIndex = i - 1; + return TSDB_CODE_SUCCESS; } static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { @@ -4879,7 +4899,11 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { } static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) { - setGroupStartEndIndex(pInfo); + int32_t code = setGroupStartEndIndex(pInfo); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + return code; + } STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo)); if (pSubTblsInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -4906,7 +4930,7 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) { } int32_t bufPageSize = pInfo->bufPageSize; int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize; - int32_t code = + code = createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pSubTblsInfo->aInputs); @@ -5184,7 +5208,9 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* return code; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId; + STableKeyInfo* pTmpGpId = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno); + pInfo->groupId = pTmpGpId->groupId; code = startSubTablesTableMergeScan(pOperator); QUERY_CHECK_CODE(code, lino, _end); } @@ -5198,6 +5224,7 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity); if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) { STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno); pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo); } if (pBlock != NULL) { @@ -5214,7 +5241,10 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; + STableKeyInfo* pTmpGpId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno); + + pInfo->groupId = pTmpGpId->groupId; code = startSubTablesTableMergeScan(pOperator); QUERY_CHECK_CODE(code, lino, _end); resetLimitInfoForNextGroup(&pInfo->limitInfo); @@ -5544,6 +5574,7 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); + QUERY_CHECK_NULL(tableKeyInfo, code, lino, _end, terrno); if (tableKeyInfo->groupId != pInfo->groupId) { break; } @@ -5669,7 +5700,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { return code; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId; + STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + pInfo->groupId = tmp->groupId; startGroupTableMergeScan(pOperator); } @@ -5683,6 +5716,7 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pOperator); if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) { STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno); pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo); } if (pBlock != NULL) { @@ -5704,7 +5738,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; + STableKeyInfo* tmp = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + pInfo->groupId = tmp->groupId; startGroupTableMergeScan(pOperator); resetLimitInfoForNextGroup(&pInfo->limitInfo); } @@ -5912,8 +5948,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR _error: pTaskInfo->code = code; pInfo->base.pTableListInfo = NULL; - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); + if (pInfo != NULL) destroyTableMergeScanOperatorInfo(pInfo); + destroyOperator(pOperator); return code; } @@ -6070,7 +6106,7 @@ _error: if (pInfo != NULL) { destoryTableCountScanOperator(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 8fb0646495..2a1b4f3e23 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -130,7 +130,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN pGroupIdCalc->lastKeysLen = 0; pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen); if (!pGroupIdCalc->keyBuf) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } } } @@ -164,7 +164,7 @@ _error: destroySortOperatorInfo(pInfo); } - taosMemoryFree(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -370,8 +370,13 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + if (ps == NULL) { + return terrno; + } + ps->param = pOperator->pDownstream[0]; ps->onlyRef = true; + code = tsortAddSource(pInfo->pSortHandle, ps); if (code) { taosMemoryFree(ps); @@ -464,6 +469,9 @@ void destroySortOperatorInfo(void* param) { int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); + if (pInfo == NULL) { + return terrno; + } SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info; @@ -638,6 +646,10 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam)); + if (ps == NULL || param == NULL) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + param->childOpInfo = pOperator->pDownstream[0]; param->grpSortOpInfo = pInfo; ps->param = param; @@ -824,6 +836,6 @@ _error: if (pInfo != NULL) { destroyGroupSortOperatorInfo(pInfo); } - taosMemoryFree(pOperator); + destroyOperator(pOperator); return code; } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 499c08f89d..39fe78502d 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -926,7 +926,7 @@ _error: destroyStreamCountAggOperatorInfo(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 201b19d2cb..57e31cfebe 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -980,8 +980,8 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* return code; _error: - destroyStreamEventOperatorInfo(pInfo); - taosMemoryFreeClear(pOperator); + if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo); + destroyOperator(pOperator); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index b7b5b1a38c..b8686d0f19 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1458,8 +1458,8 @@ _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } - destroyStreamFillOperatorInfo(pInfo); - taosMemoryFreeClear(pOperator); + if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 563b78144f..08b644b6ec 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2012,8 +2012,8 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN return code; _error: - destroyStreamFinalIntervalOperatorInfo(pInfo); - taosMemoryFreeClear(pOperator); + if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -3832,7 +3832,7 @@ _error: destroyStreamSessionAggOperatorInfo(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -4088,7 +4088,7 @@ _error: if (pInfo != NULL) { destroyStreamSessionAggOperatorInfo(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); @@ -4977,8 +4977,8 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* return code; _error: - destroyStreamStateOperatorInfo(pInfo); - taosMemoryFreeClear(pOperator); + if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo); + destroyOperator(pOperator); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -5312,8 +5312,8 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* return code; _error: - destroyStreamFinalIntervalOperatorInfo(pInfo); - taosMemoryFreeClear(pOperator); + if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 2ba9152952..cdd22c2adc 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2813,8 +2813,8 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP _error: if (pInfo) { pInfo->pTableListInfo = NULL; + destroyBlockDistScanOperatorInfo(pInfo); } - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); return code; } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index a4bf2dce72..47ba8e560a 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1207,8 +1207,8 @@ _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); + if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 2383c09dac..a367afeed4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -75,9 +75,9 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup, true); - if (pResultRow == NULL) { + if (pResultRow == NULL || pTaskInfo->code != 0) { *pResult = NULL; - return TSDB_CODE_SUCCESS; + return pTaskInfo->code; } // set time window for current result @@ -1402,7 +1402,7 @@ _error: if (pInfo != NULL) { destroyIntervalOperatorInfo(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -1678,7 +1678,7 @@ _error: destroyStateWindowOperatorInfo(pInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -1770,8 +1770,8 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh return code; _error: - destroySWindowOperatorInfo(pInfo); - taosMemoryFreeClear(pOperator); + if (pInfo != NULL) destroySWindowOperatorInfo(pInfo); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -2083,7 +2083,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge _error: destroyMAIOperatorInfo(miaInfo); - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -2415,7 +2415,7 @@ _error: destroyMergeIntervalOperatorInfo(pMergeIntervalInfo); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 755eef4431..9e902140f4 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -285,6 +285,10 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, pSortHandle->pageSize = pageSize; pSortHandle->numOfPages = numOfPages; pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL); + if (pSortHandle->pSortInfo == NULL) { + return terrno; + } + pSortHandle->loops = 0; pSortHandle->pqMaxTupleLength = pqMaxTupleLength; @@ -1708,6 +1712,9 @@ int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) { pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t); pHandle->extRowsMemSize = extRowsMemSize; pHandle->aExtRowsOrders = taosArrayDup(pHandle->pSortInfo, NULL); + if (pHandle->aExtRowsOrders == NULL) { + return terrno; + } int32_t code = initRowIdSort(pHandle); if (code) { diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 40e0407a54..4eefd150f3 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -305,7 +305,7 @@ int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, dou return TSDB_CODE_NO_DISKSPACE; } - int32_t ret = createDiskbasedBuf(&(*pBucket)->pBuffer, (*pBucket)->bufPageSize, (*pBucket)->bufPageSize * 1024, "1", tsTempDir); + int32_t ret = createDiskbasedBuf(&(*pBucket)->pBuffer, (*pBucket)->bufPageSize, (*pBucket)->bufPageSize * DEFAULT_NUM_OF_SLOT * 4, "1", tsTempDir); if (ret != 0) { tMemBucketDestroy(*pBucket); return ret; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index bce8cda125..9b61f81939 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -581,6 +581,8 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { return TSDB_CODE_OUT_OF_MEMORY; } memcpy(*pDst, pSrc, sizeof(*pSrc)); + (*pDst)->vgArray = NULL; + if (pSrc->vgHash) { (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 23cc7324f0..cd67b89208 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -401,8 +401,8 @@ static int32_t concatCopyHelper(const char *input, char *output, bool hasNchar, taosMemoryFree(newBuf); return TSDB_CODE_SCALAR_CONVERT_ERROR; } - (void)memcpy(varDataVal(output) + *dataLen, newBuf, varDataLen(input) * TSDB_NCHAR_SIZE); - *dataLen += varDataLen(input) * TSDB_NCHAR_SIZE; + (void)memcpy(varDataVal(output) + *dataLen, newBuf, len); + *dataLen += len; taosMemoryFree(newBuf); } else { (void)memcpy(varDataVal(output) + *dataLen, varDataVal(input), varDataLen(input)); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 832bcb5015..a0c6317126 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -819,10 +819,21 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); + if (pHandle == NULL) { + goto _EXIT; + } + pHandle->list = tdListNew(sizeof(SCfComparator)); + if (pHandle->list == NULL) { + goto _EXIT; + } + (void)taosThreadMutexInit(&pHandle->mutex, NULL); (void)taosThreadMutexInit(&pHandle->cfMutex, NULL); pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (pHandle->cfInst == NULL) { + goto _EXIT; + } rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 87293c59ec..321027c293 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1277,10 +1277,13 @@ static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64 if (pMeta->closeFlag) { streamMetaWUnLock(pMeta); stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId); - return -1; + return TSDB_CODE_FAILED; } *pList = taosArrayDup(pMeta->pTaskList, NULL); + if (*pList == NULL) { + return terrno; + } taosHashClear(pMeta->startInfo.pReadyTaskSet); taosHashClear(pMeta->startInfo.pFailedTaskSet); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 5e538c1e42..752101afbd 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -235,7 +235,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte void* newRet = NULL; int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet); if (newRet == NULL) { - if (code) { + if (code != -1) { stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, tstrerror(code)); } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index a83a0e4cc8..6506d449a6 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -48,7 +48,7 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, - terrstr(terrno)); + terrstr()); return terrno; } diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index cf1fef71c2..2f418d5a01 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -445,9 +445,8 @@ int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) { return EBUSY; #else int32_t code = pthread_mutex_trylock(mutex); - if (code) { - terrno = TAOS_SYSTEM_ERROR(code); - return terrno; + if (code && code != EBUSY) { + code = TAOS_SYSTEM_ERROR(code); } return code; #endif @@ -816,9 +815,8 @@ int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) { return pthread_mutex_trylock((pthread_mutex_t *)lock); #else int32_t code = pthread_spin_trylock((pthread_spinlock_t *)lock); - if (code) { - terrno = TAOS_SYSTEM_ERROR(code); - return code; + if (code && code != EBUSY) { + code = TAOS_SYSTEM_ERROR(code); } return code; #endif diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 2daedfdf32..5c1909b16e 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -624,6 +624,9 @@ void taosHashTableResize(SHashObj *pHashObj) { size_t inc = newCapacity - pHashObj->capacity; void *p = taosMemoryCalloc(inc, sizeof(SHashEntry)); + if (p == NULL) { + return; + } for (int32_t i = 0; i < inc; ++i) { pHashObj->hashList[i + pHashObj->capacity] = (void *)((char *)p + i * sizeof(SHashEntry)); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 4dc386d500..a1fbe3db2a 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -367,6 +367,9 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem pPBuf->fileSize = 0; pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem)); pPBuf->freePgList = tdListNew(POINTER_BYTES); + if (pPBuf->pFree == NULL || pPBuf->freePgList == NULL) { + goto _error; + } // at least more than 2 pages must be in memory if (inMemBufSize < pagesize * 2) {