Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837

This commit is contained in:
54liuyao 2024-09-30 09:05:46 +08:00
commit d13819bbf4
115 changed files with 1616 additions and 560 deletions

View File

@ -7,7 +7,17 @@ ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo firstEp localhost:6030 > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo fqdn localhost >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo serverPort 6030 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo debugFlag 135 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo asyncLog 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo supportVnodes 1024 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo numOfLogLines 300000000 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo logKeepDays -1 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo checkpointInterval 60 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo snodeAddress 127.0.0.1:873 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo monitor 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg

View File

@ -27,7 +27,7 @@ PI 系统是一套用于数据收集、查找、分析、传递和可视化的
在数据写入页面中,点击 **+新增数据源** 按钮,进入新增数据源页面。
![kafka-01.png](./kafka-01.png)
![new.png](./pic/pi-01-new.png)
### 基本配置

View File

@ -208,3 +208,15 @@ CSV 文件中的每个 Row 配置一个 OPC 数据点位。Row 的规则如下
### 8. 创建完成
点击 **提交** 按钮,完成创建 OPC UA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
## 增加数据点位
在任务运行中,点击 **编辑**,点击 **增加数据点位** 按钮,追加数据点位到 CSV 文件中。
![增加数据点位](./pic/opc-08-add-point.png)
在弹出的表单中,填写数据点位的信息。
![数据点位表单](./pic/opc-09-add-point.png)
点击 **确定** 按钮,完成数据点位的追加。

View File

@ -182,3 +182,15 @@ CSV 文件中的每个 Row 配置一个 OPC 数据点位。Row 的规则如下
### 7. 创建完成
点击 **提交** 按钮,完成创建 OPC DA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
## 增加数据点位
在任务运行中,点击 **编辑**,点击 **增加数据点位** 按钮,追加数据点位到 CSV 文件中。
![增加数据点位](./pic/opc-08-add-point.png)
在弹出的表单中,填写数据点位的信息。
![数据点位表单](./pic/opc-09-add-point.png)
点击 **确定** 按钮,完成数据点位的追加。

View File

@ -33,13 +33,14 @@ TDengine 可以通过 MQTT 连接器从 MQTT 代理订阅数据并将其写入 T
### 3. 配置连接和认证信息
**MQTT地址** 中填写 MQTT 代理的地址,例如:`192.168.1.42:1883`
**MQTT 地址** 中填写 MQTT 代理的地址,例如:`192.168.1.42`
**MQTT 端口** 中填写 MQTT 代理的端口,例如:`1883`
**用户** 中填写 MQTT 代理的用户名。
**密码** 中填写 MQTT 代理的密码。
点击 **连通性检查** 按钮,检查数据源是否可用。
![mqtt-03.png](./mqtt-03.png)
@ -64,6 +65,8 @@ TDengine 可以通过 MQTT 连接器从 MQTT 代理订阅数据并将其写入 T
**订阅主题及 QoS 配置** 中填写要消费的 Topic 名称。使用如下格式设置: `topic1::0,topic2::1`
点击 **检查连通性** 按钮,检查数据源是否可用。
![mqtt-05.png](./mqtt-05.png)
### 6. 配置 MQTT Payload 解析

View File

@ -102,7 +102,7 @@ kcat <topic> \
**主题** 中填写要消费的 Topic 名称。可以配置多个 Topic Topic 之间用逗号分隔。例如:`tp1,tp2`。
**Client ID** 中填写客户端标识,填写后会生成带有 `taosx` 前缀的客户端 ID (例如,如果填写的标识为 `foo`,则生成的客户端 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的客户端 ID 形如 `taosx100foo`)。连接到同一个 Kafka 集群的所有客户端 ID 必须保证唯一
**Client ID** 中填写客户端标识,填写后会生成带有 `taosx` 前缀的客户端 ID (例如,如果填写的标识为 `foo`,则生成的客户端 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的客户端 ID 形如 `taosx100foo`)。需要注意的是,当使用多个 taosX 订阅同一 Topic 需要进行负载均衡时,必须填写一致的客户端 ID 才能达到均衡效果
**消费者组 ID** 中填写消费者组标识,填写后会生成带有 `taosx` 前缀的消费者组 ID (例如,如果填写的标识为 `foo`,则生成的消费者组 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的消费者组 ID 形如 `taosx100foo`)。

Binary file not shown.

Before

Width:  |  Height:  |  Size: 68 KiB

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 70 KiB

After

Width:  |  Height:  |  Size: 75 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 58 KiB

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 52 KiB

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

After

Width:  |  Height:  |  Size: 104 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 74 KiB

After

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 53 KiB

After

Width:  |  Height:  |  Size: 68 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

After

Width:  |  Height:  |  Size: 65 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 23 KiB

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

After

Width:  |  Height:  |  Size: 99 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 73 KiB

After

Width:  |  Height:  |  Size: 148 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 35 KiB

After

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 54 KiB

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 27 KiB

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 49 KiB

After

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 35 KiB

After

Width:  |  Height:  |  Size: 97 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

After

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 52 KiB

After

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 93 KiB

After

Width:  |  Height:  |  Size: 120 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 115 KiB

After

Width:  |  Height:  |  Size: 142 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 54 KiB

After

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 73 KiB

After

Width:  |  Height:  |  Size: 124 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 175 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 18 KiB

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 57 KiB

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 92 KiB

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 32 KiB

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 27 KiB

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

After

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 66 KiB

After

Width:  |  Height:  |  Size: 61 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 48 KiB

After

Width:  |  Height:  |  Size: 66 KiB

View File

@ -239,40 +239,45 @@ d4,2017-07-14T10:40:00.006+08:00,-2.740636,10,-0.893545,7,California.LosAngles
- `plugins_home`:外部数据源连接器所在目录。
- `data_dir`:数据文件存放目录。
- `logs_home`:日志文件存放目录,`taosX` 日志文件的前缀为 `taosx.log`,外部数据源有自己的日志文件名前缀。
- `log_level`:日志等级,可选级别包括 `error`、`warn`、`info`、`debug`、`trace`,默认值为 `info`
- `log_keep_days`:日志的最大存储天数,`taosX` 日志将按天划分为不同的文件。
- `instanceId`:当前 explorer 服务的实例 ID如果同一台机器上启动了多个 explorer 实例,必须保证各个实例的实例 ID 互不相同。
- `logs_home`:日志文件存放目录,`taosX` 日志文件的前缀为 `taosx.log`,外部数据源有自己的日志文件名前缀。已弃用,请使用 `log.path` 代替。
- `log_level`:日志等级,可选级别包括 `error`、`warn`、`info`、`debug`、`trace`,默认值为 `info`。已弃用,请使用 `log.level` 代替。
- `log_keep_days`:日志的最大存储天数,`taosX` 日志将按天划分为不同的文件。已弃用,请使用 `log.keepDays` 代替。
- `jobs`:每个运行时的最大线程数。在服务模式下,线程总数为 `jobs*2`,默认线程数为`当前服务器内核*2`。
- `serve.listen`:是 `taosX` REST API 监听地址,默认值为 `0.0.0.0:6050`
- `serve.database_url``taosX` 数据库的地址,格式为 `sqlite:<path>`
- `serve.request_timeout`:全局接口 API 超时时间。
- `monitor.fqdn``taosKeeper` 服务的 FQDN没有默认值置空则关闭监控功能。
- `monitor.port``taosKeeper` 服务的端口,默认`6043`。
- `monitor.interval`:向 `taosKeeper` 发送指标的频率,默认为每 10 秒一次,只有 1 到 10 之间的值才有效。
- `log.path`:日志文件存放的目录。
- `log.level`:日志级别,可选值为 "error", "warn", "info", "debug", "trace"。
- `log.compress`:日志文件滚动后的文件是否进行压缩。
- `log.rotationCount`:日志文件目录下最多保留的文件数,超出数量的旧文件被删除。
- `log.rotationSize`:触发日志文件滚动的文件大小(单位为字节),当日志文件超出此大小后会生成一个新文件,新的日志会写入新文件。
- `log.reservedDiskSize`:日志所在磁盘停止写入日志的阈值(单位为字节),当磁盘剩余空间达到此大小后停止写入日志。
- `log.keepDays`:日志文件保存的天数,超过此天数的旧日志文件会被删除。
- `log.watching`:是否对日志文件中 `log.loggers` 配置内容的变更进行监听并尝试重载。
- `log.loggers`:指定模块的日志输出级别,格式为 `"modname" = "level"`,同时适配 tracing 库语法,可以根据 `modname[span{field=value}]=level`,其中 `level` 为日志级别。
如下所示:
```toml
# plugins home
#plugins_home = "/usr/local/taos/plugins" # on linux/macOS
#plugins_home = "C:\\TDengine\\plugins" # on windows
# data dir
#data_dir = "/var/lib/taos/taosx" # on linux/macOS
#data_dir = "C:\\TDengine\\data\\taosx" # on windows
# logs home
#logs_home = "/var/log/taos" # on linux/macOS
#logs_home = "C:\\TDengine\\log" # on windows
# log level: off/error/warn/info/debug/trace
#log_level = "info"
# log keep days
#log_keep_days = 30
# number of jobs, default to 0, will use `jobs` number of works for TMQ
# number of threads used for tokio workers, default to 0 (means cores * 2)
#jobs = 0
# enable OpenTelemetry tracing and metrics exporter
#otel = false
# server instance id
#
# The instanceId of each instance is unique on the host
# instanceId = 16
[serve]
# listen to ip:port address
#listen = "0.0.0.0:6050"
@ -280,13 +285,66 @@ d4,2017-07-14T10:40:00.006+08:00,-2.740636,10,-0.893545,7,California.LosAngles
# database url
#database_url = "sqlite:taosx.db"
# default global request timeout which unit is second. This parameter takes effect for certain interfaces that require a timeout setting
#request_timeout = 30
[monitor]
# FQDN of taosKeeper service, no default value
#fqdn = "localhost"
# port of taosKeeper service, default 6043
# Port of taosKeeper service, default 6043
#port = 6043
# how often to send metrics to taosKeeper, default every 10 seconds. Only value from 1 to 10 is valid.
# How often to send metrics to taosKeeper, default every 10 seconds. Only value from 1 to 10 is valid.
#interval = 10
# log configuration
[log]
# All log files are stored in this directory
#
#path = "/var/log/taos" # on linux/macOS
#path = "C:\\TDengine\\log" # on windows
# log filter level
#
#level = "info"
# Compress archived log files or not
#
#compress = false
# The number of log files retained by the current explorer server instance in the `path` directory
#
#rotationCount = 30
# Rotate when the log file reaches this size
#
#rotationSize = "1GB"
# Log downgrade when the remaining disk space reaches this size, only logging `ERROR` level logs
#
#reservedDiskSize = "1GB"
# The number of days log files are retained
#
#keepDays = 30
# Watching the configuration file for log.loggers changes, default to true.
#
#watching = true
# Customize the log output level of modules, and changes will be applied after modifying the file when log.watching is enabled
#
# ## Examples:
#
# crate = "error"
# crate::mod1::mod2 = "info"
# crate::span[field=value] = "warn"
#
[log.loggers]
#"actix_server::accept" = "warn"
#"taos::query" = "warn"
```
### 启动
@ -451,6 +509,16 @@ taosX 会将监控指标上报给 taosKeeper这些监控指标会被 taosKeep
| written_blocks | 本次运行此任务写人成功的 raw block 数 |
| failed_blocks | 本次运行此任务写入失败的 raw block 数 |
### Kafka 数据源相关指标
| 字段 | 描述 |
| ----------------------------- | ---------------------------- |
| kafka_consumers | 本次运行任务 Kafka 消费者数 |
| kafka_total_partitions | Kafka 主题总分区数 |
| kafka_consuming_partitions | 本次运行任务正在消费的分区数 |
| kafka_consumed_messages | 本次运行任务已经消费的消息数 |
| total_kafka_consumed_messages | 累计消费的消息总数 |
## taosX 数据解析插件
接入 kafka / mqtt 消息中间件时,需要对原始数据进行解析,如果使用 json/regex 等模式解析器无法满足解析需求,同时 UDT(自定义解析脚本) 也无法满足性能要求时,可以自定义数据解析插件。

