Merge branch '3.0' of github.com:taosdata/TDengine into docs/td-32339
|
@ -7,7 +7,17 @@ ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
|
|||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
|
||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
|
||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
|
||||
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo firstEp localhost:6030 > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo fqdn localhost >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo serverPort 6030 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo debugFlag 135 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo asyncLog 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo supportVnodes 1024 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo numOfLogLines 300000000 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo logKeepDays -1 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo checkpointInterval 60 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo snodeAddress 127.0.0.1:873 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo monitor 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
|
|
|
@ -27,7 +27,7 @@ PI 系统是一套用于数据收集、查找、分析、传递和可视化的
|
|||
|
||||
在数据写入页面中,点击 **+新增数据源** 按钮,进入新增数据源页面。
|
||||
|
||||

|
||||

|
||||
|
||||
### 基本配置
|
||||
|
||||
|
|
|
@ -208,3 +208,15 @@ CSV 文件中的每个 Row 配置一个 OPC 数据点位。Row 的规则如下
|
|||
### 8. 创建完成
|
||||
|
||||
点击 **提交** 按钮,完成创建 OPC UA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
||||
|
||||
## 增加数据点位
|
||||
|
||||
在任务运行中,点击 **编辑**,点击 **增加数据点位** 按钮,追加数据点位到 CSV 文件中。
|
||||
|
||||

|
||||
|
||||
在弹出的表单中,填写数据点位的信息。
|
||||
|
||||

|
||||
|
||||
点击 **确定** 按钮,完成数据点位的追加。
|
|
@ -182,3 +182,15 @@ CSV 文件中的每个 Row 配置一个 OPC 数据点位。Row 的规则如下
|
|||
### 7. 创建完成
|
||||
|
||||
点击 **提交** 按钮,完成创建 OPC DA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
||||
|
||||
## 增加数据点位
|
||||
|
||||
在任务运行中,点击 **编辑**,点击 **增加数据点位** 按钮,追加数据点位到 CSV 文件中。
|
||||
|
||||

|
||||
|
||||
在弹出的表单中,填写数据点位的信息。
|
||||
|
||||

|
||||
|
||||
点击 **确定** 按钮,完成数据点位的追加。
|
|
@ -33,13 +33,14 @@ TDengine 可以通过 MQTT 连接器从 MQTT 代理订阅数据并将其写入 T
|
|||
|
||||
### 3. 配置连接和认证信息
|
||||
|
||||
在 **MQTT地址** 中填写 MQTT 代理的地址,例如:`192.168.1.42:1883`
|
||||
在 **MQTT 地址** 中填写 MQTT 代理的地址,例如:`192.168.1.42`
|
||||
|
||||
在 **MQTT 端口** 中填写 MQTT 代理的端口,例如:`1883`
|
||||
|
||||
在 **用户** 中填写 MQTT 代理的用户名。
|
||||
|
||||
在 **密码** 中填写 MQTT 代理的密码。
|
||||
|
||||
点击 **连通性检查** 按钮,检查数据源是否可用。
|
||||
|
||||

|
||||
|
||||
|
@ -64,6 +65,8 @@ TDengine 可以通过 MQTT 连接器从 MQTT 代理订阅数据并将其写入 T
|
|||
|
||||
在 **订阅主题及 QoS 配置** 中填写要消费的 Topic 名称。使用如下格式设置: `topic1::0,topic2::1`。
|
||||
|
||||
点击 **检查连通性** 按钮,检查数据源是否可用。
|
||||
|
||||

