diff --git a/docs/examples/rust/nativeexample/examples/query.rs b/docs/examples/rust/nativeexample/examples/query.rs index a493e7c729..55c2feb7b7 100644 --- a/docs/examples/rust/nativeexample/examples/query.rs +++ b/docs/examples/rust/nativeexample/examples/query.rs @@ -7,62 +7,61 @@ async fn main() -> anyhow::Result<()> { let taos = builder.build()?; -// ANCHOR: create_db_and_table -let db = "power"; -// create database -taos.exec_many([ - format!("CREATE DATABASE IF NOT EXISTS `{db}`"), - format!("USE `{db}`"), -]) -.await?; -println!("Create database power successfully."); + // ANCHOR: create_db_and_table + let db = "power"; + // create database + taos.exec_many([ + format!("CREATE DATABASE IF NOT EXISTS `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + println!("Create database power successfully."); -// create super table -taos.exec_many([ - "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ - TAGS (`groupid` INT, `location` BINARY(24))", -]).await?; -println!("Create stable meters successfully."); + // create super table + taos.exec_many([ + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ + TAGS (`groupid` INT, `location` BINARY(24))", + ]).await?; + println!("Create stable meters successfully."); -// ANCHOR_END: create_db_and_table + // ANCHOR_END: create_db_and_table -// ANCHOR: insert_data -let inserted = taos.exec("INSERT INTO " + -"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + -"VALUES " + -"(NOW + 1a, 10.30000, 219, 0.31000) " + -"(NOW + 2a, 12.60000, 218, 0.33000) " + -"(NOW + 3a, 12.30000, 221, 0.31000) " + -"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + -"VALUES " + -"(NOW + 1a, 10.30000, 218, 0.25000) ").await?; + // ANCHOR: insert_data + let inserted = taos.exec("INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) ").await?; -println!("inserted: {} rows to power.meters successfully.", inserted); -// ANCHOR_END: insert_data + println!("inserted: {} rows to power.meters successfully.", inserted); + // ANCHOR_END: insert_data -// ANCHOR: query_data -// query data, make sure the database and table are created before -let mut result = taos.query("SELECT ts, current, location FROM power.meters limit 100").await?; + // ANCHOR: query_data + // query data, make sure the database and table are created before + let mut result = taos.query("SELECT ts, current, location FROM power.meters limit 100").await?; -for field in result.fields() { - println!("got field: {}", field.name()); -} - -let mut rows = result.rows(); -let mut nrows = 0; -while let Some(row) = rows.try_next().await? { - for (col, (name, value)) in row.enumerate() { - println!( - "[{}] got value in col {} (named `{:>8}`): {}", - nrows, col, name, value - ); + for field in result.fields() { + println!("got field: {}", field.name()); } - nrows += 1; -} -// ANCHOR_END: query_data -// ANCHOR: query_with_req_id -let result = taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", 1).await?; -// ANCHOR_END: query_with_req_id + let mut rows = result.rows(); + let mut nrows = 0; + while let Some(row) = rows.try_next().await? { + for (col, (name, value)) in row.enumerate() { + println!( + "[{}] got value in col {} (named `{:>8}`): {}", + nrows, col, name, value + ); + } + nrows += 1; + } + // ANCHOR_END: query_data + // ANCHOR: query_with_req_id + let result = taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", 1).await?; + // ANCHOR_END: query_with_req_id } diff --git a/docs/examples/rust/nativeexample/examples/tmq.rs b/docs/examples/rust/nativeexample/examples/tmq.rs index 764c0c1fc8..04bf66b2ef 100644 --- a/docs/examples/rust/nativeexample/examples/tmq.rs +++ b/docs/examples/rust/nativeexample/examples/tmq.rs @@ -44,8 +44,12 @@ async fn main() -> anyhow::Result<()> { // ANCHOR_END: create_topic // ANCHOR: create_consumer - dsn.params.insert("group.id".to_string(), "abc".to_string()); - dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string()); + dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string()); + dsn.params.insert("msg.with.table.name".to_string(), "true".to_string()); + dsn.params.insert("enable.auto.commit".to_string(), "true".to_string()); + dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string()); + dsn.params.insert("group.id".to_string(), "group1".to_string()); + dsn.params.insert("client.id".to_string(), "client1".to_string()); let builder = TmqBuilder::from_dsn(&dsn)?; let mut consumer = builder.build().await?; diff --git a/docs/examples/rust/restexample/examples/tmq.rs b/docs/examples/rust/restexample/examples/tmq.rs index 8f195e1629..0438e47515 100644 --- a/docs/examples/rust/restexample/examples/tmq.rs +++ b/docs/examples/rust/restexample/examples/tmq.rs @@ -9,9 +9,11 @@ async fn main() -> anyhow::Result<()> { .filter_level(log::LevelFilter::Info) .init(); use taos_query::prelude::*; + // ANCHOR: create_consumer_dsn let dsn = "ws://localhost:6041".to_string(); log::info!("dsn: {}", dsn); let mut dsn = Dsn::from_str(&dsn)?; + // ANCHOR_END: create_consumer_dsn let taos = TaosBuilder::from_dsn(&dsn)?.build().await?; @@ -41,8 +43,12 @@ async fn main() -> anyhow::Result<()> { // ANCHOR_END: create_topic // ANCHOR: create_consumer - dsn.params.insert("group.id".to_string(), "abc".to_string()); - dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string()); + dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string()); + dsn.params.insert("msg.with.table.name".to_string(), "true".to_string()); + dsn.params.insert("enable.auto.commit".to_string(), "true".to_string()); + dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string()); + dsn.params.insert("group.id".to_string(), "group1".to_string()); + dsn.params.insert("client.id".to_string(), "client1".to_string()); let builder = TmqBuilder::from_dsn(&dsn)?; let mut consumer = builder.build().await?; diff --git a/docs/zh/08-develop/07-tmq.md b/docs/zh/08-develop/07-tmq.md index 405d227e8e..a5795b5b6e 100644 --- a/docs/zh/08-develop/07-tmq.md +++ b/docs/zh/08-develop/07-tmq.md @@ -104,9 +104,10 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 - ```rust -{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer}} +{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer_dsn}} + +{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer}} ``` diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java index f7570c742a..842abb4086 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java @@ -24,24 +24,24 @@ public abstract class AbsConsumerLoop { public AbsConsumerLoop() throws SQLException { // ANCHOR: create_consumer -Properties config = new Properties(); -config.setProperty("td.connect.type", "jni"); -config.setProperty("bootstrap.servers", "localhost:6030"); -config.setProperty("auto.offset.reset", "latest"); -config.setProperty("msg.with.table.name", "true"); -config.setProperty("enable.auto.commit", "true"); -config.setProperty("auto.commit.interval.ms", "1000"); -config.setProperty("group.id", "group1"); -config.setProperty("client.id", "client1"); -config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer"); -config.setProperty("value.deserializer.encoding", "UTF-8"); -try { - this.consumer = new TaosConsumer<>(config); -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create jni consumer with " + config.getProperty("bootstrap.servers") + " ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create consumer", ex); -} + Properties config = new Properties(); + config.setProperty("td.connect.type", "jni"); + config.setProperty("bootstrap.servers", "localhost:6030"); + config.setProperty("auto.offset.reset", "latest"); + config.setProperty("msg.with.table.name", "true"); + config.setProperty("enable.auto.commit", "true"); + config.setProperty("auto.commit.interval.ms", "1000"); + config.setProperty("group.id", "group1"); + config.setProperty("client.id", "client1"); + config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer"); + config.setProperty("value.deserializer.encoding", "UTF-8"); + try { + this.consumer = new TaosConsumer<>(config); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create jni consumer with " + config.getProperty("bootstrap.servers") + " ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create consumer", ex); + } // ANCHOR_END: create_consumer this.topics = Collections.singletonList("topic_meters"); @@ -53,65 +53,65 @@ try { public void pollDataCodePiece() throws SQLException { // ANCHOR: poll_data_code_piece -try { - consumer.subscribe(topics); - while (!shutdown.get()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process your data here - process(bean); + try { + consumer.subscribe(topics); + while (!shutdown.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process your data here + process(bean); + } + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data", ex); + } finally { + consumer.close(); + shutdownLatch.countDown(); } - } -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to poll data", ex); -} finally { - consumer.close(); - shutdownLatch.countDown(); -} // ANCHOR_END: poll_data_code_piece } public void commitCodePiece() throws SQLException { // ANCHOR: commit_code_piece -try { - consumer.subscribe(topics); - while (!shutdown.get()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process your data here - process(bean); + try { + consumer.subscribe(topics); + while (!shutdown.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process your data here + process(bean); + } + if (!records.isEmpty()) { + // after processing the data, commit the offset manually + consumer.commitSync(); + } + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); + } finally { + consumer.close(); + shutdownLatch.countDown(); } - if (!records.isEmpty()) { - // after processing the data, commit the offset manually - consumer.commitSync(); - } - } -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to execute consumer functions", ex); -} finally { - consumer.close(); - shutdownLatch.countDown(); -} // ANCHOR_END: commit_code_piece } public void unsubscribeCodePiece() throws SQLException { // ANCHOR: unsubscribe_data_code_piece -try { - consumer.unsubscribe(); -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to unsubscribe consumer", ex); -} finally { - consumer.close(); -} + try { + consumer.unsubscribe(); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to unsubscribe consumer", ex); + } finally { + consumer.close(); + } // ANCHOR_END: unsubscribe_data_code_piece } @@ -119,14 +119,14 @@ try { try { // ANCHOR: poll_data -consumer.subscribe(topics); -while (!shutdown.get()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - process(bean); - } -} + consumer.subscribe(topics); + while (!shutdown.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + process(bean); + } + } // ANCHOR_END: poll_data consumer.unsubscribe(); diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java index 4012f2e679..c78a875ab6 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java @@ -15,37 +15,38 @@ import java.util.concurrent.TimeUnit; public class ConsumerLoopFull { static private Connection connection; static private Statement statement; - public static TaosConsumer getConsumer() throws SQLException{ -// ANCHOR: create_consumer -Properties config = new Properties(); -config.setProperty("td.connect.type", "jni"); -config.setProperty("bootstrap.servers", "localhost:6030"); -config.setProperty("auto.offset.reset", "latest"); -config.setProperty("msg.with.table.name", "true"); -config.setProperty("enable.auto.commit", "true"); -config.setProperty("auto.commit.interval.ms", "1000"); -config.setProperty("group.id", "group1"); -config.setProperty("client.id", "1"); -config.setProperty("td.connect.user", "root"); -config.setProperty("td.connect.pass", "taosdata"); -config.setProperty("value.deserializer", "com.taosdata.example.ConsumerLoopFull$ResultDeserializer"); -config.setProperty("value.deserializer.encoding", "UTF-8"); -try { - return new TaosConsumer<>(config); -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create consumer", ex); -} catch (Exception e) { - e.printStackTrace(); - throw new SQLException("Failed to create consumer", e); -} + public static TaosConsumer getConsumer() throws SQLException { +// ANCHOR: create_consumer + Properties config = new Properties(); + config.setProperty("td.connect.type", "jni"); + config.setProperty("bootstrap.servers", "localhost:6030"); + config.setProperty("auto.offset.reset", "latest"); + config.setProperty("msg.with.table.name", "true"); + config.setProperty("enable.auto.commit", "true"); + config.setProperty("auto.commit.interval.ms", "1000"); + config.setProperty("group.id", "group1"); + config.setProperty("client.id", "1"); + config.setProperty("td.connect.user", "root"); + config.setProperty("td.connect.pass", "taosdata"); + config.setProperty("value.deserializer", "com.taosdata.example.ConsumerLoopFull$ResultDeserializer"); + config.setProperty("value.deserializer.encoding", "UTF-8"); + + try { + return new TaosConsumer<>(config); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create consumer", ex); + } catch (Exception e) { + e.printStackTrace(); + throw new SQLException("Failed to create consumer", e); + } // ANCHOR_END: create_consumer } public static void pollDataExample() throws SQLException { - try (TaosConsumer consumer = getConsumer()){ + try (TaosConsumer consumer = getConsumer()) { // subscribe to the topics List topics = Collections.singletonList("topic_meters"); @@ -72,96 +73,97 @@ try { public static void pollExample() throws SQLException { // ANCHOR: poll_data_code_piece -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("subscribe topics successfully"); - for (int i = 0; i < 50; i++) { - // poll data - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process the data here - System.out.println("data: " + JSON.toJSONString(bean)); + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + for (int i = 0; i < 50; i++) { + // poll data + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process the data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + } + + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data", ex); } - } - -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to poll data", ex); -} // ANCHOR_END: poll_data_code_piece -} + } public static void seekExample() throws SQLException { // ANCHOR: consumer_seek -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("subscribe topics successfully"); - ConsumerRecords records = ConsumerRecords.emptyRecord(); - // make sure we have got some data - while (records.isEmpty()){ - records = consumer.poll(Duration.ofMillis(100)); - } + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + ConsumerRecords records = ConsumerRecords.emptyRecord(); + // make sure we have got some data + while (records.isEmpty()) { + records = consumer.poll(Duration.ofMillis(100)); + } - for (ConsumerRecord record : records) { - System.out.println("first data polled: " + JSON.toJSONString(record.value())); - Set assignment = consumer.assignment(); - // seek to the beginning of the all partitions - consumer.seekToBeginning(assignment); - System.out.println("assignment seek to beginning successfully"); - break; - } + for (ConsumerRecord record : records) { + System.out.println("first data polled: " + JSON.toJSONString(record.value())); + Set assignment = consumer.assignment(); + // seek to the beginning of the all partitions + consumer.seekToBeginning(assignment); + System.out.println("assignment seek to beginning successfully"); + break; + } - // poll data agagin - records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - // process the data here - System.out.println("second data polled: " + JSON.toJSONString(record.value())); - break; - } + // poll data agagin + records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + // process the data here + System.out.println("second data polled: " + JSON.toJSONString(record.value())); + break; + } -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("seek example failed", ex); -} + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("seek example failed", ex); + } // ANCHOR_END: consumer_seek } public static void commitExample() throws SQLException { // ANCHOR: commit_code_piece -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - consumer.subscribe(topics); - for (int i = 0; i < 50; i++) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process your data here - System.out.println("data: " + JSON.toJSONString(bean)); + consumer.subscribe(topics); + for (int i = 0; i < 50; i++) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process your data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + if (!records.isEmpty()) { + // after processing the data, commit the offset manually + consumer.commitSync(); + } + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); } - if (!records.isEmpty()) { - // after processing the data, commit the offset manually - consumer.commitSync(); - } - } -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to execute consumer functions", ex); -} // ANCHOR_END: commit_code_piece } + public static void unsubscribeExample() throws SQLException { TaosConsumer consumer = getConsumer(); List topics = Collections.singletonList("topic_meters"); @@ -169,7 +171,7 @@ try (TaosConsumer consumer = getConsumer()){ // ANCHOR: unsubscribe_data_code_piece try { consumer.unsubscribe(); - } catch (SQLException ex){ + } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to unsubscribe consumer", ex); @@ -182,6 +184,7 @@ try (TaosConsumer consumer = getConsumer()){ public static class ResultDeserializer extends ReferenceDeserializer { } + // use this class to define the data structure of the result record public static class ResultBean { private Timestamp ts; @@ -240,12 +243,12 @@ try (TaosConsumer consumer = getConsumer()){ } } - public static void prepareData() throws SQLException{ + public static void prepareData() throws SQLException { StringBuilder insertQuery = new StringBuilder(); insertQuery.append("INSERT INTO " + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + "VALUES "); - for (int i = 0; i < 10000; i++){ + for (int i = 0; i < 10000; i++) { insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) "); } try { @@ -256,7 +259,8 @@ try (TaosConsumer consumer = getConsumer()){ throw new SQLException("Failed to insert data to power.meters", ex); } } - public static void prepareMeta() throws SQLException{ + + public static void prepareMeta() throws SQLException { try { statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); statement.executeUpdate("USE power"); @@ -288,6 +292,7 @@ try (TaosConsumer consumer = getConsumer()){ } System.out.println("Connection created successfully."); } + public static void closeConnection() throws SQLException { try { if (statement != null) { @@ -337,16 +342,16 @@ try (TaosConsumer consumer = getConsumer()){ System.out.println("Data prepared successfully"); - // 关闭线程池,不再接收新任务 + // close the executor, which will make the executor reject new tasks executor.shutdown(); try { - // 等待直到所有任务完成 + // wait for the executor to terminate boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); assert result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - } catch (Exception e){ + } catch (Exception e) { e.printStackTrace(); System.out.println("Wait executor termination failed."); } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopImp.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopImp.java index 22f96841e6..84d84f062b 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopImp.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopImp.java @@ -20,9 +20,9 @@ public class ConsumerLoopImp { properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); // ANCHOR: create_topic -Connection connection = DriverManager.getConnection(url, properties); -Statement statement = connection.createStatement(); -statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); + Connection connection = DriverManager.getConnection(url, properties); + Statement statement = connection.createStatement(); + statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); // ANCHOR_END: create_topic statement.close(); diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java index b9463d30f7..73901aba49 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java @@ -35,29 +35,29 @@ public class ConsumerOffsetSeek { config.setProperty("value.deserializer.encoding", "UTF-8"); // ANCHOR: consumer_seek -String topic = "topic_meters"; -Map offset = null; -try (TaosConsumer consumer = new TaosConsumer<>(config)) { - consumer.subscribe(Collections.singletonList(topic)); - for (int i = 0; i < 10; i++) { - if (i == 3) { - // Saving consumption position - offset = consumer.position(topic); - } - if (i == 5) { - // reset consumption to the previously saved position - for (Map.Entry entry : offset.entrySet()) { - consumer.seek(entry.getKey(), entry.getValue()); + String topic = "topic_meters"; + Map offset = null; + try (TaosConsumer consumer = new TaosConsumer<>(config)) { + consumer.subscribe(Collections.singletonList(topic)); + for (int i = 0; i < 10; i++) { + if (i == 3) { + // Saving consumption position + offset = consumer.position(topic); + } + if (i == 5) { + // reset consumption to the previously saved position + for (Map.Entry entry : offset.entrySet()) { + consumer.seek(entry.getKey(), entry.getValue()); + } + } + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + // you can handle data here } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. server: " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); } - ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); - // you can handle data here - } -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to execute consumer functions. server: " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to execute consumer functions", ex); -} // ANCHOR_END: consumer_seek } } \ No newline at end of file diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java index 4831567eab..945c321260 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java @@ -16,97 +16,97 @@ public class JdbcBasicDemo { public static void main(String[] args) throws SQLException { -final String url = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; + final String url = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; // get connection -Properties properties = new Properties(); -properties.setProperty("charset", "UTF-8"); -properties.setProperty("locale", "en_US.UTF-8"); -properties.setProperty("timezone", "UTC-8"); -System.out.println("get connection starting..."); -try(Connection connection = DriverManager.getConnection(url, properties)){ + Properties properties = new Properties(); + properties.setProperty("charset", "UTF-8"); + properties.setProperty("locale", "en_US.UTF-8"); + properties.setProperty("timezone", "UTC-8"); + System.out.println("get connection starting..."); + try (Connection connection = DriverManager.getConnection(url, properties)) { -if (connection != null){ - System.out.println("[ OK ] Connection established."); -} else { - System.out.println("[ ERR ] Connection can not be established."); - return; -} + if (connection != null) { + System.out.println("[ OK ] Connection established."); + } else { + System.out.println("[ ERR ] Connection can not be established."); + return; + } -Statement stmt = connection.createStatement(); + Statement stmt = connection.createStatement(); // ANCHOR: create_db_and_table // create database -stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); + stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); // use database -stmt.executeUpdate("USE power"); + stmt.executeUpdate("USE power"); // create table -stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); + stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); // ANCHOR_END: create_db_and_table // ANCHOR: insert_data // insert data -String insertQuery = "INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 219, 0.31000) " + - "(NOW + 2a, 12.60000, 218, 0.33000) " + - "(NOW + 3a, 12.30000, 221, 0.31000) " + - "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 218, 0.25000) "; -int affectedRows = stmt.executeUpdate(insertQuery); -System.out.println("insert " + affectedRows + " rows."); + String insertQuery = "INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) "; + int affectedRows = stmt.executeUpdate(insertQuery); + System.out.println("insert " + affectedRows + " rows."); // ANCHOR_END: insert_data // ANCHOR: query_data // query data -ResultSet resultSet = stmt.executeQuery("SELECT * FROM meters"); + ResultSet resultSet = stmt.executeQuery("SELECT * FROM meters"); -Timestamp ts; -float current; -String location; -while(resultSet.next()){ - ts = resultSet.getTimestamp(1); - current = resultSet.getFloat(2); - location = resultSet.getString("location"); + Timestamp ts; + float current; + String location; + while (resultSet.next()) { + ts = resultSet.getTimestamp(1); + current = resultSet.getFloat(2); + location = resultSet.getString("location"); - System.out.printf("%s, %f, %s\n", ts, current, location); -} + System.out.printf("%s, %f, %s\n", ts, current, location); + } // ANCHOR_END: query_data // ANCHOR: with_reqid -AbstractStatement aStmt = (AbstractStatement) connection.createStatement(); -aStmt.execute("CREATE DATABASE IF NOT EXISTS power", 1L); -aStmt.executeUpdate("USE power", 2L); -try (ResultSet rs = aStmt.executeQuery("SELECT * FROM meters limit 1", 3L)) { - while(rs.next()){ - Timestamp timestamp = rs.getTimestamp(1); - System.out.println("timestamp = " + timestamp); - } -} -aStmt.close(); + AbstractStatement aStmt = (AbstractStatement) connection.createStatement(); + aStmt.execute("CREATE DATABASE IF NOT EXISTS power", 1L); + aStmt.executeUpdate("USE power", 2L); + try (ResultSet rs = aStmt.executeQuery("SELECT * FROM meters limit 1", 3L)) { + while (rs.next()) { + Timestamp timestamp = rs.getTimestamp(1); + System.out.println("timestamp = " + timestamp); + } + } + aStmt.close(); // ANCHOR_END: with_reqid -String sql = "SELECT * FROM meters limit 2;"; + String sql = "SELECT * FROM meters limit 2;"; // ANCHOR: jdbc_exception -try (Statement statement = connection.createStatement(); - // executeQuery - ResultSet tempResultSet = statement.executeQuery(sql)) { + try (Statement statement = connection.createStatement(); + // executeQuery + ResultSet tempResultSet = statement.executeQuery(sql)) { - // print result - printResult(tempResultSet); -} catch (SQLException e) { - System.out.println("ERROR Message: " + e.getMessage()); - System.out.println("ERROR Code: " + e.getErrorCode()); - e.printStackTrace(); -} + // print result + printResult(tempResultSet); + } catch (SQLException e) { + System.out.println("ERROR Message: " + e.getMessage()); + System.out.println("ERROR Code: " + e.getErrorCode()); + e.printStackTrace(); + } // ANCHOR_END: jdbc_exception } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java index d02b3c8789..f5f5b1c62c 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java @@ -15,37 +15,37 @@ public class JdbcCreatDBDemo { public static void main(String[] args) throws SQLException { -final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; + final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; // get connection -Properties properties = new Properties(); -properties.setProperty("charset", "UTF-8"); -properties.setProperty("locale", "en_US.UTF-8"); -properties.setProperty("timezone", "UTC-8"); -System.out.println("get connection starting..."); + Properties properties = new Properties(); + properties.setProperty("charset", "UTF-8"); + properties.setProperty("locale", "en_US.UTF-8"); + properties.setProperty("timezone", "UTC-8"); + System.out.println("get connection starting..."); // ANCHOR: create_db_and_table -try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - Statement stmt = connection.createStatement()) { + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + Statement stmt = connection.createStatement()) { - // create database - int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); - // you can check rowsAffected here - assert rowsAffected == 0; + // create database + int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); + // you can check rowsAffected here + assert rowsAffected == 0; - // use database - rowsAffected = stmt.executeUpdate("USE power"); - // you can check rowsAffected here - assert rowsAffected == 0; + // use database + rowsAffected = stmt.executeUpdate("USE power"); + // you can check rowsAffected here + assert rowsAffected == 0; - // create table - rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); - // you can check rowsAffected here - assert rowsAffected == 0; + // create table + rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); + // you can check rowsAffected here + assert rowsAffected == 0; -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create db and table, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); -} + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create db and table, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + } // ANCHOR_END: create_db_and_table } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java index 7fc6d5137d..f8e11a8f41 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java @@ -15,36 +15,36 @@ public class JdbcInsertDataDemo { public static void main(String[] args) throws SQLException { -final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; + final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; // get connection -Properties properties = new Properties(); -properties.setProperty("charset", "UTF-8"); -properties.setProperty("locale", "en_US.UTF-8"); -properties.setProperty("timezone", "UTC-8"); -System.out.println("get connection starting..."); + Properties properties = new Properties(); + properties.setProperty("charset", "UTF-8"); + properties.setProperty("locale", "en_US.UTF-8"); + properties.setProperty("timezone", "UTC-8"); + System.out.println("get connection starting..."); // ANCHOR: insert_data -try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - Statement stmt = connection.createStatement()) { + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + Statement stmt = connection.createStatement()) { - // insert data, please make sure the database and table are created before - String insertQuery = "INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 219, 0.31000) " + - "(NOW + 2a, 12.60000, 218, 0.33000) " + - "(NOW + 3a, 12.30000, 221, 0.31000) " + - "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 218, 0.25000) "; - int affectedRows = stmt.executeUpdate(insertQuery); - // you can check affectedRows here - System.out.println("inserted into " + affectedRows + " rows to power.meters successfully."); -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to insert data to power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + // insert data, please make sure the database and table are created before + String insertQuery = "INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) "; + int affectedRows = stmt.executeUpdate(insertQuery); + // you can check affectedRows here + System.out.println("inserted into " + affectedRows + " rows to power.meters successfully."); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to insert data to power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); -} + } // ANCHOR_END: insert_data } } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java index 30a835bb11..a4749afed5 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java @@ -15,36 +15,36 @@ public class JdbcQueryDemo { public static void main(String[] args) throws SQLException { -final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; + final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; // get connection -Properties properties = new Properties(); -properties.setProperty("charset", "UTF-8"); -properties.setProperty("locale", "en_US.UTF-8"); -properties.setProperty("timezone", "UTC-8"); -System.out.println("get connection starting..."); + Properties properties = new Properties(); + properties.setProperty("charset", "UTF-8"); + properties.setProperty("locale", "en_US.UTF-8"); + properties.setProperty("timezone", "UTC-8"); + System.out.println("get connection starting..."); // ANCHOR: query_data -try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - Statement stmt = connection.createStatement(); - // query data, make sure the database and table are created before - ResultSet resultSet = stmt.executeQuery("SELECT ts, current, location FROM power.meters limit 100")) { + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + Statement stmt = connection.createStatement(); + // query data, make sure the database and table are created before + ResultSet resultSet = stmt.executeQuery("SELECT ts, current, location FROM power.meters limit 100")) { - Timestamp ts; - float current; - String location; - while (resultSet.next()) { - ts = resultSet.getTimestamp(1); - current = resultSet.getFloat(2); - // we recommend using the column name to get the value - location = resultSet.getString("location"); + Timestamp ts; + float current; + String location; + while (resultSet.next()) { + ts = resultSet.getTimestamp(1); + current = resultSet.getFloat(2); + // we recommend using the column name to get the value + location = resultSet.getString("location"); - // you can check data here - System.out.printf("ts: %s, current: %f, location: %s %n", ts, current, location); - } -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to query data from power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); -} + // you can check data here + System.out.printf("ts: %s, current: %f, location: %s %n", ts, current, location); + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to query data from power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + } // ANCHOR_END: query_data } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java index cc557c18c5..3705e0aac2 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java @@ -15,31 +15,31 @@ public class JdbcReqIdDemo { public static void main(String[] args) throws SQLException { -final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; + final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; // get connection -Properties properties = new Properties(); -properties.setProperty("charset", "UTF-8"); -properties.setProperty("locale", "en_US.UTF-8"); -properties.setProperty("timezone", "UTC-8"); -System.out.println("get connection starting..."); + Properties properties = new Properties(); + properties.setProperty("charset", "UTF-8"); + properties.setProperty("locale", "en_US.UTF-8"); + properties.setProperty("timezone", "UTC-8"); + System.out.println("get connection starting..."); // ANCHOR: with_reqid -try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - // Create a statement that allows specifying a request ID - AbstractStatement aStmt = (AbstractStatement) connection.createStatement()) { + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + // Create a statement that allows specifying a request ID + AbstractStatement aStmt = (AbstractStatement) connection.createStatement()) { + + try (ResultSet rs = aStmt.executeQuery("SELECT ts, current, location FROM power.meters limit 1", 3L)) { + while (rs.next()) { + Timestamp timestamp = rs.getTimestamp(1); + System.out.println("timestamp = " + timestamp); + } + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute sql with reqId, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - try (ResultSet rs = aStmt.executeQuery("SELECT ts, current, location FROM power.meters limit 1", 3L)) { - while (rs.next()) { - Timestamp timestamp = rs.getTimestamp(1); - System.out.println("timestamp = " + timestamp); } - } -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to execute sql with reqId, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - -} // ANCHOR_END: with_reqid } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java index d8084eb9f6..6caa42b1c1 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java @@ -18,37 +18,38 @@ import java.util.concurrent.TimeUnit; public class WsConsumerLoopFull { static private Connection connection; static private Statement statement; - public static TaosConsumer getConsumer() throws SQLException{ -// ANCHOR: create_consumer -Properties config = new Properties(); -config.setProperty("td.connect.type", "ws"); -config.setProperty("bootstrap.servers", "localhost:6041"); -config.setProperty("auto.offset.reset", "latest"); -config.setProperty("msg.with.table.name", "true"); -config.setProperty("enable.auto.commit", "true"); -config.setProperty("auto.commit.interval.ms", "1000"); -config.setProperty("group.id", "group1"); -config.setProperty("client.id", "client1"); -config.setProperty("td.connect.user", "root"); -config.setProperty("td.connect.pass", "taosdata"); -config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer"); -config.setProperty("value.deserializer.encoding", "UTF-8"); -try { - return new TaosConsumer<>(config); - } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create consumer", ex); - } catch (Exception e) { - e.printStackTrace(); - throw new SQLException("Failed to create consumer", e); - } + public static TaosConsumer getConsumer() throws SQLException { +// ANCHOR: create_consumer + Properties config = new Properties(); + config.setProperty("td.connect.type", "ws"); + config.setProperty("bootstrap.servers", "localhost:6041"); + config.setProperty("auto.offset.reset", "latest"); + config.setProperty("msg.with.table.name", "true"); + config.setProperty("enable.auto.commit", "true"); + config.setProperty("auto.commit.interval.ms", "1000"); + config.setProperty("group.id", "group1"); + config.setProperty("client.id", "client1"); + config.setProperty("td.connect.user", "root"); + config.setProperty("td.connect.pass", "taosdata"); + config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer"); + config.setProperty("value.deserializer.encoding", "UTF-8"); + + try { + return new TaosConsumer<>(config); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create consumer", ex); + } catch (Exception e) { + e.printStackTrace(); + throw new SQLException("Failed to create consumer", e); + } // ANCHOR_END: create_consumer -} + } public static void pollDataExample() throws SQLException { - try (TaosConsumer consumer = getConsumer()){ + try (TaosConsumer consumer = getConsumer()) { // subscribe to the topics List topics = Collections.singletonList("topic_meters"); @@ -75,178 +76,180 @@ try { public static void pollExample() throws SQLException { // ANCHOR: poll_data_code_piece -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("subscribe topics successfully"); - for (int i = 0; i < 50; i++) { - // poll data - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process the data here - System.out.println("data: " + JSON.toJSONString(bean)); + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + for (int i = 0; i < 50; i++) { + // poll data + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process the data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + } + + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data", ex); } - } - -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to poll data", ex); -} // ANCHOR_END: poll_data_code_piece } public static void seekExample() throws SQLException { // ANCHOR: consumer_seek -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("subscribe topics successfully"); - ConsumerRecords records = ConsumerRecords.emptyRecord(); - // make sure we have got some data - while (records.isEmpty()){ - records = consumer.poll(Duration.ofMillis(100)); - } + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + ConsumerRecords records = ConsumerRecords.emptyRecord(); + // make sure we have got some data + while (records.isEmpty()) { + records = consumer.poll(Duration.ofMillis(100)); + } - for (ConsumerRecord record : records) { - System.out.println("first data polled: " + JSON.toJSONString(record.value())); - Set assignment = consumer.assignment(); - // seek to the beginning of the all partitions - consumer.seekToBeginning(assignment); - System.out.println("assignment seek to beginning successfully"); - break; - } + for (ConsumerRecord record : records) { + System.out.println("first data polled: " + JSON.toJSONString(record.value())); + Set assignment = consumer.assignment(); + // seek to the beginning of the all partitions + consumer.seekToBeginning(assignment); + System.out.println("assignment seek to beginning successfully"); + break; + } - // poll data agagin - records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - // process the data here - System.out.println("second data polled: " + JSON.toJSONString(record.value())); - break; - } -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("seek example failed", ex); -} + // poll data agagin + records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + // process the data here + System.out.println("second data polled: " + JSON.toJSONString(record.value())); + break; + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("seek example failed", ex); + } // ANCHOR_END: consumer_seek } public static void commitExample() throws SQLException { // ANCHOR: commit_code_piece -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - consumer.subscribe(topics); - for (int i = 0; i < 50; i++) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process your data here - System.out.println("data: " + JSON.toJSONString(bean)); + consumer.subscribe(topics); + for (int i = 0; i < 50; i++) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process your data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + if (!records.isEmpty()) { + // after processing the data, commit the offset manually + consumer.commitSync(); + } + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); } - if (!records.isEmpty()) { - // after processing the data, commit the offset manually - consumer.commitSync(); - } - } -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to execute consumer functions", ex); -} // ANCHOR_END: commit_code_piece } + public static void unsubscribeExample() throws SQLException { TaosConsumer consumer = getConsumer(); List topics = Collections.singletonList("topic_meters"); consumer.subscribe(topics); // ANCHOR: unsubscribe_data_code_piece -try { - consumer.unsubscribe(); -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to unsubscribe consumer", ex); -} finally { - consumer.close(); -} + try { + consumer.unsubscribe(); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to unsubscribe consumer", ex); + } finally { + consumer.close(); + } // ANCHOR_END: unsubscribe_data_code_piece } -public static class ResultDeserializer extends ReferenceDeserializer { + public static class ResultDeserializer extends ReferenceDeserializer { -} -// use this class to define the data structure of the result record -public static class ResultBean { - private Timestamp ts; - private double current; - private int voltage; - private double phase; - private int groupid; - private String location; - - public Timestamp getTs() { - return ts; } - public void setTs(Timestamp ts) { - this.ts = ts; + // use this class to define the data structure of the result record + public static class ResultBean { + private Timestamp ts; + private double current; + private int voltage; + private double phase; + private int groupid; + private String location; + + public Timestamp getTs() { + return ts; + } + + public void setTs(Timestamp ts) { + this.ts = ts; + } + + public double getCurrent() { + return current; + } + + public void setCurrent(double current) { + this.current = current; + } + + public int getVoltage() { + return voltage; + } + + public void setVoltage(int voltage) { + this.voltage = voltage; + } + + public double getPhase() { + return phase; + } + + public void setPhase(double phase) { + this.phase = phase; + } + + public int getGroupid() { + return groupid; + } + + public void setGroupid(int groupid) { + this.groupid = groupid; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } } - public double getCurrent() { - return current; - } - - public void setCurrent(double current) { - this.current = current; - } - - public int getVoltage() { - return voltage; - } - - public void setVoltage(int voltage) { - this.voltage = voltage; - } - - public double getPhase() { - return phase; - } - - public void setPhase(double phase) { - this.phase = phase; - } - - public int getGroupid() { - return groupid; - } - - public void setGroupid(int groupid) { - this.groupid = groupid; - } - - public String getLocation() { - return location; - } - - public void setLocation(String location) { - this.location = location; - } -} - - public static void prepareData() throws SQLException{ + public static void prepareData() throws SQLException { StringBuilder insertQuery = new StringBuilder(); insertQuery.append("INSERT INTO " + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + "VALUES "); - for (int i = 0; i < 10000; i++){ + for (int i = 0; i < 10000; i++) { insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) "); } try { @@ -257,7 +260,8 @@ public static class ResultBean { throw new SQLException("Failed to insert data to power.meters", ex); } } - public static void prepareMeta() throws SQLException{ + + public static void prepareMeta() throws SQLException { try { statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); statement.executeUpdate("USE power"); @@ -289,6 +293,7 @@ public static class ResultBean { } System.out.println("Connection created successfully."); } + public static void closeConnection() throws SQLException { try { if (statement != null) { @@ -338,16 +343,16 @@ public static class ResultBean { System.out.println("Data prepared successfully"); - // 关闭线程池,不再接收新任务 + // close the executor, which will make the executor reject new tasks executor.shutdown(); try { - // 等待直到所有任务完成 + // wait for the executor to terminate boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); assert result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - } catch (Exception e){ + } catch (Exception e) { e.printStackTrace(); System.out.println("Wait executor termination failed."); }