Merge branch '3.0' of github.com:taosdata/TDengine into fix/TD-31288

This commit is contained in:
54liuyao 2024-08-07 16:59:45 +08:00
commit 3a7c4ab5a1
30 changed files with 580 additions and 274 deletions

View File

@ -10,7 +10,7 @@ sidebar_label: 集群维护
## 数据重整 ## 数据重整
TDengine 面向多种写入场景而很多写入场景下TDengine 的存储会导致数据存储的放大或数据文件的空洞等。这一方面影响数据的存储效率另一方面也会影响查询效率。为了解决上述问题TDengine 企业版提供了对数据的重整功能,即 DATA COMPACT 功能,将存储的数据文件重新整理,删除文件空洞和无效数据,提高数据的组织度,从而提高存储和查询的效率。 TDengine 面向多种写入场景而很多写入场景下TDengine 的存储会导致数据存储的放大或数据文件的空洞等。这一方面影响数据的存储效率另一方面也会影响查询效率。为了解决上述问题TDengine 企业版提供了对数据的重整功能,即 DATA COMPACT 功能,将存储的数据文件重新整理,删除文件空洞和无效数据,提高数据的组织度,从而提高存储和查询的效率。数据重整功能在 3.0.3.0 版本第一次发布,此后又经过了多次迭代优化,建议使用最新版本。
### 语法 ### 语法
@ -39,7 +39,7 @@ KILL COMPACT compact_id
## Vgroup Leader 再平衡 ## Vgroup Leader 再平衡
当多副本集群中的一个或多个节点因为升级或其它原因而重启后,有可能出现集群中各个 dnode 负载不均衡的现象,极端情况下会出现所有 vgroup 的 leader 都位于同一个 dnode 的情况。为了解决这个问题,可以使用下面的命令 当多副本集群中的一个或多个节点因为升级或其它原因而重启后,有可能出现集群中各个 dnode 负载不均衡的现象,极端情况下会出现所有 vgroup 的 leader 都位于同一个 dnode 的情况。为了解决这个问题,可以使用下面的命令,该命令在 3.0.4.0 版本中首次发布,建议尽可能使用最新版本。
```SQL ```SQL
balance vgroup leader; # 再平衡所有 vgroup 的 leader balance vgroup leader; # 再平衡所有 vgroup 的 leader
@ -73,7 +73,7 @@ restore qnode on dnode <dnode_id># 恢复dnode上的qnode
## 分裂虚拟组 ## 分裂虚拟组
当一个 vgroup 因为子表数过多而导致 CPU 或 Disk 资源使用量负载过高时,增加 dnode 节点后可通过split vgroup命令把该vgroup分裂为两个虚拟组。分裂完成后新产生的两个 vgroup 承担原来由一个 vgroup 提供的读写服务。 当一个 vgroup 因为子表数过多而导致 CPU 或 Disk 资源使用量负载过高时,增加 dnode 节点后可通过split vgroup命令把该vgroup分裂为两个虚拟组。分裂完成后新产生的两个 vgroup 承担原来由一个 vgroup 提供的读写服务。该命令在 3.0.6.0 版本第一次发布,建议尽可能使用最新版本。
```sql ```sql
split vgroup <vgroup_id> split vgroup <vgroup_id>
@ -97,7 +97,7 @@ split vgroup <vgroup_id>
## 双副本 ## 双副本
双副本是一种特殊的数据库高可用配置,本节对它的使用和维护操作进行特别说明。 双副本是一种特殊的数据库高可用配置,本节对它的使用和维护操作进行特别说明。该功能在 3.3.0.0 版本中第一次发布,建议尽可能使用最新版本。
### 查看 Vgroups 的状态 ### 查看 Vgroups 的状态

View File

