diff --git a/docs/en/10-third-party/01-collection/flink.md b/docs/en/10-third-party/01-collection/flink.md index f14ffc33a6..e716d5a757 100644 --- a/docs/en/10-third-party/01-collection/flink.md +++ b/docs/en/10-third-party/01-collection/flink.md @@ -6,7 +6,11 @@ title: TDengine Flink Connector import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -## Preconditions +Apache Flink is an open-source distributed stream batch integrated processing framework supported by the Apache Software Foundation, which can be used for many big data processing scenarios such as stream processing, batch processing, complex event processing, real-time data warehouse construction, and providing real-time data support for machine learning. At the same time, Flink has a wealth of connectors and various tools that can interface with numerous different types of data sources to achieve data reading and writing. In the process of data processing, Flink also provides a series of reliable fault-tolerant mechanisms, effectively ensuring that tasks can run stably and continuously even in the event of unexpected situations. + +With the help of TDengine's Flink connector, Apache Flink can seamlessly integrate with the TDengine database. On the one hand, it can accurately store the results obtained after complex calculations and deep analysis into the TDengine database, achieving efficient storage and management of data; On the other hand, it is also possible to quickly and stably read massive amounts of data from the TDengine database, and conduct comprehensive and in-depth analysis and processing on this basis, fully tapping into the potential value of the data, providing strong data support and scientific basis for enterprise decision-making, greatly improving the efficiency and quality of data processing, and enhancing the competitiveness and innovation ability of enterprises in the digital age. + +## Prerequisites Prepare the following environment: @@ -14,20 +18,16 @@ Prepare the following environment: - TaosAdapter can run normally. - Apache Flink v1.19.0 or above is installed. Please refer to the installation of Apache Flink [Official documents](https://flink.apache.org/) -## JRE version compatibility - -JRE: Supports JRE 8 and above versions. - ## Supported platforms Flink Connector supports all platforms that can run Flink 1.19 and above versions. ## Version History - | Flink Connector Version | Major Changes | TDengine Version| |-------------------------| ------------------------------------ | ---------------- | | 2.0.0 | 1. Support SQL queries on data in TDengine database
2 Support CDC subscription to data in TDengine database
3 Supports reading and writing to TDengine database using Table SQL | 3.3.5.0 and above versions| +| 1.0.0 | Support Sink function to write data from other sources to TDengine in the future| 3.3.2.0 and above versions| ## Exception and error codes @@ -91,13 +91,17 @@ TDengine currently supports timestamp, number, character, and boolean types, and | GEOMETRY | byte[] | ## Instructions for use + ### Flink Semantic Selection Instructions The semantic reason for using At Least One (at least once) is: + -TDengine currently does not support transactions and cannot perform frequent checkpoint operations and complex transaction coordination. -Due to TDengine's use of timestamps as primary keys, downstream operators of duplicate data can perform filtering operations to avoid duplicate calculations. -Using At Least One (at least once) to ensure high data processing performance and low data latency, the setting method is as follows: +Instructions: + ```text StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); @@ -123,7 +127,8 @@ Parameter description: - User: Login TDengine username, default value is' root '. - Password: User login password, default value 'taosdata'. -- batchErrorIgnore: true: If there is an SQL execution failure in the middle of the ExecutBatch of Statement, continue to execute the following SQL. false: Do not execute any statements after failed SQL. The default value is: false。 +- database_name: database name。 +- timezone: time zone。 - HttpConnectTimeout: The connection timeout time, measured in milliseconds, with a default value of 60000. - MessageWaitTimeout: The timeout period for a message, measured in milliseconds, with a default value of 60000. - UseSSL: Whether SSL is used in the connection. @@ -281,8 +286,9 @@ The core function of Sink is to efficiently and accurately write Flink processed | TDengineConfigParams.PROPERTYKEY_RECONNECT_RETR_COUNT | integer | number of automatic reconnection retries, default value 3 | only takes effect when PROPERTYKEY-INABLE AUTO-RECONNECT is true| | TDengineConfigParams.PROPERTYKEYDISABLE_SSL_CERTVNet | boolean | Disable SSL certificate verification. true: close, false: Not closed. The default is false || -#### Use Sink connector -Write the received RowData type data into TDengine example: +Usage example: + +Write the sub table data of the meters table in the power database into the corresponding sub table of the sink_meters super table in the power_stink database.
Sink RowData @@ -291,18 +297,21 @@ Write the received RowData type data into TDengine example: ```
-Write batch received RowData data into TDengine example: +Usage example: + +Subscribe to the sub table data of the meters super table in the power database and write it to the corresponding sub table of the sink_meters super table in the power_stink database.
-Sink RowData +Cdc Sink ```java -{{#include docs/examples/flink/Main.java:BatchRowDataToSink}} +{{#include docs/examples/flink/Main.java:CdcRowDataToSink}} ```
### Table SQL -ETL (Extract, Transform, Load) data processing: Flink SQL with JDBC can be used to extract data from multiple different data source databases (such as TDengine, MySQL, Oracle, etc.), perform transformation operations (such as data cleaning, format conversion, associating data from different tables, etc.) in Flink, and then load the processed results into the target data source (such as TDengine, MySQL, etc.). +Extract data from multiple different data source databases (such as TDengine, MySQL, Oracle, etc.) using Table SQL, perform custom operator operations (such as data cleaning, format conversion, associating data from different tables, etc.), and then load the processed results into the target data source (such as TDengine, MySQL, etc.). + #### Source connector Parameter configuration instructions: @@ -311,7 +320,7 @@ Parameter configuration instructions: |-----------------------| :-----: | ------------ | ------ | | connector | string | connector identifier, set `tdengine-connector`|| | td.jdbc.url | string | url of the connection || -| td.jdbc.mode | strng | connector type: `source`, `cdc`, `sink`| | +| td.jdbc.mode | strng | connector type: `source`, `sink`| | | table.name | string | original or target table name || | scan.query | string | SQL statement to retrieve data|| | sink.db.name | string | target database name|| @@ -319,7 +328,9 @@ Parameter configuration instructions: | sink.batch.size | integer| batch size written|| | sink.table.name | string | name of the regular table or sub table written|| -#### Example of using Source connector +Usage example: + +Write the sub table data of the meters table in the power database into the corresponding sub table of the sink_meters super table in the power_stink database.
Table Source @@ -328,26 +339,29 @@ Parameter configuration instructions: ```
-#### CDC connector +#### Table CDC connector + Parameter configuration instructions: -| Parameter Name | Type | Parameter Description | Remarks| -|-------------------| :-----: |--------------------------------------------------------------------------------------|-------| -| connector | string | connector identifier, set `tdengine-connector` || -| user | string | username, default root || -| password | string | password, default taosdata || -| bootstrap. servers| string | server address || -| topic | string | subscribe to topic || -| td.jdbc.mode | strng | connector type: `cdc`, `sink` | | -| group.id | string | Consumption group ID, sharing consumption progress within the same consumption group || -| auto.offset.reset | string | initial position for consumer group subscription | earliest: subscribe from scratch
latest: default; Subscribe only from the latest data| -| poll.interval_mas | integer | Pull data interval, default 500ms || -| sink.db.name | string | Target database name || -| sink.superstable.name | string | Write the name of the superstable || -| sink.batch.size | integer | batch size written || -| sink.table.name | string | Name of the regular table or sub table written || +| Parameter Name | Type | Parameter Description | +|-------------------| :-----: |--------------------------------------------------------------------------------------| +| connector | string | connector identifier, set `tdengine-connector` | +| user | string | username, default root | +| password | string | password, default taosdata | +| bootstrap. servers| string | server address | +| topic | string | subscribe to topic | +| td.jdbc.mode | strng | connector type: `cdc`, `sink` | +| group.id | string | Consumption group ID, sharing consumption progress within the same consumption group | +| auto.offset.reset | string | initial position for consumer group subscription.
earliest: subscribe from scratch
latest: default; Subscribe only from the latest data| +| poll.interval_mas | integer | Pull data interval, default 500ms | +| sink.db.name | string | Target database name | +| sink.superstable.name | string | Write the name of the superstable | +| sink.batch.size | integer | batch size written | +| sink.table.name | string | Name of the regular table or sub table written | -#### Example of using CDC connector +Usage example: + +Subscribe to the sub table data of the meters super table in the power database and write it to the corresponding sub table of the sink_meters super table in the power_stink database.
Table CDC diff --git a/docs/examples/flink/Main.java b/docs/examples/flink/Main.java index b601778776..12d79126cf 100644 --- a/docs/examples/flink/Main.java +++ b/docs/examples/flink/Main.java @@ -414,7 +414,7 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location") Properties connProps = new Properties(); connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); - SourceSplitSql splitSql = getTimeSplit(); + SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); @@ -437,8 +437,8 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location") } //ANCHOR_END: RowDataToSink - //ANCHOR: BatchRowDataToSink - static void testBatchToTdSink() throws Exception { + //ANCHOR: CdcRowDataToSink + static void testCdcToSink() throws Exception { System.out.println("testTDengineCdcToTdSink start!"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); @@ -449,39 +449,32 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location") config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws"); config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041"); config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest"); - config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true"); - config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL, "1000"); config.setProperty(TDengineCdcParams.GROUP_ID, "group_1"); config.setProperty(TDengineCdcParams.CONNECT_USER, "root"); config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata"); config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData"); config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); - config.setProperty(TDengineCdcParams.TMQ_BATCH_MODE, "true"); - Class> typeClass = (Class>) (Class) ConsumerRecords.class; - TDengineCdcSource> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass); - DataStreamSource> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source"); + TDengineCdcSource tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class); + DataStreamSource input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source"); Properties sinkProps = new Properties(); sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true"); - sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); - sinkProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true"); - sinkProps.setProperty(TDengineConfigParams.TD_SOURCE_TYPE, "tdengine_cdc"); sinkProps.setProperty(TDengineConfigParams.TD_DATABASE_NAME, "power_sink"); sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters"); sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000"); - TDengineSink> sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase", "location", "groupid", "tbname")); + TDengineSink sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase", "location", "groupid", "tbname")); input.sinkTo(sink); JobClient jobClient = env.executeAsync("Flink test cdc Example"); Thread.sleep(6000L); jobClient.cancel().get(); System.out.println("testTDengineCdcToTdSink finish!"); } - //ANCHOR_END: BatchRowDataToSink + //ANCHOR_END: CdcRowDataToSink //ANCHOR: source_table static void testTableToSink() throws Exception { diff --git a/docs/zh/10-third-party/01-collection/12-flink.md b/docs/zh/10-third-party/01-collection/12-flink.md index b58a013f38..af5434e0b3 100644 --- a/docs/zh/10-third-party/01-collection/12-flink.md +++ b/docs/zh/10-third-party/01-collection/12-flink.md @@ -17,10 +17,6 @@ Apache Flink 是一款由 Apache 软件基金会支持的开源分布式流批 - taosAdapter 能够正常运行。详细参考 [taosAdapter 使用手册](../../../reference/components/taosadapter) - Apache Flink v1.19.0 或以上版本已安装。安装 Apache Flink 请参考 [官方文档](https://flink.apache.org/) -## JRE 版本兼容性 - -- JRE: 支持 JRE 8 及以上版本。 - # 支持的平台 Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。 @@ -29,6 +25,7 @@ Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。 | Flink Connector 版本 | 主要变化 | TDengine 版本 | | ------------------| ------------------------------------ | ---------------- | | 2.0.0 | 1. 支持 SQL 查询 TDengine 数据库中的数据
2. 支持 CDC 订阅 TDengine 数据库中的数据
3. 支持 Table SQL 方式读取和写入 TDengine 数据库| 3.3.5.0 及以上版本 | +| 1.0.0 | 支持 Sink 功能,将来着其他数据源的数据写入到 TDengine| 3.3.2.0 及以上版本| ## 异常和错误码 @@ -53,8 +50,8 @@ Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。 | 0x2305 |resultSet is closed |resultSet 结果集已经释放,请检查 resultSet 是否释放后再次使用。| | 0x230d |parameter index out of range |参数越界,请检查参数的合理范围。| | 0x230e |connection already closed |连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。| -| 0x230f |unknown sql type in tdengine |请检查 TDengine 支持的 Data Type 类型。| -| 0x2315 |unknown taos type in tdengine |在 TDengine 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine 数据类型。| +| 0x230f |unknown sql type in TDengine |请检查 TDengine 支持的 Data Type 类型。| +| 0x2315 |unknown taos type in TDengine |在 TDengine 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine 数据类型。| | 0x2319 |user is required |创建连接时缺少用户名信息。| | 0x231a |password is required |创建连接时缺少密码信息。| | 0x231d |can't create connection with server within |通过增加参数 httpConnectTimeout 增加连接耗时,或是请检查与 taosAdapter 之间的连接情况。| @@ -99,11 +96,14 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Flink RowD - TDengine 目前不支持事务,不能进行频繁的检查点操作和复杂的事务协调。 - 由于 TDengine 采用时间戳作为主键,重复数据下游算子可以进行过滤操作,避免重复计算。 - 采用 At-Least-Once(至少一次)确保达到较高的数据处理的性能和较低的数据延时,设置方式如下: - ```java - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(5000); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); - ``` + +使用方式: + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); +env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); +``` 如果使用 Maven 管理项目,只需在 pom.xml 中加入以下依赖。 @@ -124,7 +124,8 @@ URL 规范格式为: 参数说明: - user:登录 TDengine 用户名,默认值 'root'。 - password:用户登录密码,默认值 'taosdata'。 -- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。 +- database_name: 数据库名称。 +- timezone: 时区设置。 - httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 60000。 - messageWaitTimeout: 消息超时时间,单位 ms, 默认值为 60000。 - useSSL: 连接中是否使用 SSL。 @@ -240,7 +241,7 @@ CDC 连接器会根据用户设置的并行度进行创建 consumer,因此用 ```
-批量查询结果示例: +将订阅结果批量下发到算子的示例:
CDC Batch Source @@ -249,7 +250,7 @@ CDC 连接器会根据用户设置的并行度进行创建 consumer,因此用 ```
-查询结果为自定义数据类型示例: +订阅结果为自定义数据类型示例:
CDC Custom Type @@ -283,9 +284,9 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自 | TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3。 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。| | TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION| boolean| 关闭 SSL 证书验证 。true: 关闭,false: 不关闭。默认为 false。|| -#### 使用 Sink 连接器 +使用示例: -将接收的 RowData 类型数据写入 TDengine 示例: +将 power 库的 meters 表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
Sink RowData @@ -294,38 +295,41 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自 ```
+使用示例: -将批量接收的 RowData 类型数据写入 TDengine 示例: +订阅 power 库的 meters 超级表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
-Sink RowData +Cdc Sink ```java -{{#include docs/examples/flink/Main.java:BatchRowDataToSink}} +{{#include docs/examples/flink/Main.java:CdcRowDataToSink}} ```
### Table SQL -数据处理 ETL(Extract,Transform,Load):可以使用 Flink SQL With JDBC 从多个不同的数据源数据库(如 TDengine、MySQL、Oracle 等)中提取数据,在 Flink 中进行转换操作(如数据清洗、格式转换、关联不同表的数据等),然后将处理后的结果加载到目标数据源(如 TDengine、Mysql 等)中。 +使用 Table SQL 的方式从多个不同的数据源数据库(如 TDengine、MySQL、Oracle 等)中提取数据后, 再进行自定义的算子操作(如数据清洗、格式转换、关联不同表的数据等),然后将处理后的结果加载到目标数据源(如 TDengine、Mysql 等)中。 - -#### Source 连接器 +#### Table Source 连接器 参数配置说明: | 参数名称 | 类型 | 参数说明 | 备注 | | ----------------------- | :-----: | ------------ | ------ | -| connector | string | 连接器标识,设置 `tdengine-connector` 。|| -| td.jdbc.url| string | 连接的 url 。| | -| td.jdbc.mode | strng | 连接器类型, 设置 `source`, `cdc`, `sink`。| | -| table.name| string| 原表或目标表名称。| | -| scan.query| string| 获取数据的 SQL 语句。|| -| sink.db.name|string| 目标数据库名称。|| -| sink.supertable.name|string |写入的超级表名称。|| -| sink.batch.size | integer | 写入的批大小。|| -| sink.table.name|string|写入的普通表或子表名称。|| +| connector | string | 连接器标识,设置 `tdengine-connector` 。| +| td.jdbc.url| string | 连接的 url 。| +| td.jdbc.mode | strng | 连接器类型, 设置 `source`, `sink`。| +| table.name| string| 原表或目标表名称。| +| scan.query| string| 获取数据的 SQL 语句。| +| sink.db.name|string| 目标数据库名称。| +| sink.supertable.name|string |写入的超级表名称。| +| sink.batch.size | integer | 写入的批大小。| +| sink.table.name|string|写入的普通表或子表名称。| -#### Source 连接器使用示例 + +使用示例: + +将 power 库的 meters 表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
Table Source @@ -334,29 +338,29 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自 ```
-#### CDC 连接器 +#### Table CDC 连接器 参数配置说明: -| 参数名称 | 类型 | 参数说明 | 备注 | -| ----------------------- | :-----: | ------------ |-------| -| connector | string | 连接器标识,设置 `tdengine-connector`。|| -| user| string | 用户名, 默认 root。| | -| password | string | 密码, 默认taosdata。| | -| bootstrap.servers| string | 服务器地址。| | +| 参数名称 | 类型 | 参数说明 | +| ----------------------- | :-----: | ------------ | +| connector | string | 连接器标识,设置 `tdengine-connector`。| +| user| string | 用户名, 默认 root。| +| password | string | 密码, 默认taosdata。| +| bootstrap.servers| string | 服务器地址。| | topic | string | 订阅主题。|| -| td.jdbc.mode | strng | 连接器类型, cdc, sink。| | -| group.id| string| 消费组 ID,同一消费组共享消费进度。 | | -| auto.offset.reset| string| 消费组订阅的初始位置。 | earliest: 从头开始订阅
latest: default; 仅从最新数据开始订阅。| -| poll.interval_ms| integer| 拉取数据间隔, 默认 500ms。| | -| sink.db.name|string| 目标数据库名称。|| -| sink.supertable.name|string |写入的超级表名称。|| -| sink.batch.size | integer | 写入的批大小。|| -| sink.table.name|string|写入的普通表或子表名称。|| +| td.jdbc.mode | strng | 连接器类型, cdc, sink。| +| group.id| string| 消费组 ID,同一消费组共享消费进度。 | +| auto.offset.reset| string| 消费组订阅的初始位置。
earliest: 从头开始订阅
latest: default; 仅从最新数据开始订阅。| +| poll.interval_ms| integer| 拉取数据间隔, 默认 500ms。| +| sink.db.name|string| 目标数据库名称。| +| sink.supertable.name|string |写入的超级表名称。| +| sink.batch.size | integer | 写入的批大小。| +| sink.table.name|string|写入的普通表或子表名称。| +使用示例: - -#### CDC 连接器使用示例 +订阅 power 库的 meters 超级表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
Table CDC