View File

@ -12,17 +12,67 @@ sidebar_label: taosX-Agent
- `endpoint`: 必填,`taosX` 的 GRPC 服务地址。
- `token`: 必填,在 `Explorer` 上创建 `Agent` 时,产生的 Token。
- `compression`: 非必填,可配置为 `ture``false`, 默认为 `false`。配置为`true`, 则开启 `Agent``taosX` 通信数据压缩。
- `log_level`: 非必填,日志级别,默认为 `info`, 同 `taosX` 一样,支持 `error``warn``info``debug``trace` 五级。
- `log_keep_days`:非必填,日志保存天数,默认为 `30` 天。
- `log_level`: 非必填,日志级别,默认为 `info`, 同 `taosX` 一样,支持 `error``warn``info``debug``trace` 五级。已弃用,请使用 `log.level` 代替。
- `log_keep_days`:非必填,日志保存天数,默认为 `30` 天。已弃用,请使用 `log.keepDays` 代替。
- `log.path`:日志文件存放的目录。
- `log.level`:日志级别,可选值为 "error", "warn", "info", "debug", "trace"。
- `log.compress`:日志文件滚动后的文件是否进行压缩。
- `log.rotationCount`:日志文件目录下最多保留的文件数,超出数量的旧文件被删除。
- `log.rotationSize`:触发日志文件滚动的文件大小(单位为字节),当日志文件超出此大小后会生成一个新文件,新的日志会写入新文件。
- `log.reservedDiskSize`:日志所在磁盘停止写入日志的阈值(单位为字节),当磁盘剩余空间达到此大小后停止写入日志。
- `log.keepDays`:日志文件保存的天数,超过此天数的旧日志文件会被删除。
如下所示:
```TOML
endpoint = "grpc://<taosx-ip>:6055"
token = "<token>"
compression = true
log_level = "info"
log_keep_days = 30
# taosX service endpoint
#
#endpoint = "http://localhost:6055"
# !important!
# Uncomment it and copy-paste the token generated in Explorer.
#
#token = ""
# server instance id
#
# The instanceId of each instance is unique on the host
# instanceId = 64
# enable communication data compression between Agent and taosX
#
#compression = true
# log configuration
[log]
# All log files are stored in this directory
#
#path = "/var/log/taos" # on linux/macOS
#path = "C:\\TDengine\\log" # on windows
# log filter level
#
#level = "info"
# Compress archived log files or not
#
#compress = false
# The number of log files retained by the current explorer server instance in the `path` directory
#
#rotationCount = 30
# Rotate when the log file reaches this size
#
#rotationSize = "1GB"
# Log downgrade when the remaining disk space reaches this size, only logging `ERROR` level logs
#
#reservedDiskSize = "1GB"
# The number of days log files are retained
#
#keepDays = 30
```
您不必对配置文件如何设置感到疑惑,阅读并跟随 `Explorer` 中创建 `Agent` 的提示进行操作,您可以对配置文件进行查看、修改和检查。

View File

@ -15,36 +15,111 @@ taosEexplorer 无需单独安装,从 TDengine 3.3.0.0 版本开始,它随着
在启动 taosExplorer 之前,请确保配置文件中的内容正确。
```TOML
# listen port
# This is a automacically generated configuration file for Explorer in [TOML](https://toml.io/) format.
#
# Here is a full list of available options.
# Explorer server port to listen on.
# Default is 6060.
#
port = 6060
# listen address for IPv4
# IPv4 listen address.
# Default is 0.0.0.0
addr = "0.0.0.0"
# listen address for IPv4
#ipv6 = "::1"
# IPv6 listen address.
# log level. Possible: error,warn,info,debug,trace
# ipv6 = "::1"
# explorer server instance id
#
# The instanceId of each instance is unique on the host
# instanceId = 1
# Explorer server log level.
# Default is "info"
#
# Deprecated: use log.level instead
log_level = "info"
# taosAdapter address.
# All data files are stored in this directory
# data_dir = "/var/lib/taos/explorer" # Default for Linux
# data_dir = "C:\\TDengine\\data\\explorer" # Default for Windows
# REST API endpoint to connect to the cluster.
# This configuration is also the target for data migration tasks.
#
# Default is "http://localhost:6041" - the default endpoint for REST API.
#
cluster = "http://localhost:6041"
# taosX gRPC address
# native endpoint to connect to the cluster.
# Default is disabled. To enable it, set it to the native API URL like "taos://localhost:6030" and uncomment it.
# If you enable it, you will get more performance for data migration tasks.
#
# cluster_native = "taos://localhost:6030"
# API endpoint for data replication/backup/data sources. No default option.
# Set it to API URL like "http://localhost:6050".
#
x_api = "http://localhost:6050"
# GRPC endpoint for "Agent"s.
# Default is "http://localhost:6055" - the default endpoint for taosX grpc API.
# You should set it to public IP or FQDN name like:
# "http://192.168.111.111:6055" or "http://node1.company.domain:6055" and
# ensure to add the port to the exception list of the firewall if it enabled.
grpc = "http://localhost:6055"
# CORS configuration switch, it allows cross-origin access
cors = false
cors = true
# Enable ssl: if the following two files exist, enable ssl protocol
# Enable ssl
# If the following two files exist, enable ssl protocol
#
[ssl]
# SSL certificate
#certificate = "/path/to/ca.file"
#
# certificate = "/path/to/ca.file" # on linux/macOS
# certificate = "C:\\path\\to\\ca.file" # on windows
# SSL certificate key
#certificate_key = "/path/to/key.file"
#
# certificate_key = "/path/to/key.file" # on linux/macOS
# certificate_key = "C:\\path\\to\\key.file" # on windows
# log configuration
[log]
# All log files are stored in this directory
#
# path = "/var/log/taos" # on linux/macOS
# path = "C:\\TDengine\\log" # on windows
# log filter level
#
# level = "info"
# Compress archived log files or not
#
# compress = false
# The number of log files retained by the current explorer server instance in the `path` directory
#
# rotationCount = 30
# Rotate when the log file reaches this size
#
# rotationSize = "1GB"
# Log downgrade when the remaining disk space reaches this size, only logging `ERROR` level logs
#
# reservedDiskSize = "1GB"
# The number of days log files are retained
#
# keepDays = 30
```
说明:
@ -52,13 +127,23 @@ cors = false
- `port`taosExplorer 服务绑定的端口。
- `addr`taosExplorer 服务绑定的 IPv4 地址,默认为 `0.0.0.0`。如需修改,请配置为 `localhost` 之外的地址以对外提供服务。
- `ipv6`taosExplorer 服务绑定的 IPv6 地址,默认不绑定 IPv6 地址。
- `log_level`:日志级别,可选值为 "error", "warn", "info", "debug", "trace"。
- `instanceId`:当前 explorer 服务的实例 ID如果同一台机器上启动了多个 explorer 实例,必须保证各个实例的实例 ID 互不相同。
- `log_level`:日志级别,可选值为 "error", "warn", "info", "debug", "trace"。此参数已弃用,请使用 `log.level` 代替。
- `cluster`TDengine 集群的 taosAdapter 地址。
- `cluster_native`TDengine 集群的原生连接地址,默认关闭。
- `x_api`taosX 的 gRPC 地址。
- `grpc`: taosX 代理向 taosX 建立连接的 gRPC 地址.
- `grpc`taosX 代理向 taosX 建立连接的 gRPC 地址.
- `cors`CORS 配置开关,默认为 `false`。当为 `true` 时,允许跨域访问。
- `ssl.certificate`: SSL 证书(如果同时设置了 certificate 与 certificate_key 两个参数,则启用 HTTPS 服务,否则不启用)。
- `ssl.certificate_key`: SSL 证书密钥。
- `ssl.certificate`SSL 证书(如果同时设置了 certificate 与 certificate_key 两个参数,则启用 HTTPS 服务,否则不启用)。
- `ssl.certificate_key`SSL 证书密钥。
- `log.path`:日志文件存放的目录。
- `log.level`:日志级别,可选值为 "error", "warn", "info", "debug", "trace"。
- `log.compress`:日志文件滚动后的文件是否进行压缩。
- `log.rotationCount`:日志文件目录下最多保留的文件数,超出数量的旧文件被删除。
- `log.rotationSize`:触发日志文件滚动的文件大小(单位为字节),当日志文件超出此大小后会生成一个新文件,新的日志会写入新文件。
- `log.reservedDiskSize`:日志所在磁盘停止写入日志的阈值(单位为字节),当磁盘剩余空间达到此大小后停止写入日志。
- `log.keepDays`:日志文件保存的天数,超过此天数的旧日志文件会被删除。
## 启动停止