@ -4,14 +4,13 @@ title: 多级存储与对象存储
toc_max_heading_level: 4 toc_max_heading_level: 4
--- ---
TDengine 特有的多级存储功能,其作用是将较近的热度较高的数据存储在高速介质上,而时间久远热度很低的数据存储在低成本介质上,达成了以下目标: 本节介绍 TDengine Enterprise 特有的多级存储功能,其作用是将较近的热度较高的数据存储在高速介质上,而时间久远热度很低的数据存储在低成本介质上,达成了以下目标:
- 降低存储成本 -- 将数据分级存储后,海量极冷数据存入廉价存储介质带来显著经济性 - 降低存储成本 -- 将数据分级存储后,海量极冷数据存入廉价存储介质带来显著经济性
- 提升写入性能 -- 得益于每级存储可支持多个挂载点WAL 预写日志也支持 0 级的多挂载点并行写入,极大提升写入性能(实际场景测得支持持续写入每秒 3 亿测点以上),在机械硬盘上可获得极高磁盘 IO 吞吐(实测可达 2GB/s - 提升写入性能 -- 得益于每级存储可支持多个挂载点WAL 预写日志也支持 0 级的多挂载点并行写入,极大提升写入性能(实际场景测得支持持续写入每秒 3 亿测点以上),在机械硬盘上可获得极高磁盘 IO 吞吐(实测可达 2GB/s
- 方便维护 -- 配置好各级存储挂载点后,系统数据迁移等工作,无需人工干预;存储扩容更灵活、方便 - 方便维护 -- 配置好各级存储挂载点后,系统数据迁移等工作,无需人工干预;存储扩容更灵活、方便
- 对 SQL 透明 -- 无论查询的数据是否跨级,一条 SQL 可返回所有数据,简单高效 - 对 SQL 透明 -- 无论查询的数据是否跨级,一条 SQL 可返回所有数据,简单高效
多级存储所涉及的各层存储介质都是本地存储设备。除了本地存储设备之外TDengine Enterprise 还支持使用对象存储(S3),将最冷的一批数据保存在最廉价的介质上,以进一步降低存储成本,并在必要时仍然可以进行查询,且数据存储在哪里也对 SQL 透明。支持对象存储在 3.3.0.0 版本中首次发布,建议使用最新版本。
多级存储所涉及的各层存储介质都是本地存储设备。除了本地存储设备之外TDengine 还支持使用对象存储(S3),将最冷的一批数据保存在最廉价的介质上,以进一步降低存储成本,并在必要时仍然可以进行查询,且数据存储在哪里也对 SQL 透明。
## 多级存储 ## 多级存储

View File

@ -4,7 +4,7 @@ title: 用户和权限管理
toc_max_heading_level: 4 toc_max_heading_level: 4
--- ---
TDengine 默认仅配置了一个 root 用户该用户拥有最高权限。TDengine 支持对系统资源、库、表、视图和主题的访问权限控制。root 用户可以为每个用户针对不同的资源设置不同的访问权限。本节介绍 TDengine 中的用户和权限管理。 TDengine 默认仅配置了一个 root 用户该用户拥有最高权限。TDengine 支持对系统资源、库、表、视图和主题的访问权限控制。root 用户可以为每个用户针对不同的资源设置不同的访问权限。本节介绍 TDengine 中的用户和权限管理。用户和权限管理是 TDengine Enterprise 特有功能。
## 用户管理 ## 用户管理

View File

@ -4,7 +4,7 @@ title: 更多安全策略
toc_max_heading_level: 4 toc_max_heading_level: 4
--- ---
除了传统的用户和权限管理之外TDengine 还有其他的安全策略,例如 IP 白名单、审计日志、数据加密等。 除了传统的用户和权限管理之外TDengine 还有其他的安全策略,例如 IP 白名单、审计日志、数据加密等,这些都是 TDengine Enterprise 特有功能,其中白名单功能在 3.2.0.0 版本首次发布,审计日志在 3.1.1.0 版本中首次发布,数据库加密在 3.3.0.0 中首次发布,建议使用最新版本
## IP 白名单 ## IP 白名单

View File

@ -6,7 +6,7 @@ toc_max_heading_level: 4
## 简介 ## 简介
1. 部分用户因为部署环境的特殊性只能部署两台服务器,同时希望实现一定的服务高可用和数据高可靠。本文主要描述基于数据复制和客户端 Failover 两项关键技术的 TDengine 双活系统的产品行为包括双活系统的架构、配置、运维等。TDengine 双活既可以用于前面所述资源受限的环境,也可用于在两套 TDengine 集群(不限资源)之间的灾备场景。 1. 部分用户因为部署环境的特殊性只能部署两台服务器,同时希望实现一定的服务高可用和数据高可靠。本文主要描述基于数据复制和客户端 Failover 两项关键技术的 TDengine 双活系统的产品行为包括双活系统的架构、配置、运维等。TDengine 双活既可以用于前面所述资源受限的环境,也可用于在两套 TDengine 集群(不限资源)之间的灾备场景。双活是 TDengine Enterprise 特有功能,在 3.3.0.0 版本中第一次发布,建议使用最新版本。
2. 双活系统的定义是:业务系统中有且仅有两台服务器,其上分别部署一套服务,在业务层看来这两台机器和两套服务是一个完整的系统,对其中的细节业务层不需要感知。双活中的两个节点通常被称为 Master-Slave意为”主从“或”主备“本文档中可能会出现混用的情况。 2. 双活系统的定义是:业务系统中有且仅有两台服务器,其上分别部署一套服务,在业务层看来这两台机器和两套服务是一个完整的系统,对其中的细节业务层不需要感知。双活中的两个节点通常被称为 Master-Slave意为”主从“或”主备“本文档中可能会出现混用的情况。

View File

@ -41,7 +41,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
3. 使用 Websocket 连接,用户也无需安装客户端驱动程序 taosc。 3. 使用 Websocket 连接,用户也无需安装客户端驱动程序 taosc。
4. 连接云服务实例,必须使用 REST 连接 或 Websocket 连接。 4. 连接云服务实例,必须使用 REST 连接 或 Websocket 连接。
一般我们建议使用 **Websocket 连接** **推荐使用 WebSocket 连接**
## 安装客户端驱动 taosc ## 安装客户端驱动 taosc

View File

@ -11,7 +11,9 @@ import TabItem from "@theme/TabItem";
- 减少解析时间通过参数绑定SQL 语句的结构在第一次执行时就已经确定,后续的执行只需要替换参数值,这样可以避免每次执行时都进行语法解析,从而减少解析时间。 - 减少解析时间通过参数绑定SQL 语句的结构在第一次执行时就已经确定,后续的执行只需要替换参数值,这样可以避免每次执行时都进行语法解析,从而减少解析时间。
- 预编译当使用参数绑定时SQL 语句可以被预编译并缓存,后续使用不同的参数值执行时,可以直接使用预编译的版本,提高执行效率。 - 预编译当使用参数绑定时SQL 语句可以被预编译并缓存,后续使用不同的参数值执行时,可以直接使用预编译的版本,提高执行效率。
- 减少网络开销:参数绑定还可以减少发送到数据库的数据量,因为只需要发送参数值而不是完整的 SQL 语句,特别是在执行大量相似的插入或更新操作时,这种差异尤为明显。 - 减少网络开销:参数绑定还可以减少发送到数据库的数据量,因为只需要发送参数值而不是完整的 SQL 语句,特别是在执行大量相似的插入或更新操作时,这种差异尤为明显。
**Tips: 数据写入推荐使用参数绑定方式**
下面我们继续以智能电表为例,展示各语言连接器使用参数绑定高效写入的功能: 下面我们继续以智能电表为例,展示各语言连接器使用参数绑定高效写入的功能:
1. 准备一个参数化的 SQL 插入语句,用于向超级表 `meters` 中插入数据。这个语句允许动态地指定子表名、标签和列值。 1. 准备一个参数化的 SQL 插入语句,用于向超级表 `meters` 中插入数据。这个语句允许动态地指定子表名、标签和列值。

View File

@ -3,7 +3,7 @@ title: taosX 参考手册
sidebar_label: taosX sidebar_label: taosX
--- ---
taosX 是 TDengine 中的一个核心组件提供零代码数据接入的能力taosX 支持两种运行模式:服务模式和命令行模式。本节讲述如何以这两种方式使用 taosX。要想使用 taosX 需要先安装 TDengine Enterprise 安装包。 taosX 是 TDengine Enterprise 中的一个核心组件提供零代码数据接入的能力taosX 支持两种运行模式:服务模式和命令行模式。本节讲述如何以这两种方式使用 taosX。要想使用 taosX 需要先安装 TDengine Enterprise 安装包。
## 命令行模式 ## 命令行模式

View File

@ -3,7 +3,7 @@ title: taosX-Agent 参考手册
sidebar_label: taosX-Agent sidebar_label: taosX-Agent
--- ---
本节讲述如何部署 `Agent` (for `taosX`)。使用之前需要安装 TDengine Enterprise 安装包之后。 本节讲述如何部署 `Agent` (for `taosX`)。使用之前需要安装 TDengine Enterprise 安装包之后taosX-Agent 用于在部分数据接入场景,如 Pi, OPC UA, OPC DA 等对访问数据源有一定限制或者网络环境特殊的场景下,可以将 taosX-Agent 部署在靠近数据源的环境中甚至与数据源在相同的服务器上,由 taosX-Agent 负责从数据源读取数据并发送给 taosX
## 配置 ## 配置

View File

@ -14,17 +14,102 @@ taosKeeper 是 TDengine 3.0 版本监控指标的导出工具,通过简单的
## 安装 ## 安装
taosKeeper 有两种安装方式: taosKeeper 有两种安装方式:
taosKeeper 安装方式:
- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../../get-started/)。 - 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../../get-started/)。
- 单独编译 taosKeeper 并安装,详情请参考 [taosKeeper](https://github.com/taosdata/taoskeeper) 仓库。 - 单独编译 taosKeeper 并安装,详情请参考 [taosKeeper](https://github.com/taosdata/taoskeeper) 仓库。
## 配置和运行方式 ## 配置
### 配置 taosKeeper 需要在操作系统终端执行,该工具支持三种配置方式:命令行参数、环境变量 和 配置文件。优先级为:命令行参数、环境变量、配置文件参数。 一般我们推荐使用配置文件。
taosKeeper 需要在操作系统终端执行,该工具支持三种配置方式:[命令行参数](#命令行参数启动)、[环境变量](#环境变量启动) 和 [配置文件](#配置文件启动)。优先级为:命令行参数、环境变量、配置文件参数。 ### 命令行参数和环境变量
命令行参数 和 环境变量说明可以参考命令 `taoskeeper --help` 的输出。下面是一个例子:
```shell
Usage of taosKeeper v3.3.2.0:
--debug enable debug mode. Env "TAOS_KEEPER_DEBUG"
-P, --port int http port. Env "TAOS_KEEPER_PORT" (default 6043)
--logLevel string log level (panic fatal error warn warning info debug trace). Env "TAOS_KEEPER_LOG_LEVEL" (default "info")
--gopoolsize int coroutine size. Env "TAOS_KEEPER_POOL_SIZE" (default 50000)
-R, --RotationInterval string interval for refresh metrics, such as "300ms", Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Env "TAOS_KEEPER_ROTATION_INTERVAL" (default "15s")
--tdengine.host string TDengine server's ip. Env "TAOS_KEEPER_TDENGINE_HOST" (default "127.0.0.1")
--tdengine.port int TDengine REST server(taosAdapter)'s port. Env "TAOS_KEEPER_TDENGINE_PORT" (default 6041)
--tdengine.username string TDengine server's username. Env "TAOS_KEEPER_TDENGINE_USERNAME" (default "root")
--tdengine.password string TDengine server's password. Env "TAOS_KEEPER_TDENGINE_PASSWORD" (default "taosdata")
--tdengine.usessl TDengine server use ssl or not. Env "TAOS_KEEPER_TDENGINE_USESSL"
--metrics.prefix string prefix in metrics names. Env "TAOS_KEEPER_METRICS_PREFIX"
--metrics.database.name string database for storing metrics data. Env "TAOS_KEEPER_METRICS_DATABASE" (default "log")
--metrics.tables stringArray export some tables that are not super table, multiple values split with white space. Env "TAOS_KEEPER_METRICS_TABLES"
--environment.incgroup whether running in cgroup. Env "TAOS_KEEPER_ENVIRONMENT_INCGROUP"
--log.path string log path. Env "TAOS_KEEPER_LOG_PATH" (default "/var/log/taos")
--log.rotationCount uint log rotation count. Env "TAOS_KEEPER_LOG_ROTATION_COUNT" (default 5)
--log.rotationTime duration log rotation time. Env "TAOS_KEEPER_LOG_ROTATION_TIME" (default 24h0m0s)
--log.rotationSize string log rotation size(KB MB GB), must be a positive integer. Env "TAOS_KEEPER_LOG_ROTATION_SIZE" (default "100000000")
-c, --config string config path default /etc/taos/taoskeeper.toml
-V, --version Print the version and exit
-h, --help Print this help message and exit
```
### 配置文件
taosKeeper 支持用 `taoskeeper -c <keeper config file>` 命令来指定配置文件。
若不指定配置文件taosKeeper 会使用默认配置文件,其路径为: `/etc/taos/taoskeeper.toml`
若既不指定 taosKeeper 配置文件,且 `/etc/taos/taoskeeper.toml` 也不存在,将使用默认配置。
**下面是配置文件的示例:**
```toml
# Start with debug middleware for gin
debug = false
# Listen port, default is 6043
port = 6043
# log level
loglevel = "info"
# go pool size
gopoolsize = 50000
# interval for metrics
RotationInterval = "15s"
[tdengine]
host = "127.0.0.1"
port = 6041
username = "root"
password = "taosdata"
usessl = false
[metrics]
# metrics prefix in metrics names.
prefix = "taos"
# export some tables that are not super table
tables = []
# database for storing metrics data
[metrics.database]
name = "log"
# database options for db storing metrics data
[metrics.database.options]
vgroups = 1
buffer = 64
KEEP = 90
cachemodel = "both"
[environment]
# Whether running in cgroup.
incgroup = false
[log]
rotationCount = 5
rotationTime = "24h"
rotationSize = 100000000
```
## 启动
**在运行 taosKeeper 之前要确保 TDengine 集群与 taosAdapter 已经在正确运行。** 并且 TDengine 已经开启监控服务TDengine 配置文件 `taos.cfg` 中至少需要配置 `monitor``monitorFqdn` **在运行 taosKeeper 之前要确保 TDengine 集群与 taosAdapter 已经在正确运行。** 并且 TDengine 已经开启监控服务TDengine 配置文件 `taos.cfg` 中至少需要配置 `monitor``monitorFqdn`
@ -36,8 +121,6 @@ monitorFqdn localhost # taoskeeper 服务的 FQDN
TDengine 监控配置相关,具体请参考:[TDengine 监控配置](../../../operation/monitor)。 TDengine 监控配置相关,具体请参考:[TDengine 监控配置](../../../operation/monitor)。
### 启动
<Tabs> <Tabs>
<TabItem label="Linux" value="linux"> <TabItem label="Linux" value="linux">
@ -79,8 +162,8 @@ Active: inactive (dead)
- `systemctl` 命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 `sudo` - `systemctl` 命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 `sudo`
- 如果系统中不支持 `systemd`,也可以用手动运行 `/usr/local/taos/bin/taoskeeper` 方式启动 taoskeeper 服务。 - 如果系统中不支持 `systemd`,也可以用手动运行 `/usr/local/taos/bin/taoskeeper` 方式启动 taoskeeper 服务。
- 故障排查: - 故障排查:如果服务异常请查看日志获取更多信息。日志文件默认放在 `/var/log/taos` 下。
- 如果服务异常请查看系统日志获取更多信息。
::: :::
</TabItem> </TabItem>
@ -100,8 +183,7 @@ Active: inactive (dead)
- `launchctl` 命令管理`com.tdengine.taoskeeper`需要管理员权限,务必在前面加 `sudo` 来增强安全性。 - `launchctl` 命令管理`com.tdengine.taoskeeper`需要管理员权限,务必在前面加 `sudo` 来增强安全性。
- `sudo launchctl list | grep taoskeeper` 指令返回的第一列是 `taoskeeper` 程序的 PID若为 `-` 则说明 taoskeeper 服务未运行。 - `sudo launchctl list | grep taoskeeper` 指令返回的第一列是 `taoskeeper` 程序的 PID若为 `-` 则说明 taoskeeper 服务未运行。
- 故障排查: - 故障排查:如果服务异常请查看日志获取更多信息。日志文件默认放在 `/var/log/taos` 下。
- 如果服务异常请查看系统日志获取更多信息。
::: :::
@ -109,88 +191,82 @@ Active: inactive (dead)
</Tabs> </Tabs>
#### 配置文件启动 ## 健康检查
执行以下命令即可快速体验 taosKeeper。当不指定 taosKeeper 配置文件时,优先使用 `/etc/taos/taoskeeper.toml` 配置,否则将使用默认配置。 可以访问 taosKeeper 的 `check_health` 接口来判断服务是否存活,如果服务正常则会返回 HTTP 200 状态码:
```
$ curl -i http://127.0.0.1:6043/check_health
```
返回结果:
```
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Wed, 07 Aug 2024 06:19:50 GMT
Content-Length: 21
{"version":"3.3.2.3"}
```
## 数据收集与监控
taosKeeper 作为 TDengine 监控指标的导出工具,可以将 TDengine 产生的监控数据记录在指定数据库中(默认的监控数据是 `log`),这些监控数据可以用来配置 TDengine 监控。
### 查看监控数据
可以查看 `log` 库下的超级表,每个超级表都对应一组监控指标,具体指标不再赘述。
```shell ```shell
$ taoskeeper -c <keeper config file> taos> use log;
``` Database changed.
**下面是配置文件的示例:** taos> show stables;
```toml stable_name |
# gin 框架是否启用 debug =================================
debug = false taosd_dnodes_status |
taosd_vnodes_info |
# 服务监听端口, 默认为 6043 keeper_monitor |
port = 6043 taosd_vgroups_info |
taos_sql_req |
# 日志级别,包含 panic、error、info、debug、trace等 taos_slow_sql |
loglevel = "info" taosd_mnodes_info |
taosd_cluster_info |
# 程序中使用协程池的大小 taosd_sql_req |
gopoolsize = 50000 taosd_dnodes_info |
adapter_requests |
# 查询 TDengine 监控数据轮询间隔 taosd_cluster_basic |
RotationInterval = "15s" taosd_dnodes_data_dirs |
taosd_dnodes_log_dirs |
[tdengine] Query OK, 14 row(s) in set (0.006542s)
host = "127.0.0.1"
port = 6041
username = "root"
password = "taosdata"
[metrics]
# 监控指标前缀
prefix = "taos"
# 存放监控数据的数据库
database = "log"
# 指定需要监控的普通表
tables = []
# 监控数据的配置选项
[metrics.databaseoptions]
cachemodel = "none"
[environment]
# 容器模式收集信息
incgroup = false
[log]
# 日志文件滚动个数
rotationCount = 5
# 日志文件切割时间
rotationTime = "24h"
# 日志文件切割大小 (字节)
rotationSize = 100000000
``` ```
### 获取监控指标 可以查看一个超级表的最近一条上报记录,如:
taosKeeper 作为 TDengine 监控指标的导出工具,可以将 TDengine 产生的监控数据记录在指定数据库中,并提供导出接口。 ``` shell
taos> select last_row(*) from taosd_dnodes_info;
#### 查看监控结果集 last_row(_ts) | last_row(disk_engine) | last_row(system_net_in) | last_row(vnodes_num) | last_row(system_net_out) | last_row(uptime) | last_row(has_mnode) | last_row(io_read_disk) | last_row(error_log_count) | last_row(io_read) | last_row(cpu_cores) | last_row(has_qnode) | last_row(has_snode) | last_row(disk_total) | last_row(mem_engine) | last_row(info_log_count) | last_row(cpu_engine) | last_row(io_write_disk) | last_row(debug_log_count) | last_row(disk_used) | last_row(mem_total) | last_row(io_write) | last_row(masters) | last_row(cpu_system) | last_row(trace_log_count) | last_row(mem_free) |
======================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================
```shell 2024-08-07 14:54:09.174 | 0.000000000000000 | 3379.093240947399863 | 37.000000000000000 | 5265.998201139278535 | 64402.000000000000000 | 1.000000000000000 | 8323.261934108399146 | 6.000000000000000 | 40547.386655118425551 | 16.000000000000000 | 0.000000000000000 | 0.000000000000000 | 5.272955781120000e+11 | 2443032.000000000000000 | 423.000000000000000 | 0.556269622200215 | 677731.836503547732718 | 356380.000000000000000 | 4.997186764800000e+10 | 65557284.000000000000000 | 714177.054532129666768 | 37.000000000000000 | 2.642280705451021 | 0.000000000000000 | 11604276.000000000000000 |
$ taos Query OK, 1 row(s) in set (0.003168s)
# 如上示例,使用 log 库作为监控日志存储位置
> use log;
> select * from taosd_cluster_info limit 1;
``` ```
结果示例:
```shell ### 使用 TDInsight 配置监控
_ts | cluster_uptime | dbs_total | tbs_total | stbs_total | vgroups_total | vgroups_alive | vnodes_total | vnodes_alive | mnodes_total | mnodes_alive | connections_total | topics_total | streams_total | dnodes_total | dnodes_alive | grants_expire_time | grants_timeseries_used | grants_timeseries_total | cluster_id |
===================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================
2024-06-04 03:03:34.341 | 0.000000000000000 | 2.000000000000000 | 1.000000000000000 | 4.000000000000000 | 4.000000000000000 | 4.000000000000000 | 4.000000000000000 | 4.000000000000000 | 1.000000000000000 | 1.000000000000000 | 2.000000000000000 | 0.000000000000000 | 0.000000000000000 | 1.000000000000000 | 1.000000000000000 | 0.000000000000000 | 3.000000000000000 | 0.000000000000000 | 554014120921134497 |
Query OK, 1 row(s) in set (0.001652s)
```
#### 导出监控指标 收集到监控数据以后,就可以使用 TDInsight 来配置 TDengine 的监控,具体请参考 [TDinsight 参考手册](../tdinsight/)
## 集成 Prometheus
taoskeeper 提供了 `/metrics` 接口,返回了 Prometheus 格式的监控数据Prometheus 可以从 taoskeeper 抽取监控数据,实现通过 Prometheus 监控 TDengine 的目的。
### 导出监控指标
下面通过 `curl` 命令展示 `/metrics` 接口返回的数据格式:
```shell ```shell
$ curl http://127.0.0.1:6043/metrics $ curl http://127.0.0.1:6043/metrics
@ -219,28 +295,8 @@ taos_cluster_info_first_ep{cluster_id="554014120921134497",value="tdengine:6030"
taos_cluster_info_first_ep_dnode_id{cluster_id="554014120921134497"} 1 taos_cluster_info_first_ep_dnode_id{cluster_id="554014120921134497"} 1
``` ```
### check\_health
``` ### 抽取配置
$ curl -i http://127.0.0.1:6043/check_health
```
返回结果:
```
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Mon, 03 Apr 2023 07:20:38 GMT
Content-Length: 19
{"version":"1.0.0"}
```
### 集成 Prometheus
taoskeeper 提供了 `/metrics` 接口,返回了 Prometheus 格式的监控数据Prometheus 可以从 taoskeeper 抽取监控数据,实现通过 Prometheus 监控 TDengine 的目的。
#### 抽取配置
Prometheus 提供了 `scrape_configs` 配置如何从 endpoint 抽取监控数据,通常只需要修改 `static_configs` 中的 targets 配置为 taoskeeper 的 endpoint 地址,更多配置信息请参考 [Prometheus 配置文档](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config)。 Prometheus 提供了 `scrape_configs` 配置如何从 endpoint 抽取监控数据,通常只需要修改 `static_configs` 中的 targets 配置为 taoskeeper 的 endpoint 地址,更多配置信息请参考 [Prometheus 配置文档](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config)。
@ -255,7 +311,7 @@ scrape_configs:
- targets: ["localhost:6043"] - targets: ["localhost:6043"]
``` ```
#### Dashboard ### Dashboard
我们提供了 `TaosKeeper Prometheus Dashboard for 3.x` dashboard提供了和 TDinsight 类似的监控 dashboard。 我们提供了 `TaosKeeper Prometheus Dashboard for 3.x` dashboard提供了和 TDinsight 类似的监控 dashboard。

View File

@ -28,13 +28,14 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
TDengine 版本更新往往会增加新的功能特性,列表中的连接器版本为连接器最佳适配版本。 TDengine 版本更新往往会增加新的功能特性,列表中的连接器版本为连接器最佳适配版本。
| **TDengine 版本** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** | | **TDengine 版本** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** |
| ---------------------- | --------- | ---------- | ------------ | ------------- | --------------- | -------- | | ---------------------- | ------------- | ------------------------------------------- | ------------ | ------------- | --------------- | -------- |
| **3.0.0.0 及以上** | 3.0.2以上 | 当前版本 | 3.0 分支 | 3.0.0 | 3.1.0 | 当前版本 | | **3.3.0.0 及以上** | 3.3.2.0及以上 | taospy 2.7.15及以上taos-ws-py 0.3.2及以上 | 3.5.5及以上 | 3.1.3及以上 | 3.1.0及以上 | 当前版本 |
| **2.4.0.14 及以上** | 2.0.38 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 | | **3.0.0.0 及以上** | 3.0.2以上 | 当前版本 | 3.0 分支 | 3.0.0 | 3.1.0 | 当前版本 |
| **2.4.0.4 - 2.4.0.13** | 2.0.37 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 | | **2.4.0.14 及以上** | 2.0.38 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 |
| **2.2.x.x ** | 2.0.36 | 当前版本 | master 分支 | n/a | 2.0.7 - 2.0.9 | 当前版本 | | **2.4.0.4 - 2.4.0.13** | 2.0.37 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 |
| **2.0.x.x ** | 2.0.34 | 当前版本 | master 分支 | n/a | 2.0.1 - 2.0.6 | 当前版本 | | **2.2.x.x ** | 2.0.36 | 当前版本 | master 分支 | n/a | 2.0.7 - 2.0.9 | 当前版本 |
| **2.0.x.x ** | 2.0.34 | 当前版本 | master 分支 | n/a | 2.0.1 - 2.0.6 | 当前版本 |
## 功能特性 ## 功能特性

View File

@ -25,9 +25,9 @@ extern "C" {
#include "tarray.h" #include "tarray.h"
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
#include "tsimplehash.h"
#include "tmsg.h" #include "tmsg.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "tsimplehash.h"
typedef enum { typedef enum {
JOB_TASK_STATUS_NULL = 0, JOB_TASK_STATUS_NULL = 0,
@ -69,16 +69,16 @@ typedef enum {
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0) #define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
#define QUERY_MSG_MASK_AUDIT() (1 << 1) #define QUERY_MSG_MASK_AUDIT() (1 << 1)
#define QUERY_MSG_MASK_VIEW() (1 << 2) #define QUERY_MSG_MASK_VIEW() (1 << 2)
#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0) #define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
#define TEST_AUDIT_MASK(m) (((m) & QUERY_MSG_MASK_AUDIT()) != 0) #define TEST_AUDIT_MASK(m) (((m)&QUERY_MSG_MASK_AUDIT()) != 0)
#define TEST_VIEW_MASK(m) (((m) & QUERY_MSG_MASK_VIEW()) != 0) #define TEST_VIEW_MASK(m) (((m)&QUERY_MSG_MASK_VIEW()) != 0)
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision uint8_t precision; // the number of precision
col_id_t numOfColumns; // the number of columns col_id_t numOfColumns; // the number of columns
int16_t numOfPKs; int16_t numOfPKs;
int32_t rowSize; // row size of the schema int32_t rowSize; // row size of the schema
} STableComInfo; } STableComInfo;
typedef struct SIndexMeta { typedef struct SIndexMeta {
@ -119,8 +119,9 @@ typedef struct STableMeta {
int32_t sversion; int32_t sversion;
int32_t tversion; int32_t tversion;
STableComInfo tableInfo; STableComInfo tableInfo;
SSchemaExt* schemaExt; // There is no additional memory allocation, and the pointer is fixed to the next address of the schema content. SSchemaExt* schemaExt; // There is no additional memory allocation, and the pointer is fixed to the next address of
SSchema schema[]; // the schema content.
SSchema schema[];
} STableMeta; } STableMeta;
#pragma pack(pop) #pragma pack(pop)
@ -196,9 +197,9 @@ typedef struct SBoundColInfo {
} SBoundColInfo; } SBoundColInfo;
typedef struct STableColsData { typedef struct STableColsData {
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
SArray* aCol; SArray* aCol;
bool getFromHash; bool getFromHash;
} STableColsData; } STableColsData;
typedef struct STableVgUid { typedef struct STableVgUid {
@ -207,15 +208,14 @@ typedef struct STableVgUid {
} STableVgUid; } STableVgUid;
typedef struct STableBufInfo { typedef struct STableBufInfo {
void* pCurBuff; void* pCurBuff;
SArray* pBufList; SArray* pBufList;
int64_t buffUnit; int64_t buffUnit;
int64_t buffSize; int64_t buffSize;
int64_t buffIdx; int64_t buffIdx;
int64_t buffOffset; int64_t buffOffset;
} STableBufInfo; } STableBufInfo;
typedef struct STableDataCxt { typedef struct STableDataCxt {
STableMeta* pMeta; STableMeta* pMeta;
STSchema* pSchema; STSchema* pSchema;
@ -237,23 +237,22 @@ typedef struct SStbInterlaceInfo {
void* pRequest; void* pRequest;
uint64_t requestId; uint64_t requestId;
int64_t requestSelf; int64_t requestSelf;
bool tbFromHash; bool tbFromHash;
SHashObj* pVgroupHash; SHashObj* pVgroupHash;
SArray* pVgroupList; SArray* pVgroupList;
SSHashObj* pTableHash; SSHashObj* pTableHash;
int64_t tbRemainNum; int64_t tbRemainNum;
STableBufInfo tbBuf; STableBufInfo tbBuf;
char firstName[TSDB_TABLE_NAME_LEN]; char firstName[TSDB_TABLE_NAME_LEN];
STSchema *pTSchema; STSchema* pTSchema;
STableDataCxt *pDataCtx; STableDataCxt* pDataCtx;
void *boundTags; void* boundTags;
bool tableColsReady; bool tableColsReady;
SArray *pTableCols; SArray* pTableCols;
int32_t pTableColsIdx; int32_t pTableColsIdx;
} SStbInterlaceInfo; } SStbInterlaceInfo;
typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param); typedef int32_t (*__async_exec_fn_t)(void* param);
@ -308,6 +307,8 @@ void destroyAhandle(void* ahandle);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
bool persistHandle, void* ctx); bool persistHandle, void* ctx);
int32_t asyncFreeConnById(void* pTransporter, int64_t pid);
;
/** /**
* Asynchronously send message to server, after the response received, the callback will be incured. * Asynchronously send message to server, after the response received, the callback will be incured.
* *
@ -325,7 +326,7 @@ void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema(); const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t getAsofJoinReverseOp(EOperatorType op); int32_t getAsofJoinReverseOp(EOperatorType op);
int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta); int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta);
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
@ -384,7 +385,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \ (_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \
(_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA || (_type) == TDMT_MND_DROP_TB_WITH_TSMA) (_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA || (_type) == TDMT_MND_DROP_TB_WITH_TSMA)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \

View File

@ -125,6 +125,7 @@ typedef struct SRpcInit {
int32_t timeToGetConn; int32_t timeToGetConn;
int8_t supportBatch; // 0: no batch, 1. batch int8_t supportBatch; // 0: no batch, 1. batch
int32_t batchSize; int32_t batchSize;
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
void *parent; void *parent;
} SRpcInit; } SRpcInit;
@ -158,18 +159,21 @@ void *rpcReallocCont(void *ptr, int64_t contLen);
// Because taosd supports multi-process mode // Because taosd supports multi-process mode
// These functions should not be used on the server side // These functions should not be used on the server side
// Please use tmsg<xx> functions, which are defined in tmsgcb.h // Please use tmsg<xx> functions, which are defined in tmsgcb.h
int rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); int32_t rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
int rpcSendResponse(const SRpcMsg *pMsg); int32_t rpcSendResponse(const SRpcMsg *pMsg);
int rpcRegisterBrokenLinkArg(SRpcMsg *msg); int32_t rpcRegisterBrokenLinkArg(SRpcMsg *msg);
int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock int32_t rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
// These functions will not be called in the child process // These functions will not be called in the child process
int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int32_t rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int32_t rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated, int32_t rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated,
int32_t timeoutMs); int32_t timeoutMs);
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
void *rpcAllocHandle(); int32_t rpcFreeConnById(void *shandle, int64_t connId);
int32_t rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
int32_t rpcAllocHandle(int64_t *refId);
int32_t rpcSetIpWhite(void *thandl, void *arg); int32_t rpcSetIpWhite(void *thandl, void *arg);
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf); int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);

View File

@ -387,6 +387,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.supportBatch = 1; rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024; rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.notWaitAvaliableConn = 1;
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));

View File

@ -202,6 +202,7 @@ typedef struct SExchangeInfo {
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
char* pTaskId; char* pTaskId;
SArray* pFetchRpcHandles;
} SExchangeInfo; } SExchangeInfo;
typedef struct SScanInfo { typedef struct SScanInfo {

View File

@ -298,13 +298,13 @@ _end:
pTaskInfo->code = code; pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
(*ppRes) = NULL; (*ppRes) = NULL;
return code; return code;
} }
static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
int32_t code = loadRemoteDataNext(pOperator, &pRes); int32_t code = loadRemoteDataNext(pOperator, &pRes);
return pRes; return pRes;
} }
@ -346,6 +346,14 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources); qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
if (!pInfo->pFetchRpcHandles) {
return terrno;
}
void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
if (!ret) {
return terrno;
}
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode)); pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
if (pInfo->pSources == NULL) { if (pInfo->pSources == NULL) {
@ -384,6 +392,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
pInfo->self = taosAddRef(exchangeObjRefPool, pInfo); pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);
return initDataSource(numOfSources, pInfo, id); return initDataSource(numOfSources, pInfo, id);
} }
@ -391,7 +400,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
SOperatorInfo** pOptrInfo) { SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo); QRY_OPTR_CHECK(pOptrInfo);
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -468,6 +477,14 @@ void freeSourceDataInfo(void* p) {
void doDestroyExchangeOperatorInfo(void* param) { void doDestroyExchangeOperatorInfo(void* param) {
SExchangeInfo* pExInfo = (SExchangeInfo*)param; SExchangeInfo* pExInfo = (SExchangeInfo*)param;
for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
if (*pRpcHandle > 0) {
SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
(void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
}
}
taosArrayDestroy(pExInfo->pFetchRpcHandles);
taosArrayDestroy(pExInfo->pSources); taosArrayDestroy(pExInfo->pSources);
taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo); taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
@ -495,6 +512,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
} }
int32_t index = pWrapper->sourceIndex; int32_t index = pWrapper->sourceIndex;
int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
*pRpcHandle = -1;
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
if (!pSourceDataInfo) { if (!pSourceDataInfo) {
return terrno; return terrno;
@ -668,6 +687,8 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
int64_t transporterId = 0; int64_t transporterId = 0;
code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
*pRpcHandle = transporterId;
} }
_end: _end:
@ -691,7 +712,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
if (pColList == NULL) { // data from other sources if (pColList == NULL) { // data from other sources
blockDataCleanup(pRes); blockDataCleanup(pRes);
code = blockDecode(pRes, pData, (const char**) pNextStart); code = blockDecode(pRes, pData, (const char**)pNextStart);
if (code) { if (code) {
return code; return code;
} }

View File

@ -88,20 +88,26 @@ static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
} }
} }
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) { static int32_t overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order, bool* overlap) {
int32_t code = TSDB_CODE_SUCCESS;
STimeWindow w = {0}; STimeWindow w = {0};
// 0 by default, which means it is not a interval operator of the upstream operator. // 0 by default, which means it is not a interval operator of the upstream operator.
if (pInterval->interval == 0) { if (pInterval->interval == 0) {
return false; *overlap = false;
return code;
} }
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.skey); w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.skey);
ASSERT(w.ekey >= pBlockInfo->window.skey); if(w.ekey < pBlockInfo->window.skey) {
qError("w.ekey:%" PRId64 " < pBlockInfo->window.skey:%" PRId64, w.ekey, pBlockInfo->window.skey);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (w.ekey < pBlockInfo->window.ekey) { if (w.ekey < pBlockInfo->window.ekey) {
return true; *overlap = true;
return code;
} }
while (1) { while (1) {
@ -110,17 +116,25 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
break; break;
} }
ASSERT(w.ekey > pBlockInfo->window.ekey); if(w.ekey <= pBlockInfo->window.ekey) {
qError("w.ekey:%" PRId64 " <= pBlockInfo->window.ekey:%" PRId64, w.ekey, pBlockInfo->window.ekey);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) { if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
return true; *overlap = true;
return code;
} }
} }
} else { } else {
w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.ekey); w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.ekey);
ASSERT(w.skey <= pBlockInfo->window.ekey); if(w.skey > pBlockInfo->window.ekey) {
qError("w.skey:%" PRId64 " > pBlockInfo->window.skey:%" PRId64, w.skey, pBlockInfo->window.ekey);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (w.skey > pBlockInfo->window.skey) { if (w.skey > pBlockInfo->window.skey) {
return true; *overlap = true;
return code;
} }
while (1) { while (1) {
@ -129,14 +143,19 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
break; break;
} }
ASSERT(w.skey < pBlockInfo->window.skey); if(w.skey >= pBlockInfo->window.skey){
qError("w.skey:%" PRId64 " >= pBlockInfo->window.skey:%" PRId64, w.skey, pBlockInfo->window.skey);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) { if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
return true; *overlap = true;
return code;
} }
} }
} }
return false; *overlap = false;
return code;
} }
// this function is for table scanner to extract temporary results of upstream aggregate results. // this function is for table scanner to extract temporary results of upstream aggregate results.
@ -319,9 +338,18 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
bool loadSMA = false; bool loadSMA = false;
*status = pTableScanInfo->dataBlockLoadFlag; *status = pTableScanInfo->dataBlockLoadFlag;
if (pOperator->exprSupp.pFilterInfo != NULL || if (pOperator->exprSupp.pFilterInfo != NULL) {
overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD; (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
} else {
bool overlap = false;
int ret =
overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order, &overlap);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (overlap) {
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
}
} }
SDataBlockInfo* pBlockInfo = &pBlock->info; SDataBlockInfo* pBlockInfo = &pBlock->info;
@ -358,7 +386,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
} }
} }
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); if(*status != FUNC_DATA_REQUIRED_DATA_LOAD) {
qError("[loadDataBlock] invalid status:%d", *status);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
// try to filter data block according to sma info // try to filter data block according to sma info
if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) { if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
@ -413,7 +444,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
return code; return code;
} }
ASSERT(p == pBlock); if(p != pBlock) {
qError("[loadDataBlock] p != pBlock");
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
// restore the previous value // restore the previous value

View File

@ -156,6 +156,10 @@ _exit:
int32_t executeGeomFromTextFunc(SColumnInfoData *pInputData, int32_t i, SColumnInfoData *pOutputData) { int32_t executeGeomFromTextFunc(SColumnInfoData *pInputData, int32_t i, SColumnInfoData *pOutputData) {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
if (!IS_VAR_DATA_TYPE((pInputData)->info.type)) {
return TSDB_CODE_FUNC_FUNTION_PARA_VALUE;
}
char *input = colDataGetData(pInputData, i); char *input = colDataGetData(pInputData, i);
unsigned char *output = NULL; unsigned char *output = NULL;

View File

@ -147,7 +147,17 @@ static int32_t initWktRegex(pcre2_code **ppRegex, pcre2_match_data **ppMatchData
"*)(([-+]?[0-9]+\\.?[0-9]*)|([-+]?[0-9]*\\.?[0-9]+))(e[-+]?[0-9]+)?){1,3}( *))*( *)\\)))( *))*( *)\\)))( *))*( " "*)(([-+]?[0-9]+\\.?[0-9]*)|([-+]?[0-9]*\\.?[0-9]+))(e[-+]?[0-9]+)?){1,3}( *))*( *)\\)))( *))*( *)\\)))( *))*( "
"*)\\)))|(GEOCOLLECTION\\((?R)(( *)(,)( *)(?R))*( *)\\))( *)$"); "*)\\)))|(GEOCOLLECTION\\((?R)(( *)(,)( *)(?R))*( *)\\))( *)$");
code = doRegComp(ppRegex, ppMatchData, wktPatternWithSpace); pcre2_code *pRegex = NULL;
pcre2_match_data *pMatchData = NULL;
code = doRegComp(&pRegex, &pMatchData, wktPatternWithSpace);
if (code < 0) {
taosMemoryFree(wktPatternWithSpace);
return TSDB_CODE_OUT_OF_MEMORY;
}
*ppRegex = pRegex;
*ppMatchData = pMatchData;
taosMemoryFree(wktPatternWithSpace); taosMemoryFree(wktPatternWithSpace);
return code; return code;
} }

View File

@ -225,6 +225,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
.code = 0 .code = 0
}; };
TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId); TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
if (code) { if (code) {
destroySendMsgInfo(pInfo); destroySendMsgInfo(pInfo);
@ -235,6 +236,9 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) { int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
} }
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
return rpcFreeConnById(pTransporter, pid);
}
char* jobTaskStatusStr(int32_t status) { char* jobTaskStatusStr(int32_t status) {
switch (status) { switch (status) {
@ -448,13 +452,13 @@ void parseTagDatatoJson(void* p, char** jsonStr) {
if (value == NULL) { if (value == NULL) {
goto end; goto end;
} }
if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
goto end; goto end;
} }
} else if (type == TSDB_DATA_TYPE_NCHAR) { } else if (type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL; cJSON* value = NULL;
if (pTagVal->nData > 0) { if (pTagVal->nData > 0) {
char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1); char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
if (tagJsonValue == NULL) { if (tagJsonValue == NULL) {
goto end; goto end;
} }
@ -479,7 +483,7 @@ void parseTagDatatoJson(void* p, char** jsonStr) {
goto end; goto end;
} }
if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
goto end; goto end;
} }
} else if (type == TSDB_DATA_TYPE_DOUBLE) { } else if (type == TSDB_DATA_TYPE_DOUBLE) {
@ -488,7 +492,7 @@ void parseTagDatatoJson(void* p, char** jsonStr) {
if (value == NULL) { if (value == NULL) {
goto end; goto end;
} }
if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
goto end; goto end;
} }
} else if (type == TSDB_DATA_TYPE_BOOL) { } else if (type == TSDB_DATA_TYPE_BOOL) {
@ -497,7 +501,7 @@ void parseTagDatatoJson(void* p, char** jsonStr) {
if (value == NULL) { if (value == NULL) {
goto end; goto end;
} }
if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
goto end; goto end;
} }
} else { } else {

View File

@ -464,7 +464,7 @@ struct SFilterInfo {
(colInfo).type = RANGE_TYPE_UNIT; \ (colInfo).type = RANGE_TYPE_UNIT; \
(colInfo).dataType = FILTER_UNIT_DATA_TYPE(u); \ (colInfo).dataType = FILTER_UNIT_DATA_TYPE(u); \
if (taosArrayPush((SArray *)((colInfo).info), &u) == NULL) { \ if (taosArrayPush((SArray *)((colInfo).info), &u) == NULL) { \
FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); \ FLT_ERR_RET(terrno); \
} \ } \
} while (0) } while (0)
#define FILTER_PUSH_VAR_HASH(colInfo, ha) \ #define FILTER_PUSH_VAR_HASH(colInfo, ha) \
@ -481,6 +481,9 @@ struct SFilterInfo {
#define FILTER_COPY_IDX(dst, src, n) \ #define FILTER_COPY_IDX(dst, src, n) \
do { \ do { \
*(dst) = taosMemoryMalloc(sizeof(uint32_t) * n); \ *(dst) = taosMemoryMalloc(sizeof(uint32_t) * n); \
if (NULL == *(dst)) { \
FLT_ERR_JRET(terrno); \
} \
(void)memcpy(*(dst), src, sizeof(uint32_t) * n); \ (void)memcpy(*(dst), src, sizeof(uint32_t) * n); \
} while (0) } while (0)

View File

@ -17,11 +17,11 @@
#include "command.h" #include "command.h"
#include "query.h" #include "query.h"
#include "schInt.h" #include "schInt.h"
#include "tglobal.h"
#include "tmisce.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "tglobal.h"
#include "tmisce.h"
// clang-format off // clang-format off
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
@ -975,11 +975,13 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask)); SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
if (isHb && persistHandle && trans->pHandle == 0) { if (isHb && persistHandle && trans->pHandle == 0) {
trans->pHandle = rpcAllocHandle(); int64_t refId = 0;
if (NULL == trans->pHandle) { code = rpcAllocHandle(&refId);
SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", terrno); if (code != 0) {
SCH_ERR_JRET(terrno); SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
SCH_ERR_JRET(code);
} }
trans->pHandle = (void *)refId;
} }
if (pJob && pTask) { if (pJob && pTask) {
@ -1200,7 +1202,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
} }
persistHandle = true; persistHandle = true;
SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); int64_t refId = 0;
code = rpcAllocHandle(&refId);
if (code != 0) {
SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
SCH_ERR_JRET(code);
}
SCH_SET_TASK_HANDLE(pTask, (void *)refId);
break; break;
} }
case TDMT_SCH_FETCH: case TDMT_SCH_FETCH:

View File

@ -148,7 +148,6 @@ typedef struct {
STransSyncMsg* pSyncMsg; // for syncchronous with timeout API STransSyncMsg* pSyncMsg; // for syncchronous with timeout API
int64_t syncMsgRef; int64_t syncMsgRef;
SCvtAddr cvtAddr; SCvtAddr cvtAddr;
bool setMaxRetry;
int32_t retryMinInterval; int32_t retryMinInterval;
int32_t retryMaxInterval; int32_t retryMaxInterval;
@ -207,7 +206,7 @@ typedef struct {
#pragma pack(pop) #pragma pack(pop)
typedef enum { Normal, Quit, Release, Register, Update } STransMsgType; typedef enum { Normal, Quit, Release, Register, Update, FreeById } STransMsgType;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
@ -304,10 +303,10 @@ int32_t transClearBuffer(SConnBuffer* buf);
int32_t transDestroyBuffer(SConnBuffer* buf); int32_t transDestroyBuffer(SConnBuffer* buf);
int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf); bool transReadComplete(SConnBuffer* connBuf);
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
int transSetConnOption(uv_tcp_t* stream, int keepalive); int32_t transSetConnOption(uv_tcp_t* stream, int keepalive);
void transRefSrvHandle(void* handle); void transRefSrvHandle(void* handle);
void transUnrefSrvHandle(void* handle); void transUnrefSrvHandle(void* handle);
@ -315,21 +314,24 @@ void transUnrefSrvHandle(void* handle);
void transRefCliHandle(void* handle); void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle); void transUnrefCliHandle(void* handle);
int transReleaseCliHandle(void* handle); int32_t transReleaseCliHandle(void* handle);
int transReleaseSrvHandle(void* handle); int32_t transReleaseSrvHandle(void* handle);
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs); int32_t timeoutMs);
int transSendResponse(const STransMsg* msg); int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId);
int transRegisterMsg(const STransMsg* msg); int32_t transFreeConnById(void* shandle, int64_t transpointId);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int32_t transSendResponse(const STransMsg* msg);
int32_t transRegisterMsg(const STransMsg* msg);
int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func); int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
int transSockInfo2Str(struct sockaddr* sockname, char* dst); int32_t transSockInfo2Str(struct sockaddr* sockname, char* dst);
int64_t transAllocHandle(); int32_t transAllocHandle(int64_t* refId);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);

View File

@ -58,6 +58,8 @@ typedef struct {
int32_t failFastThreshold; int32_t failFastThreshold;
int32_t failFastInterval; int32_t failFastInterval;
int8_t notWaitAvaliableConn; // 1: no delay, 0: delay
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType); bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType);

View File

@ -102,6 +102,8 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pRpc->timeToGetConn == 0) { if (pRpc->timeToGetConn == 0) {
pRpc->timeToGetConn = 10 * 1000; pRpc->timeToGetConn = 10 * 1000;
} }
pRpc->notWaitAvaliableConn = pInit->notWaitAvaliableConn;
pRpc->tcphandle = pRpc->tcphandle =
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
@ -163,38 +165,48 @@ void* rpcReallocCont(void* ptr, int64_t contLen) {
return st + TRANS_MSG_OVERHEAD; return st + TRANS_MSG_OVERHEAD;
} }
int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
return transSendRequest(shandle, pEpSet, pMsg, NULL); return transSendRequest(shandle, pEpSet, pMsg, NULL);
} }
int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
return transSendRequest(shandle, pEpSet, pMsg, pCtx); if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0|| pRid == NULL) {
} return transSendRequest(shandle, pEpSet, pMsg, pCtx);
int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { } else {
return transSendRecv(shandle, pEpSet, pMsg, pRsp); return transSendRequestWithId(shandle, pEpSet, pMsg, pRid);
} }
int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs) {
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
} }
int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } int32_t rpcSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
return transSendRequestWithId(shandle, pEpSet, pReq, transpointId);
}
int32_t rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
}
int32_t rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs) {
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
}
int32_t rpcFreeConnById(void* shandle, int64_t connId) { return transFreeConnById(shandle, connId); }
int32_t rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); } void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); }
void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); }
int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } int32_t rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); } int32_t rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); }
// client only // client only
int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { int32_t rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
// later // later
return transSetDefaultAddr(thandle, ip, fqdn); return transSetDefaultAddr(thandle, ip, fqdn);
} }
// server only // server only
int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); } int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); }
void* rpcAllocHandle() { return (void*)transAllocHandle(); } int32_t rpcAllocHandle(int64_t* refId) { return transAllocHandle(refId); }
int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); } int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); }
int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) { int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) {

View File

@ -213,8 +213,10 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd);
cliHandleUpdate};
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
NULL, cliHandleUpdate, cliHandleFreeById};
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, /// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
/// NULL,cliHandleUpdate}; /// NULL,cliHandleUpdate};
@ -660,7 +662,9 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
if (QUEUE_IS_EMPTY(&plist->conns)) { if (QUEUE_IS_EMPTY(&plist->conns)) {
if (plist->list->numOfConn >= pTranInst->connLimitNum) { if (plist->list->numOfConn >= pTranInst->connLimitNum) {
*exceed = true; *exceed = true;
return NULL;;
} }
plist->list->numOfConn++;
return NULL; return NULL;
} }
@ -704,7 +708,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
SMsgList* list = plist->list; SMsgList* list = plist->list;
if ((list)->numOfConn >= pTransInst->connLimitNum) { if ((list)->numOfConn >= pTransInst->connLimitNum) {
STraceId* trace = &(*pMsg)->msg.info.traceId; STraceId* trace = &(*pMsg)->msg.info.traceId;
if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) { if (pTransInst->notWaitAvaliableConn || (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType))) {
tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType), tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType),
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
@ -899,10 +903,12 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
exh->handle = conn; exh->handle = conn;
exh->pThrd = conn->hostThrd; exh->pThrd = conn->hostThrd;
taosWUnLockLatch(&exh->latch); taosWUnLockLatch(&exh->latch);
conn->refId = exh->refId; conn->refId = exh->refId;
taosWUnLockLatch(&exh->latch); taosWUnLockLatch(&exh->latch);
tDebug("conn %p specified by %"PRId64"", conn, handle);
(void)transReleaseExHandle(transGetRefMgt(), handle); (void)transReleaseExHandle(transGetRefMgt(), handle);
return 0; return 0;
} }
@ -1035,7 +1041,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
list->size--; list->size--;
} }
} }
conn->list = NULL; conn->list = NULL;
(void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transReleaseExHandle(transGetRefMgt(), conn->refId);
@ -1075,8 +1080,11 @@ static void cliDestroy(uv_handle_t* handle) {
(void)atomic_sub_fetch_32(&pThrd->connCount, 1); (void)atomic_sub_fetch_32(&pThrd->connCount, 1);
if (conn->refId > 0) {
(void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transReleaseExHandle(transGetRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId); (void)transRemoveExHandle(transGetRefMgt(), conn->refId);
}
taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn->stream); taosMemoryFree(conn->stream);
@ -1589,6 +1597,40 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
pThrd->cvtAddr = pCtx->cvtAddr; pThrd->cvtAddr = pCtx->cvtAddr;
destroyCmsg(pMsg); destroyCmsg(pMsg);
} }
static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) {
int32_t code = 0;
int64_t refId = (int64_t)(pMsg->msg.info.handle);
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
tDebug("id %" PRId64 " already released", refId);
destroyCmsg(pMsg);
return;
}
taosRLockLatch(&exh->latch);
SCliConn* conn = exh->handle;
taosRUnLockLatch(&exh->latch);
if (conn == NULL || conn->refId != refId) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception);
}
tDebug("do free conn %p by id %" PRId64 "", conn, refId);
int32_t size = transQueueSize(&conn->cliMsgs);
if (size == 0) {
// already recv, and notify upper layer
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception);
return;
} else {
while (T_REF_VAL_GET(conn) >= 1) transUnrefCliHandle(conn);
}
return;
_exception:
tDebug("already free conn %p by id %" PRId64"", conn, refId);
(void)transReleaseExHandle(transGetRefMgt(), refId);
destroyCmsg(pMsg);
}
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
STransConnCtx* pCtx = (*pMsg)->ctx; STransConnCtx* pCtx = (*pMsg)->ctx;
@ -2759,7 +2801,7 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle); SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
return pThrd; return pThrd;
} }
int transReleaseCliHandle(void* handle) { int32_t transReleaseCliHandle(void* handle) {
int32_t code = 0; int32_t code = 0;
SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle); SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
if (pThrd == NULL) { if (pThrd == NULL) {
@ -2823,25 +2865,25 @@ static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq
cliMsg->type = Normal; cliMsg->type = Normal;
cliMsg->refId = (int64_t)shandle; cliMsg->refId = (int64_t)shandle;
QUEUE_INIT(&cliMsg->seqq); QUEUE_INIT(&cliMsg->seqq);
*pCliMsg = cliMsg; *pCliMsg = cliMsg;
return 0; return 0;
} }
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) { if (pTransInst == NULL) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
return TSDB_CODE_RPC_BROKEN_LINK; return TSDB_CODE_RPC_MODULE_QUIT;
} }
int32_t code = 0; int32_t code = 0;
int64_t handle = (int64_t)pReq->info.handle; int64_t handle = (int64_t)pReq->info.handle;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle);
if (pThrd == NULL) { if (pThrd == NULL) {
transFreeMsg(pReq->pCont); TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception;);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
} }
if (handle != 0) { if (handle != 0) {
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh != NULL) { if (exh != NULL) {
@ -2849,26 +2891,27 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
if (exh->handle == NULL && exh->inited != 0) { if (exh->handle == NULL && exh->inited != 0) {
SCliMsg* pCliMsg = NULL; SCliMsg* pCliMsg = NULL;
code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg);
ASSERT(code == 0); if (code != 0) {
taosWUnLockLatch(&exh->latch);
(void)transReleaseExHandle(transGetRefMgt(), handle);
TAOS_CHECK_GOTO(code, NULL, _exception);
}
QUEUE_PUSH(&exh->q, &pCliMsg->seqq); QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
taosWUnLockLatch(&exh->latch); taosWUnLockLatch(&exh->latch);
tDebug("msg refId: %" PRId64 "", handle); tDebug("msg refId: %" PRId64 "", handle);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0; return 0;
} else {
exh->inited = 1;
taosWUnLockLatch(&exh->latch);
(void)transReleaseExHandle(transGetRefMgt(), handle);
} }
exh->inited = 1;
taosWUnLockLatch(&exh->latch);
(void)transReleaseExHandle(transGetRefMgt(), handle);
} }
} }
SCliMsg* pCliMsg = NULL; SCliMsg* pCliMsg = NULL;
code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg), NULL, _exception);
if (code != 0) {
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return code;
}
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
@ -2880,13 +2923,63 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
} }
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0; return 0;
_exception:
transFreeMsg(pReq->pCont);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return code;
}
int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
if (transpointId == NULL) {
ASSERT(0);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception);
}
TAOS_CHECK_GOTO(transAllocHandle(transpointId), NULL, _exception);
SCliThrd* pThrd = transGetWorkThrd(pTransInst, *transpointId);
if (pThrd == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception);
}
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), *transpointId);
if (exh == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception);
}
pReq->info.handle = (void*)(*transpointId);
SCliMsg* pCliMsg = NULL;
TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, NULL, &pCliMsg), NULL, _exception);
STraceId* trace = &pReq->info.traceId;
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle);
if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) {
destroyCmsg(pCliMsg);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
}
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0;
_exception:
transFreeMsg(pReq->pCont);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return code;
} }
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) { if (pTransInst == NULL) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
return TSDB_CODE_RPC_BROKEN_LINK; return TSDB_CODE_RPC_MODULE_QUIT;
} }
int32_t code = 0; int32_t code = 0;
@ -2908,8 +3001,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
code = tsem_init(sem, 0, 0); code = tsem_init(sem, 0, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(sem); taosMemoryFree(sem);
code = TAOS_SYSTEM_ERROR(errno); TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _RETURN1);
TAOS_CHECK_GOTO(code, NULL, _RETURN1);
} }
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
@ -3003,13 +3095,13 @@ _EXIT:
taosMemoryFree(pSyncMsg); taosMemoryFree(pSyncMsg);
return code; return code;
} }
int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs) { int32_t timeoutMs) {
int32_t code = 0; int32_t code = 0;
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) { if (pTransInst == NULL) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
return TSDB_CODE_RPC_BROKEN_LINK; return TSDB_CODE_RPC_MODULE_QUIT;
} }
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
@ -3096,22 +3188,21 @@ _RETURN2:
/* /*
* *
**/ **/
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
if (ip == NULL || fqdn == NULL) return TSDB_CODE_INVALID_PARA;
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) { if (pTransInst == NULL) {
return TSDB_CODE_RPC_BROKEN_LINK; return TSDB_CODE_RPC_MODULE_QUIT;
} }
SCvtAddr cvtAddr = {0}; SCvtAddr cvtAddr = {0};
if (ip != NULL && fqdn != NULL) { tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip)); tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn)); cvtAddr.cvt = true;
cvtAddr.cvt = true;
}
int32_t code = 0; int32_t code = 0;
int8_t i = 0; for (int8_t i = 0; i < pTransInst->numOfThreads; i++) {
for (i = 0; i < pTransInst->numOfThreads; i++) {
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
if (pCtx == NULL) { if (pCtx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -3136,7 +3227,9 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
if ((code = transAsyncSend(thrd->asyncPool, &(cliMsg->q))) != 0) { if ((code = transAsyncSend(thrd->asyncPool, &(cliMsg->q))) != 0) {
destroyCmsg(cliMsg); destroyCmsg(cliMsg);
code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); if (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT) {
code = TSDB_CODE_RPC_MODULE_QUIT;
}
break; break;
} }
} }
@ -3145,7 +3238,7 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
return code; return code;
} }
int64_t transAllocHandle() { int32_t transAllocHandle(int64_t* refId) {
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
if (exh == NULL) { if (exh == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -3166,5 +3259,43 @@ int64_t transAllocHandle() {
QUEUE_INIT(&exh->q); QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch); taosInitRWLatch(&exh->latch);
tDebug("pre alloc refId %" PRId64 "", exh->refId); tDebug("pre alloc refId %" PRId64 "", exh->refId);
return exh->refId; *refId = exh->refId;
return 0;
}
int32_t transFreeConnById(void* shandle, int64_t transpointId) {
int32_t code = 0;
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
return TSDB_CODE_RPC_MODULE_QUIT;
}
if (transpointId == 0) {
tDebug("not free by refId:%"PRId64"", transpointId);
TAOS_CHECK_GOTO(0, NULL, _exception);
}
SCliThrd* pThrd = transGetWorkThrdFromHandle(pTransInst, transpointId);
if (pThrd == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception);
}
SCliMsg* pCli = taosMemoryCalloc(1, sizeof(SCliMsg));
if (pCli == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
}
pCli->type = FreeById;
tDebug("release conn id %" PRId64 "", transpointId);
STransMsg msg = {.info.handle = (void*)transpointId};
pCli->msg = msg;
code = transAsyncSend(pThrd->asyncPool, &pCli->q);
if (code != 0) {
taosMemoryFree(pCli);
TAOS_CHECK_GOTO(code, NULL, _exception);
}
_exception:
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return code;
} }

