Merge branch 'enh/dv3' into enh/triggerCheckpoint

This commit is contained in:
yihaoDeng 2023-06-08 07:20:41 +00:00
commit 8b91617145
34 changed files with 553 additions and 541 deletions

View File

@ -274,7 +274,7 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_TOOLS "" OFF) option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF) option(WITH_LIBURING "" OFF)
IF (TD_LINUX) IF (TD_LINUX)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
ELSE() ELSE()
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
ENDIF() ENDIF()

View File

@ -16,165 +16,79 @@ TDengine Source Connector is used to read data from TDengine in real-time and se
![TDengine Database Kafka Connector -- streaming integration with kafka connect](kafka/streaming-integration-with-kafka-connect.webp) ![TDengine Database Kafka Connector -- streaming integration with kafka connect](kafka/streaming-integration-with-kafka-connect.webp)
## What is Confluent?
[Confluent](https://www.confluent.io/) adds many extensions to Kafka. include:
1. Schema Registry
2. REST Proxy
3. Non-Java Clients
4. Many packaged Kafka Connect plugins
5. GUI for managing and monitoring Kafka - Confluent Control Center
Some of these extensions are available in the community version of Confluent. Some are only available in the enterprise version.
![TDengine Database Kafka Connector -- Confluent platform](kafka/confluentPlatform.webp)
Confluent Enterprise Edition provides the `confluent` command-line tool to manage various components.
## Prerequisites ## Prerequisites
1. Linux operating system 1. Linux operating system
2. Java 8 and Maven installed 2. Java 8 and Maven installed
3. Git is installed 3. Git/curl/vi is installed
4. TDengine is installed and started. If not, please refer to [Installation and Uninstallation](/operation/pkg-install) 4. TDengine is installed and started. If not, please refer to [Installation and Uninstallation](/operation/pkg-install)
## Install Confluent ## Install Kafka
Confluent provides two installation methods: Docker and binary packages. This article only introduces binary package installation.
Execute in any directory: Execute in any directory:
```` ````
curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz curl -O https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar xzf confluent-7.1.1.tar.gz -C /opt/ tar xzf kafka_2.13-3.4.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.4.0 /opt/kafka
```` ````
Then you need to add the `$CONFLUENT_HOME/bin` directory to the PATH. Then you need to add the `$KAFKA_HOME/bin` directory to the PATH.
```title=".profile" ```title=".profile"
export CONFLUENT_HOME=/opt/confluent-7.1.1 export KAFKA_HOME=/opt/kafka
export PATH=$CONFLUENT_HOME/bin:$PATH export PATH=$PATH:$KAFKA_HOME/bin
``` ```
Users can append the above script to the current user's profile file (~/.profile or ~/.bash_profile) Users can append the above script to the current user's profile file (~/.profile or ~/.bash_profile)
After the installation is complete, you can enter `confluent version` for simple verification:
```
# confluent version
confluent - Confluent CLI
Version: v2.6.1
Git Ref: 6d920590
Build Date: 2022-02-18T06:14:21Z
Go Version: go1.17.6 (linux/amd64)
Development: false
```
## Install TDengine Connector plugin ## Install TDengine Connector plugin
### Install from source code ### Install from source code
``` ```shell
git clone --branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git git clone --branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine cd kafka-connect-tdengine
mvn clean package mvn clean package -Dmaven.test.skip=true
unzip -d $CONFLUENT_HOME/share/java/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip unzip -d $KAFKA_HOME/components/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip
``` ```
The above script first clones the project source code and then compiles and packages it with Maven. After the package is complete, the zip package of the plugin is generated in the `target/components/packages/` directory. Unzip this zip package to plugin path. We used `$CONFLUENT_HOME/share/java/` above because it's a build in plugin path. The above script first clones the project source code and then compiles and packages it with Maven. After the package is complete, the zip package of the plugin is generated in the `target/components/packages/` directory. Unzip this zip package to plugin path. We used `$KAFKA_HOME/components/` above because it's a build in plugin path.
### Install with confluent-hub ### Add configuration file
[Confluent Hub](https://www.confluent.io/hub) provides a service to download Kafka Connect plugins. After TDengine Kafka Connector is published to Confluent Hub, it can be installed using the command tool `confluent-hub`. add kafka-connect-tdengine plugin path to `plugin.path` in `$KAFKA_HOME/config/connect-distributed.properties`.
**TDengine Kafka Connector is currently not officially released and cannot be installed in this way**.
## Start Confluent ```properties
plugin.path=/usr/share/java,/opt/kafka/components
```
confluent local services start
``` ```
:::note ## Start Kafka Services
Be sure to install the plugin before starting Confluent. Otherwise, Kafka Connect will fail to discover the plugins.
:::
:::tip Use command bellow to start all services:
If a component fails to start, try clearing the data and restarting. The data directory will be printed to the console at startup, e.g.:
```title="Console output log" {1} ```shell
Using CONFLUENT_CURRENT: /tmp/confluent.106668 zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
Starting ZooKeeper
ZooKeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
```
To clear data, execute `rm -rf /tmp/confluent.106668`. kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
:::
### Check Confluent Services Status connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
Use command bellow to check the status of all service:
```
confluent local services status
```
The expected output is:
```
Connect is [UP]
Control Center is [UP]
Kafka is [UP]
Kafka REST is [UP]
ksqlDB Server is [UP]
Schema Registry is [UP]
ZooKeeper is [UP]
``` ```
### Check Successfully Loaded Plugin ### Check Successfully Loaded Plugin
After Kafka Connect was completely started, you can use bellow command to check if our plugins are installed successfully: After Kafka Connect was completely started, you can use bellow command to check if our plugins are installed successfully:
```
confluent local services connect plugin list ```shell
curl http://localhost:8083/connectors
``` ```
The output should contains `TDengineSinkConnector` and `TDengineSourceConnector` as bellow: The output as bellow:
```txt
[]
``` ```
Available Connect Plugins:
[
{
"class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
"type": "sink",
"version": "1.0.0"
},
{
"class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
"type": "source",
"version": "1.0.0"
},
......
```
If not, please check the log file of Kafka Connect. To view the log file path, please execute:
```
echo `cat /tmp/confluent.current`/connect/connect.stdout
```
It should produce a path like:`/tmp/confluent.104086/connect/connect.stdout`
Besides log file `connect.stdout` there is a file named `connect.properties`. At the end of this file you can see the effective `plugin.path` which is a series of paths joined by comma. If Kafka Connect not found our plugins, it's probably because the installed path is not included in `plugin.path`.
## The use of TDengine Sink Connector ## The use of TDengine Sink Connector
@ -184,40 +98,47 @@ TDengine Sink Connector internally uses TDengine [modeless write interface](/ref
The following example synchronizes the data of the topic meters to the target database power. The data format is the InfluxDB Line protocol format. The following example synchronizes the data of the topic meters to the target database power. The data format is the InfluxDB Line protocol format.
### Add configuration file ### Add Sink Connector configuration file
``` ```shell
mkdir ~/test mkdir ~/test
cd ~/test cd ~/test
vi sink-demo.properties vi sink-demo.json
``` ```
sink-demo.properties' content is following: sink-demo.json' content is following:
```ini title="sink-demo.properties" ```json title="sink-demo.json"
name=TDengineSinkConnector {
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector "name": "TDengineSinkConnector",
tasks.max=1 "config": {
topics=meters "connector.class":"com.taosdata.kafka.connect.sink.TDengineSinkConnector",
connection.url=jdbc:TAOS://127.0.0.1:6030 "tasks.max": "1",
connection.user=root "topics": "meters",
connection.password=taosdata "connection.url": "jdbc:TAOS://127.0.0.1:6030",
connection.database=power "connection.user": "root",
db.schemaless=line "connection.password": "taosdata",
data.precision=ns "connection.database": "power",
key.converter=org.apache.kafka.connect.storage.StringConverter "db.schemaless": "line",
value.converter=org.apache.kafka.connect.storage.StringConverter "data.precision": "ns",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dead_letter_topic",
"errors.deadletterqueue.topic.replication.factor": 1
}
}
``` ```
Key configuration instructions: Key configuration instructions:
1. `topics=meters` and `connection.database=power` means to subscribe to the data of the topic meters and write to the database power. 1. `"topics": "meters"` and `"connection.database": "power"` means to subscribe to the data of the topic meters and write to the database power.
2. `db.schemaless=line` means the data in the InfluxDB Line protocol format. 2. `"db.schemaless": "line"` means the data in the InfluxDB Line protocol format.
### Create Connector instance ### Create Sink Connector instance
```` ````shell
confluent local services connect connector load TDengineSinkConnector --config ./sink-demo.properties curl -X POST -d @sink-demo.json http://localhost:8083/connectors -H "Content-Type: application/json"
```` ````
If the above command is executed successfully, the output is as follows: If the above command is executed successfully, the output is as follows:
@ -237,7 +158,10 @@ If the above command is executed successfully, the output is as follows:
"tasks.max": "1", "tasks.max": "1",
"topics": "meters", "topics": "meters",
"value.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "TDengineSinkConnector" "name": "TDengineSinkConnector",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dead_letter_topic",
"errors.deadletterqueue.topic.replication.factor": "1",
}, },
"tasks": [], "tasks": [],
"type": "sink" "type": "sink"
@ -258,7 +182,7 @@ meters,location=California.LoSangeles,groupid=3 current=11.3,voltage=221,phase=0
Use kafka-console-producer to write test data to the topic `meters`. Use kafka-console-producer to write test data to the topic `meters`.
``` ```
cat test-data.txt | kafka-console-producer --broker-list localhost:9092 --topic meters cat test-data.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic meters
``` ```
:::note :::note
@ -269,12 +193,12 @@ TDengine Sink Connector will automatically create the database if the target dat
Use the TDengine CLI to verify that the sync was successful. Use the TDengine CLI to verify that the sync was successful.
``` ```sql
taos> use power; taos> use power;
Database changed. Database changed.
taos> select * from meters; taos> select * from meters;
ts | current | voltage | phase | groupid | location | _ts | current | voltage | phase | groupid | location |
=============================================================================================================================================================== ===============================================================================================================================================================
2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles | 2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles | 2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles |
@ -293,29 +217,34 @@ TDengine Source Connector will convert the data in TDengine data table into [Inf
The following sample program synchronizes the data in the database test to the topic tdengine-source-test. The following sample program synchronizes the data in the database test to the topic tdengine-source-test.
### Add configuration file ### Add Source Connector configuration file
``` ```shell
vi source-demo.properties vi source-demo.json
``` ```
Input following content: Input following content:
```ini title="source-demo.properties" ```json title="source-demo.json"
name=TDengineSourceConnector {
connector.class=com.taosdata.kafka.connect.source.TDengineSourceConnector "name":"TDengineSourceConnector",
tasks.max=1 "config":{
connection.url=jdbc:TAOS://127.0.0.1:6030 "connector.class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
connection.username=root "tasks.max": 1,
connection.password=taosdata "connection.url": "jdbc:TAOS://127.0.0.1:6030",
connection.database=test "connection.username": "root",
connection.attempts=3 "connection.password": "taosdata",
connection.backoff.ms=5000 "connection.database": "test",
topic.prefix=tdengine-source- "connection.attempts": 3,
poll.interval.ms=1000 "connection.backoff.ms": 5000,
fetch.max.rows=100 "topic.prefix": "tdengine-source",
key.converter=org.apache.kafka.connect.storage.StringConverter "poll.interval.ms": 1000,
value.converter=org.apache.kafka.connect.storage.StringConverter "fetch.max.rows": 100,
"topic.per.stable": true,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
``` ```
### Prepare test data ### Prepare test data
@ -340,40 +269,40 @@ INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-1
Use TDengine CLI to execute SQL script Use TDengine CLI to execute SQL script
``` ```shell
taos -f prepare-source-data.sql taos -f prepare-source-data.sql
``` ```
### Create Connector instance ### Create Connector instance
```` ```shell
confluent local services connect connector load TDengineSourceConnector --config source-demo.properties curl -X POST -d @source-demo.json http://localhost:8083/connectors -H "Content-Type: application/json"
```` ```
### View topic data ### View topic data
Use the kafka-console-consumer command-line tool to monitor data in the topic tdengine-source-test. In the beginning, all historical data will be output. After inserting two new data into TDengine, kafka-console-consumer immediately outputs the two new data. The output is in InfluxDB line protocol format. Use the kafka-console-consumer command-line tool to monitor data in the topic tdengine-source-test. In the beginning, all historical data will be output. After inserting two new data into TDengine, kafka-console-consumer immediately outputs the two new data. The output is in InfluxDB line protocol format.
```` ````shell
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test-meters
```` ````
output: output:
```` ```txt
...... ......
meters,location="California.SanFrancisco",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000 meters,location="California.SanFrancisco",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000
meters,location="California.SanFrancisco",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000 meters,location="California.SanFrancisco",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000
...... ......
```` ```
All historical data is displayed. Switch to the TDengine CLI and insert two new pieces of data: All historical data is displayed. Switch to the TDengine CLI and insert two new pieces of data:
```` ```sql
USE test; USE test;
INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38); INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38);
INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22); INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);
```` ```
Switch back to kafka-console-consumer, and the command line window has printed out the two pieces of data just inserted. Switch back to kafka-console-consumer, and the command line window has printed out the two pieces of data just inserted.
@ -383,16 +312,16 @@ After testing, use the unload command to stop the loaded connector.
View currently active connectors: View currently active connectors:
```` ```shell
confluent local services connect connector status curl http://localhost:8083/connectors
```` ```
You should now have two active connectors if you followed the previous steps. Use the following command to unload: You should now have two active connectors if you followed the previous steps. Use the following command to unload:
```` ```shell
confluent local services connect connector unload TDengineSinkConnector curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector
confluent local services connect connector unload TDengineSourceConnector curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
```` ```
## Configuration reference ## Configuration reference
@ -430,19 +359,14 @@ The following configuration items apply to TDengine Sink Connector and TDengine
6. `query.interval.ms`: The time range of reading data from TDengine each time, its unit is millisecond. It should be adjusted according to the data flow in rate, the default value is 1000. 6. `query.interval.ms`: The time range of reading data from TDengine each time, its unit is millisecond. It should be adjusted according to the data flow in rate, the default value is 1000.
7. `topic.per.stable`: If it's set to true, it means one super table in TDengine corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>-<stable.name>`; if it's set to false, it means the whole DB corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>`. 7. `topic.per.stable`: If it's set to true, it means one super table in TDengine corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>-<stable.name>`; if it's set to false, it means the whole DB corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>`.
## Other notes ## Other notes
1. To install plugin to a customized location, refer to https://docs.confluent.io/home/connect/self-managed/install.html#install-connector-manually. 1. To use Kafka Connect, refer to <https://kafka.apache.org/documentation/#connect>.
2. To use Kafka Connect without confluent, refer to https://kafka.apache.org/documentation/#connect.
## Feedback ## Feedback
https://github.com/taosdata/kafka-connect-tdengine/issues <https://github.com/taosdata/kafka-connect-tdengine/issues>
## Reference ## Reference
1. https://www.confluent.io/what-is-apache-kafka 1. For more information, see <https://kafka.apache.org/documentation/>
2. https://developer.confluent.io/learn-kafka/kafka-connect/intro
3. https://docs.confluent.io/platform/current/platform.html