View File

@ -522,6 +522,6 @@ Offset 结构体提供了获取当前消息所属的数据库,主题和分区
## 附录
- Rust 连接器文档https://docs.rs/taos
- Rust 连接器项目地址: https://github.com/taosdata/rust-connector-taos
- Rust 连接器项目地址: https://github.com/taosdata/taos-connector-rust
- deadpool 连接池: https://crates.io/crates/deadpool
- r2d2 连接池: https://crates.io/crates/r2d2

View File

@ -60,6 +60,8 @@ TDengine ODBC 支持两种连接 TDengine 数据库方式Websocket 连接与
4.6【密码】仅供第5步测试连接使用选填数据库用户密码如果不填TDengine 默认 taosdata
4.7【兼容软件】支持对工业软件 KingSCADA、Kepware 等的兼容性适配,通常情况下,选择默认值 General 即可
5. 点【测试连接】测试连接情况,如果成功,提示"成功连接到URL"
6. 点【确定】,即可保存配置并退出
@ -90,12 +92,449 @@ TDengine ODBC 支持两种连接 TDengine 数据库方式Websocket 连接与
4.6 【密码】仅供第5步测试连接使用选填数据库用户密码如果不填TDengine 默认 taosdata
4.7【兼容软件】支持对工业软件 KingSCADA、Kepware 等的兼容性适配,通常情况下,选择默认值 General 即可
5. 点【测试连接】测试连接情况,如果成功,提示"连接成功"
6. 点【确定】,即可保存配置并退出
7. 也可以在第2步选择已经配置好的数据源名通过【配置】按钮进入配置页面修改已有配置
## 支持的平台
原生连接方式支持的平台和 TDengine Windows X64版 客户端驱动支持的平台一致。
WebSocket 连接方式除此之外还支持 Windows X64系统上运行的 32 位应用程序上使用。
## 版本历史
| taos_odbc版本 | 主要变化 | TDengine 版本 |
| :----------- | :-------------------------------------------------------------------------------------------------- | :---------------- |
| v1.1.0 | 1. 支持视图功能;<br/>2. 支持 VARBINARY/GEOMETRY 数据类型; | 3.3.3.0及更高版本 |
| v1.0.2 | 支持 CP1252 字符编码; | 3.2.3.0及更高版本 |
| v1.0.1 | 1. 支持 DSN 设置 BI 模式,在 BI 模式下 TDengine 数据库不返回系统数据库和超级表子表信息;<br/>2. 重构字符集转换模块,提升读写性能;<br/>3. ODBC 数据源配置对话框中默认修改默认连接方式为“WebSocket”<br/>4. ODBC 数据源配置对话框增加“测试连接”控件;<br/>5. ODBC 数据源配置支持中文/英文界面; | - |
| v1.0.0.0 | 发布初始版本支持与Tdengine数据库交互以读写数据具体请参考“API 参考”一节 | 3.2.2.0及更高版本 |
## 数据类型映射
下表说明了 ODBC 连接器如何将服务器数据类型映射到默认的 SQL 和 C 数据类型。
| TDengine Type | SQL Type | C Type |
|--------------------|-------------------|-------------------|
| TIMESTAMP | SQL_TYPE_TIMESTAMP| SQL_C_TIMESTAMP |
| INT | SQL_INTEGER | SQL_C_SLONG |
| INT UNSIGNED | SQL_INTEGER | SQL_C_ULONG |
| BIGINT | SQL_BIGINT | SQL_C_SBIGINT |
| BIGINT UNSIGNED | SQL_BIGINT | SQL_C_UBIGINT |
| FLOAT | SQL_REAL | SQL_C_FLOAT |
| DOUBLE | SQL_REAL | SQL_C_DOUBLE |
| BINARY | SQL_BINARY | SQL_C_BINARY |
| SMALLINT | SQL_SMALLINT | SQL_C_SSHORT |
| SMALLINT UNSIGNED | SQL_SMALLINT | SQL_C_USHORT |
| TINYINT | SQL_TINYINT | SQL_C_STINYINT |
| TINYINT UNSIGNED | SQL_TINYINT | SQL_C_UTINYINT |
| BOOL | SQL_BIT | SQL_C_BIT |
| NCHAR | SQL_VARCHAR | SQL_C_CHAR |
| JSON | SQL_VARCHAR | SQL_C_CHAR |
| VARCHAR | SQL_VARCHAR | SQL_C_CHAR |
| GEOMETRY | SQL_VARBINARY | SQL_C_BINARY |
| VARBINARY | SQL_VARBINARY | SQL_C_BINARY |
## API 参考
本节按功能分类汇总了 ODBC API关于完整的 ODBC API 参考,请访问 http://msdn.microsoft.com/en-us/library/ms714177.aspx 的ODBC程序员参考页面。
### 数据源和驱动程序管理
- API: ConfigDSN
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 配置数据源
- API: ConfigDriver
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 用于执行与特定驱动程序相关的安装和配置任务
- API: ConfigTranslator
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 用于解析DSN的配置在DSN配置和实际数据库驱动程序配置之间进行翻译或转换
### 连接到数据源
- API: SQLAllocHandle
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 分配环境、连接、语句或描述符句柄
- API: SQLConnect
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 通过数据源名称、用户 ID 和密码连接到特定驱动程序
- API: SQLDriverConnect
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 通过连接字符串连接到特定驱动程序,支持更多连接信息
- API: SQLBrowseConnect
- **是否支持**: 不支持
- **标准**: ODBC
- **作用**: 用于发现和枚举连接到数据源所需的特性和属性值。每次调用 SQLBrowseConnect 都会返回属性和属性值的连续级别
- API: SQLAllocEnv
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.x 函数 SQLAllocEnv 已替换为 SQLAllocHandle
- API: SQLAllocConnect
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.x 函数 SQLAllocConnect 已替换为 SQLAllocHandle
### 获取有关驱动程序和数据源的信息
- API: SQLDataSources
- **是否支持**: 不支持
- **标准**: ISO 92
- **作用**: 返回可用数据源的列表,由驱动程序管理器处理
- API: SQLDrivers
- **是否支持**: 不支持
- **标准**: ISO 92
- **作用**: 返回由驱动程序管理器处理的已安装驱动程序及其属性的列表
- API: SQLGetInfo
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 返回有关数据库环境的详细信息如数据库产品名称、驱动程序名、数据库的SQL语法特性、连接能力等等
- API: SQLGetFunctions
- **是否支持**: 不支持
- **标准**: ISO 92
- **作用**: 用于查询驱动程序支持的函数
- API: SQLGetTypeInfo
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 返回有关支持的数据类型的信息
### 设置和检索驱动程序属性
- API: SQLSetConnectAttr
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 设置连接属性当设置SQL_ATTR_AUTOCOMMIT属性时用于控制自动提交模式
- API: SQLGetConnectAttr
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 返回连接属性的值
- API: SQLSetConnectOption
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.0 函数 SQLSetConnectOption 已替换为 SQLSetConnectAttr
- API: SQLGetConnectOption
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.0 函数 SQLSetConnectOption 已替换为 SQLGetConnectAttr
- API: SQLSetEnvAttr
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 设置控制环境的属性
- API: SQLGetEnvAttr
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 返回环境属性的当前设置
- API: SQLSetStmtAttr
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 设置与语句相关的属性
- API: SQLGetStmtAttr
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 返回语句属性的当前设置
- API: SQLSetStmtOption
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.0 函数 SQLSetStmtOption 已替换为 SQLSetStmtAttr
- API: SQLGetStmtOption
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.0 函数 SQLSetStmtOption 已替换为 SQLGetStmtAttr
### 准备SQL请求
- API: SQLAllocStmt
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.x 函数 SQLAllocStmt 已替换为 SQLAllocHandle
- API: SQLPrepare
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 用于预处理SQL语句这通常是SQLExecute之前的一个步骤
- API: SQLBindCol
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 用于将结果集中的列绑定到应用程序缓冲区
- API: SQLBindParameter
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 用于将SQL语句的参数绑定到应用程序缓冲区
- API: SQLGetCursorName
- **是否支持**: 不支持
- **标准**: ISO 92
- **作用**: 返回与指定语句关联的游标名称
- API: SQLSetCursorName
- **是否支持**: 不支持
- **标准**: ISO 92
- **作用**: 设置游标名称,允许在查询中使用命名游标
- API: SQLSetScrollOptions
- **是否支持**: 不支持
- **标准**: ODBC
- **作用**: 设置控制光标行为的选项
### 提交请求
- API: SQLExecute
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 用于执行之前通过 SQLPrepare 准备好的SQL语句
- API: SQLExecDirect
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 用于执行包含SQL语句的字符串
- API: SQLNativeSql
- **是否支持**: 不支持
- **标准**: ODBC
- **作用**: 用于将应用程序提供的SQL语句转换为数据库驱动程序的本机SQL语法
- API: SQLDescribeParam
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 返回语句中特定参数的描述
- API: SQLNumParams
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 用于查询预编译SQL语句中的参数数量
- API: SQLParamData
- **是否支持**: 不支持
- **标准**: ISO 92
- **作用**: 用于从参数数据流中获取下一个参数值
- API: SQLPutData
- **是否支持**: 不支持
- **标准**: ISO 92
- **作用**: 当使用流输入方式时,可以用于向输出参数发送数据块
### 检索结果和关于结果的信息
- API: SQLRowCount
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 返回受插入或删除请求影响的行数
- API: SQLNumResultCols
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 返回结果集中的列数
- API: SQLDescribeCol
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 用于描述结果集中列的属性。它提供了关于列的数据类型、列名、列的最大宽度、小数位数和是否可为空等信息
- API: SQLColAttribute
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 返回结果集中列的描述符信息,如标题、排序规则等
- API: SQLColAttributes
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.0 函数 SQLColAttributes 已替换为 SQLColAttribute
- API: SQLGetData
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 用于从结果集中的当前行获取特定列的数据
- API: SQLMoreResults
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 多个结果集的 SQL 语句执行后(例如:一个批处理或存储过程),移动到下一个结果集
- API: SQLFetch
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 用于从结果集中提取下一行数据,并返回所有绑定列的数据
- API: SQLFetchScroll
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 用于从结果集中提取指定的数据行集,并返回所有绑定列的数据
- API: SQLExtendedFetch
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中SQLExtendedFetch 已替换为 SQLFetchScroll
- API: SQLSetPos
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 设置行集中的游标位置,并允许应用程序更新数据集中的行
- API: SQLBulkOperations
- **是否支持**: 不支持
- **标准**: ODBC
- **作用**: 执行批量插入和批量书签操作,包括更新、删除和按书签提取
### 检索错误或诊断信息
- API: SQLError
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.x 函数 SQLError 已替换为 SQLGetDiagRec
- API: SQLGetDiagField
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 返回附加诊断信息(单条诊断结果)
- API: SQLGetDiagRec
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 返回附加诊断信息(多条诊断结果)
### 获取有关数据源的系统表项的信息
- API: SQLColumnPrivileges
- **是否支持**: 不支持
- **标准**: ODBC
- **作用**: 用于检索指定表中列的权限信息,如哪些用户或角色拥有对特定列的读取、插入、更新或删除权限
- API: SQLColumns
- **是否支持**: 支持
- **标准**: X/Open
- **作用**: 返回指定表中的列名列表
- API: SQLForeignKeys
- **是否支持**: 不支持
- **标准**: ODBC
- **作用**: 检索外键关系的详细信息
- API: SQLPrimaryKeys
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 返回构成表主键的列名列表
- API: SQLSpecialColumns
- **是否支持**: 不支持
- **标准**: X/Open
- **作用**: 返回数据库中特殊列的信息,如唯一键或索引列
- API: SQLStatistics
- **是否支持**: 不支持
- **标准**: ISO 92
- **作用**: 返回关于表的统计信息,如行数、列数、平均行宽等
- API: SQLTablePrivileges
- **是否支持**: 不支持
- **标准**: ODBC
- **作用**: 返回用户在特定表上的权限如SELECT、INSERT、UPDATE等
- API: SQLTables
- **是否支持**: 支持
- **标准**: X/Open
- **作用**: 返回存储在数据源的当前数据库中的表信息
- API: SQLProcedures
- **是否支持**: 不支持
- **标准**: ODBC
- **作用**: 返回数据库中可用的存储过程信息,包括名称和类型
- API: SQLProcedureColumns
- **是否支持**: 不支持
- **标准**: ODBC
- **作用**: 返回存储过程的列信息,包括输入输出参数的详细信息
### 执行事务
- API: SQLTransact
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.x 函数 SQLTransact 已替换为 SQLEndTran
- API: SQLEndTran
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 用于提交或回滚事务TDengine 不支持事务,因此不支持回滚操作
### 终止连接
- API: SQLDisconnect
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 断开数据库连接
- API: SQLFreeHandle
- **是否支持**: 支持
- **标准**: ISO 92
- **作用**: 释放与特定环境、连接、语句或描述符句柄关联的资源
- API: SQLFreeConnect
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.0 函数 SQLFreeConnect 已替换为 SQLFreeHandle
- API: SQLFreeEnv
- **是否支持**: 不支持
- **标准**: 弃用
- **作用**: 在 ODBC 3.x 中ODBC 2.0 函数 SQLFreeEnv 已替换为 SQLFreeHandle
- API: SQLFreeStmt
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 结束语句处理,丢弃挂起的结果,并且可以选择释放与语句句柄关联的所有资源
- API: SQLCloseCursor
- **是否支持**: 支持
- **标准**: ODBC
- **作用**: 关闭与当前语句句柄关联的游标,并释放游标所使用的所有资源
## 与第三方集成
作为使用 TDengine ODBC driver 的一个示例,你可以使用 Power BI 与 TDengine 分析时序数据。更多细节请参考 [Power BI](../../../third-party/bi/powerbi)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 15 KiB

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 16 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View File

