diff --git a/docs/examples/flink/Main.java b/docs/examples/flink/Main.java index 12d79126cf..50a507d1de 100644 --- a/docs/examples/flink/Main.java +++ b/docs/examples/flink/Main.java @@ -263,7 +263,7 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location") Class> typeClass = (Class>) (Class) SourceRecords.class; SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters"); TDengineSource> source = new TDengineSource<>(connProps, sql, typeClass); - DataStreamSource> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source"); + DataStreamSource> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source"); DataStream resultStream = input.map((MapFunction, String>) records -> { StringBuilder sb = new StringBuilder(); Iterator iterator = records.iterator(); @@ -304,7 +304,7 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location") config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData"); config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); TDengineCdcSource tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class); - DataStreamSource input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source"); + DataStreamSource input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source"); DataStream resultStream = input.map((MapFunction) rowData -> { StringBuilder sb = new StringBuilder(); sb.append("tsxx: " + rowData.getTimestamp(0, 0) + @@ -343,7 +343,7 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location") Class> typeClass = (Class>) (Class) ConsumerRecords.class; TDengineCdcSource> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass); - DataStreamSource> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source"); + DataStreamSource> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source"); DataStream resultStream = input.map((MapFunction, String>) records -> { Iterator> iterator = records.iterator(); StringBuilder sb = new StringBuilder(); @@ -388,7 +388,7 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location") config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultDeserializer"); config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); TDengineCdcSource tdengineSource = new TDengineCdcSource<>("topic_meters", config, ResultBean.class); - DataStreamSource input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source"); + DataStreamSource input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source"); DataStream resultStream = input.map((MapFunction) rowData -> { StringBuilder sb = new StringBuilder(); sb.append("ts: " + rowData.getTs() +