mod java exception

This commit is contained in:
sheyanjie-qq 2024-08-02 18:39:19 +08:00 committed by gccgdb1234
parent 6ab94e8318
commit 255d0fd02a
15 changed files with 216 additions and 166 deletions

View File

@ -27,6 +27,10 @@ public static void main(String[] args) throws SQLException {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to connect to " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to connect to " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
}
// ANCHOR_END: main

View File

@ -16,6 +16,10 @@ public static void main(String[] args) throws SQLException {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to connect to " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to connect to " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
}
// ANCHOR_END: main

View File

@ -28,6 +28,10 @@ public static void main(String[] args) throws SQLException {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to connect to " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to connect to " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
}
// ANCHOR_END: main

View File

@ -16,6 +16,8 @@ public class ConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopFlag = false;
public static TaosConsumer<ResultBean> getConsumer() throws SQLException {
// ANCHOR: create_consumer
Properties config = new Properties();
@ -38,15 +40,16 @@ public class ConsumerLoopFull {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
} catch (Exception e) {
e.printStackTrace();
throw new SQLException("Failed to create consumer", e);
} catch (Exception ex) {
System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers")
+ "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
}
// ANCHOR_END: create_consumer
}
public static void pollDataExample() throws SQLException {
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
public static void pollDataExample(TaosConsumer<ResultBean> consumer) throws SQLException {
try{
// subscribe to the topics
List<String> topics = Collections.singletonList("topic_meters");
@ -68,12 +71,15 @@ public class ConsumerLoopFull {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex);
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex);
}
}
public static void pollExample() throws SQLException {
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: poll_data_code_piece
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
@ -93,54 +99,48 @@ public class ConsumerLoopFull {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data", ex);
} catch (Exception ex) {
System.out.println("Failed to poll data; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data", ex);
}
// ANCHOR_END: poll_data_code_piece
}
public static void seekExample() throws SQLException {
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: consumer_seek
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("subscribe topics successfully");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("now assignment: " + JSON.toJSONString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
for (ConsumerRecord<ResultBean> record : records) {
System.out.println("first data polled: " + JSON.toJSONString(record.value()));
Set<TopicPartition> assignment = consumer.assignment();
// seek to the beginning of the all partitions
consumer.seekToBeginning(assignment);
System.out.println("assignment seek to beginning successfully");
break;
}
// poll data again
records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
// process the data here
System.out.println("second data polled: " + JSON.toJSONString(record.value()));
break;
}
consumer.seekToBeginning(assignment);
System.out.println("assignment seek to beginning successfully");
System.out.println("beginning assignment: " + JSON.toJSONString(assignment));
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex);
} catch (Exception ex) {
System.out.println("seek example failed; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex);
}
// ANCHOR_END: consumer_seek
}
public static void commitExample() throws SQLException {
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: commit_code_piece
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
try {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
@ -160,12 +160,14 @@ public class ConsumerLoopFull {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to execute consumer functions", ex);
} catch (Exception ex) {
System.out.println("Failed to execute consumer functions. ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to execute consumer functions", ex);
}
// ANCHOR_END: commit_code_piece
}
public static void unsubscribeExample() throws SQLException {
TaosConsumer<ResultBean> consumer = getConsumer();
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
// ANCHOR: unsubscribe_data_code_piece
@ -175,7 +177,11 @@ public class ConsumerLoopFull {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to unsubscribe consumer", ex);
} finally {
} catch (Exception ex) {
System.out.println("Failed to unsubscribe consumer. ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to unsubscribe consumer", ex);
}
finally {
consumer.close();
}
// ANCHOR_END: unsubscribe_data_code_piece
@ -243,17 +249,16 @@ public class ConsumerLoopFull {
}
}
public static void prepareData() throws SQLException {
StringBuilder insertQuery = new StringBuilder();
insertQuery.append("INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES ");
for (int i = 0; i < 10000; i++) {
insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) ");
}
public static void prepareData() throws SQLException, InterruptedException {
try {
int affectedRows = statement.executeUpdate(insertQuery.toString());
assert affectedRows == 10000;
int i = 0;
while (!stopFlag) {
i++;
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
Thread.sleep(1);
}
} catch (SQLException ex) {
System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to insert data to power.meters", ex);
@ -315,7 +320,7 @@ public class ConsumerLoopFull {
}
public static void main(String[] args) throws SQLException {
public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();
@ -326,11 +331,14 @@ public class ConsumerLoopFull {
executor.submit(() -> {
try {
// please use one example at a time
pollDataExample();
// seekExample();
// pollExample();
// commitExample();
unsubscribeExample();
TaosConsumer<ResultBean> consumer = getConsumer();
pollDataExample(consumer);
seekExample(consumer);
pollExample(consumer);
commitExample(consumer);
unsubscribeExample(consumer);
stopFlag = true;
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
}

View File

@ -45,6 +45,10 @@ public class JdbcCreatDBDemo {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to create db and table, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to create db and table, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
// ANCHOR_END: create_db_and_table

View File

@ -43,7 +43,10 @@ public class JdbcInsertDataDemo {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert data to power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to insert data to power.meters, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
// ANCHOR_END: insert_data
}

View File

@ -44,6 +44,10 @@ public class JdbcQueryDemo {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to query data from power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to query data from power.meters, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
// ANCHOR_END: query_data
}

View File

@ -38,7 +38,10 @@ public class JdbcReqIdDemo {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to execute sql with reqId, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to execute sql with reqId, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
// ANCHOR_END: with_reqid
}

View File

@ -69,6 +69,10 @@ public class ParameterBindingBasicDemo {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
}

View File

@ -31,20 +31,27 @@ public class ParameterBindingFullDemo {
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
bindVarbinary(conn);
bindGeometry(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
bindVarbinary(conn);
bindGeometry(conn);
clean(conn);
conn.close();
clean(conn);
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
}
private static void init(Connection conn) throws SQLException {

View File

@ -28,6 +28,10 @@ public class SchemalessJniTest {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrMessage: " + ex.getMessage());
throw ex;
}
}

View File

@ -28,6 +28,10 @@ public class SchemalessWsTest {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrMessage: " + ex.getMessage());
throw ex;
}
}

View File

@ -50,6 +50,10 @@ public class WSParameterBindingBasicDemo {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
}

View File

@ -39,8 +39,11 @@ public class WSParameterBindingFullDemo {
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Error Code: " + ex.getErrorCode());
System.out.println("Message: " + ex.getMessage());
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
throw ex;
}
}

View File

@ -6,10 +6,7 @@ import com.taosdata.jdbc.tmq.*;
import java.sql.*;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -20,7 +17,7 @@ public class WsConsumerLoopFull {
static private Statement statement;
public static TaosConsumer<ResultBean> getConsumer() throws SQLException {
// ANCHOR: create_consumer
// ANCHOR: create_consumer
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
@ -29,7 +26,7 @@ public class WsConsumerLoopFull {
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "client1");
config.setProperty("client.id", "1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer");
@ -38,20 +35,19 @@ public class WsConsumerLoopFull {
try {
return new TaosConsumer<>(config);
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed
// exceptions info
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers")
+ "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
} catch (Exception ex) {
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers")
+ "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
} catch (Exception e) {
e.printStackTrace();
throw new SQLException("Failed to create consumer", e);
}
// ANCHOR_END: create_consumer
// ANCHOR_END: create_consumer
}
public static void pollDataExample() throws SQLException {
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
public static void pollDataExample(TaosConsumer<ResultBean> consumer) throws SQLException {
try{
// subscribe to the topics
List<String> topics = Collections.singletonList("topic_meters");
@ -70,17 +66,18 @@ public class WsConsumerLoopFull {
consumer.unsubscribe();
System.out.println("unsubscribed topics successfully");
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed
// exceptions info
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
+ ex.getMessage());
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex);
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex);
}
}
public static void pollExample() throws SQLException {
// ANCHOR: poll_data_code_piece
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: poll_data_code_piece
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
@ -97,58 +94,51 @@ public class WsConsumerLoopFull {
}
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed
// exceptions info
System.out
.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data", ex);
} catch (Exception ex) {
System.out.println("Failed to poll data; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data", ex);
}
// ANCHOR_END: poll_data_code_piece
// ANCHOR_END: poll_data_code_piece
}
public static void seekExample() throws SQLException {
// ANCHOR: consumer_seek
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: consumer_seek
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("subscribe topics successfully");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("now assignment: " + JSON.toJSONString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
for (ConsumerRecord<ResultBean> record : records) {
System.out.println("first data polled: " + JSON.toJSONString(record.value()));
Set<TopicPartition> assignment = consumer.assignment();
// seek to the beginning of the all partitions
consumer.seekToBeginning(assignment);
System.out.println("assignment seek to beginning successfully");
break;
}
// poll data again
records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
// process the data here
System.out.println("second data polled: " + JSON.toJSONString(record.value()));
break;
}
consumer.seekToBeginning(assignment);
System.out.println("assignment seek to beginning successfully");
System.out.println("beginning assignment: " + JSON.toJSONString(assignment));
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed
// exceptions info
System.out
.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex);
} catch (Exception ex) {
System.out.println("seek example failed; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex);
}
// ANCHOR_END: consumer_seek
// ANCHOR_END: consumer_seek
}
public static void commitExample() throws SQLException {
// ANCHOR: commit_code_piece
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: commit_code_piece
try {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
@ -165,32 +155,34 @@ public class WsConsumerLoopFull {
}
}
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed
// exceptions info
System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
+ ex.getMessage());
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to execute consumer functions", ex);
} catch (Exception ex) {
System.out.println("Failed to execute consumer functions. ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to execute consumer functions", ex);
}
// ANCHOR_END: commit_code_piece
// ANCHOR_END: commit_code_piece
}
public static void unsubscribeExample() throws SQLException {
TaosConsumer<ResultBean> consumer = getConsumer();
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
// ANCHOR: unsubscribe_data_code_piece
// ANCHOR: unsubscribe_data_code_piece
try {
consumer.unsubscribe();
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed
// exceptions info
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
+ ex.getMessage());
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to unsubscribe consumer", ex);
} finally {
} catch (Exception ex) {
System.out.println("Failed to unsubscribe consumer. ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to unsubscribe consumer", ex);
}
finally {
consumer.close();
}
// ANCHOR_END: unsubscribe_data_code_piece
// ANCHOR_END: unsubscribe_data_code_piece
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
@ -255,20 +247,16 @@ public class WsConsumerLoopFull {
}
}
public static void prepareData() throws SQLException {
StringBuilder insertQuery = new StringBuilder();
insertQuery.append("INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES ");
for (int i = 0; i < 10000; i++) {
insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) ");
}
public static void prepareData() throws SQLException, InterruptedException {
try {
int affectedRows = statement.executeUpdate(insertQuery.toString());
assert affectedRows == 10000;
for (int i = 0; i < 3000; i++) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
Thread.sleep(1);
}
} catch (SQLException ex) {
System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
+ ex.getMessage());
System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to insert data to power.meters", ex);
}
}
@ -277,13 +265,10 @@ public class WsConsumerLoopFull {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate(
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate(
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (SQLException ex) {
System.out.println(
"Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create db and table", ex);
}
}
@ -297,15 +282,13 @@ public class WsConsumerLoopFull {
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode()
+ "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println(
"Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
@ -317,8 +300,7 @@ public class WsConsumerLoopFull {
statement.close();
}
} catch (SQLException ex) {
System.out.println(
"Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
@ -327,14 +309,14 @@ public class WsConsumerLoopFull {
connection.close();
}
} catch (SQLException ex) {
System.out.println(
"Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException {
public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();
@ -345,14 +327,22 @@ public class WsConsumerLoopFull {
executor.submit(() -> {
try {
// please use one example at a time
pollDataExample();
// seekExample();
// pollExample();
// commitExample();
unsubscribeExample();
TaosConsumer<ResultBean> consumer = getConsumer();
pollDataExample(consumer);
seekExample(consumer);
consumer.unsubscribe();
pollExample(consumer);
consumer.unsubscribe();
commitExample(consumer);
consumer.unsubscribe();
unsubscribeExample(consumer);
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode()
+ "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully");
});