@ -153,7 +153,6 @@ char *tTagValToData(const STagVal *pTagVal, bool isJson);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid);
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, void *pMsgBuf);

View File

@ -393,7 +393,7 @@ typedef struct SStateStore {
int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount,
void** ppVal, int32_t* pVLen, int32_t* pWinCode);
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType,
int32_t pkLen, SUpdateInfo** ppInfo);

View File

@ -93,7 +93,7 @@ void streamStateFreeVal(void* val);
// count window
int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal,
int32_t* pVLen, int32_t* pWinCode);
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key);

View File

@ -70,6 +70,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
extern int32_t streamMetaId;
enum {
STREAM_STATUS__NORMAL = 0,
STREAM_STATUS__STOP,
@ -135,11 +137,6 @@ enum {
STREAM_QUEUE__PROCESSING,
};
enum {
STREAM_META_WILL_STOP = 1,
STREAM_META_OK_TO_STOP = 2,
};
typedef enum EStreamTaskEvent {
TASK_EVENT_INIT = 0x1,
TASK_EVENT_INIT_SCANHIST = 0x2,
@ -282,7 +279,6 @@ typedef enum {
} EConsenChkptStatus;
typedef struct SConsenChkptInfo {
// bool alreadySendChkptId;
EConsenChkptStatus status;
int64_t statusTs;
int32_t consenChkptTransId;

View File

@ -121,7 +121,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
// count window
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
int32_t* pVLen, int32_t* pWinCode);
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
// function
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,

View File

@ -25,6 +25,7 @@ extern "C" {
typedef struct SLRUCache SLRUCache;
typedef void (*_taos_lru_deleter_t)(const void *key, size_t keyLen, void *value, void *ud);
typedef void (*_taos_lru_overwriter_t)(const void *key, size_t keyLen, void *value, void *ud);
typedef int (*_taos_lru_functor_t)(const void *key, size_t keyLen, void *value, void *ud);
typedef struct LRUHandle LRUHandle;
@ -42,7 +43,8 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
void taosLRUCacheCleanup(SLRUCache *cache);
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority, void *ud);
_taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle,
LRUPriority priority, void *ud);
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen);
void taosLRUCacheErase(SLRUCache *cache, const void *key, size_t keyLen);

View File

