Merge pull request #29444 from taosdata/docs/TD-32275-main

modify flink connector docs param note
This commit is contained in:
Linhe Huo 2025-01-01 17:05:46 +08:00 committed by GitHub
commit e7e30b47d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 158 additions and 154 deletions

View File

@ -26,7 +26,7 @@ Flink Connector supports all platforms that can run Flink 1.19 and above version
| Flink Connector Version | Major Changes | TDengine Version|
|-------------------------| ------------------------------------ | ---------------- |
| 2.0.0 | 1. Support SQL queries on data in TDengine database<br/>2 Support CDC subscription to data in TDengine database<br/>3 Supports reading and writing to TDengine database using Table SQL | 3.3.5.0 and above versions|
| 2.0.0 | 1.Support SQL queries on data in TDengine database <br/> 2. Support CDC subscription to data in TDengine database<br/> 3. Supports reading and writing to TDengine database using Table SQL | 3.3.5.0 and above versions|
| 1.0.0 | Support Sink function to write data from other sources to TDengine in the future| 3.3.2.0 and above versions|
## Exception and error codes
@ -36,39 +36,39 @@ Please refer to:
| Error Code | Description | Suggested Actions |
| ---------------- |------------------------------------------------------- | -------------------- |
|0xa000 | connection param error | connector parameter error
|0xa001 | The groupid parameter of CDC is incorrect | The groupid parameter of CDC is incorrect|
|0xa002 | wrong topic parameter for CDC | The topic parameter for CDC is incorrect|
|0xa010 | database name configuration error | database name configuration error|
|0xa011 | Table name configuration error | Table name configuration error|
|0xa012 | No data was obtained from the data source | Failed to retrieve data from the data source|
|0xa013 | value.deserializer parameter not set | No serialization method set|
|0xa014 | List of column names for target table not set | List of column names for target table not set ||
|0x2301 | Connection already closed | The connection has been closed. Check the connection status or create a new connection to execute the relevant instructions|
|0x2302 | this operation is NOT supported currently | The current interface is not supported, you can switch to other connection methods|
|0x2303 | invalid variables | The parameter is invalid. Please check the corresponding interface specification and adjust the parameter type and size|
|0x2304 | Statement is closed | Statement has already been closed. Please check if the statement is closed and reused, or if the connection is working properly|
|0x2305 | ResultSet is closed | The ResultSet has been released. Please check if the ResultSet has been released and used again|
|0x230d | parameter index out of range | parameter out of range, please check the reasonable range of the parameter|
|0x230e | Connection already closed | The connection has been closed. Please check if the connection is closed and used again, or if the connection is working properly|
|0x230f | unknown SQL type in TDengine | Please check the Data Type types supported by TDengine|
|0x2315 | unknown tao type in TDengine | Did the correct TDengine data type be specified when converting TDengine data type to JDBC data type|
|0x2319 | user is required | Username information is missing when creating a connection|
|0x231a | password is required | Password information is missing when creating a connection|
|0x231d | can't create connection with server within | Increase connection time by adding the parameter httpConnectTimeout, or check the connection status with taosAdapter|
|0x231e | failed to complete the task within the specified time | Increase execution time by adding the parameter messageWaitTimeout, or check the connection with taosAdapter|
|0x2352 | Unsupported encoding | An unsupported character encoding set was specified under the local connection|
|0x2353 |internal error of database, Please see taoslog for more details | An error occurred while executing prepareStatement on the local connection. Please check the taoslog for problem localization|
|0x2354 | Connection is NULL | Connection has already been closed while executing the command on the local connection. Please check the connection with TDengine|
|0x2355 | result set is NULL | Local connection to obtain result set, result set exception, please check connection status and retry|
|0x2356 | invalid num of fields | The meta information obtained from the local connection result set does not match|
|0x2357 | empty SQL string | Fill in the correct SQL for execution|
|0x2371 |consumer properties must not be null | When creating a subscription, the parameter is empty. Please fill in the correct parameter|
|0x2375 | Topic reference has been destroyed | During the process of creating a data subscription, the topic reference was released. Please check the connection with TDengine|
|0x2376 |failed to set consumer topic, Topic name is empty | During the process of creating a data subscription, the subscription topic name is empty. Please check if the specified topic name is filled in correctly|
|0x2377 | Consumer reference has been destroyed | The subscription data transmission channel has been closed, please check the connection with TDengine|
|0x2378 | Consumer create error | Failed to create data subscription. Please check the taos log based on the error message to locate the problem|
|0x237a | vGroup not found in result set VGroup | Not assigned to the current consumer, due to the Rebalance mechanism, the relationship between Consumer and VGroup is not bound|
|0xa000 | connection param error | Connector parameter error.
|0xa001 | the groupid parameter of CDC is incorrect | The groupid parameter of CDC is incorrect.|
|0xa002 | wrong topic parameter for CDC | The topic parameter for CDC is incorrect.|
|0xa010 | database name configuration error | database name configuration error.|
|0xa011 | table name configuration error | Table name configuration error.|
|0xa012 | no data was obtained from the data source | Failed to retrieve data from the data source.|
|0xa013 | value.deserializer parameter not set | No serialization method set.|
|0xa014 | list of column names set incorrectly | List of column names for target table not set. |
|0x2301 | connection already closed | The connection has been closed. Check the connection status or create a new connection to execute the relevant instructions.|
|0x2302 | this operation is NOT supported currently | The current interface is not supported, you can switch to other connection methods.|
|0x2303 | invalid variables | The parameter is invalid. Please check the corresponding interface specification and adjust the parameter type and size.|
|0x2304 | statement is closed | Statement has already been closed. Please check if the statement is closed and reused, or if the connection is working properly.|
|0x2305 | resultSet is closed | The ResultSet has been released. Please check if the ResultSet has been released and used again.|
|0x230d | parameter index out of range | parameter out of range, please check the reasonable range of the parameter.|
|0x230e | connection already closed | The connection has been closed. Please check if the connection is closed and used again, or if the connection is working properly.|
|0x230f | unknown SQL type in TDengine | Please check the Data Type types supported by TDengine.|
|0x2315 | unknown tao type in TDengine | Did the correct TDengine data type be specified when converting TDengine data type to JDBC data type.|
|0x2319 | user is required | Username information is missing when creating a connection.|
|0x231a | password is required | Password information is missing when creating a connection.|
|0x231d | can't create connection with server within | Increase connection time by adding the parameter httpConnectTimeout, or check the connection status with taosAdapter.|
|0x231e | failed to complete the task within the specified time | Increase execution time by adding the parameter messageWaitTimeout, or check the connection with taosAdapter.|
|0x2352 | unsupported encoding | An unsupported character encoding set was specified under the local connection.|
|0x2353 | internal error of database, Please see taoslog for more details | An error occurred while executing prepareStatement on the local connection. Please check the taoslog for problem localization.|
|0x2354 | connection is NULL | Connection has already been closed while executing the command on the local connection. Please check the connection with TDengine.|
|0x2355 | result set is NULL | Local connection to obtain result set, result set exception, please check connection status and retry.|
|0x2356 | invalid num of fields | The meta information obtained from the local connection result set does not match.|
|0x2357 | empty SQL string | Fill in the correct SQL for execution.|
|0x2371 | consumer properties must not be null | When creating a subscription, the parameter is empty. Please fill in the correct parameter.|
|0x2375 | topic reference has been destroyed | During the process of creating a data subscription, the topic reference was released. Please check the connection with TDengine.|
|0x2376 | failed to set consumer topic, Topic name is empty | During the process of creating a data subscription, the subscription topic name is empty. Please check if the specified topic name is filled in correctly.|
|0x2377 | consumer reference has been destroyed | The subscription data transmission channel has been closed, please check the connection with TDengine.|
|0x2378 | consumer create error | Failed to create data subscription. Please check the taos log based on the error message to locate the problem.|
|0x237a | vGroup not found in result set VGroup | Not assigned to the current consumer, due to the Rebalance mechanism, the relationship between Consumer and VGroup is not bound.|
## Data type mapping
@ -96,13 +96,13 @@ TDengine currently supports timestamp, number, character, and boolean types, and
The semantic reason for using At Least One (at least once) is:
-TDengine currently does not support transactions and cannot perform frequent checkpoint operations and complex transaction coordination.
-Due to TDengine's use of timestamps as primary keys, downstream operators of duplicate data can perform filtering operations to avoid duplicate calculations.
-Using At Least One (at least once) to ensure high data processing performance and low data latency, the setting method is as follows:
- TDengine currently does not support transactions and cannot perform frequent checkpoint operations and complex transaction coordination.
- Due to TDengine's use of timestamps as primary keys, downstream operators of duplicate data can perform filtering operations to avoid duplicate calculations.
- Using At Least One (at least once) to ensure high data processing performance and low data latency, the setting method is as follows:
Instructions:
```text
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
@ -121,7 +121,7 @@ If using Maven to manage a project, simply add the following dependencies in pom
The parameters for establishing a connection include URL and Properties.
The URL specification format is:
`jdbc: TAOS-WS://[host_name]:[port]/[database_name]? [user={user}|&password={password}|&timezone={timezone}]`
`jdbc: TAOS-WS://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&timezone={timezone}]`
Parameter description:
@ -142,18 +142,17 @@ By setting the parallelism of the data source, multiple threads can read data fr
The configuration parameters in Properties are as follows:
|Parameter Name | Type | Parameter Description | Remarks|
| ----------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TDengineConfigParams.PROPERTYKEYUSER | string | Login TDengine username, default value 'root' ||
| TDengineConfigParams.PROPERTYKEY-PASSWORD | string | User login password, default value 'taosdata' ||
| TDengineConfigParams.If the downstream operator receives data of RowData type, it only needs to be set to RowData. If the user needs to customize the type, the complete class path needs to be set here|
| TDengineConfigParams.TD_STACTMODE | boolean | This parameter is used to batch push data to downstream operators. If set to True, when creating a TDengine Source object, the data type needs to be specified as SourceRecords \<type \>| The type here is the type used to receive data from downstream operators|
| TDengineConfigParams.PROPERTYKEY_CARSET | string | The character set used by the client, with the default value being the system character set. ||
| TDengineConfigParams.PROPERTYKEY.MSSAGE_maIT_TIMEOUT | integer | Message timeout, in milliseconds, default value is 60000 ||
| TDengineConfigParams.Whether compression is enabled during the transmission process. true: Enable, false: Not enabled. Default is false ||
| TDengineConfigParams.Whether to enable automatic reconnection or not. true: Enable, false: Not enabled. Default to false||
| TDengineConfigParams.PROPERTYKEY-RECONNECT-RETR_COUNT | integer | number of automatic reconnection retries, default value 3 | only takes effect when PROPERTYKEY-INABLE AUTO-RECONNECT is true|
| TDengineConfigParams.PROPERTYKEYDISABLE_SSL_CERTVNet | boolean | Disable SSL certificate verification. true: close, false: Not closed. The default is false||
- TDengineConfigParams.PROPERTY_KEY_USER: Login to TDengine username, default value is 'root '.
- TDengineConfigParams.PROPERTY_KEY_PASSWORD: User login password, default value 'taosdata'.
- TDengineConfigParams.VALUE_DESERIALIZER: The downstream operator receives the result set deserialization method. If the received result set type is `RowData` of `Flink`, it only needs to be set to `RowData`. It is also possible to inherit `TDengineRecordDeserialization` and implement `convert` and `getProducedType` methods, customizing the deserialization method based on `ResultSet` of `SQL`.
- TDengineConfigParams.TD_BATCH_MODE: This parameter is used to batch push data to downstream operators. If set to True, when creating the `TDengine Source` object, it is necessary to specify the data type as a `Template` form of the `SourceRecords` type.
- TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: Message timeout time, in milliseconds, default value is 60000.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: Is compression enabled during the transmission process. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: Whether to enable automatic reconnection. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: Automatic reconnection retry interval, in milliseconds, default value 2000. It only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: The default value for automatic reconnection retry is 3, which only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: Turn off SSL certificate verification. true: Enable, false: Not enabled. The default is false.
#### Split by time
@ -209,27 +208,32 @@ Example of custom data type query result:
</details>
- ResultBean is a custom inner class used to define the data type of the Source query results.
- ResultSoureDeserialization is a custom inner class that inherits Tdengine RecordDesrialization and implements convert and getProducedType methods.
- ResultSoureDeserialization is a custom inner class that inherits `TDengine` RecordDesrialization and implements convert and getProducedType methods.
### CDC Data Subscription
Flink CDC is mainly used to provide data subscription functionality, which can monitor real-time changes in TDengine database data and transmit these changes in the form of data streams to Flink for processing, while ensuring data consistency and integrity.
#### Parameter Description
Parameter Description
| Parameter Name | Type | Parameter Description | Remarks |
|-------------------------------------------|:-------:|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| TDengineCdcParams.BOOTSTRAP_SERVER | string | ip address of the server | |
| TDengineCdcParams.CONNECT-USER | string | username | |
| TDengineCdcParams.CONNECT-PASS | string | password | |
| TDengineCdcParams.POLL_INTERVAL_MS | integer | pull data interval, default 500ms | |
| TDengineConfigParams.VALUE_DESERIALIZER | string | the type of data received by the operator| If the downstream operator receives data of RowData type, it only needs to be set to RowData. If the user needs to customize the type, the complete class path needs to be set here |
| TDengineCdcParams.TMQ_STACTMODE | boolean | this parameter is used to batch push data to downstream operators. If set to True, when creating a TDengine CdcSource object, the data type needs to be specified as ConsumerRecords \<type \> | The type here is the type used to receive data from downstream operators |
| TDengineCdcParams.GROUP ID | string | consumption group ID, shared consumption progress within the same consumption group | <br/>* * Required field * *. Maximum length: 192< Each topic can create up to 100 consumers
| TDengineCdcParams.AUTO-OFFSET-REET | string | initial position of consumer group subscription | early: subscribe from scratch<br/>latest: default; Subscribe only from the latest data |
| TDengineCdcParams.ENABLEAUTO_CMMIT | boolean | whether to automatically submit, true: Enable (for downstream stateless operators); false Commit triggered by checkpoint; default false | |
| TDengineCdcParams.AUTO_CMMIT_INTERVAL_S | integer | the time interval for automatically submitting consumption records to consumption sites, in milliseconds | The default value is 5000, and this parameter takes effect when AUTO_oFFSET-REET is set to true |
| TDengineCdcParams.TMQ_SSSION_TIMEOUT_SS | integer | timeout after consumer heartbeat loss, which triggers rebalancing logic. After success, the consumer will be deleted (supported from TDengine 3.3.3.0 version) | default value is 12000, value range [60001800000] |
| TDengineCdcParams.TMQ_maX_POLL_INTERVAL_S | integer | the longest time interval for pulling data from a consumer poll. If this time is exceeded, the consumer will be considered offline and the rebalancing logic will be triggered. After success, the consumer will be deleted (supported from version 3.3.3.0) | The default value is 300000, [1000INT32_MAX]
- TDengineCdcParams.BOOTSTRAP_SERVERS: `ip:port` of the TDengine server, if using WebSocket connection, then it is the `ip:port` where taosAdapter is located.
- TDengineCdcParams.CONNECT_USER: Login to TDengine username, default value is 'root '.
- TDengineCdcParams.CONNECT_PASS: User login password, default value 'taosdata'.
- TDengineCdcParams.POLL_INTERVAL_MS: Pull data interval, default 500ms.
- TDengineCdcParams. VALUE_DESERIALIZER: Result set deserialization method, If the received result set type is `RowData` of `Flink`, simply set it to 'RowData'. You can inherit `com.taosdata.jdbc.tmq.ReferenceDeserializer`, specify the result set bean, and implement deserialization. You can also inherit `com.taosdata.jdbc.tmq.Deserializer` and customize the deserialization method based on the SQL resultSet.
- TDengineCdcParams.TMQ_BATCH_MODE: This parameter is used to batch push data to downstream operators. If set to True, when creating the `TDengineCdcSource` object, it is necessary to specify the data type as a template form of the `ConsumerRecords` type.
- TDengineCdcParams.GROUP_ID: Consumer group ID, the same consumer group shares consumption progress。Maximum length: 192.
- TDengineCdcParams.AUTO_OFFSET_RESET: Initial position of the consumer group subscription `earliest` subscribe from the beginning, `latest` subscribe from the latest data, default `latest`)。
- TDengineCdcParams.ENABLE_AUTO_COMMIT: Whether to enable automatic consumption point submissiontrue: automatic submissionfalsesubmit based on the `checkpoint` time, default to false.
> **Note**The automatic submission mode of the reader automatically submits data after obtaining it, regardless of whether the downstream operator has processed the data correctly. There is a risk of data loss, and it is mainly used for efficient stateless operator scenarios or scenarios with low data consistency requirements.
- TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS: Time interval for automatically submitting consumption records, in milliseconds, default 5000. This parameter takes effect when `ENABLE_AUTO_COMMIT` is set to true.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: Is compression enabled during the transmission process. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: Whether to enable automatic reconnection. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: Automatic reconnection retry interval, in milliseconds, default value 2000. It only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: The default value for automatic reconnection retry is 3, which only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS: Timeout after consumer heartbeat is lost, after which rebalance logic is triggered, and upon success, that consumer will be removed (supported from version 3.3.3.0)Default is 12000, range [6000, 1800000].
- TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS: The longest time interval for consumer poll data fetching, exceeding this time will be considered as the consumer being offline, triggering rebalance logic, and upon success, that consumer will be removed (supported from version 3.3.3.0) Default is 300000, range [1000, INT32_MAX].
#### Use CDC connector
@ -267,24 +271,23 @@ Example of custom data type query result:
The core function of Sink is to efficiently and accurately write Flink processed data from different data sources or operators into TDengine. In this process, the efficient write mechanism possessed by TDengine played a crucial role, effectively ensuring the fast and stable storage of data.
#### Sink Properties
Sink Properties
| Parameter Name | Type | Parameter Description | Remarks|
|---------------------------------------------------------|:----------------------------------------------------------------------------------------------------:|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TDengineConfigParams.PROPERTYKEYUSER | string | Login TDengine username, default value 'root' ||
| TDengineConfigParams.PROPERTYKEY-PASSWORD | string | User login password, default value 'taosdata' ||
| TDengineConfigParams.PROPERTYKEYDBNAME | string | Database name written ||
| TDengineConfigParams.TD_SUPERTABLeNAME | string | Name of the super table to be written | If the data received by the super table must have a tbname field, determine which sub table to write to|
| TDengineConfigParams.TD_TABLeNAME | string | The name of the table to be written, this parameter only needs to be set together with TD_SUPERTABLeNAME | Used to determine which sub table or regular table to write to|
| TDengineConfigParams.TD_STACTISZE | integer | Set batch size | Write when the batch quantity is reached, or a checkpoint time will also trigger writing to the database|
| TDengineConfigParams.VALUE_DESERIALIZER | string | If the downstream operator receives data of RowData type, it only needs to be set to RowData. If the user needs to customize the type, the complete class path needs to be set here |
| TDengineConfigParams.TD_STACTMODE | boolean | This parameter is used to set the reception of batch data | If set to True:< The source is TDengine Source, using SourceRecords \<type \>to create TDengine Sink object<br/>The source is TDengine CDC, using ConsumerRecords \<type \>to create TDengine Sink object | The type here is the type that receives data|
| TDengineConfigParams.TD_SOURCETYPE | string | If the data is from a source, such as source or cdc | TDengine source is set to "tdengine_stource", TDengine cdc is set to "tdengine_cdc"|
| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | Message timeout, in milliseconds, default value is 60000 ||
| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | Whether compression is enabled during the transmission process. true: Enable, false: Not enabled. Default is false | |
| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT | integer| to enable automatic reconnection or not. true: Enable, false: Not enabled. Default to false | |
| TDengineConfigParams.PROPERTYKEY_RECONNECT_RETR_COUNT | integer | number of automatic reconnection retries, default value 3 | only takes effect when PROPERTYKEY-INABLE AUTO-RECONNECT is true|
| TDengineConfigParams.PROPERTYKEYDISABLE_SSL_CERTVNet | boolean | Disable SSL certificate verification. true: close, false: Not closed. The default is false ||
- TDengineConfigParams.PROPERTY_KEY_USER: Login to TDengine username, default value is 'root '.
- TDengineConfigParams.PROPERTY_KEY_PASSWORD: User login password, default value 'taosdata'.
- TDengineConfigParams.PROPERTY_KEY_DBNAME: The database name.
- TDengineConfigParams.TD_SUPERTABLE_NAME:The name of the super table. The received data must have a tbname field to determine which sub table to write to.
- TDengineConfigParams.TD_TABLE_NAME: The table name of a sub table or a normal table. This parameter only needs to be set together with `TD_SUPERTABLE_NAME`.
- TDengineConfigParams.VALUE_DESERIALIZER: The deserialization method for receiving result sets. If the type of the received result set is RowData of Flink, it only needs to be set to RowData. It is also possible to inherit 'TDengine SinkRecordSequencer' and implement the 'serialize' method, customizing the deserialization method based on the received data type.
- TDengineConfigParams.TD_BATCH_SIZE: Set the batch size for writing to the `TDengine` database once | Writing will be triggered when the number of batches is reached, or when a checkpoint is set.
- TDengineConfigParams.TD_BATCH_MODE: When set to True for receiving batch data, if the data source is `TDengine Source` , use the `SourceRecords Template` type to create a `TDengineSink` object; If the source is `TDengine CDC`, use the `ConsumerRecords Template` to create a `TDengineSink` object.
- TDengineConfigParams.TD_SOURCE_TYPE: Set the data source. When the data source is `TDengine Source`, it is set to 'tdengine_stource', and when the source is `TDengine CDC`, it is set to 'tdengine_cdc'. When the configuration of `TD_BATCH_MODE` is set to True, it takes effect.
- TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: Message timeout time, in milliseconds, default value is 60000.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: Is compression enabled during the transmission process. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: Whether to enable automatic reconnection. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: Automatic reconnection retry interval, in milliseconds, default value 2000. It only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: The default value for automatic reconnection retry is 3, which only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: Turn off SSL certificate verification. true: Enable, false: Not enabled. The default is false.
Usage example:
@ -316,17 +319,17 @@ Extract data from multiple different data source databases (such as TDengine, My
Parameter configuration instructions:
| Parameter Name | Type | Parameter Description | Remarks|
|-----------------------| :-----: | ------------ | ------ |
| connector | string | connector identifier, set `tdengine-connector`||
| td.jdbc.url | string | url of the connection ||
| td.jdbc.mode | strng | connector type: `source`, `sink`| |
| table.name | string | original or target table name ||
| scan.query | string | SQL statement to retrieve data||
| sink.db.name | string | target database name||
| sink.superstable.name | string | write the name of the superstable||
| sink.batch.size | integer| batch size written||
| sink.table.name | string | name of the regular table or sub table written||
| Parameter Name | Type | Parameter Description |
|-----------------------| :-----: | ------------ |
| connector | string | connector identifier, set `tdengine-connector`|
| td.jdbc.url | string | url of the connection |
| td.jdbc.mode | strng | connector type: `source`, `sink`|
| table.name | string | original or target table name |
| scan.query | string | SQL statement to retrieve data|
| sink.db.name | string | target database name|
| sink.supertable.name | string | name of the supertable|
| sink.batch.size | integer| batch size written|
| sink.table.name | string | the table name of a sub table or a normal table |
Usage example:
@ -351,13 +354,13 @@ Parameter configuration instructions:
| bootstrap. servers| string | server address |
| topic | string | subscribe to topic |
| td.jdbc.mode | strng | connector type: `cdc`, `sink` |
| group.id | string | Consumption group ID, sharing consumption progress within the same consumption group |
| auto.offset.reset | string | initial position for consumer group subscription. <br/> earliest: subscribe from scratch <br/> latest: default; Subscribe only from the latest data|
| poll.interval_mas | integer | Pull data interval, default 500ms |
| sink.db.name | string | Target database name |
| sink.superstable.name | string | Write the name of the superstable |
| group.id | string | consumption group ID, sharing consumption progress within the same consumption group |
| auto.offset.reset | string | initial position for consumer group subscription. <br/> `earliest`: subscribe from the beginning <br/> `latest` subscribe from the latest data <br/>default `latest`|
| poll.interval_mas | integer | pull data interval, default 500ms |
| sink.db.name | string | target database name |
| sink.supertable.name | string | name of the supertable |
| sink.batch.size | integer | batch size written |
| sink.table.name | string | Name of the regular table or sub table written |
| sink.table.name | string | the table name of a sub table or a normal table |
Usage example:

View File

@ -122,6 +122,7 @@ URL 规范格式为:
`jdbc:TAOS-WS://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&timezone={timezone}]`
参数说明:
- user登录 TDengine 用户名,默认值 'root'。
- password用户登录密码默认值 'taosdata'。
- database_name: 数据库名称。
@ -135,22 +136,18 @@ URL 规范格式为:
Source 拉取 TDengine 数据库中的数据,并将获取到的数据转换为 Flink 内部可处理的格式和类型,并以并行的方式进行读取和分发,为后续的数据处理提供高效的输入。
通过设置数据源的并行度,实现多个线程并行地从数据源中读取数据,提高数据读取的效率和吞吐量,充分利用集群资源进行大规模数据处理能力。
#### Source Properties
Properties 中配置参数如下:
| 参数名称 | 类型 | 参数说明 | 备注 |
| ----------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TDengineConfigParams.PROPERTY_KEY_USER | string | 登录 TDengine 用户名,默认值 'root'。| |
| TDengineConfigParams.PROPERTY_KEY_PASSWORD| string | 用户登录密码,默认值 'taosdata'。| |
| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径。|
| TDengineConfigParams.TD_BATCH_MODE | boolean | 此参数用于批量将数据推送给下游算子,如果设置为 True创建 TDengineSource 对象时需要指定数据类型为 SourceRecords\<类型\> 。 | 此处的类型为用下游算子接收数据的类型。|
| TDengineConfigParams.PROPERTY_KEY_CHARSET | string | 客户端使用的字符集,默认值为系统字符集。| |
| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | 消息超时时间,单位 ms 默认值为 60000。| |
| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | 传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。| |
| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT| boolean| 是否启用自动重连。true: 启用false: 不启用。默认为 false。||
| TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。|
| TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION| boolean| 关闭 SSL 证书验证 。true: 关闭false: 不关闭。默认为 false。||
- TDengineConfigParams.PROPERTY_KEY_USER登录 TDengine 用户名,默认值 'root'。
- TDengineConfigParams.PROPERTY_KEY_PASSWORD用户登录密码默认值 'taosdata'。
- TDengineConfigParams.VALUE_DESERIALIZER下游算子接收结果集反序列化方法, 如果接收结果集类型是 `Flink``RowData`,仅需要设置为 `RowData`即可。也可继承 `TDengineRecordDeserialization` 并实现 `convert``getProducedType` 方法,根据 `SQL``ResultSet` 自定义反序列化方式。
- TDengineConfigParams.TD_BATCH_MODE此参数用于批量将数据推送给下游算子如果设置为 True创建 `TDengineSource` 对象时需要指定数据类型为 `SourceRecords` 类型的泛型形式。
- TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms 默认值为 60000。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔,单位毫秒,默认值 2000。仅在 `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` 为 true 时生效。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数,默认值 3仅在 `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` 为 true 时生效。
- TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: 关闭 SSL 证书验证 。true: 启用false: 不启用。默认为 false。
#### 按时间分片
@ -206,27 +203,32 @@ Properties 中配置参数如下:
</details>
- ResultBean 自定义的一个内部类,用于定义 Source 查询结果的数据类型。
- ResultSoureDeserialization 是自定义的一个内部类,通过继承 TdengineRecordDeserialization 并实现 convert 和 getProducedType 方法。
- ResultSoureDeserialization 是自定义的一个内部类,通过继承 `TDengineRecordDeserialization` 并实现 `convert``getProducedType` 方法。
### CDC 数据订阅
Flink CDC 主要用于提供数据订阅功能,能实时监控 TDengine 数据库的数据变化,并将这些变更以数据流形式传输到 Flink 中进行处理,同时确保数据的一致性和完整性。
Flink CDC 主要用于提供数据订阅功能,能实时监控 `TDengine` 数据库的数据变化,并将这些变更以数据流形式传输到 `Flink` 中进行处理,同时确保数据的一致性和完整性。
#### 参数说明
| 参数名称 | 类型 | 参数说明 | 备注 |
| ----------------------- | :-----: | ------------------------- | -------------------------- |
| TDengineCdcParams.BOOTSTRAP_SERVERS| string | 服务端的 IP 地址。 | |
| TDengineCdcParams.CONNECT_USER| string | 用户名。 | |
| TDengineCdcParams.CONNECT_PASS| string | 密码。 | |
| TDengineCdcParams.POLL_INTERVAL_MS|int| 拉取数据间隔, 默认 500ms。| |
| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型。 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径。|
| TDengineCdcParams.TMQ_BATCH_MODE| boolean | 此参数用于批量将数据推送给下游算子,如果设置为 True创建 TDengineCdcSource 对象时需要指定数据类型为 ConsumerRecords\<类型\>。| 此处的类型为用下游算子接收数据的类型。|
| TDengineCdcParams.GROUP_ID| string | 消费组 ID同一消费组共享消费进度。 | <br />**必填项**。最大长度192。<br />每个 topic 最多可建立 100 个 consumer 。|
| TDengineCdcParams.AUTO_OFFSET_RESET| string | 消费组订阅的初始位置。 | earliest: 从头开始订阅<br/> latest: default; 仅从最新数据开始订阅。|
| TDengineCdcParams.ENABLE_AUTO_COMMIT| boolean | 是否自动提交true: 启用(用于下游均为无状态算子) false由 checkpoint 触发 commit 。| 默认 false。|
| TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS| integer|消费记录自动提交消费位点时间间隔,单位为毫秒。| 默认值为 5000, 此参数在 AUTO_OFFSET_RESET 为 true 生效。|
| TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS| integer | consumer 心跳丢失后超时时间,超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除(从 TDengine 3.3.3.0 版本开始支持)。| 默认值为 12000取值范围 [6000 1800000]。 |
| TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS| integer | consumer poll 拉取数据间隔的最长时间,超过该时间,会认为该 consumer 离线,触发 rebalance 逻辑,成功后该 consumer 会被删除(从 3.3.3.0 版本开始支持)。 | 默认值为 300000[1000INT32_MAX]。|
Properties 中配置参数如下:
- TDengineCdcParams.BOOTSTRAP_SERVERSTDengine 服务端所在的`ip:port`,如果使用 `WebSocket` 连接,则为 taosAdapter 所在的`ip:port`。
- TDengineCdcParams.CONNECT_USER登录 TDengine 用户名,默认值 'root'。
- TDengineCdcParams.CONNECT_PASS用户登录密码默认值 'taosdata'。
- TDengineCdcParams.POLL_INTERVAL_MS拉取数据间隔, 默认 500ms。
- TDengineCdcParams.VALUE_DESERIALIZER结果集反序列化方法如果接收结果集类型是 `Flink``RowData`,仅需要设置为 `RowData`即可。可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean实现反序列化。
- TDengineCdcParams.TMQ_BATCH_MODE此参数用于批量将数据推送给下游算子如果设置为 True创建 `TDengineCdcSource` 对象时需要指定数据类型为 `ConsumerRecords` 类型的泛型形式。
- TDengineCdcParams.GROUP_ID消费组 ID同一消费组共享消费进度。最大长度192。
- TDengineCdcParams.AUTO_OFFSET_RESET 消费组订阅的初始位置 `earliest` 从头开始订阅, `latest` 仅从最新数据开始订阅, 默认 `latest`)。
- TDengineCdcParams.ENABLE_AUTO_COMMIT是否启用消费位点自动提交true: 自动提交false依赖 `checkpoint` 时间来提交, 默认 false。
> **注意**自动提交模式reader获取完成数据后自动提交不管下游算子是否正确的处理了数据存在数据丢失的风险主要用于为了追求高效的无状态算子场景或是数据一致性要求不高的场景。
- TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS消费记录自动提交消费位点时间间隔单位为毫秒。默认值为 5000, 此参数在 `ENABLE_AUTO_COMMIT` 为 true 生效。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT是否启用自动重连。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS自动重连重试间隔单位毫秒默认值 2000。仅在 `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` 为 true 时生效。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT自动重连重试次数默认值 3仅在 `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` 为 true 时生效。
- TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS`consumer` 心跳丢失后超时时间,超时后会触发 `rebalance` 逻辑,成功后该 `consumer` 会被删除从3.3.3.0版本开始支持), 默认值为 12000取值范围 [6000 1800000]。
- TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS`consumer poll` 拉取数据间隔的最长时间,超过该时间,会认为该 `consumer` 离线,触发 `rebalance` 逻辑,成功后该 `consumer` 会被删除。 默认值为 300000[1000INT32_MAX]。
#### 使用 CDC 连接器
@ -259,30 +261,29 @@ CDC 连接器会根据用户设置的并行度进行创建 consumer因此用
```
</details>
- ResultBean 是自定义的一个内部类,其字段名和数据类型与列的名称和数据类型一一对应,这样根据 value.deserializer 属性对应的反序列化类可以反序列化出 ResultBean 类型的对象。
- ResultBean 是自定义的一个内部类,其字段名和数据类型与列的名称和数据类型一一对应,这样根据 `TDengineCdcParams.VALUE_DESERIALIZER` 属性对应的反序列化类可以反序列化出 ResultBean 类型的对象。
### Sink
Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自不同数据源或算子的数据写入 TDengine。在这一过程中TDengine 所具备的高效写入机制发挥了至关重要的作用,有力保障了数据的快速和稳定存储。
Sink 的核心功能在于高效且精准地将经过 `Flink` 处理的、源自不同数据源或算子的数据写入 `TDengine`。在这一过程中,`TDengine` 所具备的高效写入机制发挥了至关重要的作用,有力保障了数据的快速和稳定存储。
#### Sink Properties
Properties 中配置参数如下:
| 参数名称 | 类型 | 参数说明 | 备注 |
| ----------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TDengineConfigParams.PROPERTY_KEY_USER | string | 登录 TDengine 用户名,默认值 'root'。| |
| TDengineConfigParams.PROPERTY_KEY_PASSWORD| string | 用户登录密码,默认值 'taosdata'。| |
| TDengineConfigParams.PROPERTY_KEY_DBNAME| string | 写入的数据库名称。||
| TDengineConfigParams.TD_SUPERTABLE_NAME| string | 写入的超级表名称。| 如果是超级表接收的数据必须有 tbname 字段,确定写入那张子表。|
| TDengineConfigParams.TD_TABLE_NAME| string | 写入的表名此参数和TD_SUPERTABLE_NAME 仅需要设置一个即可。| 用于确定写入那张子表或普通表。|
| TDengineConfigParams.TD_BATCH_SIZE| integer | 设置批大小 | 当到达批的数量后进行写入或是一个checkpoint的时间也会触发写入数据库。|
| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型。 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径。|
| TDengineConfigParams.TD_BATCH_MODE | boolean | 此参数用于设置接收批量数据。 | 如果设置为 True:<br/> 来源是 TDengine Source 使用SourceRecords\<类型\> 创建 TDengineSink 对象<br/> 来源是 TDengine CDC 使用 ConsumerRecords\<类型\> 创建 TDengineSink 对象。| 此处的类型为接收数据的类型。|
| TDengineConfigParams.TD_SOURCE_TYPE | string | 如果数据是表示数据来源是source 或者 cdc 等。 | TDengine source 设置为 "tdengine_source", TDengine cdc 设置为 "tdengine_cdc"。|
| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | 消息超时时间,单位 ms 默认值为 60000。| |
| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | 传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。| |
| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT| boolean| 是否启用自动重连。true: 启用false: 不启用。默认为 false。||
| TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3。 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。|
| TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION| boolean| 关闭 SSL 证书验证 。true: 关闭false: 不关闭。默认为 false。||
- TDengineConfigParams.PROPERTY_KEY_USER登录 `TDengine` 用户名,默认值 'root'。
- TDengineConfigParams.PROPERTY_KEY_PASSWORD用户登录密码默认值 'taosdata'。
- TDengineConfigParams.PROPERTY_KEY_DBNAME写入的数据库名称。
- TDengineConfigParams.TD_SUPERTABLE_NAME写入的超级表名称。接收的数据必须有 tbname 字段,确定写入那张子表。
- TDengineConfigParams.TD_TABLE_NAME写入子表或普通表的表名此参数和TD_SUPERTABLE_NAME 仅需要设置一个即可。
- TDengineConfigParams.VALUE_DESERIALIZER接收结果集反序列化方法, 如果接收结果集类型是 `Flink``RowData`,仅需要设置为 `RowData`即可。也可继承 `TDengineSinkRecordSerializer` 并实现 `serialize` 方法,根据 接收的数据类型自定义反序列化方式。
- TDengineConfigParams.TD_BATCH_SIZE设置一次写入 `TDengine` 数据库的批大小 | 当到达批的数量后进行写入或是一个checkpoint的时间也会触发写入数据库。
- TDengineConfigParams.TD_BATCH_MODE接收批量数据当设置为 True 时,如果数据来源是 `TDengine Source`,则使用 `SourceRecords` 泛型类型来创建 `TDengineSink` 对象;若来源是 `TDengine CDC`,则使用 `ConsumerRecords` 泛型来创建 `TDengineSink` 对象。
- TDengineConfigParams.TD_SOURCE_TYPE设置数据来源。 当数据来源是 `TDengine Source` 是设置为 'tdengine_source', 当来源是 `TDengine CDC` 设置为 'tdengine_cdc'。当配置 `TD_BATCH_MODE` 为 True 生效。
- TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms 默认值为 60000。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔,单位毫秒,默认值 2000。仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数,默认值 3仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
- TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: 关闭 SSL 证书验证 。true: 启用false: 不启用。默认为 false。
使用示例:
@ -314,8 +315,8 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自
参数配置说明:
| 参数名称 | 类型 | 参数说明 | 备注 |
| ----------------------- | :-----: | ------------ | ------ |
| 参数名称 | 类型 | 参数说明 |
| ----------------------- | :-----: | ------------ |
| connector | string | 连接器标识,设置 `tdengine-connector` 。|
| td.jdbc.url| string | 连接的 url 。|
| td.jdbc.mode | strng | 连接器类型, 设置 `source`, `sink`。|
@ -351,7 +352,7 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自
| topic | string | 订阅主题。||
| td.jdbc.mode | strng | 连接器类型, cdc, sink。|
| group.id| string| 消费组 ID同一消费组共享消费进度。 |
| auto.offset.reset| string| 消费组订阅的初始位置。<br/>earliest: 从头开始订阅 <br/> latest: default; 仅从最新数据开始订阅。|
| auto.offset.reset| string| 消费组订阅的初始位置。<br/>`earliest`: 从头开始订阅 <br/> `latest`: 仅从最新数据开始订阅<br/> 默认 `latest`。|
| poll.interval_ms| integer| 拉取数据间隔, 默认 500ms。|
| sink.db.name|string| 目标数据库名称。|
| sink.supertable.name|string |写入的超级表名称。|