diff --git a/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java b/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java index 62dac019d7..ec9faf383e 100644 --- a/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java +++ b/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java @@ -19,8 +19,10 @@ public class ConsumerLoopFull { static private Connection connection; static private Statement statement; static private volatile boolean stopThread = false; + static private String groupId = "group1"; + static private String clientId = "clinet1"; - public static TaosConsumer getConsumer() throws Exception { + public static TaosConsumer getConsumer() throws Exception { // ANCHOR: create_consumer Properties config = new Properties(); config.setProperty("td.connect.type", "jni"); @@ -30,7 +32,7 @@ public class ConsumerLoopFull { config.setProperty("enable.auto.commit", "true"); config.setProperty("auto.commit.interval.ms", "1000"); config.setProperty("group.id", "group1"); - config.setProperty("client.id", "1"); + config.setProperty("client.id", "clinet1"); config.setProperty("td.connect.user", "root"); config.setProperty("td.connect.pass", "taosdata"); config.setProperty("value.deserializer", "com.taos.example.ConsumerLoopFull$ResultDeserializer"); @@ -45,8 +47,10 @@ public class ConsumerLoopFull { return consumer; } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to create native consumer, host: %s, %sErrMessage: %s%n", + System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", config.getProperty("bootstrap.servers"), + config.getProperty("group.id"), + config.getProperty("client.id"), ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. @@ -58,9 +62,8 @@ public class ConsumerLoopFull { public static void pollExample(TaosConsumer consumer) throws SQLException { // ANCHOR: poll_data_code_piece + List topics = Collections.singletonList("topic_meters"); try { - List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics consumer.subscribe(topics); System.out.println("Subscribe topics successfully."); @@ -69,13 +72,16 @@ public class ConsumerLoopFull { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { ResultBean bean = record.value(); - // process the data here + // Add your data processing logic here System.out.println("data: " + JSON.toJSONString(bean)); } } } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to poll data, %sErrMessage: %s%n", + System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", + topics.get(0), + groupId, + clientId, ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. @@ -87,9 +93,8 @@ public class ConsumerLoopFull { public static void seekExample(TaosConsumer consumer) throws SQLException { // ANCHOR: consumer_seek + List topics = Collections.singletonList("topic_meters"); try { - List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics consumer.subscribe(topics); System.out.println("Subscribe topics successfully."); @@ -106,7 +111,10 @@ public class ConsumerLoopFull { System.out.println("Assignment seek to beginning successfully."); } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to execute seek example, %sErrMessage: %s%n", + System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", + topics.get(0), + groupId, + clientId, ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. @@ -119,15 +127,14 @@ public class ConsumerLoopFull { public static void commitExample(TaosConsumer consumer) throws SQLException { // ANCHOR: commit_code_piece + List topics = Collections.singletonList("topic_meters"); try { - List topics = Collections.singletonList("topic_meters"); - consumer.subscribe(topics); for (int i = 0; i < 50; i++) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { ResultBean bean = record.value(); - // process your data here + // Add your data processing logic here System.out.println("data: " + JSON.toJSONString(bean)); } if (!records.isEmpty()) { @@ -138,7 +145,10 @@ public class ConsumerLoopFull { } } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to execute commit example, %sErrMessage: %s%n", + System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", + topics.get(0), + groupId, + clientId, ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. @@ -158,7 +168,10 @@ public class ConsumerLoopFull { System.out.println("Consumer unsubscribed successfully."); } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to unsubscribe consumer, %sErrMessage: %s%n", + System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", + topics.get(0), + groupId, + clientId, ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. diff --git a/docs/examples/java/src/main/java/com/taos/example/JdbcInsertDataDemo.java b/docs/examples/java/src/main/java/com/taos/example/JdbcInsertDataDemo.java index f19017193c..5c3599d819 100644 --- a/docs/examples/java/src/main/java/com/taos/example/JdbcInsertDataDemo.java +++ b/docs/examples/java/src/main/java/com/taos/example/JdbcInsertDataDemo.java @@ -25,25 +25,26 @@ public class JdbcInsertDataDemo { properties.setProperty("timezone", "UTC-8"); System.out.println("get connection starting..."); // ANCHOR: insert_data + // insert data, please make sure the database and table are created before + String insertQuery = "INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) "; try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); Statement stmt = connection.createStatement()) { - // insert data, please make sure the database and table are created before - String insertQuery = "INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 219, 0.31000) " + - "(NOW + 2a, 12.60000, 218, 0.33000) " + - "(NOW + 3a, 12.30000, 221, 0.31000) " + - "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 218, 0.25000) "; int affectedRows = stmt.executeUpdate(insertQuery); // you can check affectedRows here System.out.println("Successfully inserted " + affectedRows + " rows to power.meters."); } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n", + System.out.printf("Failed to insert data to power.meters, sql: %s, %sErrMessage: %s%n", + insertQuery, ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. diff --git a/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopFull.java b/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopFull.java index 66c37f172e..6db65f47f2 100644 --- a/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopFull.java +++ b/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopFull.java @@ -19,6 +19,8 @@ public class WsConsumerLoopFull { static private Connection connection; static private Statement statement; static private volatile boolean stopThread = false; + static private String groupId = "group1"; + static private String clientId = "clinet1"; public static TaosConsumer getConsumer() throws Exception { // ANCHOR: create_consumer @@ -30,7 +32,7 @@ public class WsConsumerLoopFull { config.setProperty("enable.auto.commit", "true"); config.setProperty("auto.commit.interval.ms", "1000"); config.setProperty("group.id", "group1"); - config.setProperty("client.id", "1"); + config.setProperty("client.id", "clinet1"); config.setProperty("td.connect.user", "root"); config.setProperty("td.connect.pass", "taosdata"); config.setProperty("value.deserializer", "com.taos.example.WsConsumerLoopFull$ResultDeserializer"); @@ -45,8 +47,10 @@ public class WsConsumerLoopFull { return consumer; } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to create websocket consumer, host: %s, %sErrMessage: %s%n", + System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", config.getProperty("bootstrap.servers"), + config.getProperty("group.id"), + config.getProperty("client.id"), ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. @@ -58,9 +62,8 @@ public class WsConsumerLoopFull { public static void pollExample(TaosConsumer consumer) throws SQLException { // ANCHOR: poll_data_code_piece + List topics = Collections.singletonList("topic_meters"); try { - List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics consumer.subscribe(topics); System.out.println("Subscribe topics successfully."); @@ -69,13 +72,16 @@ public class WsConsumerLoopFull { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { ResultBean bean = record.value(); - // process the data here + // Add your data processing logic here System.out.println("data: " + JSON.toJSONString(bean)); } } } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to poll data, %sErrMessage: %s%n", + System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", + topics.get(0), + groupId, + clientId, ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. @@ -87,9 +93,8 @@ public class WsConsumerLoopFull { public static void seekExample(TaosConsumer consumer) throws SQLException { // ANCHOR: consumer_seek + List topics = Collections.singletonList("topic_meters"); try { - List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics consumer.subscribe(topics); System.out.println("Subscribe topics successfully."); @@ -106,7 +111,10 @@ public class WsConsumerLoopFull { System.out.println("Assignment seek to beginning successfully."); } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to execute seek example, %sErrMessage: %s%n", + System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", + topics.get(0), + groupId, + clientId, ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. @@ -119,15 +127,14 @@ public class WsConsumerLoopFull { public static void commitExample(TaosConsumer consumer) throws SQLException { // ANCHOR: commit_code_piece + List topics = Collections.singletonList("topic_meters"); try { - List topics = Collections.singletonList("topic_meters"); - consumer.subscribe(topics); for (int i = 0; i < 50; i++) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { ResultBean bean = record.value(); - // process your data here + // Add your data processing logic here System.out.println("data: " + JSON.toJSONString(bean)); } if (!records.isEmpty()) { @@ -138,7 +145,10 @@ public class WsConsumerLoopFull { } } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to execute commit example, %sErrMessage: %s%n", + System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", + topics.get(0), + groupId, + clientId, ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. @@ -158,7 +168,10 @@ public class WsConsumerLoopFull { System.out.println("Consumer unsubscribed successfully."); } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to unsubscribe consumer, %sErrMessage: %s%n", + System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", + topics.get(0), + groupId, + clientId, ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", ex.getMessage()); // Print stack trace for context in examples. Use logging in production. diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java deleted file mode 100644 index ce9af5ecdc..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java +++ /dev/null @@ -1,384 +0,0 @@ -package com.taosdata.example; - -import com.alibaba.fastjson.JSON; -import com.taosdata.jdbc.TSDBDriver; -import com.taosdata.jdbc.tmq.*; - -import java.sql.*; -import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -// ANCHOR: consumer_demo -public class ConsumerLoopFull { - static private Connection connection; - static private Statement statement; - static private volatile boolean stopThread = false; - - public static TaosConsumer getConsumer() throws Exception { -// ANCHOR: create_consumer - Properties config = new Properties(); - config.setProperty("td.connect.type", "jni"); - config.setProperty("bootstrap.servers", "localhost:6030"); - config.setProperty("auto.offset.reset", "latest"); - config.setProperty("msg.with.table.name", "true"); - config.setProperty("enable.auto.commit", "true"); - config.setProperty("auto.commit.interval.ms", "1000"); - config.setProperty("group.id", "group1"); - config.setProperty("client.id", "1"); - config.setProperty("td.connect.user", "root"); - config.setProperty("td.connect.pass", "taosdata"); - config.setProperty("value.deserializer", "com.taosdata.example.ConsumerLoopFull$ResultDeserializer"); - config.setProperty("value.deserializer.encoding", "UTF-8"); - - try { - TaosConsumer consumer= new TaosConsumer<>(config); - System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n", - config.getProperty("bootstrap.servers"), - config.getProperty("group.id"), - config.getProperty("client.id")); - return consumer; - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to create native consumer, host: %s, %sErrMessage: %s%n", - config.getProperty("bootstrap.servers"), - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: create_consumer - } - - public static void pollExample(TaosConsumer consumer) throws SQLException { -// ANCHOR: poll_data_code_piece - try { - List topics = Collections.singletonList("topic_meters"); - - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("Subscribe topics successfully."); - for (int i = 0; i < 50; i++) { - // poll data - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process the data here - System.out.println("data: " + JSON.toJSONString(bean)); - } - } - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to poll data, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: poll_data_code_piece - } - - public static void seekExample(TaosConsumer consumer) throws SQLException { -// ANCHOR: consumer_seek - try { - List topics = Collections.singletonList("topic_meters"); - - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("Subscribe topics successfully."); - Set assignment = consumer.assignment(); - System.out.println("Now assignment: " + JSON.toJSONString(assignment)); - - ConsumerRecords records = ConsumerRecords.emptyRecord(); - // make sure we have got some data - while (records.isEmpty()) { - records = consumer.poll(Duration.ofMillis(100)); - } - - consumer.seekToBeginning(assignment); - System.out.println("Assignment seek to beginning successfully."); - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to execute seek example, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: consumer_seek - } - - - public static void commitExample(TaosConsumer consumer) throws SQLException { -// ANCHOR: commit_code_piece - try { - List topics = Collections.singletonList("topic_meters"); - - consumer.subscribe(topics); - for (int i = 0; i < 50; i++) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process your data here - System.out.println("data: " + JSON.toJSONString(bean)); - } - if (!records.isEmpty()) { - // after processing the data, commit the offset manually - consumer.commitSync(); - System.out.println("Commit offset manually successfully."); - } - } - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to execute commit example, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: commit_code_piece - } - - public static void unsubscribeExample(TaosConsumer consumer) throws SQLException { - List topics = Collections.singletonList("topic_meters"); - consumer.subscribe(topics); -// ANCHOR: unsubscribe_data_code_piece - try { - // unsubscribe the consumer - consumer.unsubscribe(); - System.out.println("Consumer unsubscribed successfully."); - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to unsubscribe consumer, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } - finally { - // close the consumer - consumer.close(); - System.out.println("Consumer closed successfully."); - } -// ANCHOR_END: unsubscribe_data_code_piece - } - - public static class ResultDeserializer extends ReferenceDeserializer { - - } - - // use this class to define the data structure of the result record - public static class ResultBean { - private Timestamp ts; - private double current; - private int voltage; - private double phase; - private int groupid; - private String location; - - public Timestamp getTs() { - return ts; - } - - public void setTs(Timestamp ts) { - this.ts = ts; - } - - public double getCurrent() { - return current; - } - - public void setCurrent(double current) { - this.current = current; - } - - public int getVoltage() { - return voltage; - } - - public void setVoltage(int voltage) { - this.voltage = voltage; - } - - public double getPhase() { - return phase; - } - - public void setPhase(double phase) { - this.phase = phase; - } - - public int getGroupid() { - return groupid; - } - - public void setGroupid(int groupid) { - this.groupid = groupid; - } - - public String getLocation() { - return location; - } - - public void setLocation(String location) { - this.location = location; - } - } - - public static void prepareData() throws SQLException, InterruptedException { - try { - int i = 0; - while (!stopThread) { - String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) "; - int affectedRows = statement.executeUpdate(insertQuery); - assert affectedRows == 1; - i++; - Thread.sleep(1); - } - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } - } - - public static void prepareMeta() throws SQLException { - try { - statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); - statement.executeUpdate("USE power"); - statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); - statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to create db and table, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } - } - - public static void initConnection() throws SQLException { - String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; - Properties properties = new Properties(); - properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - - try { - connection = DriverManager.getConnection(url, properties); - } catch (SQLException ex) { - System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create connection", ex); - } - try { - statement = connection.createStatement(); - } catch (SQLException ex) { - System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create statement", ex); - } - System.out.println("Connection created successfully."); - } - - public static void closeConnection() throws SQLException { - try { - if (statement != null) { - statement.close(); - } - } catch (SQLException ex) { - System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to close statement", ex); - } - - try { - if (connection != null) { - connection.close(); - } - } catch (SQLException ex) { - System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to close connection", ex); - } - System.out.println("Connection closed Successfully."); - } - - - public static void main(String[] args) throws SQLException, InterruptedException { - initConnection(); - prepareMeta(); - - // create a single thread executor - ExecutorService executor = Executors.newSingleThreadExecutor(); - - // submit a task - executor.submit(() -> { - try { - prepareData(); - } catch (SQLException ex) { - System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage()); - return; - } catch (Exception ex) { - System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage()); - return; - } - System.out.println("pollDataExample executed successfully."); - }); - - try { - TaosConsumer consumer = getConsumer(); - - pollExample(consumer); - System.out.println("pollExample executed successfully."); - consumer.unsubscribe(); - - seekExample(consumer); - System.out.println("seekExample executed successfully."); - consumer.unsubscribe(); - - commitExample(consumer); - System.out.println("commitExample executed successfully."); - consumer.unsubscribe(); - - unsubscribeExample(consumer); - System.out.println("unsubscribeExample executed successfully"); - } catch (SQLException ex) { - System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - return; - } catch (Exception ex) { - System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage()); - return; - } - - stopThread = true; - // close the executor, which will make the executor reject new tasks - executor.shutdown(); - - try { - // wait for the executor to terminate - boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - assert result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Wait executor termination failed."); - } - - closeConnection(); - System.out.println("program end."); - } -} -// ANCHOR_END: consumer_demo diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java deleted file mode 100644 index 28d7d2d67b..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.AbstractStatement; - -import java.sql.*; -import java.util.Properties; - -public class JdbcCreatDBDemo { - private static final String host = "localhost"; - private static final String dbName = "test"; - private static final String tbName = "weather"; - private static final String user = "root"; - private static final String password = "taosdata"; - - - public static void main(String[] args) throws SQLException { - - final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; - -// get connection - Properties properties = new Properties(); - properties.setProperty("charset", "UTF-8"); - properties.setProperty("locale", "en_US.UTF-8"); - properties.setProperty("timezone", "UTC-8"); - System.out.println("get connection starting..."); -// ANCHOR: create_db_and_table - try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - Statement stmt = connection.createStatement()) { - - // create database - int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); - // you can check rowsAffected here - System.out.println("Create database power successfully, rowsAffected: " + rowsAffected); - // create table - rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); - // you can check rowsAffected here - System.out.println("Create stable power.meters successfully, rowsAffected: " + rowsAffected); - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to create database power or stable meters, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: create_db_and_table - - } - - private static void printResult(ResultSet resultSet) throws SQLException { - Util.printResult(resultSet); - } - -} diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java deleted file mode 100644 index 08798b755c..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.AbstractStatement; - -import java.sql.*; -import java.util.Properties; - -public class JdbcInsertDataDemo { - private static final String host = "localhost"; - private static final String dbName = "test"; - private static final String tbName = "weather"; - private static final String user = "root"; - private static final String password = "taosdata"; - - - public static void main(String[] args) throws SQLException { - - final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; - -// get connection - Properties properties = new Properties(); - properties.setProperty("charset", "UTF-8"); - properties.setProperty("locale", "en_US.UTF-8"); - properties.setProperty("timezone", "UTC-8"); - System.out.println("get connection starting..."); -// ANCHOR: insert_data - try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - Statement stmt = connection.createStatement()) { - - // insert data, please make sure the database and table are created before - String insertQuery = "INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 219, 0.31000) " + - "(NOW + 2a, 12.60000, 218, 0.33000) " + - "(NOW + 3a, 12.30000, 221, 0.31000) " + - "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 218, 0.25000) "; - int affectedRows = stmt.executeUpdate(insertQuery); - // you can check affectedRows here - System.out.println("Successfully inserted " + affectedRows + " rows to power.meters."); - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: insert_data - } -} diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java deleted file mode 100644 index 768ba8929c..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.AbstractStatement; - -import java.sql.*; -import java.util.Properties; - -public class JdbcQueryDemo { - private static final String host = "localhost"; - private static final String dbName = "test"; - private static final String tbName = "weather"; - private static final String user = "root"; - private static final String password = "taosdata"; - - - public static void main(String[] args) throws SQLException { - - final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; - -// get connection - Properties properties = new Properties(); - properties.setProperty("charset", "UTF-8"); - properties.setProperty("locale", "en_US.UTF-8"); - properties.setProperty("timezone", "UTC-8"); - System.out.println("get connection starting..."); -// ANCHOR: query_data - String sql = "SELECT ts, current, location FROM power.meters limit 100"; - try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - Statement stmt = connection.createStatement(); - // query data, make sure the database and table are created before - ResultSet resultSet = stmt.executeQuery(sql)) { - - Timestamp ts; - float current; - String location; - while (resultSet.next()) { - ts = resultSet.getTimestamp(1); - current = resultSet.getFloat(2); - // we recommend using the column name to get the value - location = resultSet.getString("location"); - - // you can check data here - System.out.printf("ts: %s, current: %f, location: %s %n", ts, current, location); - } - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to query data from power.meters, sql: %s, %sErrMessage: %s%n", - sql, - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: query_data - } -} diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java deleted file mode 100644 index dd4b549bc5..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.AbstractStatement; - -import java.sql.*; -import java.util.Properties; - -public class JdbcReqIdDemo { - private static final String host = "localhost"; - private static final String dbName = "test"; - private static final String tbName = "weather"; - private static final String user = "root"; - private static final String password = "taosdata"; - - - public static void main(String[] args) throws SQLException { - - final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; - -// get connection - Properties properties = new Properties(); - properties.setProperty("charset", "UTF-8"); - properties.setProperty("locale", "en_US.UTF-8"); - properties.setProperty("timezone", "UTC-8"); - System.out.println("get connection starting..."); - -// ANCHOR: with_reqid - long reqId = 3L; - try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - // Create a statement that allows specifying a request ID - AbstractStatement aStmt = (AbstractStatement) connection.createStatement()) { - - try (ResultSet resultSet = aStmt.executeQuery("SELECT ts, current, location FROM power.meters limit 1", reqId)) { - Timestamp ts; - float current; - String location; - while (resultSet.next()) { - ts = resultSet.getTimestamp(1); - current = resultSet.getFloat(2); - // we recommend using the column name to get the value - location = resultSet.getString("location"); - - // you can check data here - System.out.printf("ts: %s, current: %f, location: %s %n", ts, current, location); - - } - } - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to execute sql with reqId: %s, %sErrMessage: %s%n", reqId, - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: with_reqid - } - - private static void printResult(ResultSet resultSet) throws SQLException { - Util.printResult(resultSet); - } - -} diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java deleted file mode 100644 index b5732f0e33..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.TSDBPreparedStatement; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Random; - -// ANCHOR: para_bind -public class ParameterBindingBasicDemo { - - // modify host to your own - private static final String host = "127.0.0.1"; - private static final Random random = new Random(System.currentTimeMillis()); - private static final int numOfSubTable = 10, numOfRow = 10; - - public static void main(String[] args) throws SQLException { - - String jdbcUrl = "jdbc:TAOS://" + host + ":6030/"; - try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) { - - init(conn); - - String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)"; - - try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { - - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("d_bind_" + i); - - // set tags - pstmt.setTagInt(0, i); - pstmt.setTagString(1, "location_" + i); - - // set column ts - ArrayList tsList = new ArrayList<>(); - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) - tsList.add(current + j); - pstmt.setTimestamp(0, tsList); - - // set column current - ArrayList currentList = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) - currentList.add(random.nextFloat() * 30); - pstmt.setFloat(1, currentList); - - // set column voltage - ArrayList voltageList = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) - voltageList.add(random.nextInt(300)); - pstmt.setInt(2, voltageList); - - // set column phase - ArrayList phaseList = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) - phaseList.add(random.nextFloat()); - pstmt.setFloat(3, phaseList); - // add column - pstmt.columnDataAddBatch(); - } - // execute column - pstmt.columnDataExecuteBatch(); - // you can check exeResult here - System.out.println("Successfully inserted " + (numOfSubTable * numOfRow) + " rows to power.meters."); - } - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } - } - - private static void init(Connection conn) throws SQLException { - try (Statement stmt = conn.createStatement()) { - stmt.execute("CREATE DATABASE IF NOT EXISTS power"); - stmt.execute("USE power"); - stmt.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); - } - } -} -// ANCHOR_END: para_bind diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessJniTest.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessJniTest.java deleted file mode 100644 index 5b1ce51be6..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessJniTest.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.AbstractConnection; -import com.taosdata.jdbc.enums.SchemalessProtocolType; -import com.taosdata.jdbc.enums.SchemalessTimestampType; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; - -// ANCHOR: schemaless -public class SchemalessJniTest { - private static final String host = "127.0.0.1"; - private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"; - private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0"; - private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"; - - public static void main(String[] args) throws SQLException { - final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata"; - try (Connection connection = DriverManager.getConnection(jdbcUrl)) { - init(connection); - AbstractConnection conn = connection.unwrap(AbstractConnection.class); - - conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS); - conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS); - conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED); - System.out.println("Inserted data with schemaless successfully."); - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } - } - - private static void init(Connection connection) throws SQLException { - try (Statement stmt = connection.createStatement()) { - stmt.execute("CREATE DATABASE IF NOT EXISTS power"); - stmt.execute("USE power"); - } - } -} -// ANCHOR_END: schemaless diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessWsTest.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessWsTest.java deleted file mode 100644 index 0f15e70224..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessWsTest.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.AbstractConnection; -import com.taosdata.jdbc.enums.SchemalessProtocolType; -import com.taosdata.jdbc.enums.SchemalessTimestampType; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; - -// ANCHOR: schemaless -public class SchemalessWsTest { - private static final String host = "127.0.0.1"; - private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"; - private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0"; - private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"; - - public static void main(String[] args) throws SQLException { - final String url = "jdbc:TAOS-RS://" + host + ":6041?user=root&password=taosdata&batchfetch=true"; - try(Connection connection = DriverManager.getConnection(url)){ - init(connection); - AbstractConnection conn = connection.unwrap(AbstractConnection.class); - - conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS); - conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS); - conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS); - System.out.println("Inserted data with schemaless successfully."); - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } - } - - private static void init(Connection connection) throws SQLException { - try (Statement stmt = connection.createStatement()) { - stmt.execute("CREATE DATABASE IF NOT EXISTS power"); - stmt.execute("USE power"); - } - } -} -// ANCHOR_END: schemaless diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingBasicDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingBasicDemo.java deleted file mode 100644 index 792ee4ed2d..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingBasicDemo.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.TSDBPreparedStatement; -import com.taosdata.jdbc.ws.TSWSPreparedStatement; - -import java.sql.*; -import java.util.ArrayList; -import java.util.Random; - -// ANCHOR: para_bind -public class WSParameterBindingBasicDemo { - - // modify host to your own - private static final String host = "127.0.0.1"; - private static final Random random = new Random(System.currentTimeMillis()); - private static final int numOfSubTable = 10, numOfRow = 10; - - public static void main(String[] args) throws SQLException { - - String jdbcUrl = "jdbc:TAOS-RS://" + host + ":6041/?batchfetch=true"; - try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) { - init(conn); - - String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)"; - - try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) { - - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("d_bind_" + i); - - // set tags - pstmt.setTagInt(0, i); - pstmt.setTagString(1, "location_" + i); - - // set columns - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) { - pstmt.setTimestamp(1, new Timestamp(current + j)); - pstmt.setFloat(2, random.nextFloat() * 30); - pstmt.setInt(3, random.nextInt(300)); - pstmt.setFloat(4, random.nextFloat()); - pstmt.addBatch(); - } - int [] exeResult = pstmt.executeBatch(); - // you can check exeResult here - System.out.println("Successfully inserted " + exeResult.length + " rows to power.meters."); - } - } - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } - } - - private static void init(Connection conn) throws SQLException { - try (Statement stmt = conn.createStatement()) { - stmt.execute("CREATE DATABASE IF NOT EXISTS power"); - stmt.execute("USE power"); - stmt.execute("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); - } - } -} -// ANCHOR_END: para_bind diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java deleted file mode 100644 index 17380023cd..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java +++ /dev/null @@ -1,384 +0,0 @@ -package com.taosdata.example; - -import com.alibaba.fastjson.JSON; -import com.taosdata.jdbc.TSDBDriver; -import com.taosdata.jdbc.tmq.*; - -import java.sql.*; -import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -// ANCHOR: consumer_demo -public class WsConsumerLoopFull { - static private Connection connection; - static private Statement statement; - static private volatile boolean stopThread = false; - - public static TaosConsumer getConsumer() throws Exception { -// ANCHOR: create_consumer - Properties config = new Properties(); - config.setProperty("td.connect.type", "ws"); - config.setProperty("bootstrap.servers", "localhost:6041"); - config.setProperty("auto.offset.reset", "latest"); - config.setProperty("msg.with.table.name", "true"); - config.setProperty("enable.auto.commit", "true"); - config.setProperty("auto.commit.interval.ms", "1000"); - config.setProperty("group.id", "group1"); - config.setProperty("client.id", "1"); - config.setProperty("td.connect.user", "root"); - config.setProperty("td.connect.pass", "taosdata"); - config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer"); - config.setProperty("value.deserializer.encoding", "UTF-8"); - - try { - TaosConsumer consumer= new TaosConsumer<>(config); - System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n", - config.getProperty("bootstrap.servers"), - config.getProperty("group.id"), - config.getProperty("client.id")); - return consumer; - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to create websocket consumer, host: %s, %sErrMessage: %s%n", - config.getProperty("bootstrap.servers"), - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: create_consumer - } - - public static void pollExample(TaosConsumer consumer) throws SQLException { -// ANCHOR: poll_data_code_piece - try { - List topics = Collections.singletonList("topic_meters"); - - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("Subscribe topics successfully."); - for (int i = 0; i < 50; i++) { - // poll data - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process the data here - System.out.println("data: " + JSON.toJSONString(bean)); - } - } - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to poll data, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: poll_data_code_piece - } - - public static void seekExample(TaosConsumer consumer) throws SQLException { -// ANCHOR: consumer_seek - try { - List topics = Collections.singletonList("topic_meters"); - - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("Subscribe topics successfully."); - Set assignment = consumer.assignment(); - System.out.println("Now assignment: " + JSON.toJSONString(assignment)); - - ConsumerRecords records = ConsumerRecords.emptyRecord(); - // make sure we have got some data - while (records.isEmpty()) { - records = consumer.poll(Duration.ofMillis(100)); - } - - consumer.seekToBeginning(assignment); - System.out.println("Assignment seek to beginning successfully."); - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to execute seek example, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: consumer_seek - } - - - public static void commitExample(TaosConsumer consumer) throws SQLException { -// ANCHOR: commit_code_piece - try { - List topics = Collections.singletonList("topic_meters"); - - consumer.subscribe(topics); - for (int i = 0; i < 50; i++) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process your data here - System.out.println("data: " + JSON.toJSONString(bean)); - } - if (!records.isEmpty()) { - // after processing the data, commit the offset manually - consumer.commitSync(); - System.out.println("Commit offset manually successfully."); - } - } - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to execute commit example, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } -// ANCHOR_END: commit_code_piece - } - - public static void unsubscribeExample(TaosConsumer consumer) throws SQLException { - List topics = Collections.singletonList("topic_meters"); - consumer.subscribe(topics); -// ANCHOR: unsubscribe_data_code_piece - try { - // unsubscribe the consumer - consumer.unsubscribe(); - System.out.println("Consumer unsubscribed successfully."); - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to unsubscribe consumer, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } - finally { - // close the consumer - consumer.close(); - System.out.println("Consumer closed successfully."); - } -// ANCHOR_END: unsubscribe_data_code_piece - } - - public static class ResultDeserializer extends ReferenceDeserializer { - - } - - // use this class to define the data structure of the result record - public static class ResultBean { - private Timestamp ts; - private double current; - private int voltage; - private double phase; - private int groupid; - private String location; - - public Timestamp getTs() { - return ts; - } - - public void setTs(Timestamp ts) { - this.ts = ts; - } - - public double getCurrent() { - return current; - } - - public void setCurrent(double current) { - this.current = current; - } - - public int getVoltage() { - return voltage; - } - - public void setVoltage(int voltage) { - this.voltage = voltage; - } - - public double getPhase() { - return phase; - } - - public void setPhase(double phase) { - this.phase = phase; - } - - public int getGroupid() { - return groupid; - } - - public void setGroupid(int groupid) { - this.groupid = groupid; - } - - public String getLocation() { - return location; - } - - public void setLocation(String location) { - this.location = location; - } - } - - public static void prepareData() throws SQLException, InterruptedException { - try { - int i = 0; - while (!stopThread) { - String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) "; - int affectedRows = statement.executeUpdate(insertQuery); - assert affectedRows == 1; - i++; - Thread.sleep(1); - } - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } - } - - public static void prepareMeta() throws SQLException { - try { - statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); - statement.executeUpdate("USE power"); - statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); - statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to create db and table, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - ex.printStackTrace(); - throw ex; - } - } - - public static void initConnection() throws SQLException { - String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; - Properties properties = new Properties(); - properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - - try { - connection = DriverManager.getConnection(url, properties); - } catch (SQLException ex) { - System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create connection", ex); - } - try { - statement = connection.createStatement(); - } catch (SQLException ex) { - System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create statement", ex); - } - System.out.println("Connection created successfully."); - } - - public static void closeConnection() throws SQLException { - try { - if (statement != null) { - statement.close(); - } - } catch (SQLException ex) { - System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to close statement", ex); - } - - try { - if (connection != null) { - connection.close(); - } - } catch (SQLException ex) { - System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to close connection", ex); - } - System.out.println("Connection closed Successfully."); - } - - - public static void main(String[] args) throws SQLException, InterruptedException { - initConnection(); - prepareMeta(); - - // create a single thread executor - ExecutorService executor = Executors.newSingleThreadExecutor(); - - // submit a task - executor.submit(() -> { - try { - prepareData(); - } catch (SQLException ex) { - System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage()); - return; - } catch (Exception ex) { - System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage()); - return; - } - System.out.println("pollDataExample executed successfully."); - }); - - try { - TaosConsumer consumer = getConsumer(); - - pollExample(consumer); - System.out.println("pollExample executed successfully."); - consumer.unsubscribe(); - - seekExample(consumer); - System.out.println("seekExample executed successfully."); - consumer.unsubscribe(); - - commitExample(consumer); - System.out.println("commitExample executed successfully."); - consumer.unsubscribe(); - - unsubscribeExample(consumer); - System.out.println("unsubscribeExample executed successfully"); - } catch (SQLException ex) { - System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - return; - } catch (Exception ex) { - System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage()); - return; - } - - stopThread = true; - // close the executor, which will make the executor reject new tasks - executor.shutdown(); - - try { - // wait for the executor to terminate - boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - assert result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Wait executor termination failed."); - } - - closeConnection(); - System.out.println("program end."); - } -} -// ANCHOR_END: consumer_demo