@ -116,23 +116,14 @@ function install_bin() {
${csudo}cp -r ${script_dir}/bin/* ${install_main_dir}/bin && ${csudo}chmod 0555 ${install_main_dir}/bin/*
#Make link
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || :
[ -x ${install_main_dir}/bin/${clientName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName2} ${bin_link_dir}/${clientName2} || :
if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/${demoName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${demoName2} ${bin_link_dir}/${demoName2} || :
fi
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/${uninstallScript} || :
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then
#Make link
[ -x ${install_main_dir}/bin/${clientName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName2} ${bin_link_dir}/${clientName2} || :
if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/${demoName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${demoName2} ${bin_link_dir}/${demoName2} || :
[ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${benchmarkName2} || :
[ -x ${install_main_dir}/bin/${dumpName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName2} ${bin_link_dir}/${dumpName2} || :
fi
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo}ln -sf ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/${uninstallScript2} || :
fi
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
[ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${benchmarkName2} || :
[ -x ${install_main_dir}/bin/${dumpName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName2} ${bin_link_dir}/${dumpName2} || :
}
function clean_lib() {

View File

@ -1120,7 +1120,8 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
SHashObj* pVgroupHashmap = NULL;
SArray* pTagList = taosArrayInit(0, POINTER_BYTES);
RAW_NULL_CHECK(pTagList);
RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
@ -1186,6 +1187,14 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
}
pCreateReq->ctb.suid = pTableMeta->uid;
SArray* pTagVals = NULL;
code = tTagToValArray((STag *)pCreateReq->ctb.pTag, &pTagVals);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableMeta);
goto end;
}
bool rebuildTag = false;
for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
if (tName == NULL) {
@ -1195,11 +1204,34 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
SSchema* tag = &pTableMeta->schema[j];
if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
tTagSetCid((STag*)pCreateReq->ctb.pTag, i, tag->colId);
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
if (pTagVal) {
if (pTagVal->cid != tag->colId){
pTagVal->cid = tag->colId;
rebuildTag = true;
}
} else {
uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name, (int)taosArrayGetSize(pTagVals), i, tag->colId);
}
}
}
}
taosMemoryFreeClear(pTableMeta);
if (rebuildTag){
STag* ppTag = NULL;
code = tTagNew(pTagVals, 1, false, &ppTag);
taosArrayDestroy(pTagVals);
pTagVals = NULL;
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if (NULL == taosArrayPush(pTagList, &ppTag)) {
tTagFree(ppTag);
goto end;
}
pCreateReq->ctb.pTag = (uint8_t*)ppTag;
}
taosArrayDestroy(pTagVals);
}
RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
@ -1251,6 +1283,7 @@ end:
destroyRequest(pRequest);
tDecoderClear(&coder);
qDestroyQuery(pQuery);
taosArrayDestroyP(pTagList, taosMemoryFree);
return code;
}

View File

@ -184,10 +184,16 @@ int32_t stmtBackupQueryFields(STscStmt* pStmt) {
int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
pRes->fields = taosMemoryMalloc(size);
pRes->userFields = taosMemoryMalloc(size);
if (NULL == pRes->fields || NULL == pRes->userFields) {
if (pRes->fields == NULL) {
STMT_ERR_RET(terrno);
}
pRes->userFields = taosMemoryMalloc(size);
if (pRes->userFields == NULL) {
taosMemoryFreeClear(pRes->fields);
STMT_ERR_RET(terrno);
}
(void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
(void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);

View File

@ -867,6 +867,7 @@ int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t r
code = blockDataEnsureCapacity(pDst, rowCount);
if (code) {
blockDataDestroy(pDst);
return code;
}

View File

@ -1771,26 +1771,6 @@ _err:
return code;
}
void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid) {
uint8_t *p = NULL;
int8_t isLarge = pTag->flags & TD_TAG_LARGE;
int16_t offset = 0;
if (isLarge) {
p = (uint8_t *)&((int16_t *)pTag->idx)[pTag->nTag];
} else {
p = (uint8_t *)&pTag->idx[pTag->nTag];
}
if (isLarge) {
offset = ((int16_t *)pTag->idx)[iTag];
} else {
offset = pTag->idx[iTag];
}
int32_t nt = tPutI16v(p + offset, cid);
}
// STSchema ========================================
STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
STSchema *pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);

View File

@ -297,11 +297,22 @@ static void *vmOpenVnodeInThread(void *param) {
SVnodeMgmt *pMgmt = pThread->pMgmt;
char path[TSDB_FILENAME_LEN];
dInfo("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("open-vnodes");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
SWrapperCfg *pCfg = &pThread->pCfgs[v];
if (pCfg->dropped) {
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
tmsgReportStartup("vnode-destroy", stepDesc);
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
pThread->updateVnodesList = true;
continue;
}
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,

View File

@ -1012,10 +1012,10 @@ _OVER:
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbName(pTrans, pDb->name, pStb->name);
TAOS_CHECK_RETURN (mndTransCheckConflict(pMnode, pTrans));
TAOS_CHECK_RETURN (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
TAOS_CHECK_RETURN (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
TAOS_CHECK_RETURN (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
TAOS_CHECK_RETURN(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
TAOS_CHECK_RETURN(mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
TAOS_CHECK_RETURN(mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
return 0;
}
@ -1051,7 +1051,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SRpcMsg rpcMsg = {
.msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
@ -1500,8 +1500,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
@ -1562,8 +1562,8 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -1616,8 +1616,8 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SSmaObj *pSma = NULL;
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
@ -2233,7 +2233,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) {
int32_t code = 0;
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName);
@ -2278,7 +2278,7 @@ static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bo
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
int32_t code = 0;
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
@ -2302,7 +2302,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
int32_t code = 0;
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
@ -2656,7 +2656,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
SName name = {0};
SName name = {0};
int32_t ret = 0;
if ((ret = tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
mError("stb:%s, failed to tNameFromString since %s", alterReq.name, tstrerror(ret));
@ -2779,8 +2779,8 @@ _OVER:
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
@ -2839,8 +2839,8 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2945,7 +2945,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
code = mndDropStb(pMnode, pReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
SName name = {0};
SName name = {0};
int32_t ret = 0;
if ((ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
mError("stb:%s, failed to tNameFromString since %s", dropReq.name, tstrerror(ret));
@ -3016,7 +3016,7 @@ _OVER:
mndReleaseUser(pMnode, pUser);
tFreeSTableMetaRsp(&metaRsp);
//TODO change to TAOS_RETURN
// TODO change to TAOS_RETURN
return code;
}
@ -3562,7 +3562,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -4259,7 +4259,9 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
if (code) goto _OVER;
}
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) code = 0;
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
_OVER:
tFreeSMDropTbsReq(&dropReq);
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
@ -4458,7 +4460,7 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
if (code) goto _end;
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = 0;
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_end:
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
tDecoderClear(&decoder);

View File

@ -165,68 +165,62 @@ void mndCleanupStream(SMnode *pMnode) {
}
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
int32_t code = 0;
int32_t lino = 0;
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
int32_t code = 0;
int32_t lino = 0;
SSdbRow * pRow = NULL;
SStreamObj *pStream = NULL;
void *buf = NULL;
void * buf = NULL;
int8_t sver = 0;
int32_t tlen;
int32_t dataPos = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
goto STREAM_DECODE_OVER;
}
code = sdbGetRawSoftVer(pRaw, &sver);
TSDB_CHECK_CODE(code, lino, _over);
if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
terrno = 0;
mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
goto STREAM_DECODE_OVER;
goto _over;
}
pRow = sdbAllocRow(sizeof(SStreamObj));
if (pRow == NULL) {
goto STREAM_DECODE_OVER;
}
TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
pStream = sdbGetRowObj(pRow);
if (pStream == NULL) {
goto STREAM_DECODE_OVER;
}
TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
int32_t tlen;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
buf = taosMemoryMalloc(tlen + 1);
if (buf == NULL) {
goto STREAM_DECODE_OVER;
}
TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
SDecoder decoder;
tDecoderInit(&decoder, buf, tlen + 1);
if (tDecodeSStreamObj(&decoder, pStream, sver) < 0) {
tDecoderClear(&decoder);
goto STREAM_DECODE_OVER;
}
code = tDecodeSStreamObj(&decoder, pStream, sver);
tDecoderClear(&decoder);
terrno = TSDB_CODE_SUCCESS;
STREAM_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
char *p = (pStream == NULL) ? "null" : pStream->name;
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, tstrerror(terrno));
taosMemoryFreeClear(pRow);
return NULL;
if (code < 0) {
tFreeStreamObj(pStream);
}
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
pStream->checkpointId);
return pRow;
_over:
taosMemoryFreeClear(buf);
if (code != TSDB_CODE_SUCCESS) {
char *p = (pStream == NULL) ? "null" : pStream->name;
mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
taosMemoryFreeClear(pRow);
terrno = code;
return NULL;
} else {
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
pStream->checkpointId);
terrno = 0;
return pRow;
}
}
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {

View File

@ -324,7 +324,9 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
size_t len = 0;
void *pKey = taosHashGetKey(pDb, &len);
tstrncpy(p, pKey, 128);
int cpLen = (127 < len) ? 127 : len;
TAOS_STRNCPY(p, pKey, cpLen);
p[cpLen] = '\0';
int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
if (code) {

View File

@ -646,7 +646,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
}
// add to cache.
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, NULL,
TAOS_LRU_PRIORITY_LOW, NULL);
_end:
(void)taosThreadMutexUnlock(pLock);
@ -804,7 +804,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
}
// add to cache.
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, NULL,
TAOS_LRU_PRIORITY_LOW, NULL);
_end:
(void)taosThreadMutexUnlock(pLock);

View File

@ -39,6 +39,100 @@ static void metaDestroyLock(SMeta *pMeta) { (void)taosThreadRwlockDestroy(&pMeta
static void metaCleanup(SMeta **ppMeta);
static void doScan(SMeta *pMeta) {
TBC *cursor = NULL;
int32_t code;
// open file to write
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN - 1, "%s%s", pMeta->path, TD_DIRSEP "scan.txt");
TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (fp == NULL) {
metaError("failed to open file:%s, reason:%s", path, tstrerror(terrno));
return;
}
code = tdbTbcOpen(pMeta->pTbDb, &cursor, NULL);
if (code) {
if (taosCloseFile(&fp) != 0) {
metaError("failed to close file:%s, reason:%s", path, tstrerror(terrno));
}
metaError("failed to open table.db cursor, reason:%s", tstrerror(terrno));
return;
}
code = tdbTbcMoveToFirst(cursor);
if (code) {
if (taosCloseFile(&fp) != 0) {
metaError("failed to close file:%s, reason:%s", path, tstrerror(terrno));
}
tdbTbcClose(cursor);
metaError("failed to move to first, reason:%s", tstrerror(terrno));
return;
}
for (;;) {
const void *pKey;
int kLen;
const void *pVal;
int vLen;
if (tdbTbcGet(cursor, &pKey, &kLen, &pVal, &vLen) < 0) {
break;
}
// decode entry
SDecoder dc = {0};
SMetaEntry me = {0};
tDecoderInit(&dc, (uint8_t *)pVal, vLen);
if (metaDecodeEntry(&dc, &me) < 0) {
tDecoderClear(&dc);
break;
}
// skip deleted entry
if (tdbTbGet(pMeta->pUidIdx, &me.uid, sizeof(me.uid), NULL, NULL) == 0) {
// print entry
char buf[1024] = {0};
if (me.type == TSDB_SUPER_TABLE) {
snprintf(buf, sizeof(buf) - 1, "type: super table, version:%" PRId64 " uid: %" PRId64 " name: %s\n", me.version,
me.uid, me.name);
} else if (me.type == TSDB_CHILD_TABLE) {
snprintf(buf, sizeof(buf) - 1,
"type: child table, version:%" PRId64 " uid: %" PRId64 " name: %s suid:%" PRId64 "\n", me.version,
me.uid, me.name, me.ctbEntry.suid);
} else {
snprintf(buf, sizeof(buf) - 1, "type: normal table, version:%" PRId64 " uid: %" PRId64 " name: %s\n",
me.version, me.uid, me.name);
}
if (taosWriteFile(fp, buf, strlen(buf)) < 0) {
metaError("failed to write file:%s, reason:%s", path, tstrerror(terrno));
tDecoderClear(&dc);
break;
}
}
tDecoderClear(&dc);
if (tdbTbcMoveToNext(cursor) < 0) {
break;
}
}
tdbTbcClose(cursor);
// close file
if (taosFsyncFile(fp) < 0) {
metaError("failed to fsync file:%s, reason:%s", path, tstrerror(terrno));
}
if (taosCloseFile(&fp) < 0) {
metaError("failed to close file:%s, reason:%s", path, tstrerror(terrno));
}
}
int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
SMeta *pMeta = NULL;
int32_t code = 0;
@ -134,6 +228,11 @@ int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
code = metaInitTbFilterCache(pMeta);
TSDB_CHECK_CODE(code, lino, _exit);
#if 0
// Do NOT remove this code, it is used to do debug stuff
doScan(pMeta);
#endif
_exit:
if (code) {
metaError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));

View File

@ -16,8 +16,13 @@
#include "tq.h"
#include "vnd.h"
#define MAX_REPEAT_SCAN_THRESHOLD 3
#define SCAN_WAL_IDLE_DURATION 100
#define MAX_REPEAT_SCAN_THRESHOLD 3
#define SCAN_WAL_IDLE_DURATION 100
typedef struct SBuildScanWalMsgParam {
int64_t metaId;
int32_t numOfTasks;
} SBuildScanWalMsgParam;
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
@ -31,13 +36,12 @@ int32_t tqScanWal(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int64_t st = taosGetTimestampMs();
int32_t numOfTasks = 0;
bool shouldIdle = true;
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
// check all tasks
int32_t numOfTasks = 0;
bool shouldIdle = true;
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
if (code) {
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
@ -68,54 +72,61 @@ int32_t tqScanWal(STQ* pTq) {
return code;
}
typedef struct SBuildScanWalMsgParam {
STQ* pTq;
int32_t numOfTasks;
} SBuildScanWalMsgParam;
static void doStartScanWal(void* param, void* tmrId) {
int32_t vgId = 0;
STQ* pTq = NULL;
int32_t code = 0;
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
STQ* pTq = pParam->pTq;
int32_t vgId = pTq->pStreamMeta->vgId;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId);
if (pMeta == NULL) {
tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
taosMemoryFree(pParam);
return;
}
vgId = pMeta->vgId;
pTq = pMeta->ahandle;
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
pTq->pVnode->restored);
int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
taosMemoryFree(pParam);
code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
if (code) {
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
}
code = taosReleaseRef(streamMetaId, pParam->metaId);
if (code) {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code));
}
taosMemoryFree(pParam);
}
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
tmr_h pTimer = NULL;
SBuildScanWalMsgParam* pParam = NULL;
SBuildScanWalMsgParam* pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
if (pParam == NULL) {
return terrno;
}
pParam->pTq = pTq;
pParam->metaId = pMeta->rid;
pParam->numOfTasks = numOfTasks;
tmr_h pTimer = NULL;
code = streamTimerGetInstance(&pTimer);
if (code) {
tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
return code;
}
if (pMeta->scanInfo.scanTimer == NULL) {
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer);
taosMemoryFree(pParam);
} else {
bool ret = taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
if (!ret) {
// tqError("vgId:%d failed to start scan wal in:%dms", vgId, idleDuration);
}
streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut");
}
return code;
@ -124,8 +135,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
bool alreadyRestored = pTq->pVnode->restored;
bool alreadyRestored = pTq->pVnode->restored;
int32_t numOfTasks = 0;
// do not launch the stream tasks, if it is a follower or not restored vnode.
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
@ -134,7 +145,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
streamMetaWLock(pMeta);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
streamMetaWUnLock(pMeta);
@ -389,7 +400,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
}
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL || code != 0) {
continue;
}

View File

@ -597,6 +597,13 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
taosMemoryFree(value);
}
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
SLastCol *pLastCol = (SLastCol *)value;
pLastCol->dirty = 0;
}
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
int32_t code = 0, lino = 0;
@ -606,27 +613,10 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
SLastCol emptyCol = {
.rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
if (!pLastCol) {
return terrno;
}
size_t charge = 0;
*pLastCol = emptyCol;
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLastCol, &charge));
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
TAOS_LRU_PRIORITY_LOW, pTsdb);
if (status != TAOS_LRU_STATUS_OK) {
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
code = TSDB_CODE_FAILED;
pLastCol = NULL;
}
_exit:
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(pLastCol);
code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
}
TAOS_RETURN(code);
@ -1071,40 +1061,6 @@ typedef struct {
SLastKey key;
} SIdxKey;
static int32_t tsdbCacheUpdateValue(SValue *pOld, SValue *pNew) {
uint8_t *pFree = NULL;
int nData = 0;
if (IS_VAR_DATA_TYPE(pOld->type)) {
pFree = pOld->pData;
nData = pOld->nData;
}
*pOld = *pNew;
if (IS_VAR_DATA_TYPE(pNew->type)) {
if (nData < pNew->nData) {
pOld->pData = taosMemoryCalloc(1, pNew->nData);
if (!pOld->pData) {
return terrno;
}
} else {
pOld->pData = pFree;
pFree = NULL;
}
if (pNew->nData) {
memcpy(pOld->pData, pNew->pData, pNew->nData);
} else {
pFree = pOld->pData;
pOld->pData = NULL;
}
}
taosMemoryFreeClear(pFree);
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
// update rowkey
pLastCol->rowKey.ts = TSKEY_MIN;
@ -1128,11 +1084,7 @@ static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus ca
}
pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
if (!pLastCol->dirty) {
pLastCol->dirty = 1;
}
pLastCol->dirty = 1;
pLastCol->cacheStatus = cacheStatus;
}
@ -1155,7 +1107,7 @@ static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol
TAOS_RETURN(code);
}
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
int32_t code = 0, lino = 0;
SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
@ -1165,11 +1117,11 @@ static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLa
size_t charge = 0;
*pLRULastCol = *pLastCol;
pLRULastCol->dirty = 1;
pLRULastCol->dirty = dirty;
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
code = TSDB_CODE_FAILED;
@ -1216,8 +1168,9 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
SLastCol newLastCol = {.rowKey = *pRowKey, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
code = tsdbCachePutToLRU(pTsdb, key, &newLastCol);
SLastCol newLastCol = {
.rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
}
}
@ -1296,7 +1249,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
SLastCol *pToFree = pLastCol;
if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol)) != TSDB_CODE_SUCCESS) {
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
tstrerror(code));
taosMemoryFreeClear(pToFree);
@ -1319,14 +1272,14 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
}
if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
tstrerror(code));
taosMemoryFreeClear(pToFree);
break;
}
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
tstrerror(code));
taosMemoryFreeClear(pToFree);
@ -1681,30 +1634,14 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
continue;
}
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
if (!pTmpLastCol) {
TAOS_CHECK_EXIT(terrno);
}
size_t charge = 0;
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(pLastCol);
// store result back to rocks cache
code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
TAOS_CHECK_EXIT(code);
}
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
TAOS_LRU_PRIORITY_LOW, pTsdb);
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
pLastCol = NULL;
TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
}
// store result back to rocks cache
code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
TAOS_CHECK_EXIT(code);
@ -1779,18 +1716,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
SLastCol *pToFree = pLastCol;
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
if (!pTmpLastCol) {
taosMemoryFreeClear(pToFree);
TAOS_CHECK_EXIT(terrno);
}
size_t charge = 0;
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFreeClear(pLastCol);
code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
taosMemoryFreeClear(pToFree);
TAOS_CHECK_EXIT(code);
}
@ -1798,20 +1727,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
SLastCol lastCol = *pLastCol;
code = tsdbCacheReallocSLastCol(&lastCol, NULL);
if (TSDB_CODE_SUCCESS != code) {
tsdbCacheFreeSLastColItem(pLastCol);
taosMemoryFreeClear(pLastCol);
taosMemoryFreeClear(pToFree);
TAOS_CHECK_EXIT(code);
}
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
taosMemoryFreeClear(pToFree);
TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
}
taosArraySet(pLastArray, idxKey->idx, &lastCol);
taosArrayRemove(remainCols, j);
taosArrayRemove(ignoreFromRocks, j);
@ -1999,8 +1918,9 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
.dirty = 1,
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol);
code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
}
if (taosLRUCacheRelease(pTsdb->lruCache, h, false) != TSDB_CODE_SUCCESS) {
tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
@ -2065,6 +1985,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
.dirty = 0,
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
@ -2072,7 +1993,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
goto _exit;
}
if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pLastCol);
tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
goto _exit;
@ -3660,7 +3581,7 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle)
size_t charge = tsS3BlockSize * pFD->szPage;
_taos_lru_deleter_t deleter = deleteBCache;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
// code = -1;
}
@ -3703,7 +3624,7 @@ void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *
memcpy(pPg, pPage, charge);
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
// ignore cache updating if not ok
// code = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -71,6 +71,9 @@ static int32_t tsdbSttLvlInitRef(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lv
}
code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj1);
if (code) {
if (tsdbTFileObjUnref(fobj1) != 0) {
tsdbError("failed to unref file obj, fobj:%p", fobj1);
}
tsdbSttLvlClear(lvl);
return code;
}

View File

@ -812,6 +812,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
if (!COL_VAL_IS_NONE(pColVal)) {
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (!pTColVal) return terrno;
if (!COL_VAL_IS_NULL(pColVal)) {
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
if (code) return code;

View File

@ -73,7 +73,7 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
if (rsp.pCont == NULL) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) != 0) {
if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) < 0) {
vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
}
rsp.contLen = contLen;

View File

@ -1583,7 +1583,7 @@ int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
SCtgTSMACache *pCtgCache = NULL;
(void)tNameGetFullDbName(pName, dbFName);
CTG_ERR_JRET(ctgGetDBCache(pCtg, dbFName, &pDbCache));
CTG_ERR_JRET(ctgAcquireDBCache(pCtg, dbFName, &pDbCache));
if (NULL == pDbCache || !pDbCache->tsmaCache) {
goto _return;
}
@ -1608,11 +1608,14 @@ int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
code = createDropAllTbTsmaCtgCacheOp(pCtg, pCache, syncOp, &pOp);
}
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
pCtgCache = NULL;
ctgReleaseDBCache(pCtg, pDbCache);
pDbCache = NULL;
CTG_ERR_JRET(code);
CTG_ERR_JRET(ctgEnqueue(pCtg, pOp));
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
return TSDB_CODE_SUCCESS;
@ -1621,6 +1624,9 @@ _return:
if (pCtgCache) {
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
}
if (pDbCache) {
ctgReleaseDBCache(pCtg, pDbCache);
}
if (pOp) {
taosMemoryFree(pOp->data);
taosMemoryFree(pOp);
@ -3996,17 +4002,20 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
if (pCache->retryFetch || hasOutOfDateTSMACache(pCache->pTsmas)) {
CTG_UNLOCK(CTG_READ, &pCache->tsmaLock);
taosHashRelease(dbCache->tsmaCache, pCache);
ctgDebug("tsma for tb: %s.%s not in cache", tsmaSourceTbName.tname, dbFName);
CTG_ERR_JRET(ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName));
if (NULL == taosArrayPush(pCtx->pResList, &(SMetaRes){0})) {
taosHashRelease(dbCache->tsmaCache, pCache);
CTG_ERR_JRET(terrno);
}
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1);
CTG_LOCK(CTG_WRITE, &pCache->tsmaLock);
pCache->retryFetch = false;
CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock);
taosHashRelease(dbCache->tsmaCache, pCache);
continue;
}

View File

@ -75,6 +75,8 @@ void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup);
int winPosCmprImpl(const void* pKey1, const void* pKey2);
void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
#ifdef __cplusplus
}
#endif

View File

@ -798,7 +798,6 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
int32_t lino = 0;
int64_t curOwner = 0;
*pRes = NULL;
@ -846,7 +845,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
blockDataCheck(*pRes, false);

View File

@ -687,10 +687,10 @@ int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResu
code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + pCtx[j].resultInfo->numOfRes);
QUERY_CHECK_CODE(code, lino, _end);
int32_t winCode = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(winCode)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(winCode));
QUERY_CHECK_CODE(winCode, lino, _end);
code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TSDB_CODE_SUCCESS != code) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
QUERY_CHECK_CODE(code, lino, _end);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing
@ -1301,10 +1301,17 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera
freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM);
pOperator->pDownstreamGetParams[idx] = NULL;
}
if (code) {
qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code));
}
return code;
}
code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
if (code) {
qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code));
}
return code;
}

View File

@ -86,11 +86,13 @@ static void destroyGroupOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
cleanupExprSupp(&pInfo->scalarSup);
if (pInfo->pOperator) {
if (pInfo->pOperator != NULL) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
}
cleanupGroupResInfo(&pInfo->groupResInfo);
cleanupAggSup(&pInfo->aggSup);
taosMemoryFreeClear(param);

View File

@ -67,6 +67,9 @@ int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
}
return code;
}

View File

@ -737,7 +737,7 @@ _end:
if (NULL != pVal) {
insertRet = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
sizeof(STableCachedVal), freeCachedMetaItem, NULL, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
if (insertRet != TAOS_LRU_STATUS_OK) {
qWarn("failed to put meta into lru cache, code:%d, %s", insertRet, idStr);
}
@ -890,7 +890,7 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) {
} else {
int32_t code = taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qDebug("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
}
}
@ -1380,8 +1380,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
if (code != TSDB_CODE_SUCCESS) {
taosRUnLockLatch(&pTaskInfo->lock);
lino = __LINE__;
goto _end;
TSDB_CHECK_CODE(code, lino, _end);
}
if (pInfo->currentTable >= numOfTables) {
@ -1393,11 +1392,11 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
if (!tmp) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
taosRUnLockLatch(&pTaskInfo->lock);
(*ppRes) = NULL;
return terrno;
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
tInfo = *tmp;
taosRUnLockLatch(&pTaskInfo->lock);
@ -1412,11 +1411,12 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
}
} else { // scan table group by group sequentially
code = groupSeqTableScan(pOperator, ppRes);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
@ -4492,13 +4492,13 @@ _error:
return code;
}
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
SStorageAPI* pAPI) {
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, SSDataBlock* pRes, SMetaReader* mr, SStorageAPI* pAPI) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
int32_t count = pRes->info.rows;
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
if (!item) {
@ -4558,6 +4558,8 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
} else {
pRes->info.rows++;
}
return code;
@ -4913,26 +4915,23 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
return code;
}
int32_t count = 0;
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
pRes->info.rows = 0;
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
while (pInfo->curPos < size && pRes->info.rows < pOperator->resultInfo.capacity) {
code = doTagScanOneTable(pOperator, pRes, &mr, &pTaskInfo->storageAPI);
if (code != TSDB_CODE_OUT_OF_MEMORY) {
// ignore other error
code = TSDB_CODE_SUCCESS;
}
QUERY_CHECK_CODE(code, lino, _end);
++count;
if (++pInfo->curPos >= size) {
setOperatorCompleted(pOperator);
}
}
pRes->info.rows = count;
pAPI->metaReaderFn.clearReader(&mr);
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
if (bLimitReached) {
@ -6033,9 +6032,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STupleHandle* pTupleHandle = NULL;
blockDataCleanup(pResBlock);
STupleHandle* pTupleHandle = NULL;
while (1) {
while (1) {
pTupleHandle = NULL;

View File

@ -204,15 +204,18 @@ int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle)
* @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
* @param [in, out] pBlock the output block, the group id will be saved in it
* @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
* @retval NULL if no more tuples
*/
static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) {
int32_t code = 0;
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
STupleHandle** pTupleHandle) {
QRY_PARAM_CHECK(pTupleHandle);
int32_t code = 0;
STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
if (!retTuple) {
code = tsortNextTuple(pHandle, &retTuple);
if (code) {
return NULL;
qError("failed to get next tuple, code:%s", tstrerror(code));
return code;
}
}
@ -225,7 +228,8 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
&pInfo->pGroupIdCalc->lastKeysLen, retTuple);
}
bool emptyBlock = pBlock->info.rows == 0;
bool emptyBlock = (pBlock->info.rows == 0);
if (newGroup) {
if (!emptyBlock) {
// new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
@ -247,17 +251,20 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
}
}
return retTuple;
*pTupleHandle = retTuple;
return code;
}
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
QRY_PARAM_CHECK(pResBlock);
blockDataCleanup(pDataBlock);
int32_t lino = 0;
int32_t code = 0;
SSDataBlock* p = NULL;
int32_t lino = 0;
int32_t code = 0;
STupleHandle* pTupleHandle = NULL;
SSDataBlock* p = NULL;
code = tsortGetSortedDataBlock(pHandle, &p);
if (p == NULL || (code != 0)) {
return code;
@ -266,16 +273,15 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
code = blockDataEnsureCapacity(p, capacity);
QUERY_CHECK_CODE(code, lino, _error);
STupleHandle* pTupleHandle;
while (1) {
if (pInfo->pGroupIdCalc) {
pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p);
code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
} else {
code = tsortNextTuple(pHandle, &pTupleHandle);
}
if (pTupleHandle == NULL || code != 0) {
lino = __LINE__;
TSDB_CHECK_CODE(code, lino, _error);
if (pTupleHandle == NULL) {
break;
}
@ -320,7 +326,7 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
return code;
_error:
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
blockDataDestroy(p);
return code;
@ -330,6 +336,9 @@ int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
}
return code;
}

