add flink doc

This commit is contained in:
menshibin 2024-12-31 11:09:33 +08:00
parent 5bb7be712d
commit 8967d8f209
2 changed files with 952 additions and 0 deletions

View File

@ -0,0 +1,586 @@
package com.taosdata.flink.example;
import com.taosdata.flink.cdc.TDengineCdcSource;
import com.taosdata.flink.common.TDengineCdcParams;
import com.taosdata.flink.common.TDengineConfigParams;
import com.taosdata.flink.sink.TDengineSink;
import com.taosdata.flink.source.TDengineSource;
import com.taosdata.flink.source.entity.SourceSplitSql;
import com.taosdata.flink.source.entity.SplitType;
import com.taosdata.flink.source.entity.TimestampSplitInfo;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.shaded.curator5.com.google.common.base.Strings;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.junit.Assert;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import javax.xml.transform.Source;
import org.apache.flink.streaming.api.CheckpointingMode;
public class Main {
static String jdbcUrl = "jdbc:TAOS-WS://localhost:6041?user=root&password=taosdata";
static void prepare() throws ClassNotFoundException, SQLException {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
String insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS('California.SanFrancisco', 1) " +
"VALUES " +
"('2024-12-19 19:12:45.642', 50.30000, 201, 0.31000) " +
"('2024-12-19 19:12:46.642', 82.60000, 202, 0.33000) " +
"('2024-12-19 19:12:47.642', 92.30000, 203, 0.31000) " +
"('2024-12-19 18:12:45.642', 50.30000, 201, 0.31000) " +
"('2024-12-19 18:12:46.642', 82.60000, 202, 0.33000) " +
"('2024-12-19 18:12:47.642', 92.30000, 203, 0.31000) " +
"('2024-12-19 17:12:45.642', 50.30000, 201, 0.31000) " +
"('2024-12-19 17:12:46.642', 82.60000, 202, 0.33000) " +
"('2024-12-19 17:12:47.642', 92.30000, 203, 0.31000) " +
"power.d1002 USING power.meters TAGS('Alabama.Montgomery', 2) " +
"VALUES " +
"('2024-12-19 19:12:45.642', 50.30000, 204, 0.25000) " +
"('2024-12-19 19:12:46.642', 62.60000, 205, 0.33000) " +
"('2024-12-19 19:12:47.642', 72.30000, 206, 0.31000) " +
"('2024-12-19 18:12:45.642', 50.30000, 204, 0.25000) " +
"('2024-12-19 18:12:46.642', 62.60000, 205, 0.33000) " +
"('2024-12-19 18:12:47.642', 72.30000, 206, 0.31000) " +
"('2024-12-19 17:12:45.642', 50.30000, 204, 0.25000) " +
"('2024-12-19 17:12:46.642', 62.60000, 205, 0.33000) " +
"('2024-12-19 17:12:47.642', 72.30000, 206, 0.31000) ";
Class.forName("com.taosdata.jdbc.ws.WebSocketDriver");
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
Statement stmt = connection.createStatement()) {
stmt.executeUpdate("DROP TOPIC IF EXISTS topic_meters");
stmt.executeUpdate("DROP database IF EXISTS power");
// create database
int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power vgroups 5");
stmt.executeUpdate("use power");
// you can check rowsAffected here
System.out.println("Create database power successfully, rowsAffected: " + rowsAffected);
// create table
rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);");
// you can check rowsAffected here
System.out.println("Create stable power.meters successfully, rowsAffected: " + rowsAffected);
stmt.executeUpdate("CREATE TOPIC topic_meters as SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM meters");
int affectedRows = stmt.executeUpdate(insertQuery);
// you can check affectedRows here
System.out.println("Successfully inserted " + affectedRows + " rows to power.meters.");
stmt.executeUpdate("DROP database IF EXISTS power_sink");
// create database
stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power_sink vgroups 5");
stmt.executeUpdate("use power_sink");
// you can check rowsAffected here
System.out.println("Create database power successfully, rowsAffected: " + rowsAffected);
// create table
stmt.executeUpdate("CREATE STABLE IF NOT EXISTS sink_meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);");
// you can check rowsAffected here
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS sink_normal (ts timestamp, current float, voltage int, phase float);");
// you can check rowsAffected here
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create database power or stable meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
throw ex;
}
}
public static void main(String[] args) throws Exception {
prepare();
if (args != null && args.length > 0 && args[0].equals("source")) {
testSource();
} else if (args != null && args.length > 0 && args[0].equals("table")) {
testTableToSink();
} else if (args != null && args.length > 0 && args[0].equals("cdc")) {
testCustomTypeCdc();
}else if (args != null && args.length > 0 && args[0].equals("table-cdc")) {
testCdcTableToSink();
}
}
static SourceSplitSql getTimeSplit() {
// ANCHOR: time_interval
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
// ANCHOR_END: time_interval
return splitSql;
}
static SourceSplitSql getTagSplit() throws Exception {
// ANCHOR: tag_split
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, current, voltage, phase, groupid, location from meters where voltage > 100")
.setTagList(Arrays.asList("groupid >100 and location = 'Shanghai'",
"groupid >50 and groupid < 100 and location = 'Guangzhou'",
"groupid >0 and groupid < 50 and location = 'Beijing'"))
.setSplitType(SplitType.SPLIT_TYPE_TAG);
// ANCHOR_END: tag_split
return splitSql;
}
static SourceSplitSql getTableSqlit() {
// ANCHOR: table_split
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSelect("ts, current, voltage, phase, groupid, location")
.setTableList(Arrays.asList("d1001", "d1002"))
.setOther("order by ts limit 100")
.setSplitType(SplitType.SPLIT_TYPE_TABLE);
// ANCHOR_END: table_split
}
//ANCHOR: source_test
static void testSource() throws Exception {
Properties connProps = new Properties();
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
TDengineSource<RowData> source = new TDengineSource<>(connProps, sql, RowData.class);
DataStreamSource<RowData> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<RowData, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTimestamp(0, 0) +
", current: " + rowData.getFloat(1) +
", voltage: " + rowData.getInt(2) +
", phase: " + rowData.getFloat(3) +
", location: " + new String(rowData.getBinary(4)));
sb.append("\n");
return sb.toString();
});
resultStream.print();
env.execute("tdengine flink source");
}
//ANCHOR_END: source_test
//ANCHOR: source_custom_type_test
void testCustomTypeSource() throws Exception {
System.out.println("testTDengineSourceByTimeSplit start");
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultSoureDeserialization");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
//按照时间分片
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
TDengineSource<ResultBean> source = new TDengineSource<>(connProps, splitSql, ResultBean.class);
DataStreamSource<ResultBean> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<ResultBean, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTs() +
", current: " + rowData.getCurrent() +
", voltage: " + rowData.getVoltage() +
", phase: " + rowData.getPhase() +
", groupid: " + rowData.getGroupid() +
", location" + rowData.getLocation() +
", tbname: " + rowData.getTbname());
sb.append("\n");
totalVoltage.addAndGet(rowData.getVoltage());
return sb.toString();
});
resultStream.print();
env.execute("flink tdengine source");
}
//ANCHOR_END: source_custom_type_test
//ANCHOR: source_batch_test
void testBatchSource() throws Exception {
Properties connProps = new Properties();
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
connProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
Class<SourceRecords<RowData>> typeClass = (Class<SourceRecords<RowData>>) (Class<?>) SourceRecords.class;
SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters");
TDengineSource<SourceRecords<RowData>> source = new TDengineSource<>(connProps, sql, typeClass);
DataStreamSource<SourceRecords<RowData>> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source");
DataStream<String> resultStream = input.map((MapFunction<SourceRecords<RowData>, String>) records -> {
StringBuilder sb = new StringBuilder();
Iterator<RowData> iterator = records.iterator();
while (iterator.hasNext()) {
GenericRowData row = (GenericRowData) iterator.next();
sb.append("ts: " + row.getTimestamp(0, 0) +
", current: " + row.getFloat(1) +
", voltage: " + row.getInt(2) +
", phase: " + row.getFloat(3) +
", location: " + new String(row.getBinary(4)));
sb.append("\n");
totalVoltage.addAndGet(row.getInt(2));
}
return sb.toString();
});
resultStream.print();
env.execute("flink tdengine source");
}
//ANCHOR_END: source_batch_test
//ANCHOR: cdc_source
void testTDengineCdc() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(100, AT_LEAST_ONCE);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.ENABLE_AUTO_COMMIT, "true");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<RowData> tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class);
DataStreamSource<RowData> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source");
DataStream<String> resultStream = input.map((MapFunction<RowData, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("tsxx: " + rowData.getTimestamp(0, 0) +
", current: " + rowData.getFloat(1) +
", voltage: " + rowData.getInt(2) +
", phase: " + rowData.getFloat(3) +
", location: " + new String(rowData.getBinary(4)));
sb.append("\n");
totalVoltage.addAndGet(rowData.getInt(2));
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
// The task submitted by Flink UI cannot be cancle and needs to be stopped on the UI page.
jobClient.cancel().get();
}
//ANCHOR_END: cdc_source
//ANCHOR: cdc_batch_source
void testTDengineCdcBatch() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
config.setProperty(TDengineCdcParams.TMQ_BATCH_MODE, "true");
Class<ConsumerRecords<RowData>> typeClass = (Class<ConsumerRecords<RowData>>) (Class<?>) ConsumerRecords.class;
TDengineCdcSource<ConsumerRecords<RowData>> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass);
DataStreamSource<ConsumerRecords<RowData>> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source");
DataStream<String> resultStream = input.map((MapFunction<ConsumerRecords<RowData>, String>) records -> {
Iterator<ConsumerRecord<RowData>> iterator = records.iterator();
StringBuilder sb = new StringBuilder();
while (iterator.hasNext()) {
GenericRowData row = (GenericRowData) iterator.next().value();
sb.append("tsxx: " + row.getTimestamp(0, 0) +
", current: " + row.getFloat(1) +
", voltage: " + row.getInt(2) +
", phase: " + row.getFloat(3) +
", location: " + new String(row.getBinary(4)));
sb.append("\n");
totalVoltage.addAndGet(row.getInt(2));
}
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
jobClient.cancel().get();
}
//ANCHOR_END: cdc_batch_source
//ANCHOR: cdc_custom_type_test
static void testCustomTypeCdc() throws Exception {
System.out.println("testCustomTypeTDengineCdc start");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(100, AT_LEAST_ONCE);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(4);
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultDeserializer");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<ResultBean> tdengineSource = new TDengineCdcSource<>("topic_meters", config, ResultBean.class);
DataStreamSource<ResultBean> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source");
DataStream<String> resultStream = input.map((MapFunction<ResultBean, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTs() +
", current: " + rowData.getCurrent() +
", voltage: " + rowData.getVoltage() +
", phase: " + rowData.getPhase() +
", groupid: " + rowData.getGroupid() +
", location" + rowData.getLocation() +
", tbname: " + rowData.getTbname());
sb.append("\n");
totalVoltage.addAndGet(rowData.getVoltage());
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
jobClient.cancel().get();
}
//ANCHOR_END: cdc_custom_type_test
//ANCHOR: RowDataToSink
static void testRowDataToSink() throws Exception {
Properties connProps = new Properties();
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
SourceSplitSql splitSql = getTimeSplit();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
TDengineSource<RowData> source = new TDengineSource<>(connProps, sql, RowData.class);
DataStreamSource<RowData> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
Properties sinkProps = new Properties();
sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
sinkProps.setProperty(TDengineConfigParams.TD_SOURCE_TYPE, "tdengine_source");
sinkProps.setProperty(TDengineConfigParams.TD_DATABASE_NAME, "power_sink");
sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters");
sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata");
sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000");
// Arrays.asList The list of target table field names needs to be consistent with the data order
TDengineSink<RowData> sink = new TDengineSink<>(sinkProps,
Arrays.asList("ts", "current", "voltage", "phase", "groupid", "location", "tbname"));
input.sinkTo(sink);
env.execute("flink tdengine source");
}
//ANCHOR_END: RowDataToSink
//ANCHOR: BatchRowDataToSink
static void testBatchToTdSink() throws Exception {
System.out.println("testTDengineCdcToTdSink start");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(500, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(5000);
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
config.setProperty(TDengineCdcParams.TMQ_BATCH_MODE, "true");
Class<ConsumerRecords<RowData>> typeClass = (Class<ConsumerRecords<RowData>>) (Class<?>) ConsumerRecords.class;
TDengineCdcSource<ConsumerRecords<RowData>> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass);
DataStreamSource<ConsumerRecords<RowData>> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
Properties sinkProps = new Properties();
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
sinkProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true");
sinkProps.setProperty(TDengineConfigParams.TD_SOURCE_TYPE, "tdengine_cdc");
sinkProps.setProperty(TDengineConfigParams.TD_DATABASE_NAME, "power_sink");
sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters");
sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000");
TDengineSink<ConsumerRecords<RowData>> sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase", "location", "groupid", "tbname"));
input.sinkTo(sink);
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(6000L);
jobClient.cancel().get();
System.out.println("testTDengineCdcToTdSink finish");
}
//ANCHOR_END: BatchRowDataToSink
//ANCHOR: source_table
static void testTableToSink() throws Exception {
System.out.println("testTableToSink start");
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
String tdengineSourceTableDDL = "CREATE TABLE `meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARBINARY," +
" groupid INT," +
" tbname VARBINARY" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata'," +
" 'td.jdbc.mode' = 'source'," +
" 'table-name' = 'meters'," +
" 'scan.query' = 'SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`'" +
")";
String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARBINARY," +
" groupid INT," +
" tbname VARBINARY" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'sink'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.supertable.name' = 'sink_meters'" +
")";
tableEnv.executeSql(tdengineSourceTableDDL);
tableEnv.executeSql(tdengineSinkTableDDL);
tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`");
}
//ANCHOR_END: source_table
//ANCHOR: cdc_table
static void testCdcTableToSink() throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
String tdengineSourceTableDDL = "CREATE TABLE `meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARBINARY," +
" groupid INT," +
" tbname VARBINARY" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'bootstrap.servers' = 'localhost:6041'," +
" 'td.jdbc.mode' = 'cdc'," +
" 'group.id' = 'group_22'," +
" 'auto.offset.reset' = 'earliest'," +
" 'enable.auto.commit' = 'false'," +
" 'topic' = 'topic_meters'" +
")";
String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARBINARY," +
" groupid INT," +
" tbname VARBINARY" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'cdc'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.supertable.name' = 'sink_meters'" +
")";
tableEnv.executeSql(tdengineSourceTableDDL);
tableEnv.executeSql(tdengineSinkTableDDL);
TableResult tableResult = tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`");
Thread.sleep(5000L);
tableResult.getJobClient().get().cancel().get();
}
//ANCHOR_END: cdc_table
}

View File

@ -0,0 +1,366 @@
---
sidebar_label: Flink
title: TDengine Flink Connector
---
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
Apache Flink 是一款由 Apache 软件基金会支持的开源分布式流批一体化处理框架可用于流处理、批处理、复杂事件处理、实时数据仓库构建及为机器学习提供实时数据支持等诸多大数据处理场景。与此同时Flink 拥有丰富的连接器与各类工具可对接众多不同类型的数据源实现数据的读取与写入。在数据处理的过程中Flink 还提供了一系列可靠的容错机制,有力保障任务即便遭遇意外状况,依然能稳定、持续运行。
借助 TDengine 的 Flink 连接器Apache Flink 得以与 TDengine 数据库无缝对接,一方面能够将经过复杂运算和深度分析后所得到的结果精准存入 TDengine 数据库,实现数据的高效存储与管理;另一方面,也可以从 TDengine 数据库中快速、稳定地读取海量数据,并在此基础上进行全面、深入的分析处理,充分挖掘数据的潜在价值,为企业的决策制定提供有力的数据支持和科学依据,极大地提升数据处理的效率和质量,增强企业在数字化时代的竞争力和创新能力。
## 前置条件
准备以下环境:
- TDengine 集群已部署并正常运行(企业及社区版均可)
- taosAdapter 能够正常运行。详细参考 [taosAdapter 使用手册](../../../reference/components/taosadapter)
- Apache Flink v1.19.0 或以上版本已安装。安装 Apache Flink 请参考 [官方文档](https://flink.apache.org/)
## JRE 版本兼容性
- JRE: 支持 JRE 8 及以上版本。
# 支持的平台
Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。
## 版本历史
| Flink Connector 版本 | 主要变化 | TDengine 版本 |
| ------------------| ------------------------------------ | ---------------- |
| 1.1.0 | 1. 支持 SQL 查询 TDengine 数据库中的数据<br/> 2. 支持 CDC 订阅 TDengine 数据库中的数据<br/> 3. 支持 Table SQL 方式读取和写入 TDengine 数据库| 3.3.5.0 及以上版本 |
## 异常和错误码
在任务执行失败后,查看 Flink 任务执行日志确认失败原因
具体的错误码请参考:
| Error Code | Description | Suggested Actions |
| ---------------- |------------------------------------------------------- | -------------------- |
| 0xa000 |connection param error |连接器参数错误
| 0xa001 |the groupid parameter of CDC is incorrect |CDC 的 groupid 参数错误。|
| 0xa002 |wrong topic parameter for CDC |CDC 的 topic 参数错误。|
| 0xa010 |database name configuration error |数据库名配置错误|
| 0xa011 |table name configuration error |表名配置错误|
| 0xa012 |no data was obtained from the data source |从数据源中获取数据失败|
| 0xa013 |value.deserializer parameter not set |未设置序列化方式|
| 0xa014 |list of column names for target table not set |未设置目标表的列名列表|
| 0x2301 |connection already closed |连接已经关闭,检查连接情况,或重新创建连接去执行相关指令。|
| 0x2302 |this operation is NOT supported currently! |当前使用接口不支持,可以更换其他连接方式。|
| 0x2303 |invalid variables |参数不合法,请检查相应接口规范,调整参数类型及大小。|
| 0x2304 |statement is closed |statement 已经关闭,请检查 statement 是否关闭后再次使用,或是连接是否正常。|
| 0x2305 |resultSet is closed |resultSet 结果集已经释放,请检查 resultSet 是否释放后再次使用。|
| 0x230d |parameter index out of range |参数越界,请检查参数的合理范围。|
| 0x230e |connection already closed |连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。|
| 0x230f |unknown sql type in tdengine |请检查 TDengine 支持的 Data Type 类型。|
| 0x2315 |unknown taos type in tdengine |在 TDengine 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine 数据类型。|
| 0x2319 |user is required |创建连接时缺少用户名信息。|
| 0x231a |password is required |创建连接时缺少密码信息。|
| 0x231d |can't create connection with server within |通过增加参数 httpConnectTimeout 增加连接耗时,或是请检查与 taosAdapter 之间的连接情况。|
| 0x231e |failed to complete the task within the specified time |通过增加参数 messageWaitTimeout 增加执行耗时,或是请检查与 taosAdapter 之间的连接情况。|
| 0x2352 |Unsupported encoding |本地连接下指定了不支持的字符编码集。|
| 0x2353 |internal error of database, please see taoslog for more details |本地连接执行 prepareStatement 时出现错误,请检查 taos log 进行问题定位。|
| 0x2354 |connection is NULL |本地连接执行命令时Connection 已经关闭。请检查与 TDengine 的连接情况。|
| 0x2355 |result set is NULL |本地连接获取结果集,结果集异常,请检查连接情况,并重试。|
| 0x2356 |invalid num of fields |本地连接获取结果集的 meta 信息不匹配。|
| 0x2357 |empty sql string |填写正确的 SQL 进行执行。|
| 0x2371 |consumer properties must not be null! |创建订阅时参数为空,请填写正确的参数。|
| 0x2375 |topic reference has been destroyed |创建数据订阅过程中topic 引用被释放。请检查与 TDengine 的连接情况。|
| 0x2376 |failed to set consumer topic, topic name is empty |创建数据订阅过程中,订阅 topic 名称为空。请检查指定的 topic 名称是否填写正确。|
| 0x2377 |consumer reference has been destroyed |订阅数据传输通道已经关闭,请检查与 TDengine 的连接情况。|
| 0x2378 |consumer create error |创建数据订阅失败,请根据错误信息检查 taos log 进行问题定位。|
| 0x237a |vGroup not found in result set VGroup |没有分配给当前 consumer由于 Rebalance 机制导致 Consumer 与 VGroup 不是绑定的关系。|
## 数据类型映射
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Flink RowData Type 对应类型转换如下:
| TDengine DataType | Flink RowDataType |
| ----------------- | ------------------ |
| TIMESTAMP | TimestampData |
| INT | Integer |
| BIGINT | Long |
| FLOAT | Float |
| DOUBLE | Double |
| SMALLINT | Short |
| TINYINT | Byte |
| BOOL | Boolean |
| BINARY | byte[] |
| NCHAR | StringData |
| JSON | StringData |
| VARBINARY | byte[] |
| GEOMETRY | byte[] |
## 使用说明
### Flink 语义选择说明
采用 At-Least-Once至少一次语义原因
- TDengine 目前不支持事务,不能进行频繁的检查点操作和复杂的事务协调。
- 由于 TDengine 采用时间戳作为主键,重复数据下游算子可以进行过滤操作,避免重复计算。
- 采用 At-Least-Once至少一次确保达到较高的数据处理的性能和较低的数据延时设置方式如下
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
```
如果使用 Maven 管理项目,只需在 pom.xml 中加入以下依赖。
```xml
<dependency>
<groupId>com.taosdata.flink</groupId>
<artifactId>flink-connector-tdengine</artifactId>
<version>1.0.0</version>
</dependency>
```
### 连接参数
建立连接的参数有 URL 和 Properties。
URL 规范格式为:
`jdbc:TAOS-WS://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&timezone={timezone}]`
参数说明:
- user登录 TDengine 用户名,默认值 'root'。
- password用户登录密码默认值 'taosdata'。
- batchErrorIgnoretrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false不再执行失败 SQL 后的任何语句。默认值为false。
- httpConnectTimeout: 连接超时时间,单位 ms 默认值为 60000。
- messageWaitTimeout: 消息超时时间,单位 ms 默认值为 60000。
- useSSL: 连接中是否使用 SSL。
### Source
Source 拉取 TDengine 数据库中的数据,并将获取到的数据转换为 Flink 内部可处理的格式和类型,并以并行的方式进行读取和分发,为后续的数据处理提供高效的输入。
通过设置数据源的并行度,实现多个线程并行地从数据源中读取数据,提高数据读取的效率和吞吐量,充分利用集群资源进行大规模数据处理能力。
#### Source Properties
Properties 中配置参数如下:
| 参数名称 | 类型 | 参数说明 | 备注 |
| ----------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TDengineConfigParams.PROPERTY_KEY_USER | string | 登录 TDengine 用户名,默认值 'root'| |
| TDengineConfigParams.PROPERTY_KEY_PASSWORD| string | 用户登录密码,默认值 'taosdata'| |
| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径|
| TDengineConfigParams.TD_BATCH_MODE | boolean | 此参数用于批量将数据推送给下游算子,如果设置为 True创建 TDengineSource 对象时需要指定数据类型为 SourceRecords\<类型\> | 此处的类型为用下游算子接收数据的类型|
| TDengineConfigParams.PROPERTY_KEY_CHARSET | string | 客户端使用的字符集,默认值为系统字符集。| |
| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | 消息超时时间,单位 ms 默认值为 60000| |
| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | 传输过程是否启用压缩。true: 启用false: 不启用。默认为 false| |
| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT| boolean| 是否启用自动重连。true: 启用false: 不启用。默认为 false||
| TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效|
| TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION| boolean| 关闭 SSL 证书验证 。true: 关闭false: 不关闭。默认为 false。||
#### 按时间分片
用户可以对查询的 SQL 按照时间拆分为多个子任务,输入:开始时间,结束时间,拆分间隔,时间字段名称,系统会按照设置的间隔(时间左闭右开)进行拆分并行获取数据。
```java
{{#include docs/examples/flink/Main.java:time_interval}}
```
#### 按超级表 TAG 分片
用户可以按照超级表的 TAG 字段将查询的 SQL 拆分为多个查询条件,系统会以一个查询条件对应一个子任务的方式对其进行拆分,进而并行获取数据。
```java
{{#include docs/examples/flink/Main.java:tag_split}}
```
#### 按表名分片
支持输入多个相同表结构的超级表或普通表进行分片,系统会按照一个表一个任务的方式进行拆分,进而并行获取数据。
```java
{{#include docs/examples/flink/Main.java:table_split}}
```
#### 使用 Source 连接器
查询结果为 RowData 数据类型示例:
<details>
<summary>RowData Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_test}}
```
</details>
批量查询结果示例:
<details>
<summary>Batch Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_batch_test}}
```
</details>
查询结果为自定义数据类型示例:
<details>
<summary>Custom Type Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_custom_type_test}}
```
</details>
- ResultBean 自定义的一个内部类,用于定义 Source 查询结果的数据类型。
- ResultSoureDeserialization 是自定义的一个内部类,通过继承 TdengineRecordDeserialization 并实现 convert 和 getProducedType 方法。
### CDC 数据订阅
Flink CDC 主要用于提供数据订阅功能,能实时监控 TDengine 数据库的数据变化,并将这些变更以数据流形式传输到 Flink 中进行处理,同时确保数据的一致性和完整性。
#### 参数说明
| 参数名称 | 类型 | 参数说明 | 备注 |
| ----------------------- | :-----: | ------------------------- | -------------------------- |
| TDengineCdcParams.BOOTSTRAP_SERVERS| string | 服务端的 IP 地址 | |
| TDengineCdcParams.CONNECT_USER| string | 用户名 | |
| TDengineCdcParams.CONNECT_PASS| string | 密码 | |
| TDengineCdcParams.POLL_INTERVAL_MS|int| 拉取数据间隔, 默认 500ms| |
| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径|
| TDengineCdcParams.TMQ_BATCH_MODE| boolean | 此参数用于批量将数据推送给下游算子,如果设置为 True创建 TDengineCdcSource 对象时需要指定数据类型为 ConsumerRecords\<类型\> | 此处的类型为用下游算子接收数据的类型|
| TDengineCdcParams.GROUP_ID| string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192。<br />每个 topic 最多可建立 100 个 consumer
| TDengineCdcParams.AUTO_OFFSET_RESET| string | 消费组订阅的初始位置 | earliest: 从头开始订阅<br/> latest: default; 仅从最新数据开始订阅|
| TDengineCdcParams.ENABLE_AUTO_COMMIT| boolean | 是否自动提交true: 启用(用于下游均为无状态算子) false由 checkpoint 触发 commit | 默认 false|
| TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS| integer|消费记录自动提交消费位点时间间隔,单位为毫秒| 默认值为 5000, 此参数在 AUTO_OFFSET_RESET 为 true 生效|
| TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS| integer | consumer 心跳丢失后超时时间,超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除(从 TDengine 3.3.3.0 版本开始支持)| 默认值为 12000取值范围 [6000 1800000] |
| TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS| integer | consumer poll 拉取数据间隔的最长时间,超过该时间,会认为该 consumer 离线,触发 rebalance 逻辑,成功后该 consumer 会被删除(从 3.3.3.0 版本开始支持) | 默认值为 300000[1000INT32_MAX]
#### 使用 CDC 连接器
CDC 连接器会根据用户设置的并行度进行创建 consumer因此用户根据资源情况合理设置并行度。
订阅结果为 RowData 数据类型示例:
<details>
<summary>CDC Source</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_source}}
```
</details>
批量查询结果示例:
<details>
<summary>CDC Batch Source</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_batch_source}}
```
</details>
查询结果为自定义数据类型示例:
<details>
<summary>CDC Custom Type</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_custom_type_test}}
```
</details>
- ResultBean 是自定义的一个内部类,其字段名和数据类型与列的名称和数据类型一一对应,这样根据 value.deserializer 属性对应的反序列化类可以反序列化出 ResultBean 类型的对象。
### Sink
Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自不同数据源或算子的数据写入 TDengine。在这一过程中TDengine 所具备的高效写入机制发挥了至关重要的作用,有力保障了数据的快速和稳定存储。
#### Sink Properties
| 参数名称 | 类型 | 参数说明 | 备注 |
| ----------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TDengineConfigParams.PROPERTY_KEY_USER | string | 登录 TDengine 用户名,默认值 'root'| |
| TDengineConfigParams.PROPERTY_KEY_PASSWORD| string | 用户登录密码,默认值 'taosdata'| |
| TDengineConfigParams.PROPERTY_KEY_DBNAME| string | 写入的数据库名称||
| TDengineConfigParams.TD_SUPERTABLE_NAME| string | 写入的超级表名称| 如果是超级表接收的数据必须有 tbname 字段,确定写入那张子表|
| TDengineConfigParams.TD_TABLE_NAME| string | 写入的表名此参数和TD_SUPERTABLE_NAME 仅需要设置一个即可| 用于确定写入那张子表或普通表|
| TDengineConfigParams.TD_BATCH_SIZE| integer | 设置批大小 | 当到达批的数量后进行写入或是一个checkpoint的时间也会触发写入数据库|
| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径|
| TDengineConfigParams.TD_BATCH_MODE | boolean | 此参数用于设置接收批量数据 | 如果设置为 True:<br/> 来源是 TDengine Source 使用SourceRecords\<类型\> 创建 TDengineSink 对象<br/> 来源是 TDengine CDC 使用 ConsumerRecords\<类型\> 创建 TDengineSink 对象 | 此处的类型为接收数据的类型|
| TDengineConfigParams.TD_SOURCE_TYPE | string | 如果数据是表示数据来源是source 或者 cdc 等 | TDengine source 设置为 "tdengine_source", TDengine cdc 设置为 "tdengine_cdc"|
| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | 消息超时时间,单位 ms 默认值为 60000| |
| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | 传输过程是否启用压缩。true: 启用false: 不启用。默认为 false| |
| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT| boolean| 是否启用自动重连。true: 启用false: 不启用。默认为 false||
| TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效|
| TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION| boolean| 关闭 SSL 证书验证 。true: 关闭false: 不关闭。默认为 false。||
#### 使用 Sink 连接器
将接收的 RowData 类型数据写入 TDengine 示例:
<details>
<summary>Sink RowData</summary>
```java
{{#include docs/examples/flink/Main.java:RowDataToSink}}
```
</details>
将批量接收的 RowData 类型数据写入 TDengine 示例:
<details>
<summary>Sink RowData</summary>
```java
{{#include docs/examples/flink/Main.java:BatchRowDataToSink}}
```
</details>
### Table SQL
数据处理 ETLExtractTransformLoad可以使用 Flink SQL With JDBC 从多个不同的数据源数据库(如 TDengine、MySQL、Oracle 等)中提取数据,在 Flink 中进行转换操作(如数据清洗、格式转换、关联不同表的数据等),然后将处理后的结果加载到目标数据源(如 TDengine、Mysql 等)中。
#### Source 连接器
参数配置说明:
| 参数名称 | 类型 | 参数说明 | 备注 |
| ----------------------- | :-----: | ------------ | ------ |
| connector | string | 连接器标识,设置 tdengine-connector ||
| td.jdbc.url| string | 连接的 url | |
| td.jdbc.mode | strng | 连接器类型, 设置 source, cdc, sink| |
| table.name| string| 原表或目标表名称| |
| scan.query| string| 获取数据的 SQL 语句||
| sink.db.name|string| 目标数据库名称||
| sink.supertable.name|string |写入的超级表名称||
| sink.batch.size | integer | 写入的批大小||
| sink.table.name|string|写入的普通表或子表名称||
#### Source 连接器使用示例
<details>
<summary>Table Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_table}}
```
</details>
#### CDC 连接器
参数配置说明:
| 参数名称 | 类型 | 参数说明 | 备注 |
| ----------------------- | :-----: | ------------ |-------|
| connector | string | 连接器标识,设置 tdengine-connector ||
| user| string | 用户名, 默认 root| |
| password | string | 密码, 默认taosdata| |
| bootstrap.servers| string | 服务器地址 | |
| topic | string | 订阅主题 ||
| td.jdbc.mode | strng | 连接器类型, cdc, sink| |
| group.id| string| 消费组 ID同一消费组共享消费进度 | |
| auto.offset.reset| string| 消费组订阅的初始位置 | earliest: 从头开始订阅<br/> latest: default; 仅从最新数据开始订阅|
| poll.interval_ms| integer| 拉取数据间隔, 默认 500ms| |
| sink.db.name|string| 目标数据库名称||
| sink.supertable.name|string |写入的超级表名称||
| sink.batch.size | integer | 写入的批大小||
| sink.table.name|string|写入的普通表或子表名称||
#### CDC 连接器使用示例
<details>
<summary>Table CDC</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_table}}
```
</details>