View File

@ -16,169 +16,78 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送
![TDengine Database Kafka Connector -- streaming integration with kafka connect](kafka/streaming-integration-with-kafka-connect.webp) ![TDengine Database Kafka Connector -- streaming integration with kafka connect](kafka/streaming-integration-with-kafka-connect.webp)
## 什么是 Confluent
[Confluent](https://www.confluent.io/) 在 Kafka 的基础上增加很多扩展功能。包括:
1. Schema Registry
2. REST 代理
3. 非 Java 客户端
4. 很多打包好的 Kafka Connect 插件
5. 管理和监控 Kafka 的 GUI —— Confluent 控制中心
这些扩展功能有的包含在社区版本的 Confluent 中,有的只有企业版能用。
![TDengine Database Kafka Connector -- Confluent introduction](kafka/confluentPlatform.webp)
Confluent 企业版提供了 `confluent` 命令行工具管理各个组件。
## 前置条件 ## 前置条件
运行本教程中示例的前提条件。 运行本教程中示例的前提条件。
1. Linux 操作系统 1. Linux 操作系统
2. 已安装 Java 8 和 Maven 2. 已安装 Java 8 和 Maven
3. 已安装 Git 3. 已安装 Git、curl、vi
4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](/operation/pkg-install) 4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](/operation/pkg-install)
## 安装 Confluent ## 安装 Kafka
Confluent 提供了 Docker 和二进制包两种安装方式。本文仅介绍二进制包方式安装。
在任意目录下执行: 在任意目录下执行:
``` ```shell
curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz curl -O https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar xzf confluent-7.1.1.tar.gz -C /opt/ tar xzf kafka_2.13-3.4.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.4.0 /opt/kafka
``` ```
然后需要把 `$CONFLUENT_HOME/bin` 目录加入 PATH。 然后需要把 `$KAFKA_HOME/bin` 目录加入 PATH。
```title=".profile" ```title=".profile"
export CONFLUENT_HOME=/opt/confluent-7.1.1 export KAFKA_HOME=/opt/kafka
export PATH=$CONFLUENT_HOME/bin:$PATH export PATH=$PATH:$KAFKA_HOME/bin
``` ```
以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile 以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile
安装完成之后,可以输入`confluent version`做简单验证:
```
# confluent version
confluent - Confluent CLI
Version: v2.6.1
Git Ref: 6d920590
Build Date: 2022-02-18T06:14:21Z
Go Version: go1.17.6 (linux/amd64)
Development: false
```
## 安装 TDengine Connector 插件 ## 安装 TDengine Connector 插件
### 从源码安装 ### 编译插件
``` ```shell
git clone --branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git git clone --branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine cd kafka-connect-tdengine
mvn clean package mvn clean package -Dmaven.test.skip=true
unzip -d $CONFLUENT_HOME/share/java/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip unzip -d $KAFKA_HOME/components/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip
``` ```
以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 `target/components/packages/` 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。上面的示例中使用了内置的插件安装路径: `$CONFLUENT_HOME/share/java/`。 以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 `target/components/packages/` 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。上面的示例中使用了内置的插件安装路径: `$KAFKA_HOME/components/`
### 用 confluent-hub 安装 ### 配置插件
[Confluent Hub](https://www.confluent.io/hub) 提供下载 Kafka Connect 插件的服务。在 TDengine Kafka Connector 发布到 Confluent Hub 后可以使用命令工具 `confluent-hub` 安装。 将 kafka-connect-tdengine 插件加入 `$KAFKA_HOME/config/connect-distributed.properties` 配置文件 plugin.path 中
**TDengine Kafka Connector 目前没有正式发布,不能用这种方式安装**。
## 启动 Confluent ```properties
plugin.path=/usr/share/java,/opt/kafka/components
```
confluent local services start
``` ```
:::note ## 启动 Kafka
一定要先安装插件再启动 Confluent, 否则加载插件会失败。
:::
:::tip ```shell
若某组件启动失败,可尝试清空数据,重新启动。数据目录在启动时将被打印到控制台,比如 zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
```title="控制台输出日志" {1} kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
Using CONFLUENT_CURRENT: /tmp/confluent.106668
Starting ZooKeeper connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
ZooKeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
``` ```
清空数据可执行 `rm -rf /tmp/confluent.106668` ### 验证 kafka Connect 是否启动成功
:::
### 验证各个组件是否启动成功
输入命令: 输入命令:
``` ```shell
confluent local services status curl http://localhost:8083/connectors
``` ```
如果各组件都启动成功,会得到如下输出: 如果各组件都启动成功,会得到如下输出:
```txt
[]
``` ```
Connect is [UP]
Control Center is [UP]
Kafka is [UP]
Kafka REST is [UP]
ksqlDB Server is [UP]
Schema Registry is [UP]
ZooKeeper is [UP]
```
### 验证插件是否安装成功
在 Kafka Connect 组件完全启动后,可用以下命令列出成功加载的插件:
```
confluent local services connect plugin list
```
如果成功安装,会输出如下:
```txt {4,9}
Available Connect Plugins:
[
{
"class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
"type": "sink",
"version": "1.0.0"
},
{
"class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
"type": "source",
"version": "1.0.0"
},
......
```
如果插件安装失败,请检查 Kafka Connect 的启动日志是否有异常信息,用以下命令输出日志路径:
```
echo `cat /tmp/confluent.current`/connect/connect.stdout
```
该命令的输出类似: `/tmp/confluent.104086/connect/connect.stdout`
与日志文件 `connect.stdout` 同一目录,还有一个文件名为: `connect.properties`。在这个文件的末尾,可以看到最终生效的 `plugin.path` 它是一系列用逗号分割的路径。如果插件安装失败,很可能是因为实际的安装路径不包含在 `plugin.path` 中。
## TDengine Sink Connector 的使用 ## TDengine Sink Connector 的使用
@ -188,40 +97,47 @@ TDengine Sink Connector 内部使用 TDengine [无模式写入接口](../../conn
下面的示例将主题 meters 的数据,同步到目标数据库 power。数据格式为 InfluxDB Line 协议格式。 下面的示例将主题 meters 的数据,同步到目标数据库 power。数据格式为 InfluxDB Line 协议格式。
### 添加配置文件 ### 添加 Sink Connector 配置文件
``` ```shell
mkdir ~/test mkdir ~/test
cd ~/test cd ~/test
vi sink-demo.properties vi sink-demo.json
``` ```
sink-demo.properties 内容如下: sink-demo.json 内容如下:
```ini title="sink-demo.properties" ```json title="sink-demo.json"
name=TDengineSinkConnector {
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector "name": "TDengineSinkConnector",
tasks.max=1 "config": {
topics=meters "connector.class":"com.taosdata.kafka.connect.sink.TDengineSinkConnector",
connection.url=jdbc:TAOS://127.0.0.1:6030 "tasks.max": "1",
connection.user=root "topics": "meters",
connection.password=taosdata "connection.url": "jdbc:TAOS://127.0.0.1:6030",
connection.database=power "connection.user": "root",
db.schemaless=line "connection.password": "taosdata",
data.precision=ns "connection.database": "power",
key.converter=org.apache.kafka.connect.storage.StringConverter "db.schemaless": "line",
value.converter=org.apache.kafka.connect.storage.StringConverter "data.precision": "ns",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dead_letter_topic",
"errors.deadletterqueue.topic.replication.factor": 1
}
}
``` ```
关键配置说明: 关键配置说明:
1. `topics=meters``connection.database=power`, 表示订阅主题 meters 的数据,并写入数据库 power。 1. `"topics": "meters"` 和 `"connection.database": "power"`, 表示订阅主题 meters 的数据,并写入数据库 power。
2. `db.schemaless=line`, 表示使用 InfluxDB Line 协议格式的数据。 2. `"db.schemaless": "line"`, 表示使用 InfluxDB Line 协议格式的数据。
### 创建 Connector 实例 ### 创建 Sink Connector 实例
``` ```shell
confluent local services connect connector load TDengineSinkConnector --config ./sink-demo.properties curl -X POST -d @sink-demo.json http://localhost:8083/connectors -H "Content-Type: application/json"
``` ```
若以上命令执行成功,则有如下输出: 若以上命令执行成功,则有如下输出:
@ -241,7 +157,10 @@ confluent local services connect connector load TDengineSinkConnector --config .
"tasks.max": "1", "tasks.max": "1",
"topics": "meters", "topics": "meters",
"value.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "TDengineSinkConnector" "name": "TDengineSinkConnector",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dead_letter_topic",
"errors.deadletterqueue.topic.replication.factor": "1",
}, },
"tasks": [], "tasks": [],
"type": "sink" "type": "sink"
@ -261,8 +180,8 @@ meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0
使用 kafka-console-producer 向主题 meters 添加测试数据。 使用 kafka-console-producer 向主题 meters 添加测试数据。
``` ```shell
cat test-data.txt | kafka-console-producer --broker-list localhost:9092 --topic meters cat test-data.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic meters
``` ```
:::note :::note
@ -273,12 +192,12 @@ cat test-data.txt | kafka-console-producer --broker-list localhost:9092 --topic
使用 TDengine CLI 验证同步是否成功。 使用 TDengine CLI 验证同步是否成功。
``` ```sql
taos> use power; taos> use power;
Database changed. Database changed.
taos> select * from meters; taos> select * from meters;
ts | current | voltage | phase | groupid | location | _ts | current | voltage | phase | groupid | location |
=============================================================================================================================================================== ===============================================================================================================================================================
2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles | 2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles | 2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles |
@ -297,29 +216,34 @@ TDengine Source Connector 会将 TDengine 数据表中的数据转换成 [Influx
下面的示例程序同步数据库 test 中的数据到主题 tdengine-source-test。 下面的示例程序同步数据库 test 中的数据到主题 tdengine-source-test。
### 添加配置文件 ### 添加 Source Connector 配置文件
``` ```shell
vi source-demo.properties vi source-demo.json
``` ```
输入以下内容: 输入以下内容:
```ini title="source-demo.properties" ```json title="source-demo.json"
name=TDengineSourceConnector {
connector.class=com.taosdata.kafka.connect.source.TDengineSourceConnector "name":"TDengineSourceConnector",
tasks.max=1 "config":{
connection.url=jdbc:TAOS://127.0.0.1:6030 "connector.class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
connection.username=root "tasks.max": 1,
connection.password=taosdata "connection.url": "jdbc:TAOS://127.0.0.1:6030",
connection.database=test "connection.username": "root",
connection.attempts=3 "connection.password": "taosdata",
connection.backoff.ms=5000 "connection.database": "test",
topic.prefix=tdengine-source- "connection.attempts": 3,
poll.interval.ms=1000 "connection.backoff.ms": 5000,
fetch.max.rows=100 "topic.prefix": "tdengine-source",
key.converter=org.apache.kafka.connect.storage.StringConverter "poll.interval.ms": 1000,
value.converter=org.apache.kafka.connect.storage.StringConverter "fetch.max.rows": 100,
"topic.per.stable": true,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
``` ```
### 准备测试数据 ### 准备测试数据
@ -344,27 +268,27 @@ INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-1
使用 TDengine CLI, 执行 SQL 文件。 使用 TDengine CLI, 执行 SQL 文件。
``` ```shell
taos -f prepare-source-data.sql taos -f prepare-source-data.sql
``` ```
### 创建 Connector 实例 ### 创建 Source Connector 实例
``` ```shell
confluent local services connect connector load TDengineSourceConnector --config source-demo.properties curl -X POST -d @source-demo.json http://localhost:8083/connectors -H "Content-Type: application/json"
``` ```
### 查看 topic 数据 ### 查看 topic 数据
使用 kafka-console-consumer 命令行工具监控主题 tdengine-source-test 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后kafka-console-consumer 也立即输出了新增的两条数据。 输出数据 InfluxDB line protocol 的格式。 使用 kafka-console-consumer 命令行工具监控主题 tdengine-source-test 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后kafka-console-consumer 也立即输出了新增的两条数据。 输出数据 InfluxDB line protocol 的格式。
``` ```shell
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test-meters
``` ```
输出: 输出:
``` ```txt
...... ......
meters,location="California.SanFrancisco",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000 meters,location="California.SanFrancisco",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000
meters,location="California.SanFrancisco",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000 meters,location="California.SanFrancisco",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000
@ -373,7 +297,7 @@ meters,location="California.SanFrancisco",groupid=2i32 current=12.6f32,voltage=2
此时会显示所有历史数据。切换到 TDengine CLI 插入两条新的数据: 此时会显示所有历史数据。切换到 TDengine CLI 插入两条新的数据:
``` ```sql
USE test; USE test;
INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38); INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38);
INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22); INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);
@ -387,15 +311,15 @@ INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);
查看当前活跃的 connector 查看当前活跃的 connector
``` ```shell
confluent local services connect connector status curl http://localhost:8083/connectors
``` ```
如果按照前述操作,此时应有两个活跃的 connector。使用下面的命令 unload 如果按照前述操作,此时应有两个活跃的 connector。使用下面的命令 unload
``` ```shell
confluent local services connect connector unload TDengineSinkConnector curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector
confluent local services connect connector unload TDengineSourceConnector curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
``` ```
## 配置参考 ## 配置参考
@ -442,15 +366,12 @@ confluent local services connect connector unload TDengineSourceConnector
## 其他说明 ## 其他说明
1. 插件的安装位置可以自定义请参考官方文档https://docs.confluent.io/home/connect/self-managed/install.html#install-connector-manually。 1. 关于如何在独立安装的 Kafka 环境使用 Kafka Connect 插件, 请参考官方文档:<https://kafka.apache.org/documentation/#connect>
2. 本教程的示例程序使用了 Confluent 平台,但是 TDengine Kafka Connector 本身同样适用于独立安装的 Kafka, 且配置方法相同。关于如何在独立安装的 Kafka 环境使用 Kafka Connect 插件, 请参考官方文档: https://kafka.apache.org/documentation/#connect。
## 问题反馈 ## 问题反馈
无论遇到任何问题,都欢迎在本项目的 Github 仓库反馈: https://github.com/taosdata/kafka-connect-tdengine/issues 无论遇到任何问题,都欢迎在本项目的 Github 仓库反馈:<https://github.com/taosdata/kafka-connect-tdengine/issues>
## 参考 ## 参考
1. https://www.confluent.io/what-is-apache-kafka 1. <https://kafka.apache.org/documentation/>
2. https://developer.confluent.io/learn-kafka/kafka-connect/intro
3. https://docs.confluent.io/platform/current/platform.html

View File

@ -42,27 +42,27 @@ IF (TD_LINUX)
) )
target_link_libraries(tmq target_link_libraries(tmq
taos_static taos
) )
target_link_libraries(stream_demo target_link_libraries(stream_demo
taos_static taos
) )
target_link_libraries(schemaless target_link_libraries(schemaless
taos_static taos
) )
target_link_libraries(prepare target_link_libraries(prepare
taos_static taos
) )
target_link_libraries(demo target_link_libraries(demo
taos_static taos
) )
target_link_libraries(asyncdemo target_link_libraries(asyncdemo
taos_static taos
) )
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq) SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)

View File

@ -248,6 +248,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
tb_uid_t suid); tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock); return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);

View File

@ -163,6 +163,7 @@ typedef struct {
int64_t checkPointId; int64_t checkPointId;
int32_t taskId; int32_t taskId;
int64_t streamId; int64_t streamId;
int64_t streamBackendRid;
} SStreamState; } SStreamState;
typedef struct SFunctionStateStore { typedef struct SFunctionStateStore {

View File

@ -327,6 +327,7 @@ struct SStreamTask {
int64_t checkpointingId; int64_t checkpointingId;
int32_t checkpointAlignCnt; int32_t checkpointAlignCnt;
struct SStreamMeta* pMeta; struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
}; };
// meta // meta
@ -344,7 +345,6 @@ typedef struct SStreamMeta {
SRWLatch lock; SRWLatch lock;
int32_t walScanCounter; int32_t walScanCounter;
void* streamBackend; void* streamBackend;
int32_t streamBackendId;
int64_t streamBackendRid; int64_t streamBackendRid;
} SStreamMeta; } SStreamMeta;

View File

@ -22,21 +22,20 @@ extern "C" {
// If the error is in a third-party library, place this header file under the third-party library header file. // If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following sectio // When you want to use this feature, you should find or add the same function in the following sectio
// #if !defined(WINDOWS) #if !defined(WINDOWS)
// #ifndef ALLOW_FORBID_FUNC #ifndef ALLOW_FORBID_FUNC
// #define malloc MALLOC_FUNC_TAOS_FORBID #define malloc MALLOC_FUNC_TAOS_FORBID
// #define calloc CALLOC_FUNC_TAOS_FORBID #define calloc CALLOC_FUNC_TAOS_FORBID
// #define realloc REALLOC_FUNC_TAOS_FORBID #define realloc REALLOC_FUNC_TAOS_FORBID
// #define free FREE_FUNC_TAOS_FORBID #define free FREE_FUNC_TAOS_FORBID
// #ifdef strdup #ifdef strdup
// #undef strdup #undef strdup
// #define strdup STRDUP_FUNC_TAOS_FORBID #define strdup STRDUP_FUNC_TAOS_FORBID
// #endif #endif
// #endif // ifndef ALLOW_FORBID_FUNC #endif // ifndef ALLOW_FORBID_FUNC
// #endif // if !defined(WINDOWS) #endif // if !defined(WINDOWS)
// // #define taosMemoryFree malloc
// #define taosMemoryMalloc malloc // #define taosMemoryMalloc malloc
// #define taosMemoryCalloc calloc // #define taosMemoryCalloc calloc
// #define taosMemoryRealloc realloc // #define taosMemoryRealloc realloc

View File

@ -80,5 +80,4 @@ fi
# there can not libtaos.so*, otherwise ln -s error # there can not libtaos.so*, otherwise ln -s error
${csudo}rm -f ${install_main_dir}/driver/libtaos.* || : ${csudo}rm -f ${install_main_dir}/driver/libtaos.* || :
[ -f ${install_main_dir}/driver/librocksdb.* ] && ${csudo}rm -f ${install_main_dir}/driver/librocksdb.* || :
[ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}rm -f ${install_main_dir}/driver/libtaosws.so || : [ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}rm -f ${install_main_dir}/driver/libtaosws.so || :

View File

@ -40,7 +40,6 @@ else
${csudo}rm -f ${inc_link_dir}/taosudf.h || : ${csudo}rm -f ${inc_link_dir}/taosudf.h || :
[ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h || : [ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h || :
${csudo}rm -f ${lib_link_dir}/libtaos.* || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || :
[ -f ${lib_link_dir}/librocksdb.* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb.* || :
[ -f ${lib_link_dir}/libtaosws.so ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.so || : [ -f ${lib_link_dir}/libtaosws.so ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.so || :
${csudo}rm -f ${log_link_dir} || : ${csudo}rm -f ${log_link_dir} || :

View File

@ -31,7 +31,6 @@ cd ${pkg_dir}
libfile="libtaos.so.${tdengine_ver}" libfile="libtaos.so.${tdengine_ver}"
wslibfile="libtaosws.so" wslibfile="libtaosws.so"
rocksdblib="librocksdb.so.8"
# create install dir # create install dir
install_home_path="/usr/local/taos" install_home_path="/usr/local/taos"
@ -95,7 +94,6 @@ fi
cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
[ -f ${compile_dir}/build/lib/${rocksdblib} ] && cp ${compile_dir}/build/lib/${rocksdblib} ${pkg_dir}${install_home_path}/driver ||:
[ -f ${compile_dir}/build/lib/${wslibfile} ] && cp ${compile_dir}/build/lib/${wslibfile} ${pkg_dir}${install_home_path}/driver ||: [ -f ${compile_dir}/build/lib/${wslibfile} ] && cp ${compile_dir}/build/lib/${wslibfile} ${pkg_dir}${install_home_path}/driver ||:
cp ${compile_dir}/../include/client/taos.h ${pkg_dir}${install_home_path}/include cp ${compile_dir}/../include/client/taos.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/common/taosdef.h ${pkg_dir}${install_home_path}/include cp ${compile_dir}/../include/common/taosdef.h ${pkg_dir}${install_home_path}/include

View File

@ -45,7 +45,6 @@ echo buildroot: %{buildroot}
libfile="libtaos.so.%{_version}" libfile="libtaos.so.%{_version}"
wslibfile="libtaosws.so" wslibfile="libtaosws.so"
rocksdblib="librocksdb.so.8"
# create install path, and cp file # create install path, and cp file
mkdir -p %{buildroot}%{homepath}/bin mkdir -p %{buildroot}%{homepath}/bin
@ -93,7 +92,6 @@ if [ -f %{_compiledir}/build/bin/taosadapter ]; then
fi fi
cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver
[ -f %{_compiledir}/build/lib/${wslibfile} ] && cp %{_compiledir}/build/lib/${wslibfile} %{buildroot}%{homepath}/driver ||: [ -f %{_compiledir}/build/lib/${wslibfile} ] && cp %{_compiledir}/build/lib/${wslibfile} %{buildroot}%{homepath}/driver ||:
[ -f %{_compiledir}/build/lib/${rocksdblib} ] && cp %{_compiledir}/build/lib/${rocksdblib} %{buildroot}%{homepath}/driver ||:
cp %{_compiledir}/../include/client/taos.h %{buildroot}%{homepath}/include cp %{_compiledir}/../include/client/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/common/taosdef.h %{buildroot}%{homepath}/include cp %{_compiledir}/../include/common/taosdef.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/util/taoserror.h %{buildroot}%{homepath}/include cp %{_compiledir}/../include/util/taoserror.h %{buildroot}%{homepath}/include
@ -176,7 +174,6 @@ fi
# there can not libtaos.so*, otherwise ln -s error # there can not libtaos.so*, otherwise ln -s error
${csudo}rm -f %{homepath}/driver/libtaos* || : ${csudo}rm -f %{homepath}/driver/libtaos* || :
${csudo}rm -f %{homepath}/driver/librocksdb* || :
#Scripts executed after installation #Scripts executed after installation
%post %post
@ -222,7 +219,6 @@ if [ $1 -eq 0 ];then
${csudo}rm -f ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || : ${csudo}rm -f ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${lib_link_dir}/libtaos.* || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || :
${csudo}rm -f ${lib_link_dir}/librocksdb.* || :
${csudo}rm -f ${log_link_dir} || : ${csudo}rm -f ${log_link_dir} || :
${csudo}rm -f ${data_link_dir} || : ${csudo}rm -f ${data_link_dir} || :

View File

@ -250,30 +250,18 @@ function install_lib() {
# Remove links # Remove links
${csudo}rm -f ${lib_link_dir}/libtaos.* || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || :
${csudo}rm -f ${lib64_link_dir}/libtaos.* || : ${csudo}rm -f ${lib64_link_dir}/libtaos.* || :
${csudo}rm -f ${lib_link_dir}/librocksdb.* || :
${csudo}rm -f ${lib64_link_dir}/librocksdb.* || :
#${csudo}rm -rf ${v15_java_app_dir} || : #${csudo}rm -rf ${v15_java_app_dir} || :
${csudo}cp -rf ${script_dir}/driver/* ${install_main_dir}/driver && ${csudo}chmod 777 ${install_main_dir}/driver/* ${csudo}cp -rf ${script_dir}/driver/* ${install_main_dir}/driver && ${csudo}chmod 777 ${install_main_dir}/driver/*
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1 ${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1
${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so ${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib_link_dir}/librocksdb.so.8
${csudo}ln -sf ${lib_link_dir}/librocksdb.so.8 ${lib_link_dir}/librocksdb.so
${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib_link_dir}/librocksdb.so.8
${csudo}ln -sf ${lib_link_dir}/librocksdb.so.8 ${lib_link_dir}/librocksdb.so
[ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || : [ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || :
if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 || : ${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 || :
${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || : ${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || :
${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib64_link_dir}/librocksdb.so.8 || :
${csudo}ln -sf ${lib64_link_dir}/librocksdb.so.8 ${lib64_link_dir}/librocksdb.so || :
[ -f ${install_main_dir}/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/libtaosws.so ${lib64_link_dir}/libtaosws.so || : [ -f ${install_main_dir}/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/libtaosws.so ${lib64_link_dir}/libtaosws.so || :
fi fi

View File

@ -111,11 +111,9 @@ fi
if [ "$osType" == "Darwin" ]; then if [ "$osType" == "Darwin" ]; then
lib_files="${build_dir}/lib/libtaos.${version}.dylib" lib_files="${build_dir}/lib/libtaos.${version}.dylib"
wslib_files="${build_dir}/lib/libtaosws.dylib" wslib_files="${build_dir}/lib/libtaosws.dylib"
rocksdb_lib_files="${build_dir}/lib/librocksdb.dylib.8.1.1"
else else
lib_files="${build_dir}/lib/libtaos.so.${version}" lib_files="${build_dir}/lib/libtaos.so.${version}"
wslib_files="${build_dir}/lib/libtaosws.so" wslib_files="${build_dir}/lib/libtaosws.so"
rocksdb_lib_files="${build_dir}/lib/librocksdb.so.8.1.1"
fi fi
header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h" header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h"
@ -338,7 +336,6 @@ fi
# Copy driver # Copy driver
mkdir -p ${install_dir}/driver && cp ${lib_files} ${install_dir}/driver && echo "${versionComp}" >${install_dir}/driver/vercomp.txt mkdir -p ${install_dir}/driver && cp ${lib_files} ${install_dir}/driver && echo "${versionComp}" >${install_dir}/driver/vercomp.txt
[ -f ${wslib_files} ] && cp ${wslib_files} ${install_dir}/driver || : [ -f ${wslib_files} ] && cp ${wslib_files} ${install_dir}/driver || :
[ -f ${rocksdb_lib_files} ] && cp ${rocksdb_lib_files} ${install_dir}/driver || :
# Copy connector # Copy connector
if [ "$verMode" == "cluster" ]; then if [ "$verMode" == "cluster" ]; then

View File

@ -203,18 +203,9 @@ function install_lib() {
${csudo}rm -f ${lib_link_dir}/libtaos* || : ${csudo}rm -f ${lib_link_dir}/libtaos* || :
${csudo}rm -f ${lib64_link_dir}/libtaos* || : ${csudo}rm -f ${lib64_link_dir}/libtaos* || :
#rocksdb
[ -f ${lib_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb* || :
[ -f ${lib64_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib64_link_dir}/librocksdb* || :
#rocksdb
[ -f ${lib_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb* || :
[ -f ${lib64_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib64_link_dir}/librocksdb* || :
[ -f ${lib_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.${lib_file_ext} || : [ -f ${lib_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.${lib_file_ext} || :
[ -f ${lib64_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.${lib_file_ext} || : [ -f ${lib64_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.${lib_file_ext} || :
${csudo}ln -s ${lib_dir}/librocksdb.* ${lib_link_dir}/librocksdb.${lib_file_ext_1} 2>>${install_log_path} || return 1
${csudo}ln -s ${lib_dir}/libtaos.* ${lib_link_dir}/libtaos.${lib_file_ext_1} 2>>${install_log_path} || return 1 ${csudo}ln -s ${lib_dir}/libtaos.* ${lib_link_dir}/libtaos.${lib_file_ext_1} 2>>${install_log_path} || return 1
${csudo}ln -s ${lib_link_dir}/libtaos.${lib_file_ext_1} ${lib_link_dir}/libtaos.${lib_file_ext} 2>>${install_log_path} || return 1 ${csudo}ln -s ${lib_link_dir}/libtaos.${lib_file_ext_1} ${lib_link_dir}/libtaos.${lib_file_ext} 2>>${install_log_path} || return 1
@ -223,7 +214,6 @@ function install_lib() {
if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.${lib_file_ext} ]]; then if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.${lib_file_ext} ]]; then
${csudo}ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.${lib_file_ext_1} 2>>${install_log_path} || return 1 ${csudo}ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.${lib_file_ext_1} 2>>${install_log_path} || return 1
${csudo}ln -s ${lib64_link_dir}/libtaos.${lib_file_ext_1} ${lib64_link_dir}/libtaos.${lib_file_ext} 2>>${install_log_path} || return 1 ${csudo}ln -s ${lib64_link_dir}/libtaos.${lib_file_ext_1} ${lib64_link_dir}/libtaos.${lib_file_ext} 2>>${install_log_path} || return 1
${csudo}ln -s ${lib_dir}/librocksdb.* ${lib64_link_dir}/librocksdb.${lib_file_ext_1} 2>>${install_log_path} || return 1
[ -f ${lib_dir}/libtaosws.${lib_file_ext} ] && ${csudo}ln -sf ${lib_dir}/libtaosws.${lib_file_ext} ${lib64_link_dir}/libtaosws.${lib_file_ext} 2>>${install_log_path} [ -f ${lib_dir}/libtaosws.${lib_file_ext} ] && ${csudo}ln -sf ${lib_dir}/libtaosws.${lib_file_ext} ${lib64_link_dir}/libtaosws.${lib_file_ext} 2>>${install_log_path}
fi fi

View File

@ -142,11 +142,9 @@ function clean_local_bin() {
function clean_lib() { function clean_lib() {
# Remove link # Remove link
${csudo}rm -f ${lib_link_dir}/libtaos.* || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || :
${csudo}rm -f ${lib_link_dir}/librocksdb.* || :
[ -f ${lib_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.* || : [ -f ${lib_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.* || :
${csudo}rm -f ${lib64_link_dir}/libtaos.* || : ${csudo}rm -f ${lib64_link_dir}/libtaos.* || :
${csudo}rm -f ${lib64_link_dir}/librocksdb.* || :
[ -f ${lib64_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.* || : [ -f ${lib64_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.* || :
#${csudo}rm -rf ${v15_java_app_dir} || : #${csudo}rm -rf ${v15_java_app_dir} || :

View File

@ -2465,19 +2465,31 @@ _end:
} }
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
if (stbFullName[0] == 0) { char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
if (!pBuf) {
return NULL; return NULL;
} }
int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
return NULL;
}
return pBuf;
}
int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
if (stbFullName[0] == 0) {
return TSDB_CODE_FAILED;
}
SArray* tags = taosArrayInit(0, sizeof(SSmlKv)); SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
if (tags == NULL) { if (tags == NULL) {
return NULL; return TSDB_CODE_FAILED;
} }
void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
if (cname == NULL) { if (cname == NULL) {
taosArrayDestroy(tags); taosArrayDestroy(tags);
return NULL; return TSDB_CODE_FAILED;
} }
SSmlKv pTag = {.key = "group_id", SSmlKv pTag = {.key = "group_id",
@ -2499,9 +2511,9 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
taosArrayDestroy(tags); taosArrayDestroy(tags);
if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) { if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
return NULL; return TSDB_CODE_FAILED;
} }
return rname.ctbShortName; return TSDB_CODE_SUCCESS;
} }
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {

View File

@ -103,7 +103,7 @@ target_link_libraries(
# PUBLIC bdb # PUBLIC bdb
# PUBLIC scalar # PUBLIC scalar
PUBLIC rocksdb-shared PUBLIC rocksdb
PUBLIC transport PUBLIC transport
PUBLIC stream PUBLIC stream
PUBLIC index PUBLIC index

View File

@ -298,10 +298,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName)); memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName));
} else { } else {
char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); buildCtbNameByGroupIdImpl(stbFullName, pDataBlock->info.id.groupId, ctbName);
memcpy(ctbName, tmp, strlen(tmp)); memcpy(pTableSinkInfo->tbName, ctbName, strlen(ctbName));
memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp));
taosMemoryFree(tmp);
tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode), tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode),
pDataBlock->info.id.groupId); pDataBlock->info.id.groupId);
} }

View File

@ -647,6 +647,8 @@ uint64_t calcGroupId(char* pData, int32_t len) {
// NOTE: only extract the initial 8 bytes of the final MD5 digest // NOTE: only extract the initial 8 bytes of the final MD5 digest
uint64_t id = 0; uint64_t id = 0;
memcpy(&id, context.digest, sizeof(uint64_t)); memcpy(&id, context.digest, sizeof(uint64_t));
if (0 == id)
memcpy(&id, context.digest + 8, sizeof(uint64_t));
return id; return id;
} }

View File

@ -319,6 +319,11 @@ void destroyMergeJoinOperator(void* param) {
} }
nodesDestroyNode(pJoinOperator->pCondAfterMerge); nodesDestroyNode(pJoinOperator->pCondAfterMerge);
taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks);
taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks);
taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations);
taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations);
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }

View File

@ -213,6 +213,8 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
} else { } else {
if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} else if (limitReached && groupId == 0) {
setOperatorCompleted(pOperator);
} }
} }

View File

@ -257,7 +257,8 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
// output the result // output the result
bool hasInterp = true; int32_t fillColIndex = 0;
bool hasInterp = true;
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
@ -307,7 +308,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE:
case TSDB_FILL_SET_VALUE_F: { case TSDB_FILL_SET_VALUE_F: {
SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal; SVariant* pVar = &pSliceInfo->pFillColInfo[fillColIndex].fillVal;
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0; float v = 0;
@ -342,6 +343,8 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
} }
colDataSetVal(pDst, rows, (char*)&v, false); colDataSetVal(pDst, rows, (char*)&v, false);
} }
++fillColIndex;
break; break;
} }

View File

@ -707,6 +707,10 @@ static bool isWindowPseudoColumnFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsWindowPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)); return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsWindowPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
} }
static bool isInterpFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsInterpFunc(((SFunctionNode*)pNode)->funcId));
}
static bool isInterpPseudoColumnFunc(const SNode* pNode) { static bool isInterpPseudoColumnFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsInterpPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)); return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsInterpPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
} }
@ -3030,7 +3034,7 @@ static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
} }
static EDealRes needFillImpl(SNode* pNode, void* pContext) { static EDealRes needFillImpl(SNode* pNode, void* pContext) {
if (isAggFunc(pNode) && FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pNode)->funcType) { if ((isAggFunc(pNode) || isInterpFunc(pNode)) && FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pNode)->funcType) {
*(bool*)pContext = true; *(bool*)pContext = true;
return DEAL_RES_END; return DEAL_RES_END;
} }
@ -3054,7 +3058,7 @@ static int32_t convertFillValue(STranslateContext* pCxt, SDataType dt, SNodeList
code = scalarCalculateConstants(pCaseFunc, &pCell->pNode); code = scalarCalculateConstants(pCaseFunc, &pCell->pNode);
} }
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_VALUE != nodeType(pCell->pNode)) { if (TSDB_CODE_SUCCESS == code && QUERY_NODE_VALUE != nodeType(pCell->pNode)) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Fill value is just a constant"); code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Fill value can only accept constant");
} else if (TSDB_CODE_SUCCESS != code) { } else if (TSDB_CODE_SUCCESS != code) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Filled data type mismatch"); code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Filled data type mismatch");
} }
@ -3078,6 +3082,7 @@ static int32_t checkFillValues(STranslateContext* pCxt, SFillNode* pFill, SNodeL
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
++fillNo; ++fillNo;
} }
} }
@ -3562,6 +3567,9 @@ static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery, true); code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery, true);
} }
if (TSDB_CODE_SUCCESS == code) {
code = checkFillValues(pCxt, (SFillNode*)pSelect->pFill, pSelect->pProjectionList);
}
return code; return code;
} }

View File

@ -11,7 +11,7 @@ if(${BUILD_WITH_ROCKSDB})
IF (TD_LINUX) IF (TD_LINUX)
target_link_libraries( target_link_libraries(
stream stream
PUBLIC rocksdb-shared tdb PUBLIC rocksdb tdb
PRIVATE os util transport qcom executor wal index PRIVATE os util transport qcom executor wal index
) )
ELSE() ELSE()

View File

@ -36,8 +36,9 @@ static SStreamGlobalEnv streamEnv;
int32_t streamDispatchStreamBlock(SStreamTask* pTask); int32_t streamDispatchStreamBlock(SStreamTask* pTask);
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
void destroyStreamDataBlock(SStreamDataBlock* pBlock); SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
@ -53,6 +54,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
extern int32_t streamBackendId;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -16,7 +16,9 @@
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "executor.h" #include "executor.h"
#include "query.h" #include "query.h"
#include "streamInc.h"
#include "tcommon.h" #include "tcommon.h"
#include "tref.h"
typedef struct SCompactFilteFactory { typedef struct SCompactFilteFactory {
void* status; void* status;
@ -79,8 +81,8 @@ const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name); const char* comparePartagKeyName(void* name);
void* streamBackendInit(const char* path) { void* streamBackendInit(const char* path) {
qDebug("init stream backend"); qDebug("start to init stream backend at %s", path);
SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle)); SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle));
pHandle->list = tdListNew(sizeof(SCfComparator)); pHandle->list = tdListNew(sizeof(SCfComparator));
taosThreadMutexInit(&pHandle->mutex, NULL); taosThreadMutexInit(&pHandle->mutex, NULL);
taosThreadMutexInit(&pHandle->cfMutex, NULL); taosThreadMutexInit(&pHandle->cfMutex, NULL);
@ -119,6 +121,7 @@ void* streamBackendInit(const char* path) {
if (err != NULL) { if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", path, err); qError("failed to open rocksdb, path:%s, reason:%s", path, err);
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
goto _EXIT;
} }
} else { } else {
/* /*
@ -129,6 +132,7 @@ void* streamBackendInit(const char* path) {
if (cfs != NULL) { if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf); rocksdb_list_column_families_destroy(cfs, nCf);
} }
qDebug("succ to init stream backend at %s, backend:%p", path, pHandle);
return (void*)pHandle; return (void*)pHandle;
_EXIT: _EXIT:
@ -140,7 +144,8 @@ _EXIT:
taosHashCleanup(pHandle->cfInst); taosHashCleanup(pHandle->cfInst);
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list); tdListFree(pHandle->list);
free(pHandle); taosMemoryFree(pHandle);
qDebug("failed to init stream backend at %s", path);
return NULL; return NULL;
} }
void streamBackendCleanup(void* arg) { void streamBackendCleanup(void* arg) {
@ -168,19 +173,20 @@ void streamBackendCleanup(void* arg) {
rocksdb_env_destroy(pHandle->env); rocksdb_env_destroy(pHandle->env);
rocksdb_cache_destroy(pHandle->cache); rocksdb_cache_destroy(pHandle->cache);
taosThreadMutexDestroy(&pHandle->mutex);
SListNode* head = tdListPopHead(pHandle->list); SListNode* head = tdListPopHead(pHandle->list);
while (head != NULL) { while (head != NULL) {
streamStateDestroyCompar(head->data); streamStateDestroyCompar(head->data);
taosMemoryFree(head); taosMemoryFree(head);
head = tdListPopHead(pHandle->list); head = tdListPopHead(pHandle->list);
} }
// rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list); tdListFree(pHandle->list);
taosThreadMutexDestroy(&pHandle->mutex);
taosThreadMutexDestroy(&pHandle->cfMutex); taosThreadMutexDestroy(&pHandle->cfMutex);
taosMemoryFree(pHandle); taosMemoryFree(pHandle);
qDebug("destroy stream backend backend:%p", pHandle);
return; return;
} }
SListNode* streamBackendAddCompare(void* backend, void* arg) { SListNode* streamBackendAddCompare(void* backend, void* arg) {
@ -803,7 +809,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
return 0; return 0;
} }
int streamStateOpenBackend(void* backend, SStreamState* pState) { int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendHandle* handle = backend; SBackendHandle* handle = backend;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
@ -866,7 +873,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
// rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); // rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId);
return 0; return 0;
} }
@ -882,8 +889,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
taosThreadMutexUnlock(&pHandle->cfMutex); taosThreadMutexUnlock(&pHandle->cfMutex);
char* status[] = {"close", "drop"}; char* status[] = {"close", "drop"};
qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId, qInfo("start to close %s state %p on backend %p 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pHandle,
pState->taskId); pState->streamId, pState->taskId);
if (pState->pTdbState->rocksdb == NULL) { if (pState->pTdbState->rocksdb == NULL) {
return; return;
} }
@ -938,6 +945,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
taosThreadRwlockDestroy(&pState->pTdbState->rwLock); taosThreadRwlockDestroy(&pState->pTdbState->rwLock);
pState->pTdbState->rocksdb = NULL; pState->pTdbState->rocksdb = NULL;
taosReleaseRef(streamBackendId, pState->streamBackendRid);
} }
void streamStateDestroyCompar(void* arg) { void streamStateDestroyCompar(void* arg) {
SCfComparator* comp = (SCfComparator*)arg; SCfComparator* comp = (SCfComparator*)arg;

View File

@ -15,6 +15,13 @@
#include "streamInc.h" #include "streamInc.h"
#define MAX_BLOCK_NAME_NUM 1024
typedef struct SBlockName {
uint32_t hashValue;
char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName;
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@ -331,26 +338,46 @@ FAIL:
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
int64_t groupId) { int64_t groupId) {
char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); uint32_t hashValue = 0;
if (ctbName == NULL) {
return -1;
}
if (pDataBlock->info.parTbName[0]) {
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
} else {
char* ctbShortName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, ctbShortName);
taosMemoryFree(ctbShortName);
}
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
if (pTask->pNameMap == NULL) {
pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
}
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo; if (pVal) {
uint32_t hashValue = SBlockName* pBln = (SBlockName*)pVal;
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); hashValue = pBln->hashValue;
taosMemoryFree(ctbName); if (!pDataBlock->info.parTbName[0]) {
memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
}
} else {
char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
if (ctbName == NULL) {
return -1;
}
if (pDataBlock->info.parTbName[0]) {
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
} else {
buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
}
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
hashValue =
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
taosMemoryFree(ctbName);
SBlockName bln = {0};
bln.hashValue = hashValue;
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
}
}
bool found = false; bool found = false;
// TODO: optimize search // TODO: optimize search

View File

@ -20,7 +20,7 @@
#include "ttimer.h" #include "ttimer.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
static int32_t streamBackendId = 0; int32_t streamBackendId = 0;
static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); } static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
@ -79,7 +79,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->vgId = vgId; pMeta->vgId = vgId;
pMeta->ahandle = ahandle; pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
pMeta->streamBackendId = streamBackendId;
memset(streamPath, 0, len); memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "state"); sprintf(streamPath, "%s/%s", pMeta->path, "state");

View File

@ -106,7 +106,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
} }
SStreamTask* pStreamTask = pTask; SStreamTask* pStreamTask = pTask;
char statePath[1024]; char statePath[1024];
if (!specPath) { if (!specPath) {
sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId); sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
} else { } else {
@ -119,10 +119,10 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
SStreamMeta* pMeta = pStreamTask->pMeta; SStreamMeta* pMeta = pStreamTask->pMeta;
taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid); pState->streamBackendRid = pMeta->streamBackendRid;
int code = streamStateOpenBackend(pMeta->streamBackend, pState); int code = streamStateOpenBackend(pMeta->streamBackend, pState);
if (code == -1) { if (code == -1) {
taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid); taosReleaseRef(streamBackendId, pMeta->streamBackendRid);
taosMemoryFree(pState); taosMemoryFree(pState);
pState = NULL; pState = NULL;
} }
@ -222,9 +222,7 @@ _err:
void streamStateClose(SStreamState* pState, bool remove) { void streamStateClose(SStreamState* pState, bool remove) {
SStreamTask* pTask = pState->pTdbState->pOwner; SStreamTask* pTask = pState->pTdbState->pOwner;
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
// streamStateCloseBackend(pState);
streamStateDestroy(pState, remove); streamStateDestroy(pState, remove);
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
#else #else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
@ -278,10 +276,10 @@ int32_t streamStateCommit(SStreamState* pState) {
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
void* pVal = NULL; void* pVal = NULL;
int32_t len = 0; int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff; char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen); memcpy(buf + len - rowSize, value, vLen);
return code; return code;
@ -291,10 +289,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
} }
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
void* pVal = NULL; void* pVal = NULL;
int32_t len = 0; int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff; char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
*ppVal = buf + len - rowSize; *ppVal = buf + len - rowSize;
return code; return code;

View File

@ -224,5 +224,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree((void*)pTask->id.idStr); taosMemoryFree((void*)pTask->id.idStr);
} }
if (pTask->pNameMap) {
tSimpleHashCleanup(pTask->pNameMap);
}
taosMemoryFree(pTask); taosMemoryFree(pTask);
} }

View File

@ -419,7 +419,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
if (code != 0 || len == 0 || val == NULL) { if (code != 0 || len == 0 || val == NULL) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
memcpy(val, buf, len); memcpy(buf, val, len);
buf[len] = 0; buf[len] = 0;
maxCheckPointId = atol((char*)buf); maxCheckPointId = atol((char*)buf);
taosMemoryFree(val); taosMemoryFree(val);
@ -433,7 +433,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
if (code != 0) { if (code != 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
memcpy(val, buf, len); memcpy(buf, val, len);
buf[len] = 0; buf[len] = 0;
taosMemoryFree(val); taosMemoryFree(val);

View File

@ -225,6 +225,56 @@ class TDTestCase:
tdSql.checkData(2, 0, 12) tdSql.checkData(2, 0, 12)
tdSql.checkData(3, 0, 12) tdSql.checkData(3, 0, 12)
## test fill value with scalar expression
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + 2)")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 3)
tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3)
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1.0 + 2.0)")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 3)
tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3)
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + 2.5)")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 3)
tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3)
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + '2')")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 3)
tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3)
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + '2.0')")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 3)
tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3)
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, '3' + 'abc')")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 3)
tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3)
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, '2' + '1abc')")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 3)
tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3)
tdLog.printNoPrefix("==========step5:fill prev") tdLog.printNoPrefix("==========step5:fill prev")
## {. . .} ## {. . .}
@ -1765,47 +1815,10 @@ class TDTestCase:
tdSql.checkData(60, 1, 60) # tdSql.checkData(60, 1, 60) #
# test fill value # test fill value
tdSql.query(f"select interp(c0),interp(c1) from {dbname}.{tbname1} range('2020-02-02 00:00:00', '2020-02-02 00:01:00') every(1s) fill(value, 123)") tdSql.query(f"select _irowts, interp(c0), _irowts, interp(c1), _irowts from {dbname}.{tbname1} range('2020-02-02 00:00:00', '2020-02-02 00:01:00') every(1s) fill(value, 123, 456)")
tdSql.checkRows(61) tdSql.checkRows(61)
tdSql.checkCols(2) tdSql.checkCols(5)
tdSql.checkData(0, 0, 0) # tdSql.checkData(0, 1, 0) #
tdSql.checkData(1, 0, 123)
tdSql.checkData(4, 0, 123)
tdSql.checkData(5, 0, None) #
tdSql.checkData(6, 0, 123)
tdSql.checkData(9, 0, 123)
tdSql.checkData(10, 0, 10) #
tdSql.checkData(11, 0, 123)
tdSql.checkData(14, 0, 123)
tdSql.checkData(15, 0, None) #
tdSql.checkData(16, 0, 123)
tdSql.checkData(19, 0, 123)
tdSql.checkData(20, 0, 20) #
tdSql.checkData(21, 0, 123)
tdSql.checkData(24, 0, 123)
tdSql.checkData(25, 0, None) #
tdSql.checkData(26, 0, 123)
tdSql.checkData(29, 0, 123)
tdSql.checkData(30, 0, 30) #
tdSql.checkData(31, 0, 123)
tdSql.checkData(34, 0, 123)
tdSql.checkData(35, 0, 35) #
tdSql.checkData(36, 0, 123)
tdSql.checkData(39, 0, 123)
tdSql.checkData(40, 0, 40) #
tdSql.checkData(41, 0, 123)
tdSql.checkData(44, 0, 123)
tdSql.checkData(45, 0, None) #
tdSql.checkData(46, 0, 123)
tdSql.checkData(49, 0, 123)
tdSql.checkData(50, 0, 50) #
tdSql.checkData(51, 0, 123)
tdSql.checkData(54, 0, 123)
tdSql.checkData(55, 0, None) #
tdSql.checkData(59, 0, 123)
tdSql.checkData(60, 0, 55) #
tdSql.checkData(0, 1, None) #
tdSql.checkData(1, 1, 123) tdSql.checkData(1, 1, 123)
tdSql.checkData(4, 1, 123) tdSql.checkData(4, 1, 123)
tdSql.checkData(5, 1, None) # tdSql.checkData(5, 1, None) #
@ -1817,7 +1830,7 @@ class TDTestCase:
tdSql.checkData(15, 1, None) # tdSql.checkData(15, 1, None) #
tdSql.checkData(16, 1, 123) tdSql.checkData(16, 1, 123)
tdSql.checkData(19, 1, 123) tdSql.checkData(19, 1, 123)
tdSql.checkData(20, 1, None) # tdSql.checkData(20, 1, 20) #
tdSql.checkData(21, 1, 123) tdSql.checkData(21, 1, 123)
tdSql.checkData(24, 1, 123) tdSql.checkData(24, 1, 123)
tdSql.checkData(25, 1, None) # tdSql.checkData(25, 1, None) #
@ -1826,22 +1839,137 @@ class TDTestCase:
tdSql.checkData(30, 1, 30) # tdSql.checkData(30, 1, 30) #
tdSql.checkData(31, 1, 123) tdSql.checkData(31, 1, 123)
tdSql.checkData(34, 1, 123) tdSql.checkData(34, 1, 123)
tdSql.checkData(35, 1, None) # tdSql.checkData(35, 1, 35) #
tdSql.checkData(36, 1, 123) tdSql.checkData(36, 1, 123)
tdSql.checkData(39, 1, 123) tdSql.checkData(39, 1, 123)
tdSql.checkData(40, 1, 40) # tdSql.checkData(40, 1, 40) #
tdSql.checkData(41, 1, 123) tdSql.checkData(41, 1, 123)
tdSql.checkData(44, 1, 123) tdSql.checkData(44, 1, 123)
tdSql.checkData(45, 1, 45) # tdSql.checkData(45, 1, None) #
tdSql.checkData(46, 1, 123) tdSql.checkData(46, 1, 123)
tdSql.checkData(49, 1, 123) tdSql.checkData(49, 1, 123)
tdSql.checkData(50, 1, None) # tdSql.checkData(50, 1, 50) #
tdSql.checkData(51, 1, 123) tdSql.checkData(51, 1, 123)
tdSql.checkData(54, 1, 123) tdSql.checkData(54, 1, 123)
tdSql.checkData(55, 1, None) # tdSql.checkData(55, 1, None) #
tdSql.checkData(56, 1, 123)
tdSql.checkData(59, 1, 123) tdSql.checkData(59, 1, 123)
tdSql.checkData(60, 1, 60) # tdSql.checkData(60, 1, 55) #
tdSql.checkData(0, 3, None) #
tdSql.checkData(1, 3, 456)
tdSql.checkData(4, 3, 456)
tdSql.checkData(5, 3, None) #
tdSql.checkData(6, 3, 456)
tdSql.checkData(9, 3, 456)
tdSql.checkData(10, 3, 10) #
tdSql.checkData(11, 3, 456)
tdSql.checkData(14, 3, 456)
tdSql.checkData(15, 3, None) #
tdSql.checkData(16, 3, 456)
tdSql.checkData(19, 3, 456)
tdSql.checkData(20, 3, None) #
tdSql.checkData(21, 3, 456)
tdSql.checkData(24, 3, 456)
tdSql.checkData(25, 3, None) #
tdSql.checkData(26, 3, 456)
tdSql.checkData(29, 3, 456)
tdSql.checkData(30, 3, 30) #
tdSql.checkData(31, 3, 456)
tdSql.checkData(34, 3, 456)
tdSql.checkData(35, 3, None) #
tdSql.checkData(36, 3, 456)
tdSql.checkData(39, 3, 456)
tdSql.checkData(40, 3, 40) #
tdSql.checkData(41, 3, 456)
tdSql.checkData(44, 3, 456)
tdSql.checkData(45, 3, 45) #
tdSql.checkData(46, 3, 456)
tdSql.checkData(49, 3, 456)
tdSql.checkData(50, 3, None) #
tdSql.checkData(51, 3, 456)
tdSql.checkData(54, 3, 456)
tdSql.checkData(55, 3, None) #
tdSql.checkData(56, 3, 456)
tdSql.checkData(59, 3, 456)
tdSql.checkData(60, 3, 60) #
tdSql.query(f"select _isfilled, interp(c0), _isfilled, interp(c1), _isfilled from {dbname}.{tbname1} range('2020-02-02 00:00:00', '2020-02-02 00:01:00') every(1s) fill(value, 123 + 123, 234 + 234)")
tdSql.checkRows(61)
tdSql.checkCols(5)
tdSql.checkData(0, 1, 0) #
tdSql.checkData(1, 1, 246)
tdSql.checkData(4, 1, 246)
tdSql.checkData(5, 1, None) #
tdSql.checkData(6, 1, 246)
tdSql.checkData(9, 1, 246)
tdSql.checkData(10, 1, 10) #
tdSql.checkData(11, 1, 246)
tdSql.checkData(14, 1, 246)
tdSql.checkData(15, 1, None) #
tdSql.checkData(16, 1, 246)
tdSql.checkData(19, 1, 246)
tdSql.checkData(20, 1, 20) #
tdSql.checkData(21, 1, 246)
tdSql.checkData(24, 1, 246)
tdSql.checkData(25, 1, None) #
tdSql.checkData(26, 1, 246)
tdSql.checkData(29, 1, 246)
tdSql.checkData(30, 1, 30) #
tdSql.checkData(31, 1, 246)
tdSql.checkData(34, 1, 246)
tdSql.checkData(35, 1, 35) #
tdSql.checkData(36, 1, 246)
tdSql.checkData(39, 1, 246)
tdSql.checkData(40, 1, 40) #
tdSql.checkData(41, 1, 246)
tdSql.checkData(44, 1, 246)
tdSql.checkData(45, 1, None) #
tdSql.checkData(46, 1, 246)
tdSql.checkData(49, 1, 246)
tdSql.checkData(50, 1, 50) #
tdSql.checkData(51, 1, 246)
tdSql.checkData(54, 1, 246)
tdSql.checkData(55, 1, None) #
tdSql.checkData(59, 1, 246)
tdSql.checkData(60, 1, 55) #
tdSql.checkData(0, 3, None) #
tdSql.checkData(1, 3, 468)
tdSql.checkData(4, 3, 468)
tdSql.checkData(5, 3, None) #
tdSql.checkData(6, 3, 468)
tdSql.checkData(9, 3, 468)
tdSql.checkData(10, 3, 10) #
tdSql.checkData(11, 3, 468)
tdSql.checkData(14, 3, 468)
tdSql.checkData(15, 3, None) #
tdSql.checkData(16, 3, 468)
tdSql.checkData(19, 3, 468)
tdSql.checkData(20, 3, None) #
tdSql.checkData(21, 3, 468)
tdSql.checkData(24, 3, 468)
tdSql.checkData(25, 3, None) #
tdSql.checkData(26, 3, 468)
tdSql.checkData(29, 3, 468)
tdSql.checkData(30, 3, 30) #
tdSql.checkData(31, 3, 468)
tdSql.checkData(34, 3, 468)
tdSql.checkData(35, 3, None) #
tdSql.checkData(36, 3, 468)
tdSql.checkData(39, 3, 468)
tdSql.checkData(40, 3, 40) #
tdSql.checkData(41, 3, 468)
tdSql.checkData(44, 3, 468)
tdSql.checkData(45, 3, 45) #
tdSql.checkData(46, 3, 468)
tdSql.checkData(49, 3, 468)
tdSql.checkData(50, 3, None) #
tdSql.checkData(51, 3, 468)
tdSql.checkData(54, 3, 468)
tdSql.checkData(55, 3, None) #
tdSql.checkData(56, 3, 468)
tdSql.checkData(59, 3, 468)
tdSql.checkData(60, 3, 60) #
# test fill prev # test fill prev
tdSql.query(f"select interp(c0),interp(c1) from {dbname}.{tbname1} range('2020-02-02 00:00:00', '2020-02-02 00:01:00') every(1s) fill(prev)") tdSql.query(f"select interp(c0),interp(c1) from {dbname}.{tbname1} range('2020-02-02 00:00:00', '2020-02-02 00:01:00') every(1s) fill(prev)")
@ -2016,7 +2144,7 @@ class TDTestCase:
tdSql.checkData(3, i, None) tdSql.checkData(3, i, None)
tdSql.checkData(4, i, None) tdSql.checkData(4, i, None)
tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(value, 1)") tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(value, 1, 1, 1, 1)")
tdSql.checkRows(5) tdSql.checkRows(5)
tdSql.checkCols(4) tdSql.checkCols(4)
@ -2442,6 +2570,10 @@ class TDTestCase:
tdSql.error(f"select interp(c0) from {dbname}.{tbname} where _isfilled = true range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)") tdSql.error(f"select interp(c0) from {dbname}.{tbname} where _isfilled = true range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)")
tdSql.error(f"select interp(c0) from {dbname}.{tbname} where _irowts > 0 range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)") tdSql.error(f"select interp(c0) from {dbname}.{tbname} where _irowts > 0 range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)")
# fill value number mismatch
tdSql.error(f"select interp(c0) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(value, 1, 2)")
tdSql.error(f"select interp(c0), interp(c1) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(value, 1)")

View File

@ -9,35 +9,35 @@ add_executable(get_db_name_test get_db_name_test.c)
add_executable(tmq_offset tmqOffset.c) add_executable(tmq_offset tmqOffset.c)
target_link_libraries( target_link_libraries(
tmq_offset tmq_offset
PUBLIC taos_static PUBLIC taos
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os
) )
target_link_libraries( target_link_libraries(
create_table create_table
PUBLIC taos_static PUBLIC taos
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os
) )
target_link_libraries( target_link_libraries(
tmq_demo tmq_demo
PUBLIC taos_static PUBLIC taos
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os
) )
target_link_libraries( target_link_libraries(
tmq_sim tmq_sim
PUBLIC taos_static PUBLIC taos
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os
) )
target_link_libraries( target_link_libraries(
tmq_taosx_ci tmq_taosx_ci
PUBLIC taos_static PUBLIC taos
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os
@ -45,7 +45,7 @@ target_link_libraries(
target_link_libraries( target_link_libraries(
write_raw_block_test write_raw_block_test
PUBLIC taos_static PUBLIC taos
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os
@ -53,7 +53,7 @@ target_link_libraries(
target_link_libraries( target_link_libraries(
sml_test sml_test
PUBLIC taos_static PUBLIC taos
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os
@ -61,7 +61,7 @@ target_link_libraries(
target_link_libraries( target_link_libraries(
get_db_name_test get_db_name_test
PUBLIC taos_static PUBLIC taos
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os