View File

@ -90,7 +90,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
if (isSlidingCountWindow(pAggSup)) {
if (pBuffInfo->winBuffOp == CREATE_NEW_WINDOW) {
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
(void**)&pCurWin->winInfo.pStatePos, &size);
QUERY_CHECK_CODE(code, lino, _end);
@ -101,9 +101,11 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
(void**)&pCurWin->winInfo.pStatePos, &size);
if (winCode == TSDB_CODE_FAILED) {
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
(void**)&pCurWin->winInfo.pStatePos, &size);
QUERY_CHECK_CODE(code, lino, _end);
} else {
reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
}
} else {
pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
@ -111,9 +113,11 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
(void**)&pCurWin->winInfo.pStatePos, &size);
if (winCode == TSDB_CODE_FAILED) {
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
(void**)&pCurWin->winInfo.pStatePos, &size);
QUERY_CHECK_CODE(code, lino, _end);
} else {
reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
}
}
if (ts < pCurWin->winInfo.sessionWin.win.ekey) {

View File

@ -131,9 +131,6 @@ const SSTabFltFuncDef filterDict[] = {
static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta,
size_t size, const char* dbName, int64_t* pRows);
static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time", "columns",
"ttl", "stable_name", "vgroup_id', 'uid", "type"};
static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"};
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
@ -2828,12 +2825,6 @@ _end:
return code;
}
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doBlockInfoScanNext(pOperator, &pRes);
return pRes;
}
static void destroyBlockDistScanOperatorInfo(void* param) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock);
@ -2852,6 +2843,8 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t));
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
taosMemoryFree(pCond->colList);
taosMemoryFree(pCond->pSlotList);
return terrno;
}

