This commit is contained in:
sheyanjie-qq 2024-08-02 17:17:19 +08:00 committed by gccgdb1234
parent feb5b2c540
commit 6ab94e8318
2 changed files with 57 additions and 38 deletions

View File

@ -120,7 +120,7 @@ public class ConsumerLoopFull {
break; break;
} }
// poll data agagin // poll data again
records = consumer.poll(Duration.ofMillis(100)); records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) { for (ConsumerRecord<ResultBean> record : records) {
// process the data here // process the data here

View File

@ -38,8 +38,10 @@ public class WsConsumerLoopFull {
try { try {
return new TaosConsumer<>(config); return new TaosConsumer<>(config);
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); // exceptions info
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers")
+ "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex); throw new SQLException("Failed to create consumer", ex);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
@ -68,8 +70,10 @@ public class WsConsumerLoopFull {
consumer.unsubscribe(); consumer.unsubscribe();
System.out.println("unsubscribed topics successfully"); System.out.println("unsubscribed topics successfully");
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); // exceptions info
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
+ ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex); throw new SQLException("Failed to poll data from topic_meters", ex);
} }
} }
@ -93,8 +97,10 @@ public class WsConsumerLoopFull {
} }
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed
System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); // exceptions info
System.out
.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data", ex); throw new SQLException("Failed to poll data", ex);
} }
// ANCHOR_END: poll_data_code_piece // ANCHOR_END: poll_data_code_piece
@ -123,7 +129,7 @@ public class WsConsumerLoopFull {
break; break;
} }
// poll data agagin // poll data again
records = consumer.poll(Duration.ofMillis(100)); records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) { for (ConsumerRecord<ResultBean> record : records) {
// process the data here // process the data here
@ -131,14 +137,15 @@ public class WsConsumerLoopFull {
break; break;
} }
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); // exceptions info
System.out
.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex); throw new SQLException("seek example failed", ex);
} }
// ANCHOR_END: consumer_seek // ANCHOR_END: consumer_seek
} }
public static void commitExample() throws SQLException { public static void commitExample() throws SQLException {
// ANCHOR: commit_code_piece // ANCHOR: commit_code_piece
try (TaosConsumer<ResultBean> consumer = getConsumer()) { try (TaosConsumer<ResultBean> consumer = getConsumer()) {
@ -158,8 +165,10 @@ public class WsConsumerLoopFull {
} }
} }
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed
System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); // exceptions info
System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
+ ex.getMessage());
throw new SQLException("Failed to execute consumer functions", ex); throw new SQLException("Failed to execute consumer functions", ex);
} }
// ANCHOR_END: commit_code_piece // ANCHOR_END: commit_code_piece
@ -173,8 +182,10 @@ public class WsConsumerLoopFull {
try { try {
consumer.unsubscribe(); consumer.unsubscribe();
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); // exceptions info
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
+ ex.getMessage());
throw new SQLException("Failed to unsubscribe consumer", ex); throw new SQLException("Failed to unsubscribe consumer", ex);
} finally { } finally {
consumer.close(); consumer.close();
@ -256,7 +267,8 @@ public class WsConsumerLoopFull {
int affectedRows = statement.executeUpdate(insertQuery.toString()); int affectedRows = statement.executeUpdate(insertQuery.toString());
assert affectedRows == 10000; assert affectedRows == 10000;
} catch (SQLException ex) { } catch (SQLException ex) {
System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
+ ex.getMessage());
throw new SQLException("Failed to insert data to power.meters", ex); throw new SQLException("Failed to insert data to power.meters", ex);
} }
} }
@ -265,10 +277,13 @@ public class WsConsumerLoopFull {
try { try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power"); statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); statement.executeUpdate(
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); "CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate(
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (SQLException ex) { } catch (SQLException ex) {
System.out.println("Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println(
"Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create db and table", ex); throw new SQLException("Failed to create db and table", ex);
} }
} }
@ -282,13 +297,15 @@ public class WsConsumerLoopFull {
try { try {
connection = DriverManager.getConnection(url, properties); connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) { } catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode()
+ "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex); throw new SQLException("Failed to create connection", ex);
} }
try { try {
statement = connection.createStatement(); statement = connection.createStatement();
} catch (SQLException ex) { } catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println(
"Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex); throw new SQLException("Failed to create statement", ex);
} }
System.out.println("Connection created successfully."); System.out.println("Connection created successfully.");
@ -300,7 +317,8 @@ public class WsConsumerLoopFull {
statement.close(); statement.close();
} }
} catch (SQLException ex) { } catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println(
"Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex); throw new SQLException("Failed to close statement", ex);
} }
@ -309,13 +327,13 @@ public class WsConsumerLoopFull {
connection.close(); connection.close();
} }
} catch (SQLException ex) { } catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println(
"Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex); throw new SQLException("Failed to close connection", ex);
} }
System.out.println("Connection closed Successfully."); System.out.println("Connection closed Successfully.");
} }
public static void main(String[] args) throws SQLException { public static void main(String[] args) throws SQLException {
initConnection(); initConnection();
prepareMeta(); prepareMeta();
@ -333,7 +351,8 @@ public class WsConsumerLoopFull {
// commitExample(); // commitExample();
unsubscribeExample(); unsubscribeExample();
} catch (SQLException ex) { } catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode()
+ "; ErrMessage: " + ex.getMessage());
} }
System.out.println("pollDataExample executed successfully"); System.out.println("pollDataExample executed successfully");
}); });