View File

@ -234,7 +234,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
return (p->left == 0 || p->invalid) ? true : false; return (p->left == 0 || p->invalid) ? true : false;
} }
int transSetConnOption(uv_tcp_t* stream, int keepalive) { int32_t transSetConnOption(uv_tcp_t* stream, int keepalive) {
#if defined(WINDOWS) || defined(DARWIN) #if defined(WINDOWS) || defined(DARWIN)
#else #else
return uv_tcp_keepalive(stream, 1, keepalive); return uv_tcp_keepalive(stream, 1, keepalive);
@ -745,8 +745,7 @@ int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
return taosRemoveRef(refMgt, refId); return taosRemoveRef(refMgt, refId);
} }
void* transAcquireExHandle(int32_t refMgt, int64_t refId) { void* transAcquireExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle
// acquire extern handle
return (void*)taosAcquireRef(refMgt, refId); return (void*)taosAcquireRef(refMgt, refId);
} }

View File

@ -1707,7 +1707,7 @@ void transUnrefSrvHandle(void* handle) {
} }
} }
int transReleaseSrvHandle(void* handle) { int32_t transReleaseSrvHandle(void* handle) {
int32_t code = 0; int32_t code = 0;
SRpcHandleInfo* info = handle; SRpcHandleInfo* info = handle;
SExHandle* exh = info->handle; SExHandle* exh = info->handle;
@ -1747,7 +1747,7 @@ _return2:
return code; return code;
} }
int transSendResponse(const STransMsg* msg) { int32_t transSendResponse(const STransMsg* msg) {
int32_t code = 0; int32_t code = 0;
if (msg->info.noResp) { if (msg->info.noResp) {
@ -1800,7 +1800,7 @@ _return2:
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
return code; return code;
} }
int transRegisterMsg(const STransMsg* msg) { int32_t transRegisterMsg(const STransMsg* msg) {
int32_t code = 0; int32_t code = 0;
SExHandle* exh = msg->info.handle; SExHandle* exh = msg->info.handle;
@ -1891,4 +1891,4 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
return code; return code;
} }
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } int32_t transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }

View File

@ -5,14 +5,24 @@ int32_t doRegComp(pcre2_code** ppRegex, pcre2_match_data** ppMatchData, const ch
int errorcode; int errorcode;
PCRE2_SIZE erroroffset; PCRE2_SIZE erroroffset;
*ppRegex = pcre2_compile((PCRE2_SPTR8)pattern, PCRE2_ZERO_TERMINATED, options, &errorcode, &erroroffset, NULL); pcre2_code* pRegex = NULL;
if (*ppRegex == NULL) { pcre2_match_data* pMatchData = NULL;
pRegex = pcre2_compile((PCRE2_SPTR8)pattern, PCRE2_ZERO_TERMINATED, options, &errorcode, &erroroffset, NULL);
if (pRegex == NULL) {
PCRE2_UCHAR buffer[256]; PCRE2_UCHAR buffer[256];
(void)pcre2_get_error_message(errorcode, buffer, sizeof(buffer)); (void)pcre2_get_error_message(errorcode, buffer, sizeof(buffer));
return 1; return -1;
} }
*ppMatchData = pcre2_match_data_create_from_pattern(*ppRegex, NULL); pMatchData = pcre2_match_data_create_from_pattern(pRegex, NULL);
if (pMatchData == NULL) {
pcre2_code_free(pRegex);
return -1;
}
*ppRegex = pRegex;
*ppMatchData = pMatchData;
return 0; return 0;
} }