View File

@ -278,7 +278,7 @@ bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bo
}
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo, bool genAfterBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t rows = pResBlock->info.rows;
@ -427,7 +427,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break;
}
if (start.key == INT64_MIN || end.key == INT64_MIN) {
if (start.key == INT64_MIN || end.key == INT64_MIN || genAfterBlock) {
colDataSetNULL(pDst, rows);
break;
}
@ -463,8 +463,13 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break;
}
if (genAfterBlock && rows == 0) {
hasInterp = false;
break;
}
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
if (pkey->isNull == false) {
if (pkey->isNull == false && !genAfterBlock) {
code = colDataSetVal(pDst, rows, pkey->pData, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
@ -836,7 +841,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo) &&
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo, false) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
@ -864,7 +869,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
doKeepLinearInfo(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo) &&
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo, false) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
@ -909,13 +914,12 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato
SSDataBlock* pResBlock = pSliceInfo->pRes;
SInterval* pInterval = &pSliceInfo->interval;
if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR ||
pSliceInfo->pPrevGroupKey == NULL) {
if (pSliceInfo->pPrevGroupKey == NULL) {
return;
}
while (pSliceInfo->current <= pSliceInfo->win.ekey) {
(void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo);
(void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo, true);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}

View File

@ -1229,11 +1229,13 @@ static void destroyStateWindowOperatorInfo(void* param) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->stateKey.pData);
if (pInfo->pOperator) {
if (pInfo->pOperator != NULL) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
}
cleanupExprSupp(&pInfo->scalarSup);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup);
@ -1251,13 +1253,17 @@ void destroyIntervalOperatorInfo(void* param) {
if (param == NULL) {
return;
}
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
if (pInfo->pOperator) {
if (pInfo->pOperator != NULL) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
}
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSupp);
@ -1265,6 +1271,7 @@ void destroyIntervalOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pInterpCols);
pInfo->pInterpCols = NULL;
taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
pInfo->pPrevValues = NULL;
@ -1358,6 +1365,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = terrno;
lino = __LINE__;
goto _error;
}
@ -1465,8 +1473,10 @@ _error:
if (pInfo != NULL) {
destroyIntervalOperatorInfo(pInfo);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
return code;
}
@ -1754,11 +1764,13 @@ void destroySWindowOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
if (pInfo->pOperator) {
if (pInfo->pOperator != NULL) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
}
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSupp);

