From 255d0fd02a637b8bf911ca81bea7465d286f6dbd Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Fri, 2 Aug 2024 18:39:19 +0800 Subject: [PATCH] mod java exception --- .../com/taos/example/JNIConnectExample.java | 4 + .../com/taos/example/RESTConnectExample.java | 4 + .../com/taos/example/WSConnectExample.java | 4 + .../taosdata/example/ConsumerLoopFull.java | 104 +++++----- .../com/taosdata/example/JdbcCreatDBDemo.java | 4 + .../taosdata/example/JdbcInsertDataDemo.java | 5 +- .../com/taosdata/example/JdbcQueryDemo.java | 4 + .../com/taosdata/example/JdbcReqIdDemo.java | 5 +- .../example/ParameterBindingBasicDemo.java | 4 + .../example/ParameterBindingFullDemo.java | 29 ++- .../taosdata/example/SchemalessJniTest.java | 4 + .../taosdata/example/SchemalessWsTest.java | 4 + .../example/WSParameterBindingBasicDemo.java | 4 + .../example/WSParameterBindingFullDemo.java | 7 +- .../taosdata/example/WsConsumerLoopFull.java | 196 +++++++++--------- 15 files changed, 216 insertions(+), 166 deletions(-) diff --git a/docs/examples/java/src/main/java/com/taos/example/JNIConnectExample.java b/docs/examples/java/src/main/java/com/taos/example/JNIConnectExample.java index 8b9f27c5ab..42ac7bde85 100644 --- a/docs/examples/java/src/main/java/com/taos/example/JNIConnectExample.java +++ b/docs/examples/java/src/main/java/com/taos/example/JNIConnectExample.java @@ -27,6 +27,10 @@ public static void main(String[] args) throws SQLException { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to connect to " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to connect to " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; } } // ANCHOR_END: main diff --git a/docs/examples/java/src/main/java/com/taos/example/RESTConnectExample.java b/docs/examples/java/src/main/java/com/taos/example/RESTConnectExample.java index 22bf1d61f4..b1ec31ee86 100644 --- a/docs/examples/java/src/main/java/com/taos/example/RESTConnectExample.java +++ b/docs/examples/java/src/main/java/com/taos/example/RESTConnectExample.java @@ -16,6 +16,10 @@ public static void main(String[] args) throws SQLException { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to connect to " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to connect to " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; } } // ANCHOR_END: main diff --git a/docs/examples/java/src/main/java/com/taos/example/WSConnectExample.java b/docs/examples/java/src/main/java/com/taos/example/WSConnectExample.java index b355a28f6f..d683cc64a6 100644 --- a/docs/examples/java/src/main/java/com/taos/example/WSConnectExample.java +++ b/docs/examples/java/src/main/java/com/taos/example/WSConnectExample.java @@ -28,6 +28,10 @@ public static void main(String[] args) throws SQLException { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to connect to " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to connect to " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; } } // ANCHOR_END: main diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java index 69d53e1117..7e3bc05d6a 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java @@ -16,6 +16,8 @@ public class ConsumerLoopFull { static private Connection connection; static private Statement statement; + static private volatile boolean stopFlag = false; + public static TaosConsumer getConsumer() throws SQLException { // ANCHOR: create_consumer Properties config = new Properties(); @@ -38,15 +40,16 @@ public class ConsumerLoopFull { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create consumer", ex); - } catch (Exception e) { - e.printStackTrace(); - throw new SQLException("Failed to create consumer", e); + } catch (Exception ex) { + System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create consumer", ex); } // ANCHOR_END: create_consumer } - public static void pollDataExample() throws SQLException { - try (TaosConsumer consumer = getConsumer()) { + public static void pollDataExample(TaosConsumer consumer) throws SQLException { + try{ // subscribe to the topics List topics = Collections.singletonList("topic_meters"); @@ -68,12 +71,15 @@ public class ConsumerLoopFull { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to poll data from topic_meters", ex); + } catch (Exception ex) { + System.out.println("Failed to poll data from topic_meters; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data from topic_meters", ex); } } - public static void pollExample() throws SQLException { + public static void pollExample(TaosConsumer consumer) throws SQLException { // ANCHOR: poll_data_code_piece - try (TaosConsumer consumer = getConsumer()) { + try { List topics = Collections.singletonList("topic_meters"); // subscribe to the topics @@ -93,54 +99,48 @@ public class ConsumerLoopFull { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to poll data", ex); + } catch (Exception ex) { + System.out.println("Failed to poll data; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data", ex); } // ANCHOR_END: poll_data_code_piece } - public static void seekExample() throws SQLException { + public static void seekExample(TaosConsumer consumer) throws SQLException { // ANCHOR: consumer_seek - try (TaosConsumer consumer = getConsumer()) { + try { List topics = Collections.singletonList("topic_meters"); // subscribe to the topics consumer.subscribe(topics); System.out.println("subscribe topics successfully"); + Set assignment = consumer.assignment(); + System.out.println("now assignment: " + JSON.toJSONString(assignment)); + ConsumerRecords records = ConsumerRecords.emptyRecord(); // make sure we have got some data while (records.isEmpty()) { records = consumer.poll(Duration.ofMillis(100)); } - for (ConsumerRecord record : records) { - System.out.println("first data polled: " + JSON.toJSONString(record.value())); - Set assignment = consumer.assignment(); - // seek to the beginning of the all partitions - consumer.seekToBeginning(assignment); - System.out.println("assignment seek to beginning successfully"); - break; - } - - // poll data again - records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - // process the data here - System.out.println("second data polled: " + JSON.toJSONString(record.value())); - break; - } - - + consumer.seekToBeginning(assignment); + System.out.println("assignment seek to beginning successfully"); + System.out.println("beginning assignment: " + JSON.toJSONString(assignment)); } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("seek example failed", ex); + } catch (Exception ex) { + System.out.println("seek example failed; ErrMessage: " + ex.getMessage()); + throw new SQLException("seek example failed", ex); } // ANCHOR_END: consumer_seek } - public static void commitExample() throws SQLException { + public static void commitExample(TaosConsumer consumer) throws SQLException { // ANCHOR: commit_code_piece - try (TaosConsumer consumer = getConsumer()) { + try { List topics = Collections.singletonList("topic_meters"); consumer.subscribe(topics); @@ -160,12 +160,14 @@ public class ConsumerLoopFull { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to execute consumer functions", ex); + } catch (Exception ex) { + System.out.println("Failed to execute consumer functions. ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); } // ANCHOR_END: commit_code_piece } - public static void unsubscribeExample() throws SQLException { - TaosConsumer consumer = getConsumer(); + public static void unsubscribeExample(TaosConsumer consumer) throws SQLException { List topics = Collections.singletonList("topic_meters"); consumer.subscribe(topics); // ANCHOR: unsubscribe_data_code_piece @@ -175,7 +177,11 @@ public class ConsumerLoopFull { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to unsubscribe consumer", ex); - } finally { + } catch (Exception ex) { + System.out.println("Failed to unsubscribe consumer. ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to unsubscribe consumer", ex); + } + finally { consumer.close(); } // ANCHOR_END: unsubscribe_data_code_piece @@ -243,17 +249,16 @@ public class ConsumerLoopFull { } } - public static void prepareData() throws SQLException { - StringBuilder insertQuery = new StringBuilder(); - insertQuery.append("INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES "); - for (int i = 0; i < 10000; i++) { - insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) "); - } + public static void prepareData() throws SQLException, InterruptedException { try { - int affectedRows = statement.executeUpdate(insertQuery.toString()); - assert affectedRows == 10000; + int i = 0; + while (!stopFlag) { + i++; + String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) "; + int affectedRows = statement.executeUpdate(insertQuery); + assert affectedRows == 1; + Thread.sleep(1); + } } catch (SQLException ex) { System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to insert data to power.meters", ex); @@ -315,7 +320,7 @@ public class ConsumerLoopFull { } - public static void main(String[] args) throws SQLException { + public static void main(String[] args) throws SQLException, InterruptedException { initConnection(); prepareMeta(); @@ -326,11 +331,14 @@ public class ConsumerLoopFull { executor.submit(() -> { try { // please use one example at a time - pollDataExample(); -// seekExample(); -// pollExample(); -// commitExample(); - unsubscribeExample(); + TaosConsumer consumer = getConsumer(); + + pollDataExample(consumer); + seekExample(consumer); + pollExample(consumer); + commitExample(consumer); + unsubscribeExample(consumer); + stopFlag = true; } catch (SQLException ex) { System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java index f5f5b1c62c..93531bd80f 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java @@ -45,6 +45,10 @@ public class JdbcCreatDBDemo { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to create db and table, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to create db and table, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; } // ANCHOR_END: create_db_and_table diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java index f8e11a8f41..cb8755788d 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java @@ -43,7 +43,10 @@ public class JdbcInsertDataDemo { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to insert data to power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - + throw ex; + } catch (Exception ex){ + System.out.println("Failed to insert data to power.meters, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; } // ANCHOR_END: insert_data } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java index a4749afed5..ea6d8346c6 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java @@ -44,6 +44,10 @@ public class JdbcQueryDemo { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to query data from power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to query data from power.meters, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; } // ANCHOR_END: query_data } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java index 3705e0aac2..0f2e522ce0 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java @@ -38,7 +38,10 @@ public class JdbcReqIdDemo { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to execute sql with reqId, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - + throw ex; + } catch (Exception ex){ + System.out.println("Failed to execute sql with reqId, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; } // ANCHOR_END: with_reqid } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java index fa2efab2ee..6a9fc5e44f 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java @@ -69,6 +69,10 @@ public class ParameterBindingBasicDemo { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; } } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingFullDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingFullDemo.java index e1aabcd008..bee619c2f7 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingFullDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingFullDemo.java @@ -31,20 +31,27 @@ public class ParameterBindingFullDemo { public static void main(String[] args) throws SQLException { String jdbcUrl = "jdbc:TAOS://" + host + ":6030/"; - Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata"); + try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) { - init(conn); + init(conn); - bindInteger(conn); - bindFloat(conn); - bindBoolean(conn); - bindBytes(conn); - bindString(conn); - bindVarbinary(conn); - bindGeometry(conn); + bindInteger(conn); + bindFloat(conn); + bindBoolean(conn); + bindBytes(conn); + bindString(conn); + bindVarbinary(conn); + bindGeometry(conn); - clean(conn); - conn.close(); + clean(conn); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; + } } private static void init(Connection conn) throws SQLException { diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessJniTest.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessJniTest.java index 8f5fe0e7cf..0a1a674761 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessJniTest.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessJniTest.java @@ -28,6 +28,10 @@ public class SchemalessJniTest { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrMessage: " + ex.getMessage()); + throw ex; } } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessWsTest.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessWsTest.java index 922781af56..6d06d4e17c 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessWsTest.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessWsTest.java @@ -28,6 +28,10 @@ public class SchemalessWsTest { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrMessage: " + ex.getMessage()); + throw ex; } } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingBasicDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingBasicDemo.java index 9eca3c973a..2f58b3798a 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingBasicDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingBasicDemo.java @@ -50,6 +50,10 @@ public class WSParameterBindingBasicDemo { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; } } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingFullDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingFullDemo.java index 5d63ce8bf3..ad5ba5faf0 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingFullDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingFullDemo.java @@ -39,8 +39,11 @@ public class WSParameterBindingFullDemo { } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Error Code: " + ex.getErrorCode()); - System.out.println("Message: " + ex.getMessage()); + System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw ex; + } catch (Exception ex){ + System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + throw ex; } } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java index 3ed0015359..726c795b7b 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java @@ -6,10 +6,7 @@ import com.taosdata.jdbc.tmq.*; import java.sql.*; import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -20,7 +17,7 @@ public class WsConsumerLoopFull { static private Statement statement; public static TaosConsumer getConsumer() throws SQLException { - // ANCHOR: create_consumer +// ANCHOR: create_consumer Properties config = new Properties(); config.setProperty("td.connect.type", "ws"); config.setProperty("bootstrap.servers", "localhost:6041"); @@ -29,7 +26,7 @@ public class WsConsumerLoopFull { config.setProperty("enable.auto.commit", "true"); config.setProperty("auto.commit.interval.ms", "1000"); config.setProperty("group.id", "group1"); - config.setProperty("client.id", "client1"); + config.setProperty("client.id", "1"); config.setProperty("td.connect.user", "root"); config.setProperty("td.connect.pass", "taosdata"); config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer"); @@ -38,20 +35,19 @@ public class WsConsumerLoopFull { try { return new TaosConsumer<>(config); } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed - // exceptions info - System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") - + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create consumer", ex); + } catch (Exception ex) { + System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create consumer", ex); - } catch (Exception e) { - e.printStackTrace(); - throw new SQLException("Failed to create consumer", e); } - // ANCHOR_END: create_consumer +// ANCHOR_END: create_consumer } - public static void pollDataExample() throws SQLException { - try (TaosConsumer consumer = getConsumer()) { + public static void pollDataExample(TaosConsumer consumer) throws SQLException { + try{ // subscribe to the topics List topics = Collections.singletonList("topic_meters"); @@ -70,17 +66,18 @@ public class WsConsumerLoopFull { consumer.unsubscribe(); System.out.println("unsubscribed topics successfully"); } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed - // exceptions info - System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " - + ex.getMessage()); + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data from topic_meters", ex); + } catch (Exception ex) { + System.out.println("Failed to poll data from topic_meters; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to poll data from topic_meters", ex); } } - public static void pollExample() throws SQLException { - // ANCHOR: poll_data_code_piece - try (TaosConsumer consumer = getConsumer()) { + public static void pollExample(TaosConsumer consumer) throws SQLException { +// ANCHOR: poll_data_code_piece + try { List topics = Collections.singletonList("topic_meters"); // subscribe to the topics @@ -97,58 +94,51 @@ public class WsConsumerLoopFull { } } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed - // exceptions info - System.out - .println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data", ex); + } catch (Exception ex) { + System.out.println("Failed to poll data; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to poll data", ex); } - // ANCHOR_END: poll_data_code_piece +// ANCHOR_END: poll_data_code_piece } - public static void seekExample() throws SQLException { - // ANCHOR: consumer_seek - try (TaosConsumer consumer = getConsumer()) { + public static void seekExample(TaosConsumer consumer) throws SQLException { +// ANCHOR: consumer_seek + try { List topics = Collections.singletonList("topic_meters"); // subscribe to the topics consumer.subscribe(topics); System.out.println("subscribe topics successfully"); + Set assignment = consumer.assignment(); + System.out.println("now assignment: " + JSON.toJSONString(assignment)); + ConsumerRecords records = ConsumerRecords.emptyRecord(); // make sure we have got some data while (records.isEmpty()) { records = consumer.poll(Duration.ofMillis(100)); } - for (ConsumerRecord record : records) { - System.out.println("first data polled: " + JSON.toJSONString(record.value())); - Set assignment = consumer.assignment(); - // seek to the beginning of the all partitions - consumer.seekToBeginning(assignment); - System.out.println("assignment seek to beginning successfully"); - break; - } - - // poll data again - records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - // process the data here - System.out.println("second data polled: " + JSON.toJSONString(record.value())); - break; - } + consumer.seekToBeginning(assignment); + System.out.println("assignment seek to beginning successfully"); + System.out.println("beginning assignment: " + JSON.toJSONString(assignment)); } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed - // exceptions info - System.out - .println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("seek example failed", ex); + } catch (Exception ex) { + System.out.println("seek example failed; ErrMessage: " + ex.getMessage()); throw new SQLException("seek example failed", ex); } - // ANCHOR_END: consumer_seek +// ANCHOR_END: consumer_seek } - public static void commitExample() throws SQLException { - // ANCHOR: commit_code_piece - try (TaosConsumer consumer = getConsumer()) { + + public static void commitExample(TaosConsumer consumer) throws SQLException { +// ANCHOR: commit_code_piece + try { List topics = Collections.singletonList("topic_meters"); consumer.subscribe(topics); @@ -165,32 +155,34 @@ public class WsConsumerLoopFull { } } } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed - // exceptions info - System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " - + ex.getMessage()); + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); + } catch (Exception ex) { + System.out.println("Failed to execute consumer functions. ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to execute consumer functions", ex); } - // ANCHOR_END: commit_code_piece +// ANCHOR_END: commit_code_piece } - public static void unsubscribeExample() throws SQLException { - TaosConsumer consumer = getConsumer(); + public static void unsubscribeExample(TaosConsumer consumer) throws SQLException { List topics = Collections.singletonList("topic_meters"); consumer.subscribe(topics); - // ANCHOR: unsubscribe_data_code_piece +// ANCHOR: unsubscribe_data_code_piece try { consumer.unsubscribe(); } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed - // exceptions info - System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " - + ex.getMessage()); + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to unsubscribe consumer", ex); - } finally { + } catch (Exception ex) { + System.out.println("Failed to unsubscribe consumer. ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to unsubscribe consumer", ex); + } + finally { consumer.close(); } - // ANCHOR_END: unsubscribe_data_code_piece +// ANCHOR_END: unsubscribe_data_code_piece } public static class ResultDeserializer extends ReferenceDeserializer { @@ -255,20 +247,16 @@ public class WsConsumerLoopFull { } } - public static void prepareData() throws SQLException { - StringBuilder insertQuery = new StringBuilder(); - insertQuery.append("INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES "); - for (int i = 0; i < 10000; i++) { - insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) "); - } + public static void prepareData() throws SQLException, InterruptedException { try { - int affectedRows = statement.executeUpdate(insertQuery.toString()); - assert affectedRows == 10000; + for (int i = 0; i < 3000; i++) { + String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) "; + int affectedRows = statement.executeUpdate(insertQuery); + assert affectedRows == 1; + Thread.sleep(1); + } } catch (SQLException ex) { - System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " - + ex.getMessage()); + System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to insert data to power.meters", ex); } } @@ -277,13 +265,10 @@ public class WsConsumerLoopFull { try { statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); statement.executeUpdate("USE power"); - statement.executeUpdate( - "CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); - statement.executeUpdate( - "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); + statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); + statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); } catch (SQLException ex) { - System.out.println( - "Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create db and table", ex); } } @@ -297,15 +282,13 @@ public class WsConsumerLoopFull { try { connection = DriverManager.getConnection(url, properties); } catch (SQLException ex) { - System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() - + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create connection", ex); } try { statement = connection.createStatement(); } catch (SQLException ex) { - System.out.println( - "Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create statement", ex); } System.out.println("Connection created successfully."); @@ -317,8 +300,7 @@ public class WsConsumerLoopFull { statement.close(); } } catch (SQLException ex) { - System.out.println( - "Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to close statement", ex); } @@ -327,14 +309,14 @@ public class WsConsumerLoopFull { connection.close(); } } catch (SQLException ex) { - System.out.println( - "Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to close connection", ex); } System.out.println("Connection closed Successfully."); } - public static void main(String[] args) throws SQLException { + + public static void main(String[] args) throws SQLException, InterruptedException { initConnection(); prepareMeta(); @@ -345,14 +327,22 @@ public class WsConsumerLoopFull { executor.submit(() -> { try { // please use one example at a time - pollDataExample(); - // seekExample(); - // pollExample(); - // commitExample(); - unsubscribeExample(); + TaosConsumer consumer = getConsumer(); + + pollDataExample(consumer); + seekExample(consumer); + consumer.unsubscribe(); + pollExample(consumer); + consumer.unsubscribe(); + commitExample(consumer); + consumer.unsubscribe(); + unsubscribeExample(consumer); } catch (SQLException ex) { - System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() - + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + return; + } catch (Exception ex) { + System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage()); + return; } System.out.println("pollDataExample executed successfully"); });