|
||||
|
||||
### 6. 配置 MQTT Payload 解析
|
||||
|
|
|
@ -102,7 +102,7 @@ kcat <topic> \
|
|||
|
||||
在 **主题** 中填写要消费的 Topic 名称。可以配置多个 Topic , Topic 之间用逗号分隔。例如:`tp1,tp2`。
|
||||
|
||||
在 **Client ID** 中填写客户端标识,填写后会生成带有 `taosx` 前缀的客户端 ID (例如,如果填写的标识为 `foo`,则生成的客户端 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的客户端 ID 形如 `taosx100foo`)。连接到同一个 Kafka 集群的所有客户端 ID 必须保证唯一。
|
||||
在 **Client ID** 中填写客户端标识,填写后会生成带有 `taosx` 前缀的客户端 ID (例如,如果填写的标识为 `foo`,则生成的客户端 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的客户端 ID 形如 `taosx100foo`)。需要注意的是,当使用多个 taosX 订阅同一 Topic 需要进行负载均衡时,必须填写一致的客户端 ID 才能达到均衡效果。
|
||||
|
||||
在 **消费者组 ID** 中填写消费者组标识,填写后会生成带有 `taosx` 前缀的消费者组 ID (例如,如果填写的标识为 `foo`,则生成的消费者组 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的消费者组 ID 形如 `taosx100foo`)。
|
||||
|
||||
|
|
Before Width: | Height: | Size: 68 KiB After Width: | Height: | Size: 79 KiB |
Before Width: | Height: | Size: 56 KiB After Width: | Height: | Size: 72 KiB |
Before Width: | Height: | Size: 70 KiB After Width: | Height: | Size: 75 KiB |
Before Width: | Height: | Size: 58 KiB After Width: | Height: | Size: 33 KiB |
Before Width: | Height: | Size: 42 KiB After Width: | Height: | Size: 30 KiB |
Before Width: | Height: | Size: 52 KiB After Width: | Height: | Size: 30 KiB |
Before Width: | Height: | Size: 29 KiB After Width: | Height: | Size: 24 KiB |
Before Width: | Height: | Size: 37 KiB After Width: | Height: | Size: 104 KiB |
Before Width: | Height: | Size: 74 KiB After Width: | Height: | Size: 40 KiB |
Before Width: | Height: | Size: 53 KiB After Width: | Height: | Size: 68 KiB |
Before Width: | Height: | Size: 26 KiB After Width: | Height: | Size: 65 KiB |
Before Width: | Height: | Size: 23 KiB After Width: | Height: | Size: 72 KiB |
Before Width: | Height: | Size: 43 KiB After Width: | Height: | Size: 99 KiB |
Before Width: | Height: | Size: 73 KiB After Width: | Height: | Size: 148 KiB |
Before Width: | Height: | Size: 35 KiB After Width: | Height: | Size: 98 KiB |
Before Width: | Height: | Size: 54 KiB After Width: | Height: | Size: 70 KiB |
Before Width: | Height: | Size: 27 KiB After Width: | Height: | Size: 49 KiB |
Before Width: | Height: | Size: 49 KiB After Width: | Height: | Size: 128 KiB |
Before Width: | Height: | Size: 35 KiB After Width: | Height: | Size: 97 KiB |
Before Width: | Height: | Size: 46 KiB After Width: | Height: | Size: 71 KiB |
Before Width: | Height: | Size: 52 KiB After Width: | Height: | Size: 92 KiB |
Before Width: | Height: | Size: 93 KiB After Width: | Height: | Size: 120 KiB |
Before Width: | Height: | Size: 115 KiB After Width: | Height: | Size: 142 KiB |
Before Width: | Height: | Size: 54 KiB After Width: | Height: | Size: 74 KiB |
Before Width: | Height: | Size: 73 KiB After Width: | Height: | Size: 124 KiB |
After Width: | Height: | Size: 175 KiB |
After Width: | Height: | Size: 67 KiB |
Before Width: | Height: | Size: 22 KiB After Width: | Height: | Size: 33 KiB |
Before Width: | Height: | Size: 18 KiB After Width: | Height: | Size: 16 KiB |
Before Width: | Height: | Size: 21 KiB |
Before Width: | Height: | Size: 35 KiB |
Before Width: | Height: | Size: 22 KiB After Width: | Height: | Size: 31 KiB |
Before Width: | Height: | Size: 57 KiB After Width: | Height: | Size: 42 KiB |
Before Width: | Height: | Size: 23 KiB |
Before Width: | Height: | Size: 11 KiB After Width: | Height: | Size: 31 KiB |
Before Width: | Height: | Size: 44 KiB |
Before Width: | Height: | Size: 92 KiB After Width: | Height: | Size: 51 KiB |
Before Width: | Height: | Size: 32 KiB After Width: | Height: | Size: 31 KiB |
After Width: | Height: | Size: 17 KiB |
Before Width: | Height: | Size: 27 KiB After Width: | Height: | Size: 19 KiB |
Before Width: | Height: | Size: 42 KiB After Width: | Height: | Size: 32 KiB |
Before Width: | Height: | Size: 37 KiB After Width: | Height: | Size: 47 KiB |
Before Width: | Height: | Size: 66 KiB After Width: | Height: | Size: 61 KiB |
Before Width: | Height: | Size: 22 KiB After Width: | Height: | Size: 41 KiB |
Before Width: | Height: | Size: 48 KiB After Width: | Height: | Size: 66 KiB |
|
@ -239,40 +239,45 @@ d4,2017-07-14T10:40:00.006+08:00,-2.740636,10,-0.893545,7,California.LosAngles
|
|||
|
||||
- `plugins_home`:外部数据源连接器所在目录。
|
||||
- `data_dir`:数据文件存放目录。
|
||||
- `logs_home`:日志文件存放目录,`taosX` 日志文件的前缀为 `taosx.log`,外部数据源有自己的日志文件名前缀。
|
||||
- `log_level`:日志等级,可选级别包括 `error`、`warn`、`info`、`debug`、`trace`,默认值为 `info`。
|
||||
- `log_keep_days`:日志的最大存储天数,`taosX` 日志将按天划分为不同的文件。
|
||||
- `instanceId`:当前 explorer 服务的实例 ID,如果同一台机器上启动了多个 explorer 实例,必须保证各个实例的实例 ID 互不相同。
|
||||
- `logs_home`:日志文件存放目录,`taosX` 日志文件的前缀为 `taosx.log`,外部数据源有自己的日志文件名前缀。已弃用,请使用 `log.path` 代替。
|
||||
- `log_level`:日志等级,可选级别包括 `error`、`warn`、`info`、`debug`、`trace`,默认值为 `info`。已弃用,请使用 `log.level` 代替。
|
||||
- `log_keep_days`:日志的最大存储天数,`taosX` 日志将按天划分为不同的文件。已弃用,请使用 `log.keepDays` 代替。
|
||||
- `jobs`:每个运行时的最大线程数。在服务模式下,线程总数为 `jobs*2`,默认线程数为`当前服务器内核*2`。
|
||||
- `serve.listen`:是 `taosX` REST API 监听地址,默认值为 `0.0.0.0:6050`。
|
||||
- `serve.database_url`:`taosX` 数据库的地址,格式为 `sqlite:<path>`。
|
||||
- `serve.request_timeout`:全局接口 API 超时时间。
|
||||
- `monitor.fqdn`:`taosKeeper` 服务的 FQDN,没有默认值,置空则关闭监控功能。
|
||||
- `monitor.port`:`taosKeeper` 服务的端口,默认`6043`。
|
||||
- `monitor.interval`:向 `taosKeeper` 发送指标的频率,默认为每 10 秒一次,只有 1 到 10 之间的值才有效。
|
||||
- `log.path`:日志文件存放的目录。
|
||||
- `log.level`:日志级别,可选值为 "error", "warn", "info", "debug", "trace"。
|
||||
- `log.compress`:日志文件滚动后的文件是否进行压缩。
|
||||
- `log.rotationCount`:日志文件目录下最多保留的文件数,超出数量的旧文件被删除。
|
||||
- `log.rotationSize`:触发日志文件滚动的文件大小(单位为字节),当日志文件超出此大小后会生成一个新文件,新的日志会写入新文件。
|
||||
- `log.reservedDiskSize`:日志所在磁盘停止写入日志的阈值(单位为字节),当磁盘剩余空间达到此大小后停止写入日志。
|
||||
- `log.keepDays`:日志文件保存的天数,超过此天数的旧日志文件会被删除。
|
||||
- `log.watching`:是否对日志文件中 `log.loggers` 配置内容的变更进行监听并尝试重载。
|
||||
- `log.loggers`:指定模块的日志输出级别,格式为 `"modname" = "level"`,同时适配 tracing 库语法,可以根据 `modname[span{field=value}]=level`,其中 `level` 为日志级别。
|
||||
|
||||
如下所示:
|
||||
|
||||
```toml
|
||||
# plugins home
|
||||
#plugins_home = "/usr/local/taos/plugins" # on linux/macOS
|
||||
#plugins_home = "C:\\TDengine\\plugins" # on windows
|
||||
|
||||
# data dir
|
||||
#data_dir = "/var/lib/taos/taosx" # on linux/macOS
|
||||
#data_dir = "C:\\TDengine\\data\\taosx" # on windows
|
||||
|
||||
# logs home
|
||||
#logs_home = "/var/log/taos" # on linux/macOS
|
||||
#logs_home = "C:\\TDengine\\log" # on windows
|
||||
|
||||
# log level: off/error/warn/info/debug/trace
|
||||
#log_level = "info"
|
||||
|
||||
# log keep days
|
||||
#log_keep_days = 30
|
||||
|
||||
# number of jobs, default to 0, will use `jobs` number of works for TMQ
|
||||
# number of threads used for tokio workers, default to 0 (means cores * 2)
|
||||
#jobs = 0
|
||||
|
||||
# enable OpenTelemetry tracing and metrics exporter
|
||||
#otel = false
|
||||
|
||||
# server instance id
|
||||
#
|
||||
# The instanceId of each instance is unique on the host
|
||||
# instanceId = 16
|
||||
|
||||
[serve]
|
||||
# listen to ip:port address
|
||||
#listen = "0.0.0.0:6050"
|
||||
|
@ -280,13 +285,66 @@ d4,2017-07-14T10:40:00.006+08:00,-2.740636,10,-0.893545,7,California.LosAngles
|
|||
# database url
|
||||
#database_url = "sqlite:taosx.db"
|
||||
|
||||
# default global request timeout which unit is second. This parameter takes effect for certain interfaces that require a timeout setting
|
||||
#request_timeout = 30
|
||||
|
||||
[monitor]
|
||||
# FQDN of taosKeeper service, no default value
|
||||
#fqdn = "localhost"
|
||||
# port of taosKeeper service, default 6043
|
||||
|
||||
# Port of taosKeeper service, default 6043
|
||||
#port = 6043
|
||||
# how often to send metrics to taosKeeper, default every 10 seconds. Only value from 1 to 10 is valid.
|
||||
|
||||
# How often to send metrics to taosKeeper, default every 10 seconds. Only value from 1 to 10 is valid.
|
||||
#interval = 10
|
||||
|
||||
|
||||
# log configuration
|
||||
[log]
|
||||
# All log files are stored in this directory
|
||||
#
|
||||
#path = "/var/log/taos" # on linux/macOS
|
||||
#path = "C:\\TDengine\\log" # on windows
|
||||
|
||||
# log filter level
|
||||
#
|
||||
#level = "info"
|
||||
|
||||
# Compress archived log files or not
|
||||
#
|
||||
#compress = false
|
||||
|
||||
# The number of log files retained by the current explorer server instance in the `path` directory
|
||||
#
|
||||
#rotationCount = 30
|
||||
|
||||
# Rotate when the log file reaches this size
|
||||
#
|
||||
#rotationSize = "1GB"
|
||||
|
||||
# Log downgrade when the remaining disk space reaches this size, only logging `ERROR` level logs
|
||||
#
|
||||
#reservedDiskSize = "1GB"
|
||||
|
||||
# The number of days log files are retained
|
||||
#
|
||||
#keepDays = 30
|
||||
|
||||
# Watching the configuration file for log.loggers changes, default to true.
|
||||
#
|
||||
#watching = true
|
||||
|
||||
# Customize the log output level of modules, and changes will be applied after modifying the file when log.watching is enabled
|
||||
#
|
||||
# ## Examples:
|
||||
#
|
||||
# crate = "error"
|
||||
# crate::mod1::mod2 = "info"
|
||||
# crate::span[field=value] = "warn"
|
||||
#
|
||||
[log.loggers]
|
||||
#"actix_server::accept" = "warn"
|
||||
#"taos::query" = "warn"
|
||||
```
|
||||
|
||||
### 启动
|
||||
|
@ -451,6 +509,16 @@ taosX 会将监控指标上报给 taosKeeper,这些监控指标会被 taosKeep
|
|||
| written_blocks | 本次运行此任务写人成功的 raw block 数 |
|
||||
| failed_blocks | 本次运行此任务写入失败的 raw block 数 |
|
||||
|
||||
### Kafka 数据源相关指标
|
||||
|
||||
| 字段 | 描述 |
|
||||
| ----------------------------- | ---------------------------- |
|
||||
| kafka_consumers | 本次运行任务 Kafka 消费者数 |
|
||||
| kafka_total_partitions | Kafka 主题总分区数 |
|
||||
| kafka_consuming_partitions | 本次运行任务正在消费的分区数 |
|
||||
| kafka_consumed_messages | 本次运行任务已经消费的消息数 |
|
||||
| total_kafka_consumed_messages | 累计消费的消息总数 |
|
||||
|
||||
## taosX 数据解析插件
|
||||
|
||||
接入 kafka / mqtt 消息中间件时,需要对原始数据进行解析,如果使用 json/regex 等模式解析器无法满足解析需求,同时 UDT(自定义解析脚本) 也无法满足性能要求时,可以自定义数据解析插件。
|
||||
|
|
|
@ -12,17 +12,67 @@ sidebar_label: taosX-Agent
|
|||
- `endpoint`: 必填,`taosX` 的 GRPC 服务地址。
|
||||
- `token`: 必填,在 `Explorer` 上创建 `Agent` 时,产生的 Token。
|
||||
- `compression`: 非必填,可配置为 `ture` 或 `false`, 默认为 `false`。配置为`true`, 则开启 `Agent` 和 `taosX` 通信数据压缩。
|
||||
- `log_level`: 非必填,日志级别,默认为 `info`, 同 `taosX` 一样,支持 `error`,`warn`,`info`,`debug`,`trace` 五级。
|
||||
- `log_keep_days`:非必填,日志保存天数,默认为 `30` 天。
|
||||
- `log_level`: 非必填,日志级别,默认为 `info`, 同 `taosX` 一样,支持 `error`,`warn`,`info`,`debug`,`trace` 五级。已弃用,请使用 `log.level` 代替。
|
||||
- `log_keep_days`:非必填,日志保存天数,默认为 `30` 天。已弃用,请使用 `log.keepDays` 代替。
|
||||
- `log.path`:日志文件存放的目录。
|
||||
- `log.level`:日志级别,可选值为 "error", "warn", "info", "debug", "trace"。
|
||||
- `log.compress`:日志文件滚动后的文件是否进行压缩。
|
||||
- `log.rotationCount`:日志文件目录下最多保留的文件数,超出数量的旧文件被删除。
|
||||
- `log.rotationSize`:触发日志文件滚动的文件大小(单位为字节),当日志文件超出此大小后会生成一个新文件,新的日志会写入新文件。
|
||||
- `log.reservedDiskSize`:日志所在磁盘停止写入日志的阈值(单位为字节),当磁盘剩余空间达到此大小后停止写入日志。
|
||||
- `log.keepDays`:日志文件保存的天数,超过此天数的旧日志文件会被删除。
|
||||
|
||||
如下所示:
|
||||
|
||||
```TOML
|
||||
endpoint = "grpc://<taosx-ip>:6055"
|
||||
token = "<token>"
|
||||
compression = true
|
||||
log_level = "info"
|
||||
log_keep_days = 30
|
||||
# taosX service endpoint
|
||||
#
|
||||
#endpoint = "http://localhost:6055"
|
||||
|
||||
# !important!
|
||||
# Uncomment it and copy-paste the token generated in Explorer.
|
||||
#
|
||||
#token = ""
|
||||
|
||||
# server instance id
|
||||
#
|
||||
# The instanceId of each instance is unique on the host
|
||||
# instanceId = 64
|
||||
|
||||
# enable communication data compression between Agent and taosX
|
||||
#
|
||||
#compression = true
|
||||
|
||||
# log configuration
|
||||
[log]
|
||||
# All log files are stored in this directory
|
||||
#
|
||||
#path = "/var/log/taos" # on linux/macOS
|
||||
#path = "C:\\TDengine\\log" # on windows
|
||||
|
||||
# log filter level
|
||||
#
|
||||
#level = "info"
|
||||
|
||||
# Compress archived log files or not
|
||||
#
|
||||
#compress = false
|
||||
|
||||
# The number of log files retained by the current explorer server instance in the `path` directory
|
||||
#
|
||||
#rotationCount = 30
|
||||
|
||||
# Rotate when the log file reaches this size
|
||||
#
|
||||
#rotationSize = "1GB"
|
||||
|
||||
# Log downgrade when the remaining disk space reaches this size, only logging `ERROR` level logs
|
||||
#
|
||||
#reservedDiskSize = "1GB"
|
||||
|
||||
# The number of days log files are retained
|
||||
#
|
||||
#keepDays = 30
|
||||
```
|
||||
|
||||
您不必对配置文件如何设置感到疑惑,阅读并跟随 `Explorer` 中创建 `Agent` 的提示进行操作,您可以对配置文件进行查看、修改和检查。
|
||||
|
|
|
@ -15,36 +15,111 @@ taosEexplorer 无需单独安装,从 TDengine 3.3.0.0 版本开始,它随着
|
|||
在启动 taosExplorer 之前,请确保配置文件中的内容正确。
|
||||
|
||||
```TOML
|
||||
# listen port
|
||||
# This is a automacically generated configuration file for Explorer in [TOML](https://toml.io/) format.
|
||||
#
|
||||
# Here is a full list of available options.
|
||||
|
||||
# Explorer server port to listen on.
|
||||
# Default is 6060.
|
||||
#
|
||||
port = 6060
|
||||
|
||||
# listen address for IPv4
|
||||
# IPv4 listen address.
|
||||
# Default is 0.0.0.0
|
||||
addr = "0.0.0.0"
|
||||
|
||||
# listen address for IPv4
|
||||
#ipv6 = "::1"
|
||||
# IPv6 listen address.
|
||||
|
||||
# log level. Possible: error,warn,info,debug,trace
|
||||
# ipv6 = "::1"
|
||||
|
||||
# explorer server instance id
|
||||
#
|
||||
# The instanceId of each instance is unique on the host
|
||||
# instanceId = 1
|
||||
|
||||
# Explorer server log level.
|
||||
# Default is "info"
|
||||
#
|
||||
# Deprecated: use log.level instead
|
||||
log_level = "info"
|
||||
|
||||
# taosAdapter address.
|
||||
# All data files are stored in this directory
|
||||
# data_dir = "/var/lib/taos/explorer" # Default for Linux
|
||||
# data_dir = "C:\\TDengine\\data\\explorer" # Default for Windows
|
||||
|
||||
# REST API endpoint to connect to the cluster.
|
||||
# This configuration is also the target for data migration tasks.
|
||||
#
|
||||
# Default is "http://localhost:6041" - the default endpoint for REST API.
|
||||
#
|
||||
cluster = "http://localhost:6041"
|
||||
|
||||
# taosX gRPC address
|
||||
# native endpoint to connect to the cluster.
|
||||
# Default is disabled. To enable it, set it to the native API URL like "taos://localhost:6030" and uncomment it.
|
||||
# If you enable it, you will get more performance for data migration tasks.
|
||||
#
|
||||
# cluster_native = "taos://localhost:6030"
|
||||
|
||||
# API endpoint for data replication/backup/data sources. No default option.
|
||||
# Set it to API URL like "http://localhost:6050".
|
||||
#
|
||||
x_api = "http://localhost:6050"
|
||||
|
||||
# GRPC endpoint for "Agent"s.
|
||||
# Default is "http://localhost:6055" - the default endpoint for taosX grpc API.
|
||||
# You should set it to public IP or FQDN name like:
|
||||
# "http://192.168.111.111:6055" or "http://node1.company.domain:6055" and
|
||||
# ensure to add the port to the exception list of the firewall if it enabled.
|
||||
grpc = "http://localhost:6055"
|
||||
|
||||
# CORS configuration switch, it allows cross-origin access
|
||||
cors = false
|
||||
cors = true
|
||||
|
||||
# Enable ssl: if the following two files exist, enable ssl protocol
|
||||
# Enable ssl
|
||||
# If the following two files exist, enable ssl protocol
|
||||
#
|
||||
[ssl]
|
||||
|
||||
# SSL certificate
|
||||
#certificate = "/path/to/ca.file"
|
||||
#
|
||||
# certificate = "/path/to/ca.file" # on linux/macOS
|
||||
# certificate = "C:\\path\\to\\ca.file" # on windows
|
||||
|
||||
# SSL certificate key
|
||||
#certificate_key = "/path/to/key.file"
|
||||
#
|
||||
# certificate_key = "/path/to/key.file" # on linux/macOS
|
||||
# certificate_key = "C:\\path\\to\\key.file" # on windows
|
||||
|
||||
# log configuration
|
||||
[log]
|
||||
# All log files are stored in this directory
|
||||
#
|
||||
# path = "/var/log/taos" # on linux/macOS
|
||||
# path = "C:\\TDengine\\log" # on windows
|
||||
|
||||
# log filter level
|
||||
#
|
||||
# level = "info"
|
||||
|
||||
# Compress archived log files or not
|
||||
#
|
||||
# compress = false
|
||||
|
||||
# The number of log files retained by the current explorer server instance in the `path` directory
|
||||
#
|
||||
# rotationCount = 30
|
||||
|
||||
# Rotate when the log file reaches this size
|
||||
#
|
||||
# rotationSize = "1GB"
|
||||
|
||||
# Log downgrade when the remaining disk space reaches this size, only logging `ERROR` level logs
|
||||
#
|
||||
# reservedDiskSize = "1GB"
|
||||
|
||||
# The number of days log files are retained
|
||||
#
|
||||
# keepDays = 30
|
||||
```
|
||||
|
||||
说明:
|
||||
|
@ -52,13 +127,23 @@ cors = false
|
|||
- `port`:taosExplorer 服务绑定的端口。
|
||||
- `addr`:taosExplorer 服务绑定的 IPv4 地址,默认为 `0.0.0.0`。如需修改,请配置为 `localhost` 之外的地址以对外提供服务。
|
||||
- `ipv6`:taosExplorer 服务绑定的 IPv6 地址,默认不绑定 IPv6 地址。
|
||||
- `log_level`:日志级别,可选值为 "error", "warn", "info", "debug", "trace"。
|
||||
- `instanceId`:当前 explorer 服务的实例 ID,如果同一台机器上启动了多个 explorer 实例,必须保证各个实例的实例 ID 互不相同。
|
||||
- `log_level`:日志级别,可选值为 "error", "warn", "info", "debug", "trace"。此参数已弃用,请使用 `log.level` 代替。
|
||||
- `cluster`:TDengine 集群的 taosAdapter 地址。
|
||||
- `cluster_native`:TDengine 集群的原生连接地址,默认关闭。
|
||||
- `x_api`:taosX 的 gRPC 地址。
|
||||
- `grpc`: taosX 代理向 taosX 建立连接的 gRPC 地址.
|
||||
- `grpc`:taosX 代理向 taosX 建立连接的 gRPC 地址.
|
||||
- `cors`:CORS 配置开关,默认为 `false`。当为 `true` 时,允许跨域访问。
|
||||
- `ssl.certificate`: SSL 证书(如果同时设置了 certificate 与 certificate_key 两个参数,则启用 HTTPS 服务,否则不启用)。
|
||||
- `ssl.certificate_key`: SSL 证书密钥。
|
||||
- `ssl.certificate`:SSL 证书(如果同时设置了 certificate 与 certificate_key 两个参数,则启用 HTTPS 服务,否则不启用)。
|
||||
- `ssl.certificate_key`:SSL 证书密钥。
|
||||
- `log.path`:日志文件存放的目录。
|
||||
- `log.level`:日志级别,可选值为 "error", "warn", "info", "debug", "trace"。
|
||||
- `log.compress`:日志文件滚动后的文件是否进行压缩。
|
||||
- `log.rotationCount`:日志文件目录下最多保留的文件数,超出数量的旧文件被删除。
|
||||
- `log.rotationSize`:触发日志文件滚动的文件大小(单位为字节),当日志文件超出此大小后会生成一个新文件,新的日志会写入新文件。
|
||||
- `log.reservedDiskSize`:日志所在磁盘停止写入日志的阈值(单位为字节),当磁盘剩余空间达到此大小后停止写入日志。
|
||||
- `log.keepDays`:日志文件保存的天数,超过此天数的旧日志文件会被删除。
|
||||
|
||||
|
||||
## 启动停止
|
||||
|
||||
|
|
|
@ -522,6 +522,6 @@ Offset 结构体提供了获取当前消息所属的数据库,主题和分区
|
|||
|
||||
## 附录
|
||||
- Rust 连接器文档:https://docs.rs/taos
|
||||
- Rust 连接器项目地址: https://github.com/taosdata/rust-connector-taos
|
||||
- Rust 连接器项目地址: https://github.com/taosdata/taos-connector-rust
|
||||
- deadpool 连接池: https://crates.io/crates/deadpool
|
||||
- r2d2 连接池: https://crates.io/crates/r2d2
|
||||
|
|
|
@ -382,7 +382,7 @@ typedef struct SStateStore {
|
|||
|
||||
int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount,
|
||||
void** ppVal, int32_t* pVLen, int32_t* pWinCode);
|
||||
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
|
||||
|
||||
int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType,
|
||||
int32_t pkLen, SUpdateInfo** ppInfo);
|
||||
|
|
|
@ -87,7 +87,7 @@ void streamStateFreeVal(void* val);
|
|||
// count window
|
||||
int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal,
|
||||
int32_t* pVLen, int32_t* pWinCode);
|
||||
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
|
||||
|
||||
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key);
|
||||
|
|
|
@ -70,6 +70,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
|
|||
#define SSTREAM_TASK_NEED_CONVERT_VER 2
|
||||
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
|
||||
|
||||
extern int32_t streamMetaId;
|
||||
|
||||
enum {
|
||||
STREAM_STATUS__NORMAL = 0,
|
||||
STREAM_STATUS__STOP,
|
||||
|
@ -135,11 +137,6 @@ enum {
|
|||
STREAM_QUEUE__PROCESSING,
|
||||
};
|
||||
|
||||
enum {
|
||||
STREAM_META_WILL_STOP = 1,
|
||||
STREAM_META_OK_TO_STOP = 2,
|
||||
};
|
||||
|
||||
typedef enum EStreamTaskEvent {
|
||||
TASK_EVENT_INIT = 0x1,
|
||||
TASK_EVENT_INIT_SCANHIST = 0x2,
|
||||
|
@ -282,7 +279,6 @@ typedef enum {
|
|||
} EConsenChkptStatus;
|
||||
|
||||
typedef struct SConsenChkptInfo {
|
||||
// bool alreadySendChkptId;
|
||||
EConsenChkptStatus status;
|
||||
int64_t statusTs;
|
||||
int32_t consenChkptTransId;
|
||||
|
|
|
@ -110,7 +110,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
|
|||
// count window
|
||||
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
|
||||
int32_t* pVLen, int32_t* pWinCode);
|
||||
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
|
||||
|
||||
// function
|
||||
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
|
||||
|
|
|
@ -51,7 +51,12 @@ typedef enum { M2C = 0, C2M } ConvType;
|
|||
#define strtod STR_TO_LD_FUNC_TAOS_FORBID
|
||||
#define strtold STR_TO_D_FUNC_TAOS_FORBID
|
||||
#define strtof STR_TO_F_FUNC_TAOS_FORBID
|
||||
|
||||
#ifdef strndup
|
||||
#undef strndup
|
||||
#endif
|
||||
#define strndup STR_TO_F_FUNC_TAOS_FORBID
|
||||
|
||||
#endif
|
||||
|
||||
#define tstrncpy(dst, src, size) \
|
||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
|||
typedef struct SLRUCache SLRUCache;
|
||||
|
||||
typedef void (*_taos_lru_deleter_t)(const void *key, size_t keyLen, void *value, void *ud);
|
||||
typedef void (*_taos_lru_overwriter_t)(const void *key, size_t keyLen, void *value, void *ud);
|
||||
typedef int (*_taos_lru_functor_t)(const void *key, size_t keyLen, void *value, void *ud);
|
||||
|
||||
typedef struct LRUHandle LRUHandle;
|
||||
|
@ -42,7 +43,8 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
|
|||
void taosLRUCacheCleanup(SLRUCache *cache);
|
||||
|
||||
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
|
||||
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority, void *ud);
|
||||
_taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle,
|
||||
LRUPriority priority, void *ud);
|
||||
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen);
|
||||
void taosLRUCacheErase(SLRUCache *cache, const void *key, size_t keyLen);
|
||||
|
||||
|
|
|
@ -867,6 +867,7 @@ int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t r
|
|||
|
||||
code = blockDataEnsureCapacity(pDst, rowCount);
|
||||
if (code) {
|
||||
blockDataDestroy(pDst);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -297,11 +297,22 @@ static void *vmOpenVnodeInThread(void *param) {
|
|||
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
||||
char path[TSDB_FILENAME_LEN];
|
||||
|
||||
dInfo("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
|
||||
dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
|
||||
setThreadName("open-vnodes");
|
||||
|
||||
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
||||
SWrapperCfg *pCfg = &pThread->pCfgs[v];
|
||||
if (pCfg->dropped) {
|
||||
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
||||
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
|
||||
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||
tmsgReportStartup("vnode-destroy", stepDesc);
|
||||
|
||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
|
||||
vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
|
||||
pThread->updateVnodesList = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
||||
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
|
||||
|
|
|
@ -1012,10 +1012,10 @@ _OVER:
|
|||
|
||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||
TAOS_CHECK_RETURN (mndTransCheckConflict(pMnode, pTrans));
|
||||
TAOS_CHECK_RETURN (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
|
||||
TAOS_CHECK_RETURN (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
|
||||
TAOS_CHECK_RETURN (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
|
||||
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
|
||||
TAOS_CHECK_RETURN(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
|
||||
TAOS_CHECK_RETURN(mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
|
||||
TAOS_CHECK_RETURN(mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -3016,7 +3016,7 @@ _OVER:
|
|||
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
tFreeSTableMetaRsp(&metaRsp);
|
||||
//TODO change to TAOS_RETURN
|
||||
// TODO change to TAOS_RETURN
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -4259,7 +4259,9 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
|
|||
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
|
||||
if (code) goto _OVER;
|
||||
}
|
||||
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) code = 0;
|
||||
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) {
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
_OVER:
|
||||
tFreeSMDropTbsReq(&dropReq);
|
||||
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||
|
@ -4458,7 +4460,7 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
|
|||
|
||||
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
|
||||
if (code) goto _end;
|
||||
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = 0;
|
||||
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
_end:
|
||||
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||
tDecoderClear(&decoder);
|
||||
|
|
|
@ -167,66 +167,60 @@ void mndCleanupStream(SMnode *pMnode) {
|
|||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
SSdbRow *pRow = NULL;
|
||||
SSdbRow * pRow = NULL;
|
||||
SStreamObj *pStream = NULL;
|
||||
void *buf = NULL;
|
||||
void * buf = NULL;
|
||||
int8_t sver = 0;
|
||||
int32_t tlen;
|
||||
int32_t dataPos = 0;
|
||||
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
code = sdbGetRawSoftVer(pRaw, &sver);
|
||||
TSDB_CHECK_CODE(code, lino, _over);
|
||||
|
||||
if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
|
||||
terrno = 0;
|
||||
mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
|
||||
goto STREAM_DECODE_OVER;
|
||||
goto _over;
|
||||
}
|
||||
|
||||
pRow = sdbAllocRow(sizeof(SStreamObj));
|
||||
if (pRow == NULL) {
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
|
||||
|
||||
pStream = sdbGetRowObj(pRow);
|
||||
if (pStream == NULL) {
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
|
||||
|
||||
int32_t tlen;
|
||||
int32_t dataPos = 0;
|
||||
SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER);
|
||||
SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
|
||||
|
||||
buf = taosMemoryMalloc(tlen + 1);
|
||||
if (buf == NULL) {
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
|
||||
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, buf, tlen + 1);
|
||||
if (tDecodeSStreamObj(&decoder, pStream, sver) < 0) {
|
||||
code = tDecodeSStreamObj(&decoder, pStream, sver);
|
||||
tDecoderClear(&decoder);
|
||||
goto STREAM_DECODE_OVER;
|
||||
|
||||
if (code < 0) {
|
||||
tFreeStreamObj(pStream);
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
STREAM_DECODE_OVER:
|
||||
_over:
|
||||
taosMemoryFreeClear(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
char *p = (pStream == NULL) ? "null" : pStream->name;
|
||||
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, tstrerror(terrno));
|
||||
taosMemoryFreeClear(pRow);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
char *p = (pStream == NULL) ? "null" : pStream->name;
|
||||
mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
|
||||
taosMemoryFreeClear(pRow);
|
||||
|
||||
terrno = code;
|
||||
return NULL;
|
||||
} else {
|
||||
mTrace("stream:%s, decode from raw:%p, row:%p, checkpoint:%" PRId64, pStream->name, pRaw, pStream,
|
||||
pStream->checkpointId);
|
||||
|
||||
terrno = 0;
|
||||
return pRow;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
|
||||
|
|
|
@ -324,7 +324,9 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
|||
|
||||
size_t len = 0;
|
||||
void *pKey = taosHashGetKey(pDb, &len);
|
||||
tstrncpy(p, pKey, 128);
|
||||
int cpLen = (127 < len) ? 127 : len;
|
||||
TAOS_STRNCPY(p, pKey, cpLen);
|
||||
p[cpLen] = '\0';
|
||||
|
||||
int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
|
||||
if (code) {
|
||||
|
|
|
@ -646,7 +646,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
|
|||
}
|
||||
|
||||
// add to cache.
|
||||
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
|
||||
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
_end:
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
|
@ -804,7 +804,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
|
|||
}
|
||||
|
||||
// add to cache.
|
||||
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
|
||||
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
_end:
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
|
|
|
@ -39,6 +39,100 @@ static void metaDestroyLock(SMeta *pMeta) { (void)taosThreadRwlockDestroy(&pMeta
|
|||
|
||||
static void metaCleanup(SMeta **ppMeta);
|
||||
|
||||
static void doScan(SMeta *pMeta) {
|
||||
TBC *cursor = NULL;
|
||||
int32_t code;
|
||||
|
||||
// open file to write
|
||||
char path[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(path, TSDB_FILENAME_LEN - 1, "%s%s", pMeta->path, TD_DIRSEP "scan.txt");
|
||||
TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (fp == NULL) {
|
||||
metaError("failed to open file:%s, reason:%s", path, tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
|
||||
code = tdbTbcOpen(pMeta->pTbDb, &cursor, NULL);
|
||||
if (code) {
|
||||
if (taosCloseFile(&fp) != 0) {
|
||||
metaError("failed to close file:%s, reason:%s", path, tstrerror(terrno));
|
||||
}
|
||||
metaError("failed to open table.db cursor, reason:%s", tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
|
||||
code = tdbTbcMoveToFirst(cursor);
|
||||
if (code) {
|
||||
if (taosCloseFile(&fp) != 0) {
|
||||
metaError("failed to close file:%s, reason:%s", path, tstrerror(terrno));
|
||||
}
|
||||
tdbTbcClose(cursor);
|
||||
metaError("failed to move to first, reason:%s", tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
const void *pKey;
|
||||
int kLen;
|
||||
const void *pVal;
|
||||
int vLen;
|
||||
if (tdbTbcGet(cursor, &pKey, &kLen, &pVal, &vLen) < 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
// decode entry
|
||||
SDecoder dc = {0};
|
||||
SMetaEntry me = {0};
|
||||
|
||||
tDecoderInit(&dc, (uint8_t *)pVal, vLen);
|
||||
|
||||
if (metaDecodeEntry(&dc, &me) < 0) {
|
||||
tDecoderClear(&dc);
|
||||
break;
|
||||
}
|
||||
|
||||
// skip deleted entry
|
||||
if (tdbTbGet(pMeta->pUidIdx, &me.uid, sizeof(me.uid), NULL, NULL) == 0) {
|
||||
// print entry
|
||||
char buf[1024] = {0};
|
||||
if (me.type == TSDB_SUPER_TABLE) {
|
||||
snprintf(buf, sizeof(buf) - 1, "type: super table, version:%" PRId64 " uid: %" PRId64 " name: %s\n", me.version,
|
||||
me.uid, me.name);
|
||||
|
||||
} else if (me.type == TSDB_CHILD_TABLE) {
|
||||
snprintf(buf, sizeof(buf) - 1,
|
||||
"type: child table, version:%" PRId64 " uid: %" PRId64 " name: %s suid:%" PRId64 "\n", me.version,
|
||||
me.uid, me.name, me.ctbEntry.suid);
|
||||
} else {
|
||||
snprintf(buf, sizeof(buf) - 1, "type: normal table, version:%" PRId64 " uid: %" PRId64 " name: %s\n",
|
||||
me.version, me.uid, me.name);
|
||||
}
|
||||
|
||||
if (taosWriteFile(fp, buf, strlen(buf)) < 0) {
|
||||
metaError("failed to write file:%s, reason:%s", path, tstrerror(terrno));
|
||||
tDecoderClear(&dc);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tDecoderClear(&dc);
|
||||
|
||||
if (tdbTbcMoveToNext(cursor) < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tdbTbcClose(cursor);
|
||||
|
||||
// close file
|
||||
if (taosFsyncFile(fp) < 0) {
|
||||
metaError("failed to fsync file:%s, reason:%s", path, tstrerror(terrno));
|
||||
}
|
||||
if (taosCloseFile(&fp) < 0) {
|
||||
metaError("failed to close file:%s, reason:%s", path, tstrerror(terrno));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||
SMeta *pMeta = NULL;
|
||||
int32_t code = 0;
|
||||
|
@ -134,6 +228,11 @@ int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
|||
code = metaInitTbFilterCache(pMeta);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
#if 0
|
||||
// Do NOT remove this code, it is used to do debug stuff
|
||||
doScan(pMeta);
|
||||
#endif
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
metaError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
|
||||
|
|
|
@ -19,6 +19,11 @@
|
|||
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
||||
#define SCAN_WAL_IDLE_DURATION 100
|
||||
|
||||
typedef struct SBuildScanWalMsgParam {
|
||||
int64_t metaId;
|
||||
int32_t numOfTasks;
|
||||
} SBuildScanWalMsgParam;
|
||||
|
||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
||||
|
@ -31,13 +36,12 @@ int32_t tqScanWal(STQ* pTq) {
|
|||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
int32_t numOfTasks = 0;
|
||||
bool shouldIdle = true;
|
||||
|
||||
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
|
||||
|
||||
// check all tasks
|
||||
int32_t numOfTasks = 0;
|
||||
bool shouldIdle = true;
|
||||
|
||||
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
|
||||
|
@ -68,54 +72,61 @@ int32_t tqScanWal(STQ* pTq) {
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef struct SBuildScanWalMsgParam {
|
||||
STQ* pTq;
|
||||
int32_t numOfTasks;
|
||||
} SBuildScanWalMsgParam;
|
||||
|
||||
static void doStartScanWal(void* param, void* tmrId) {
|
||||
int32_t vgId = 0;
|
||||
STQ* pTq = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
|
||||
|
||||
STQ* pTq = pParam->pTq;
|
||||
int32_t vgId = pTq->pStreamMeta->vgId;
|
||||
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId);
|
||||
if (pMeta == NULL) {
|
||||
tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
|
||||
taosMemoryFree(pParam);
|
||||
return;
|
||||
}
|
||||
|
||||
vgId = pMeta->vgId;
|
||||
pTq = pMeta->ahandle;
|
||||
|
||||
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
|
||||
pTq->pVnode->restored);
|
||||
|
||||
int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
taosMemoryFree(pParam);
|
||||
|
||||
code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
|
||||
code = taosReleaseRef(streamMetaId, pParam->metaId);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
|
||||
tstrerror(code));
|
||||
}
|
||||
|
||||
taosMemoryFree(pParam);
|
||||
}
|
||||
|
||||
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t code = 0;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
tmr_h pTimer = NULL;
|
||||
SBuildScanWalMsgParam* pParam = NULL;
|
||||
|
||||
SBuildScanWalMsgParam* pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
|
||||
pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
|
||||
if (pParam == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pParam->pTq = pTq;
|
||||
pParam->metaId = pMeta->rid;
|
||||
pParam->numOfTasks = numOfTasks;
|
||||
|
||||
tmr_h pTimer = NULL;
|
||||
code = streamTimerGetInstance(&pTimer);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pMeta->scanInfo.scanTimer == NULL) {
|
||||
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer);
|
||||
taosMemoryFree(pParam);
|
||||
} else {
|
||||
bool ret = taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
|
||||
if (!ret) {
|
||||
// tqError("vgId:%d failed to start scan wal in:%dms", vgId, idleDuration);
|
||||
}
|
||||
streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut");
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -124,8 +135,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
|||
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
|
||||
bool alreadyRestored = pTq->pVnode->restored;
|
||||
int32_t numOfTasks = 0;
|
||||
|
||||
// do not launch the stream tasks, if it is a follower or not restored vnode.
|
||||
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
|
||||
|
@ -134,7 +145,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
|||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
|
|
@ -597,6 +597,13 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
|
|||
taosMemoryFree(value);
|
||||
}
|
||||
|
||||
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
|
||||
SLastCol *pLastCol = (SLastCol *)value;
|
||||
pLastCol->dirty = 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
|
||||
|
||||
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
|
||||
int32_t code = 0, lino = 0;
|
||||
|
||||
|
@ -606,27 +613,10 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
|
|||
SLastCol emptyCol = {
|
||||
.rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||
|
||||
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
if (!pLastCol) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
size_t charge = 0;
|
||||
*pLastCol = emptyCol;
|
||||
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLastCol, &charge));
|
||||
|
||||
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
code = TSDB_CODE_FAILED;
|
||||
pLastCol = NULL;
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFree(pLastCol);
|
||||
code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
|
||||
TAOS_RETURN(code);
|
||||
|
@ -1071,40 +1061,6 @@ typedef struct {
|
|||
SLastKey key;
|
||||
} SIdxKey;
|
||||
|
||||
static int32_t tsdbCacheUpdateValue(SValue *pOld, SValue *pNew) {
|
||||
uint8_t *pFree = NULL;
|
||||
int nData = 0;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pOld->type)) {
|
||||
pFree = pOld->pData;
|
||||
nData = pOld->nData;
|
||||
}
|
||||
|
||||
*pOld = *pNew;
|
||||
if (IS_VAR_DATA_TYPE(pNew->type)) {
|
||||
if (nData < pNew->nData) {
|
||||
pOld->pData = taosMemoryCalloc(1, pNew->nData);
|
||||
if (!pOld->pData) {
|
||||
return terrno;
|
||||
}
|
||||
} else {
|
||||
pOld->pData = pFree;
|
||||
pFree = NULL;
|
||||
}
|
||||
|
||||
if (pNew->nData) {
|
||||
memcpy(pOld->pData, pNew->pData, pNew->nData);
|
||||
} else {
|
||||
pFree = pOld->pData;
|
||||
pOld->pData = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pFree);
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
|
||||
// update rowkey
|
||||
pLastCol->rowKey.ts = TSKEY_MIN;
|
||||
|
@ -1128,11 +1084,7 @@ static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus ca
|
|||
}
|
||||
|
||||
pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
|
||||
|
||||
if (!pLastCol->dirty) {
|
||||
pLastCol->dirty = 1;
|
||||
}
|
||||
|
||||
pLastCol->cacheStatus = cacheStatus;
|
||||
}
|
||||
|
||||
|
@ -1155,7 +1107,7 @@ static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
|
||||
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
|
||||
int32_t code = 0, lino = 0;
|
||||
|
||||
SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
|
@ -1165,11 +1117,11 @@ static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLa
|
|||
|
||||
size_t charge = 0;
|
||||
*pLRULastCol = *pLastCol;
|
||||
pLRULastCol->dirty = 1;
|
||||
pLRULastCol->dirty = dirty;
|
||||
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
|
||||
|
||||
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
|
||||
NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
code = TSDB_CODE_FAILED;
|
||||
|
@ -1216,8 +1168,9 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
||||
int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
|
||||
if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
||||
SLastCol newLastCol = {.rowKey = *pRowKey, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||
code = tsdbCachePutToLRU(pTsdb, key, &newLastCol);
|
||||
SLastCol newLastCol = {
|
||||
.rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||
code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1296,7 +1249,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
SLastCol *pToFree = pLastCol;
|
||||
|
||||
if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
||||
tstrerror(code));
|
||||
taosMemoryFreeClear(pToFree);
|
||||
|
@ -1319,14 +1272,14 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
}
|
||||
|
||||
if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
||||
SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||
SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||
if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
||||
tstrerror(code));
|
||||
taosMemoryFreeClear(pToFree);
|
||||
break;
|
||||
}
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
||||
tstrerror(code));
|
||||
taosMemoryFreeClear(pToFree);
|
||||
|
@ -1681,30 +1634,14 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
continue;
|
||||
}
|
||||
|
||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
if (!pTmpLastCol) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
|
||||
size_t charge = 0;
|
||||
*pTmpLastCol = *pLastCol;
|
||||
pLastCol = pTmpLastCol;
|
||||
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFree(pLastCol);
|
||||
// store result back to rocks cache
|
||||
code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
TAOS_CHECK_EXIT(code);
|
||||
}
|
||||
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
pLastCol = NULL;
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
// store result back to rocks cache
|
||||
code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
|
||||
code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
TAOS_CHECK_EXIT(code);
|
||||
|
@ -1779,18 +1716,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
|||
SLastCol *pToFree = pLastCol;
|
||||
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
|
||||
if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
if (!pTmpLastCol) {
|
||||
taosMemoryFreeClear(pToFree);
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
|
||||
size_t charge = 0;
|
||||
*pTmpLastCol = *pLastCol;
|
||||
pLastCol = pTmpLastCol;
|
||||
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
taosMemoryFreeClear(pToFree);
|
||||
TAOS_CHECK_EXIT(code);
|
||||
}
|
||||
|
@ -1798,20 +1727,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
|||
SLastCol lastCol = *pLastCol;
|
||||
code = tsdbCacheReallocSLastCol(&lastCol, NULL);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tsdbCacheFreeSLastColItem(pLastCol);
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
taosMemoryFreeClear(pToFree);
|
||||
TAOS_CHECK_EXIT(code);
|
||||
}
|
||||
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
|
||||
NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
taosMemoryFreeClear(pToFree);
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
||||
taosArrayRemove(remainCols, j);
|
||||
taosArrayRemove(ignoreFromRocks, j);
|
||||
|
@ -1999,8 +1918,9 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
|
||||
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
|
||||
.colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
|
||||
.dirty = 1,
|
||||
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
|
||||
code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol);
|
||||
code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
|
||||
}
|
||||
if (taosLRUCacheRelease(pTsdb->lruCache, h, false) != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
|
||||
|
@ -2065,6 +1985,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
||||
SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
|
||||
.colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
|
||||
.dirty = 0,
|
||||
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
|
||||
|
||||
if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2072,7 +1993,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
|
@ -3660,7 +3581,7 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle)
|
|||
size_t charge = tsS3BlockSize * pFD->szPage;
|
||||
_taos_lru_deleter_t deleter = deleteBCache;
|
||||
LRUStatus status =
|
||||
taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
// code = -1;
|
||||
}
|
||||
|
@ -3703,7 +3624,7 @@ void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *
|
|||
memcpy(pPg, pPage, charge);
|
||||
|
||||
LRUStatus status =
|
||||
taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
// ignore cache updating if not ok
|
||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -71,6 +71,9 @@ static int32_t tsdbSttLvlInitRef(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lv
|
|||
}
|
||||
code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj1);
|
||||
if (code) {
|
||||
if (tsdbTFileObjUnref(fobj1) != 0) {
|
||||
tsdbError("failed to unref file obj, fobj:%p", fobj1);
|
||||
}
|
||||
tsdbSttLvlClear(lvl);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -812,6 +812,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
|
|||
if (!COL_VAL_IS_NONE(pColVal)) {
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
||||
if (!pTColVal) return terrno;
|
||||
if (!COL_VAL_IS_NULL(pColVal)) {
|
||||
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
|
||||
if (code) return code;
|
||||
|
|
|
@ -73,7 +73,7 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
|
|||
if (rsp.pCont == NULL) {
|
||||
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) != 0) {
|
||||
if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) < 0) {
|
||||
vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
|
||||
}
|
||||
rsp.contLen = contLen;
|
||||
|
|
|
@ -1608,12 +1608,14 @@ int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
|
|||
code = createDropAllTbTsmaCtgCacheOp(pCtg, pCache, syncOp, &pOp);
|
||||
}
|
||||
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
|
||||
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
|
||||
pCtgCache = NULL;
|
||||
ctgReleaseDBCache(pCtg, pDbCache);
|
||||
pDbCache = NULL;
|
||||
|
||||
CTG_ERR_JRET(code);
|
||||
|
||||
CTG_ERR_JRET(ctgEnqueue(pCtg, pOp));
|
||||
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
|
||||
ctgReleaseDBCache(pCtg, pDbCache);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -26,6 +26,8 @@ void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
|
|||
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
|
||||
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
|
||||
|
||||
void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -798,7 +798,6 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
|
|||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
int64_t threadId = taosGetSelfPthreadId();
|
||||
int32_t lino = 0;
|
||||
int64_t curOwner = 0;
|
||||
|
||||
*pRes = NULL;
|
||||
|
@ -846,7 +845,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
|||
int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
|
||||
blockDataCheck(*pRes, false);
|
||||
|
|
|
@ -687,10 +687,10 @@ int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResu
|
|||
code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + pCtx[j].resultInfo->numOfRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
int32_t winCode = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||
if (TAOS_FAILED(winCode)) {
|
||||
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(winCode));
|
||||
QUERY_CHECK_CODE(winCode, lino, _end);
|
||||
code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||
// do nothing
|
||||
|
@ -1301,10 +1301,17 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera
|
|||
freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM);
|
||||
pOperator->pDownstreamGetParams[idx] = NULL;
|
||||
}
|
||||
|
||||
if (code) {
|
||||
qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
|
||||
if (code) {
|
||||
qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -86,11 +86,13 @@ static void destroyGroupOperatorInfo(void* param) {
|
|||
taosArrayDestroy(pInfo->pGroupCols);
|
||||
taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
|
||||
cleanupExprSupp(&pInfo->scalarSup);
|
||||
if (pInfo->pOperator) {
|
||||
|
||||
if (pInfo->pOperator != NULL) {
|
||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
||||
pInfo->pOperator = NULL;
|
||||
}
|
||||
|
||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
taosMemoryFreeClear(param);
|
||||
|
|
|
@ -67,6 +67,9 @@ int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
|||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||
blockDataCheck(*ppBlock, false);
|
||||
if (code) {
|
||||
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -737,7 +737,7 @@ _end:
|
|||
|
||||
if (NULL != pVal) {
|
||||
insertRet = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
|
||||
sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
sizeof(STableCachedVal), freeCachedMetaItem, NULL, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
if (insertRet != TAOS_LRU_STATUS_OK) {
|
||||
qWarn("failed to put meta into lru cache, code:%d, %s", insertRet, idStr);
|
||||
}
|
||||
|
@ -1380,8 +1380,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (pInfo->currentTable >= numOfTables) {
|
||||
|
@ -1393,11 +1392,11 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
|
||||
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
|
||||
if (!tmp) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
(*ppRes) = NULL;
|
||||
return terrno;
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
tInfo = *tmp;
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
|
||||
|
@ -1412,11 +1411,12 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
}
|
||||
} else { // scan table group by group sequentially
|
||||
code = groupSeqTableScan(pOperator, ppRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -5834,9 +5834,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
|||
SOperatorInfo* pOperator) {
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
|
||||
while (1) {
|
||||
while (1) {
|
||||
pTupleHandle = NULL;
|
||||
|
|
|
@ -204,15 +204,18 @@ int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle)
|
|||
* @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
|
||||
* @param [in, out] pBlock the output block, the group id will be saved in it
|
||||
* @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
|
||||
* @retval NULL if no more tuples
|
||||
*/
|
||||
static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
|
||||
STupleHandle** pTupleHandle) {
|
||||
QRY_PARAM_CHECK(pTupleHandle);
|
||||
|
||||
int32_t code = 0;
|
||||
STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
|
||||
if (!retTuple) {
|
||||
code = tsortNextTuple(pHandle, &retTuple);
|
||||
if (code) {
|
||||
return NULL;
|
||||
qError("failed to get next tuple, code:%s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -225,7 +228,8 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
|
|||
newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
|
||||
&pInfo->pGroupIdCalc->lastKeysLen, retTuple);
|
||||
}
|
||||
bool emptyBlock = pBlock->info.rows == 0;
|
||||
|
||||
bool emptyBlock = (pBlock->info.rows == 0);
|
||||
if (newGroup) {
|
||||
if (!emptyBlock) {
|
||||
// new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
|
||||
|
@ -247,17 +251,20 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
|
|||
}
|
||||
}
|
||||
|
||||
return retTuple;
|
||||
*pTupleHandle = retTuple;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
||||
int32_t lino = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
SSDataBlock* p = NULL;
|
||||
|
||||
code = tsortGetSortedDataBlock(pHandle, &p);
|
||||
if (p == NULL || (code != 0)) {
|
||||
return code;
|
||||
|
@ -266,16 +273,15 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
|
|||
code = blockDataEnsureCapacity(p, capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
STupleHandle* pTupleHandle;
|
||||
while (1) {
|
||||
if (pInfo->pGroupIdCalc) {
|
||||
pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p);
|
||||
code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
|
||||
} else {
|
||||
code = tsortNextTuple(pHandle, &pTupleHandle);
|
||||
}
|
||||
|
||||
if (pTupleHandle == NULL || code != 0) {
|
||||
lino = __LINE__;
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -320,7 +326,7 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
|
|||
return code;
|
||||
|
||||
_error:
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
|
||||
blockDataDestroy(p);
|
||||
return code;
|
||||
|
@ -330,6 +336,9 @@ int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
|||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||
blockDataCheck(*ppBlock, false);
|
||||
if (code) {
|
||||
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -90,7 +90,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
|
|||
|
||||
if (isSlidingCountWindow(pAggSup)) {
|
||||
if (pBuffInfo->winBuffOp == CREATE_NEW_WINDOW) {
|
||||
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
|
||||
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
|
||||
(void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -101,9 +101,11 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
|
|||
winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
|
||||
(void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
if (winCode == TSDB_CODE_FAILED) {
|
||||
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
|
||||
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
|
||||
(void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else {
|
||||
reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
|
||||
}
|
||||
} else {
|
||||
pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
|
||||
|
@ -111,9 +113,11 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
|
|||
winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
|
||||
(void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
if (winCode == TSDB_CODE_FAILED) {
|
||||
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
|
||||
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
|
||||
(void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else {
|
||||
reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
|
||||
}
|
||||
}
|
||||
if (ts < pCurWin->winInfo.sessionWin.win.ekey) {
|
||||
|
|
|
@ -1229,11 +1229,13 @@ static void destroyStateWindowOperatorInfo(void* param) {
|
|||
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
taosMemoryFreeClear(pInfo->stateKey.pData);
|
||||
if (pInfo->pOperator) {
|
||||
|
||||
if (pInfo->pOperator != NULL) {
|
||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
||||
pInfo->pOperator = NULL;
|
||||
}
|
||||
|
||||
cleanupExprSupp(&pInfo->scalarSup);
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
|
@ -1251,13 +1253,17 @@ void destroyIntervalOperatorInfo(void* param) {
|
|||
if (param == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
|
||||
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
if (pInfo->pOperator) {
|
||||
|
||||
if (pInfo->pOperator != NULL) {
|
||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
||||
pInfo->pOperator = NULL;
|
||||
}
|
||||
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
cleanupExprSupp(&pInfo->scalarSupp);
|
||||
|
||||
|
@ -1265,6 +1271,7 @@ void destroyIntervalOperatorInfo(void* param) {
|
|||
|
||||
taosArrayDestroy(pInfo->pInterpCols);
|
||||
pInfo->pInterpCols = NULL;
|
||||
|
||||
taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
|
||||
pInfo->pPrevValues = NULL;
|
||||
|
||||
|
@ -1358,6 +1365,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
|
|||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
code = terrno;
|
||||
lino = __LINE__;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -1465,8 +1473,10 @@ _error:
|
|||
if (pInfo != NULL) {
|
||||
destroyIntervalOperatorInfo(pInfo);
|
||||
}
|
||||
|
||||
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
|
||||
pTaskInfo->code = code;
|
||||
qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1754,11 +1764,13 @@ void destroySWindowOperatorInfo(void* param) {
|
|||
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
if (pInfo->pOperator) {
|
||||
|
||||
if (pInfo->pOperator != NULL) {
|
||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
||||
pInfo->pOperator = NULL;
|
||||
}
|
||||
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
cleanupExprSupp(&pInfo->scalarSupp);
|
||||
|
||||
|
|
|
@ -771,7 +771,7 @@ static int32_t getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam*
|
|||
|
||||
code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pHandle->pDataBlock->info.rows >= capacity) {
|
||||
|
@ -2391,25 +2391,31 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void freeSortSource(SSortSource* pSource) {
|
||||
if (NULL == pSource) {
|
||||
static void freeSortSource(void* p) {
|
||||
SSortSource** pSource = (SSortSource**)p;
|
||||
if (NULL == pSource || NULL == *pSource) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!pSource->onlyRef && pSource->param) {
|
||||
taosMemoryFree(pSource->param);
|
||||
if ((*pSource)->pageIdList) {
|
||||
taosArrayDestroy((*pSource)->pageIdList);
|
||||
}
|
||||
|
||||
if (!pSource->onlyRef && pSource->src.pBlock) {
|
||||
blockDataDestroy(pSource->src.pBlock);
|
||||
pSource->src.pBlock = NULL;
|
||||
if (!(*pSource)->onlyRef) {
|
||||
if ((*pSource)->param) {
|
||||
taosMemoryFree((*pSource)->param);
|
||||
}
|
||||
if ((*pSource)->src.pBlock) {
|
||||
blockDataDestroy((*pSource)->src.pBlock);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pSource);
|
||||
taosMemoryFreeClear(*pSource);
|
||||
}
|
||||
|
||||
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||
if (p == NULL) {
|
||||
|
@ -2417,17 +2423,12 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
SSortSource* pSource = *p;
|
||||
|
||||
taosArrayRemove(pHandle->pOrderedSource, 0);
|
||||
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
size_t origSourceCount = taosArrayGetSize(pHandle->pOrderedSource);
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = NULL;
|
||||
code = pHandle->fetchfp(pSource->param, &pBlock);
|
||||
if (code != 0) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
|
@ -2441,10 +2442,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
pHandle->numOfPages = 1024;
|
||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
|
||||
if (code) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (pHandle->beforeFp != NULL) {
|
||||
|
@ -2452,33 +2450,22 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||
if (size > sortBufSize) {
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t st = taosGetTimestampUs();
|
||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
if (code != 0) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pHandle->sortElapsed += (taosGetTimestampUs() - st);
|
||||
|
||||
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
freeSortSource(pSource);
|
||||
return code;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
freeSortSource(pSource);
|
||||
|
||||
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
|
||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||
|
@ -2486,9 +2473,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t st = taosGetTimestampUs();
|
||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
||||
pHandle->sortElapsed += (taosGetTimestampUs() - st);
|
||||
|
@ -2501,12 +2486,16 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
|||
pHandle->loops = 1;
|
||||
pHandle->tupleHandle.rowIndex = -1;
|
||||
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
|
||||
return 0;
|
||||
} else {
|
||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
taosArrayRemoveBatch(pHandle->pOrderedSource, 0, origSourceCount, freeSortSource);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2880,6 +2869,7 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle
|
|||
pHandle->tupleHandle.pBlock = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
pHandle->tupleHandle.pBlock = pBlock;
|
||||
pHandle->tupleHandle.rowIndex = 0;
|
||||
}
|
||||
|
@ -2895,8 +2885,7 @@ int32_t tsortOpen(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
return code;
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
pHandle->opened = true;
|
||||
|
|
|
@ -148,7 +148,7 @@ static int64_t idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_
|
|||
memcpy(buf + total, blk->buf + blkOffset, nread);
|
||||
|
||||
LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
NULL, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
if (s != TAOS_LRU_STATUS_OK) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -61,6 +61,7 @@ typedef enum {
|
|||
#define SCH_MAX_TASK_TIMEOUT_USEC 300000000
|
||||
#define SCH_DEFAULT_MAX_RETRY_NUM 6
|
||||
#define SCH_MIN_AYSNC_EXEC_NUM 3
|
||||
#define SCH_DEFAULT_RETRY_TOTAL_ROUND 3
|
||||
|
||||
typedef struct SSchDebug {
|
||||
bool lockEnable;
|
||||
|
|
|
@ -366,9 +366,11 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
|
|||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||
if (pEpSet) {
|
||||
pCtx->roundTotal = pEpSet->numOfEps;
|
||||
} else {
|
||||
} else if (pTask->candidateAddrs && taosArrayGetSize(pTask->candidateAddrs) > 0) {
|
||||
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
|
||||
pCtx->roundTotal = pAddr->epSet.numOfEps;
|
||||
} else {
|
||||
pCtx->roundTotal = SCH_DEFAULT_RETRY_TOTAL_ROUND;
|
||||
}
|
||||
} else {
|
||||
pCtx->roundTotal = 1;
|
||||
|
|
|
@ -164,7 +164,6 @@ extern void* streamTimer;
|
|||
extern int32_t streamBackendId;
|
||||
extern int32_t streamBackendCfWrapperId;
|
||||
extern int32_t taskDbWrapperId;
|
||||
extern int32_t streamMetaId;
|
||||
|
||||
int32_t streamTimerInit();
|
||||
void streamTimerCleanUp();
|
||||
|
|
|
@ -3657,6 +3657,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
|
|||
pCur->db = wrapper->db;
|
||||
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||
if (pCur->iter == NULL) {
|
||||
streamStateFreeCur(pCur);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char buf[128] = {0};
|
||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||
|
@ -3679,6 +3683,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
|
|||
TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
|
||||
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
|
||||
|
||||
if (!rocksdb_iter_valid(pCur->iter)) {
|
||||
streamStateFreeCur(pCur);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
rocksdb_iter_prev(pCur->iter);
|
||||
if (!rocksdb_iter_valid(pCur->iter)) {
|
||||
streamStateFreeCur(pCur);
|
||||
|
|
|
@ -24,7 +24,8 @@
|
|||
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
|
||||
|
||||
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
|
||||
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
|
||||
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
|
||||
int32_t* totalBlocks);
|
||||
|
||||
bool streamTaskShouldStop(const SStreamTask* pTask) {
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
|
@ -95,17 +96,53 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock,
|
||||
SArray* pRes) {
|
||||
SSDataBlock block = {0};
|
||||
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
|
||||
if (num != 1) {
|
||||
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
void* p = taosArrayGet(pRetrieveBlock->blocks, 0);
|
||||
int32_t code = assignOneDataBlock(&block, p);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
block.info.type = STREAM_PULL_OVER;
|
||||
block.info.childId = pTask->info.selfChildId;
|
||||
|
||||
p = taosArrayPush(pRes, &block);
|
||||
if (p != NULL) {
|
||||
(*pNumOfBlocks) += 1;
|
||||
stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->info.selfChildId, pRetrieveBlock->reqId);
|
||||
} else {
|
||||
code = terrno;
|
||||
stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64" code:%s", pTask->id.idStr,
|
||||
pRetrieveBlock->reqId, tstrerror(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
int32_t size = 0;
|
||||
int32_t numOfBlocks = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
SArray* pRes = NULL;
|
||||
|
||||
*totalBlocks = 0;
|
||||
*totalSize = 0;
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
|
||||
if (pRes == NULL) {
|
||||
pRes = taosArrayInit(4, sizeof(SSDataBlock));
|
||||
}
|
||||
|
@ -115,8 +152,6 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
|
|||
return code;
|
||||
}
|
||||
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
|
||||
if (code == TSDB_CODE_QRY_IN_EXEC) {
|
||||
resetTaskInfo(pExecutor);
|
||||
|
@ -124,6 +159,7 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
|
|||
|
||||
if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
|
||||
stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return code;
|
||||
} else {
|
||||
qResetTaskCode(pExecutor);
|
||||
|
@ -133,33 +169,11 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
|
|||
|
||||
if (output == NULL) {
|
||||
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||
SSDataBlock block = {0};
|
||||
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
|
||||
|
||||
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
|
||||
if (num != 1) {
|
||||
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
|
||||
code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*) pItem, pRes);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
continue;
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return code;
|
||||
}
|
||||
|
||||
block.info.type = STREAM_PULL_OVER;
|
||||
block.info.childId = pTask->info.selfChildId;
|
||||
|
||||
void* p = taosArrayPush(pRes, &block);
|
||||
if (p != NULL) {
|
||||
numOfBlocks += 1;
|
||||
} else {
|
||||
stError("s-task:%s failed to add retrieve block", pTask->id.idStr);
|
||||
}
|
||||
|
||||
stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr,
|
||||
pTask->info.selfChildId, pRetrieveBlock->reqId);
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -189,10 +203,10 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
|
|||
void* p = taosArrayPush(pRes, &block);
|
||||
if (p == NULL) {
|
||||
stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
|
||||
}
|
||||
|
||||
} else {
|
||||
stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
|
||||
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
|
||||
}
|
||||
|
||||
// current output should be dispatched to down stream nodes
|
||||
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
|
||||
|
@ -303,7 +317,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
|||
bool finished = false;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if(pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||
stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
|
||||
}
|
||||
|
@ -718,7 +732,7 @@ int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpoi
|
|||
|
||||
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
|
||||
int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
|
||||
if(code) {
|
||||
if (code) {
|
||||
stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
|
||||
}
|
||||
|
||||
|
|
|
@ -1060,7 +1060,7 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
||||
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
|
||||
SSessionKey* pWinKey = pKey;
|
||||
const TSKEY gap = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1082,21 +1082,27 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
|
|||
int32_t size = taosArrayGetSize(pWinStates);
|
||||
if (size == 0) {
|
||||
void* pFileStore = getStateFileStore(pFileState);
|
||||
void* p = NULL;
|
||||
void* pRockVal = NULL;
|
||||
|
||||
int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &p, pVLen);
|
||||
int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
|
||||
if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
|
||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
|
||||
int32_t valSize = *pVLen;
|
||||
COUNT_TYPE* pWinStateCount = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
|
||||
if ((*pWinStateCount) == winCount) {
|
||||
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else {
|
||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
|
||||
if (!(*pVal)) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
|
||||
pWinKey->win.ekey, code_file);
|
||||
}
|
||||
} else {
|
||||
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
|
||||
taosMemoryFree(p);
|
||||
taosMemoryFree(pRockVal);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -545,6 +545,6 @@ int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey
|
|||
return getCountWinResultBuff(pState->pFileState, pKey, winCount, ppVal, pVLen, pWinCode);
|
||||
}
|
||||
|
||||
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
||||
return createCountWinResultBuff(pState->pFileState, pKey, pVal, pVLen);
|
||||
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
|
||||
return createCountWinResultBuff(pState->pFileState, pKey, winCount, pVal, pVLen);
|
||||
}
|
||||
|
|
|
@ -445,7 +445,9 @@ _end:
|
|||
}
|
||||
|
||||
int32_t clearRowBuff(SStreamFileState* pFileState) {
|
||||
if (pFileState->deleteMark != INT64_MAX) {
|
||||
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
|
||||
}
|
||||
if (isListEmpty(pFileState->freeBuffs)) {
|
||||
return flushRowBuff(pFileState);
|
||||
}
|
||||
|
|
|
@ -677,7 +677,7 @@ static void httpHandleReq(SHttpMsg* msg) {
|
|||
tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reson:%s",
|
||||
cli->addr, cli->port, chanId, cli->seq, uv_strerror(ret));
|
||||
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
|
||||
destroyHttpClient(cli);
|
||||
uv_close((uv_handle_t*)&cli->tcp, httpDestroyClientCb);
|
||||
}
|
||||
TAOS_UNUSED(taosReleaseRef(httpRefMgt, chanId));
|
||||
return;
|
||||
|
|
|
@ -127,13 +127,9 @@ void rpcClose(void* arg) {
|
|||
if (arg == NULL) {
|
||||
return;
|
||||
}
|
||||
if (transRemoveExHandle(transGetInstMgt(), (int64_t)arg) != 0) {
|
||||
tError("failed to remove rpc handle");
|
||||
}
|
||||
TAOS_UNUSED(transRemoveExHandle(transGetInstMgt(), (int64_t)arg));
|
||||
TAOS_UNUSED(transReleaseExHandle(transGetInstMgt(), (int64_t)arg));
|
||||
|
||||
if (transReleaseExHandle(transGetInstMgt(), (int64_t)arg) != 0) {
|
||||
tError("failed to release rpc handle");
|
||||
}
|
||||
tInfo("end to close rpc");
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -3401,16 +3401,11 @@ int32_t transFreeConnById(void* shandle, int64_t transpointId) {
|
|||
}
|
||||
|
||||
_exception:
|
||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
TAOS_UNUSED(transReleaseExHandle(transGetInstMgt(), (int64_t)shandle));
|
||||
if (code != 0) {
|
||||
if (transpointId != 0) {
|
||||
if (transReleaseExHandle(transGetRefMgt(), transpointId) != 0) {
|
||||
tError("failed to release refId %" PRId64 "", transpointId);
|
||||
}
|
||||
|
||||
if (transRemoveExHandle(transGetRefMgt(), transpointId) != 0) {
|
||||
tError("failed to remove refId %" PRId64 "", transpointId);
|
||||
}
|
||||
TAOS_UNUSED(transReleaseExHandle(transGetRefMgt(), transpointId));
|
||||
TAOS_UNUSED(transRemoveExHandle(transGetRefMgt(), transpointId));
|
||||
}
|
||||
taosMemoryFree(pCli);
|
||||
}
|
||||
|
|
|
@ -937,6 +937,7 @@ static int walFindCurMetaVer(SWal* pWal) {
|
|||
TdDirPtr pDir = taosOpenDir(pWal->path);
|
||||
if (pDir == NULL) {
|
||||
wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno));
|
||||
regfree(&walMetaRegexPattern);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
@ -956,6 +957,7 @@ static int walFindCurMetaVer(SWal* pWal) {
|
|||
}
|
||||
if (taosCloseDir(&pDir) != 0) {
|
||||
wError("failed to close dir, ret:%s", tstrerror(terrno));
|
||||
regfree(&walMetaRegexPattern);
|
||||
return terrno;
|
||||
}
|
||||
regfree(&walMetaRegexPattern);
|
||||
|
|
|
@ -40,6 +40,7 @@ enum {
|
|||
struct SLRUEntry {
|
||||
void *value;
|
||||
_taos_lru_deleter_t deleter;
|
||||
_taos_lru_overwriter_t overwriter;
|
||||
void *ud;
|
||||
SLRUEntry *nextHash;
|
||||
SLRUEntry *next;
|
||||
|
@ -403,6 +404,10 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
|
|||
if (old != NULL) {
|
||||
status = TAOS_LRU_STATUS_OK_OVERWRITTEN;
|
||||
|
||||
if (old->overwriter) {
|
||||
(*old->overwriter)(old->keyData, old->keyLength, old->value, old->ud);
|
||||
}
|
||||
|
||||
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
||||
if (!TAOS_LRU_ENTRY_HAS_REFS(old)) {
|
||||
taosLRUCacheShardLRURemove(shard, old);
|
||||
|
@ -440,8 +445,9 @@ _exit:
|
|||
}
|
||||
|
||||
static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash,
|
||||
void *value, size_t charge, _taos_lru_deleter_t deleter, LRUHandle **handle,
|
||||
LRUPriority priority, void *ud) {
|
||||
void *value, size_t charge, _taos_lru_deleter_t deleter,
|
||||
_taos_lru_overwriter_t overwriter, LRUHandle **handle, LRUPriority priority,
|
||||
void *ud) {
|
||||
SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen);
|
||||
if (!e) {
|
||||
if (deleter) {
|
||||
|
@ -453,6 +459,7 @@ static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key,
|
|||
e->value = value;
|
||||
e->flags = 0;
|
||||
e->deleter = deleter;
|
||||
e->overwriter = overwriter;
|
||||
e->ud = ud;
|
||||
e->keyLength = keyLen;
|
||||
e->hash = hash;
|
||||
|
@ -726,12 +733,12 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
|
|||
}
|
||||
|
||||
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
|
||||
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority, void *ud) {
|
||||
_taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, LRUPriority priority, void *ud) {
|
||||
uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
|
||||
uint32_t shardIndex = hash & cache->shardedCache.shardMask;
|
||||
|
||||
return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle,
|
||||
priority, ud);
|
||||
return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, overwriter,
|
||||
handle, priority, ud);
|
||||
}
|
||||
|
||||
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) {
|
||||
|
|