View File

@ -771,7 +771,7 @@ static int32_t getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam*
code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
if (code != TSDB_CODE_SUCCESS) {
return terrno = code;
return code;
}
if (pHandle->pDataBlock->info.rows >= capacity) {
@ -2391,25 +2391,31 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
return code;
}
static void freeSortSource(SSortSource* pSource) {
if (NULL == pSource) {
static void freeSortSource(void* p) {
SSortSource** pSource = (SSortSource**)p;
if (NULL == pSource || NULL == *pSource) {
return;
}
if (!pSource->onlyRef && pSource->param) {
taosMemoryFree(pSource->param);
if ((*pSource)->pageIdList) {
taosArrayDestroy((*pSource)->pageIdList);
}
if (!pSource->onlyRef && pSource->src.pBlock) {
blockDataDestroy(pSource->src.pBlock);
pSource->src.pBlock = NULL;
if (!(*pSource)->onlyRef) {
if ((*pSource)->param) {
taosMemoryFree((*pSource)->param);
}
if ((*pSource)->src.pBlock) {
blockDataDestroy((*pSource)->src.pBlock);
}
}
taosMemoryFree(pSource);
taosMemoryFreeClear(*pSource);
}
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
int32_t code = 0;
int32_t lino = 0;
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
if (p == NULL) {
@ -2417,17 +2423,12 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
}
SSortSource* pSource = *p;
taosArrayRemove(pHandle->pOrderedSource, 0);
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
size_t origSourceCount = taosArrayGetSize(pHandle->pOrderedSource);
while (1) {
SSDataBlock* pBlock = NULL;
code = pHandle->fetchfp(pSource->param, &pBlock);
if (code != 0) {
freeSortSource(pSource);
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
if (pBlock == NULL) {
break;
@ -2441,10 +2442,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
if (code) {
freeSortSource(pSource);
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
}
if (pHandle->beforeFp != NULL) {
@ -2452,43 +2450,30 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
}
code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
freeSortSource(pSource);
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
size_t size = blockDataGetSize(pHandle->pDataBlock);
if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t st = taosGetTimestampUs();
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
freeSortSource(pSource);
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
pHandle->sortElapsed += (taosGetTimestampUs() - st);
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
freeSortSource(pSource);
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
}
}
freeSortSource(pSource);
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
size_t size = blockDataGetSize(pHandle->pDataBlock);
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t st = taosGetTimestampUs();
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
pHandle->sortElapsed += (taosGetTimestampUs() - st);
@ -2501,12 +2486,16 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
pHandle->loops = 1;
pHandle->tupleHandle.rowIndex = -1;
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
return 0;
} else {
code = doAddToBuf(pHandle->pDataBlock, pHandle);
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
taosArrayRemoveBatch(pHandle->pOrderedSource, 0, origSourceCount, freeSortSource);
return code;
}
@ -2880,6 +2869,7 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle
pHandle->tupleHandle.pBlock = NULL;
return code;
}
pHandle->tupleHandle.pBlock = pBlock;
pHandle->tupleHandle.rowIndex = 0;
}
@ -2895,8 +2885,7 @@ int32_t tsortOpen(SSortHandle* pHandle) {
}
if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
code = TSDB_CODE_INVALID_PARA;
return code;
return TSDB_CODE_INVALID_PARA;
}
pHandle->opened = true;

View File

@ -148,7 +148,7 @@ static int64_t idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_
memcpy(buf + total, blk->buf + blkOffset, nread);
LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
TAOS_LRU_PRIORITY_LOW, NULL);
NULL, TAOS_LRU_PRIORITY_LOW, NULL);
if (s != TAOS_LRU_STATUS_OK) {
return -1;
}

View File

@ -28,12 +28,12 @@
} \
} while (0)
#define CHECK_OUT_OF_MEM(p) \
do { \
if (NULL == (p)) { \
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
goto _err; \
} \
#define CHECK_OUT_OF_MEM(p) \
do { \
if (NULL == (p)) { \
pCxt->errCode = terrno; \
goto _err; \
} \
} while (0)
#define CHECK_PARSER_STATUS(pCxt) \

View File

@ -47,6 +47,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
SAstCreateContext cxt;
initAstCreateContext(pParseCxt, &cxt);
void* pParser = ParseAlloc((FMalloc)taosMemoryMalloc);
if (!pParser) return terrno;
int32_t i = 0;
while (1) {
SToken t0 = {0};

View File

@ -61,6 +61,7 @@ typedef enum {
#define SCH_MAX_TASK_TIMEOUT_USEC 300000000
#define SCH_DEFAULT_MAX_RETRY_NUM 6
#define SCH_MIN_AYSNC_EXEC_NUM 3
#define SCH_DEFAULT_RETRY_TOTAL_ROUND 3
typedef struct SSchDebug {
bool lockEnable;

View File

@ -366,9 +366,11 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pEpSet) {
pCtx->roundTotal = pEpSet->numOfEps;
} else {
} else if (pTask->candidateAddrs && taosArrayGetSize(pTask->candidateAddrs) > 0) {
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
pCtx->roundTotal = pAddr->epSet.numOfEps;
} else {
pCtx->roundTotal = SCH_DEFAULT_RETRY_TOTAL_ROUND;
}
} else {
pCtx->roundTotal = 1;

View File

@ -164,7 +164,6 @@ extern void* streamTimer;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId;
extern int32_t streamMetaId;
int32_t streamTimerInit();
void streamTimerCleanUp();

View File

@ -3658,6 +3658,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
if (pCur->iter == NULL) {
streamStateFreeCur(pCur);
return NULL;
}
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
@ -3680,6 +3684,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);

Some files were not shown because too many files have changed in this diff Show More