View File

@ -122,7 +122,7 @@ def scan_files_path(source_file_path):
for file in files: for file in files:
if any(item in root for item in scan_dir_list): if any(item in root for item in scan_dir_list):
file_path = os.path.join(root, file) file_path = os.path.join(root, file)
if (file_path.endswith(".c") or file_path.endswith(".cpp")) and all(item not in file_path for item in scan_skip_file_list): if (file_path.endswith(".c") or file_name.endswith(".h") or file_path.endswith(".cpp")) and all(item not in file_path for item in scan_skip_file_list):
all_file_path.append(file_path) all_file_path.append(file_path)
logger.info("Found %s files" % len(all_file_path)) logger.info("Found %s files" % len(all_file_path))
@ -134,7 +134,7 @@ def input_files(change_files):
for line in file: for line in file:
file_name = line.strip() file_name = line.strip()
if any(dir_name in file_name for dir_name in scan_dir_list): if any(dir_name in file_name for dir_name in scan_dir_list):
if (file_name.endswith(".c") or file_name.endswith(".h") or line.endswith(".cpp")) and all(dir_name not in file_name for dir_name in scan_skip_file_list): if (file_name.endswith(".c") or line.endswith(".cpp")) and all(dir_name not in file_name for dir_name in scan_skip_file_list):
if "enterprise" in file_name: if "enterprise" in file_name:
file_name = os.path.join(TD_project_path, file_name) file_name = os.path.join(TD_project_path, file_name)
else: else: