mod java exception
This commit is contained in:
parent
e0c2a3865b
commit
b45c4aec1a
|
@ -27,6 +27,10 @@ public static void main(String[] args) throws SQLException {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to connect to " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to connect to " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: main
|
||||
|
|
|
@ -16,6 +16,10 @@ public static void main(String[] args) throws SQLException {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to connect to " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to connect to " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: main
|
||||
|
|
|
@ -28,6 +28,10 @@ public static void main(String[] args) throws SQLException {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to connect to " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to connect to " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: main
|
||||
|
|
|
@ -16,6 +16,8 @@ public class ConsumerLoopFull {
|
|||
static private Connection connection;
|
||||
static private Statement statement;
|
||||
|
||||
static private volatile boolean stopFlag = false;
|
||||
|
||||
public static TaosConsumer<ResultBean> getConsumer() throws SQLException {
|
||||
// ANCHOR: create_consumer
|
||||
Properties config = new Properties();
|
||||
|
@ -38,15 +40,16 @@ public class ConsumerLoopFull {
|
|||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to create consumer", ex);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
throw new SQLException("Failed to create consumer", e);
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers")
|
||||
+ "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to create consumer", ex);
|
||||
}
|
||||
// ANCHOR_END: create_consumer
|
||||
}
|
||||
|
||||
public static void pollDataExample() throws SQLException {
|
||||
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
|
||||
public static void pollDataExample(TaosConsumer<ResultBean> consumer) throws SQLException {
|
||||
try{
|
||||
// subscribe to the topics
|
||||
List<String> topics = Collections.singletonList("topic_meters");
|
||||
|
||||
|
@ -68,12 +71,15 @@ public class ConsumerLoopFull {
|
|||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to poll data from topic_meters", ex);
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to poll data from topic_meters; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to poll data from topic_meters", ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static void pollExample() throws SQLException {
|
||||
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
|
||||
// ANCHOR: poll_data_code_piece
|
||||
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
|
||||
try {
|
||||
List<String> topics = Collections.singletonList("topic_meters");
|
||||
|
||||
// subscribe to the topics
|
||||
|
@ -93,54 +99,48 @@ public class ConsumerLoopFull {
|
|||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to poll data", ex);
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to poll data; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to poll data", ex);
|
||||
}
|
||||
// ANCHOR_END: poll_data_code_piece
|
||||
}
|
||||
|
||||
public static void seekExample() throws SQLException {
|
||||
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException {
|
||||
// ANCHOR: consumer_seek
|
||||
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
|
||||
try {
|
||||
List<String> topics = Collections.singletonList("topic_meters");
|
||||
|
||||
// subscribe to the topics
|
||||
consumer.subscribe(topics);
|
||||
System.out.println("subscribe topics successfully");
|
||||
Set<TopicPartition> assignment = consumer.assignment();
|
||||
System.out.println("now assignment: " + JSON.toJSONString(assignment));
|
||||
|
||||
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
|
||||
// make sure we have got some data
|
||||
while (records.isEmpty()) {
|
||||
records = consumer.poll(Duration.ofMillis(100));
|
||||
}
|
||||
|
||||
for (ConsumerRecord<ResultBean> record : records) {
|
||||
System.out.println("first data polled: " + JSON.toJSONString(record.value()));
|
||||
Set<TopicPartition> assignment = consumer.assignment();
|
||||
// seek to the beginning of the all partitions
|
||||
consumer.seekToBeginning(assignment);
|
||||
System.out.println("assignment seek to beginning successfully");
|
||||
break;
|
||||
}
|
||||
|
||||
// poll data again
|
||||
records = consumer.poll(Duration.ofMillis(100));
|
||||
for (ConsumerRecord<ResultBean> record : records) {
|
||||
// process the data here
|
||||
System.out.println("second data polled: " + JSON.toJSONString(record.value()));
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
consumer.seekToBeginning(assignment);
|
||||
System.out.println("assignment seek to beginning successfully");
|
||||
System.out.println("beginning assignment: " + JSON.toJSONString(assignment));
|
||||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("seek example failed", ex);
|
||||
} catch (Exception ex) {
|
||||
System.out.println("seek example failed; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("seek example failed", ex);
|
||||
}
|
||||
// ANCHOR_END: consumer_seek
|
||||
}
|
||||
|
||||
|
||||
public static void commitExample() throws SQLException {
|
||||
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException {
|
||||
// ANCHOR: commit_code_piece
|
||||
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
|
||||
try {
|
||||
List<String> topics = Collections.singletonList("topic_meters");
|
||||
|
||||
consumer.subscribe(topics);
|
||||
|
@ -160,12 +160,14 @@ public class ConsumerLoopFull {
|
|||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to execute consumer functions", ex);
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to execute consumer functions. ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to execute consumer functions", ex);
|
||||
}
|
||||
// ANCHOR_END: commit_code_piece
|
||||
}
|
||||
|
||||
public static void unsubscribeExample() throws SQLException {
|
||||
TaosConsumer<ResultBean> consumer = getConsumer();
|
||||
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
|
||||
List<String> topics = Collections.singletonList("topic_meters");
|
||||
consumer.subscribe(topics);
|
||||
// ANCHOR: unsubscribe_data_code_piece
|
||||
|
@ -175,7 +177,11 @@ public class ConsumerLoopFull {
|
|||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to unsubscribe consumer", ex);
|
||||
} finally {
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to unsubscribe consumer. ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to unsubscribe consumer", ex);
|
||||
}
|
||||
finally {
|
||||
consumer.close();
|
||||
}
|
||||
// ANCHOR_END: unsubscribe_data_code_piece
|
||||
|
@ -243,17 +249,16 @@ public class ConsumerLoopFull {
|
|||
}
|
||||
}
|
||||
|
||||
public static void prepareData() throws SQLException {
|
||||
StringBuilder insertQuery = new StringBuilder();
|
||||
insertQuery.append("INSERT INTO " +
|
||||
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
|
||||
"VALUES ");
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) ");
|
||||
}
|
||||
public static void prepareData() throws SQLException, InterruptedException {
|
||||
try {
|
||||
int affectedRows = statement.executeUpdate(insertQuery.toString());
|
||||
assert affectedRows == 10000;
|
||||
int i = 0;
|
||||
while (!stopFlag) {
|
||||
i++;
|
||||
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
|
||||
int affectedRows = statement.executeUpdate(insertQuery);
|
||||
assert affectedRows == 1;
|
||||
Thread.sleep(1);
|
||||
}
|
||||
} catch (SQLException ex) {
|
||||
System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to insert data to power.meters", ex);
|
||||
|
@ -315,7 +320,7 @@ public class ConsumerLoopFull {
|
|||
}
|
||||
|
||||
|
||||
public static void main(String[] args) throws SQLException {
|
||||
public static void main(String[] args) throws SQLException, InterruptedException {
|
||||
initConnection();
|
||||
prepareMeta();
|
||||
|
||||
|
@ -326,11 +331,14 @@ public class ConsumerLoopFull {
|
|||
executor.submit(() -> {
|
||||
try {
|
||||
// please use one example at a time
|
||||
pollDataExample();
|
||||
// seekExample();
|
||||
// pollExample();
|
||||
// commitExample();
|
||||
unsubscribeExample();
|
||||
TaosConsumer<ResultBean> consumer = getConsumer();
|
||||
|
||||
pollDataExample(consumer);
|
||||
seekExample(consumer);
|
||||
pollExample(consumer);
|
||||
commitExample(consumer);
|
||||
unsubscribeExample(consumer);
|
||||
stopFlag = true;
|
||||
} catch (SQLException ex) {
|
||||
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
}
|
||||
|
|
|
@ -45,6 +45,10 @@ public class JdbcCreatDBDemo {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to create db and table, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to create db and table, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
// ANCHOR_END: create_db_and_table
|
||||
|
||||
|
|
|
@ -43,7 +43,10 @@ public class JdbcInsertDataDemo {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to insert data to power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to insert data to power.meters, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
// ANCHOR_END: insert_data
|
||||
}
|
||||
|
|
|
@ -44,6 +44,10 @@ public class JdbcQueryDemo {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to query data from power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to query data from power.meters, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
// ANCHOR_END: query_data
|
||||
}
|
||||
|
|
|
@ -38,7 +38,10 @@ public class JdbcReqIdDemo {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to execute sql with reqId, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to execute sql with reqId, url:" + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
// ANCHOR_END: with_reqid
|
||||
}
|
||||
|
|
|
@ -69,6 +69,10 @@ public class ParameterBindingBasicDemo {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,20 +31,27 @@ public class ParameterBindingFullDemo {
|
|||
public static void main(String[] args) throws SQLException {
|
||||
|
||||
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
|
||||
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
|
||||
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
|
||||
|
||||
init(conn);
|
||||
init(conn);
|
||||
|
||||
bindInteger(conn);
|
||||
bindFloat(conn);
|
||||
bindBoolean(conn);
|
||||
bindBytes(conn);
|
||||
bindString(conn);
|
||||
bindVarbinary(conn);
|
||||
bindGeometry(conn);
|
||||
bindInteger(conn);
|
||||
bindFloat(conn);
|
||||
bindBoolean(conn);
|
||||
bindBytes(conn);
|
||||
bindString(conn);
|
||||
bindVarbinary(conn);
|
||||
bindGeometry(conn);
|
||||
|
||||
clean(conn);
|
||||
conn.close();
|
||||
clean(conn);
|
||||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
private static void init(Connection conn) throws SQLException {
|
||||
|
|
|
@ -28,6 +28,10 @@ public class SchemalessJniTest {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,10 @@ public class SchemalessWsTest {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to insert data with schemaless, host:" + host + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,6 +50,10 @@ public class WSParameterBindingBasicDemo {
|
|||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,8 +39,11 @@ public class WSParameterBindingFullDemo {
|
|||
|
||||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Error Code: " + ex.getErrorCode());
|
||||
System.out.println("Message: " + ex.getMessage());
|
||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
} catch (Exception ex){
|
||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,10 +6,7 @@ import com.taosdata.jdbc.tmq.*;
|
|||
|
||||
import java.sql.*;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -20,7 +17,7 @@ public class WsConsumerLoopFull {
|
|||
static private Statement statement;
|
||||
|
||||
public static TaosConsumer<ResultBean> getConsumer() throws SQLException {
|
||||
// ANCHOR: create_consumer
|
||||
// ANCHOR: create_consumer
|
||||
Properties config = new Properties();
|
||||
config.setProperty("td.connect.type", "ws");
|
||||
config.setProperty("bootstrap.servers", "localhost:6041");
|
||||
|
@ -29,7 +26,7 @@ public class WsConsumerLoopFull {
|
|||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("auto.commit.interval.ms", "1000");
|
||||
config.setProperty("group.id", "group1");
|
||||
config.setProperty("client.id", "client1");
|
||||
config.setProperty("client.id", "1");
|
||||
config.setProperty("td.connect.user", "root");
|
||||
config.setProperty("td.connect.pass", "taosdata");
|
||||
config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer");
|
||||
|
@ -38,20 +35,19 @@ public class WsConsumerLoopFull {
|
|||
try {
|
||||
return new TaosConsumer<>(config);
|
||||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed
|
||||
// exceptions info
|
||||
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers")
|
||||
+ "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to create consumer", ex);
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers")
|
||||
+ "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to create consumer", ex);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
throw new SQLException("Failed to create consumer", e);
|
||||
}
|
||||
// ANCHOR_END: create_consumer
|
||||
// ANCHOR_END: create_consumer
|
||||
}
|
||||
|
||||
public static void pollDataExample() throws SQLException {
|
||||
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
|
||||
public static void pollDataExample(TaosConsumer<ResultBean> consumer) throws SQLException {
|
||||
try{
|
||||
// subscribe to the topics
|
||||
List<String> topics = Collections.singletonList("topic_meters");
|
||||
|
||||
|
@ -70,17 +66,18 @@ public class WsConsumerLoopFull {
|
|||
consumer.unsubscribe();
|
||||
System.out.println("unsubscribed topics successfully");
|
||||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed
|
||||
// exceptions info
|
||||
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
|
||||
+ ex.getMessage());
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to poll data from topic_meters", ex);
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to poll data from topic_meters; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to poll data from topic_meters", ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static void pollExample() throws SQLException {
|
||||
// ANCHOR: poll_data_code_piece
|
||||
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
|
||||
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
|
||||
// ANCHOR: poll_data_code_piece
|
||||
try {
|
||||
List<String> topics = Collections.singletonList("topic_meters");
|
||||
|
||||
// subscribe to the topics
|
||||
|
@ -97,58 +94,51 @@ public class WsConsumerLoopFull {
|
|||
}
|
||||
|
||||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed
|
||||
// exceptions info
|
||||
System.out
|
||||
.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to poll data", ex);
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to poll data; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to poll data", ex);
|
||||
}
|
||||
// ANCHOR_END: poll_data_code_piece
|
||||
// ANCHOR_END: poll_data_code_piece
|
||||
}
|
||||
|
||||
public static void seekExample() throws SQLException {
|
||||
// ANCHOR: consumer_seek
|
||||
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
|
||||
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException {
|
||||
// ANCHOR: consumer_seek
|
||||
try {
|
||||
List<String> topics = Collections.singletonList("topic_meters");
|
||||
|
||||
// subscribe to the topics
|
||||
consumer.subscribe(topics);
|
||||
System.out.println("subscribe topics successfully");
|
||||
Set<TopicPartition> assignment = consumer.assignment();
|
||||
System.out.println("now assignment: " + JSON.toJSONString(assignment));
|
||||
|
||||
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
|
||||
// make sure we have got some data
|
||||
while (records.isEmpty()) {
|
||||
records = consumer.poll(Duration.ofMillis(100));
|
||||
}
|
||||
|
||||
for (ConsumerRecord<ResultBean> record : records) {
|
||||
System.out.println("first data polled: " + JSON.toJSONString(record.value()));
|
||||
Set<TopicPartition> assignment = consumer.assignment();
|
||||
// seek to the beginning of the all partitions
|
||||
consumer.seekToBeginning(assignment);
|
||||
System.out.println("assignment seek to beginning successfully");
|
||||
break;
|
||||
}
|
||||
|
||||
// poll data again
|
||||
records = consumer.poll(Duration.ofMillis(100));
|
||||
for (ConsumerRecord<ResultBean> record : records) {
|
||||
// process the data here
|
||||
System.out.println("second data polled: " + JSON.toJSONString(record.value()));
|
||||
break;
|
||||
}
|
||||
consumer.seekToBeginning(assignment);
|
||||
System.out.println("assignment seek to beginning successfully");
|
||||
System.out.println("beginning assignment: " + JSON.toJSONString(assignment));
|
||||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed
|
||||
// exceptions info
|
||||
System.out
|
||||
.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("seek example failed", ex);
|
||||
} catch (Exception ex) {
|
||||
System.out.println("seek example failed; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("seek example failed", ex);
|
||||
}
|
||||
// ANCHOR_END: consumer_seek
|
||||
// ANCHOR_END: consumer_seek
|
||||
}
|
||||
|
||||
public static void commitExample() throws SQLException {
|
||||
// ANCHOR: commit_code_piece
|
||||
try (TaosConsumer<ResultBean> consumer = getConsumer()) {
|
||||
|
||||
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException {
|
||||
// ANCHOR: commit_code_piece
|
||||
try {
|
||||
List<String> topics = Collections.singletonList("topic_meters");
|
||||
|
||||
consumer.subscribe(topics);
|
||||
|
@ -165,32 +155,34 @@ public class WsConsumerLoopFull {
|
|||
}
|
||||
}
|
||||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed
|
||||
// exceptions info
|
||||
System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
|
||||
+ ex.getMessage());
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to execute consumer functions", ex);
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to execute consumer functions. ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to execute consumer functions", ex);
|
||||
}
|
||||
// ANCHOR_END: commit_code_piece
|
||||
// ANCHOR_END: commit_code_piece
|
||||
}
|
||||
|
||||
public static void unsubscribeExample() throws SQLException {
|
||||
TaosConsumer<ResultBean> consumer = getConsumer();
|
||||
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
|
||||
List<String> topics = Collections.singletonList("topic_meters");
|
||||
consumer.subscribe(topics);
|
||||
// ANCHOR: unsubscribe_data_code_piece
|
||||
// ANCHOR: unsubscribe_data_code_piece
|
||||
try {
|
||||
consumer.unsubscribe();
|
||||
} catch (SQLException ex) {
|
||||
// handle any errors, please refer to the JDBC specifications for detailed
|
||||
// exceptions info
|
||||
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
|
||||
+ ex.getMessage());
|
||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to unsubscribe consumer", ex);
|
||||
} finally {
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to unsubscribe consumer. ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to unsubscribe consumer", ex);
|
||||
}
|
||||
finally {
|
||||
consumer.close();
|
||||
}
|
||||
// ANCHOR_END: unsubscribe_data_code_piece
|
||||
// ANCHOR_END: unsubscribe_data_code_piece
|
||||
}
|
||||
|
||||
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
|
||||
|
@ -255,20 +247,16 @@ public class WsConsumerLoopFull {
|
|||
}
|
||||
}
|
||||
|
||||
public static void prepareData() throws SQLException {
|
||||
StringBuilder insertQuery = new StringBuilder();
|
||||
insertQuery.append("INSERT INTO " +
|
||||
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
|
||||
"VALUES ");
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) ");
|
||||
}
|
||||
public static void prepareData() throws SQLException, InterruptedException {
|
||||
try {
|
||||
int affectedRows = statement.executeUpdate(insertQuery.toString());
|
||||
assert affectedRows == 10000;
|
||||
for (int i = 0; i < 3000; i++) {
|
||||
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
|
||||
int affectedRows = statement.executeUpdate(insertQuery);
|
||||
assert affectedRows == 1;
|
||||
Thread.sleep(1);
|
||||
}
|
||||
} catch (SQLException ex) {
|
||||
System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: "
|
||||
+ ex.getMessage());
|
||||
System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to insert data to power.meters", ex);
|
||||
}
|
||||
}
|
||||
|
@ -277,13 +265,10 @@ public class WsConsumerLoopFull {
|
|||
try {
|
||||
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
|
||||
statement.executeUpdate("USE power");
|
||||
statement.executeUpdate(
|
||||
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
|
||||
statement.executeUpdate(
|
||||
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
|
||||
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
|
||||
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
|
||||
} catch (SQLException ex) {
|
||||
System.out.println(
|
||||
"Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
System.out.println("Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to create db and table", ex);
|
||||
}
|
||||
}
|
||||
|
@ -297,15 +282,13 @@ public class WsConsumerLoopFull {
|
|||
try {
|
||||
connection = DriverManager.getConnection(url, properties);
|
||||
} catch (SQLException ex) {
|
||||
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode()
|
||||
+ "; ErrMessage: " + ex.getMessage());
|
||||
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to create connection", ex);
|
||||
}
|
||||
try {
|
||||
statement = connection.createStatement();
|
||||
} catch (SQLException ex) {
|
||||
System.out.println(
|
||||
"Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to create statement", ex);
|
||||
}
|
||||
System.out.println("Connection created successfully.");
|
||||
|
@ -317,8 +300,7 @@ public class WsConsumerLoopFull {
|
|||
statement.close();
|
||||
}
|
||||
} catch (SQLException ex) {
|
||||
System.out.println(
|
||||
"Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to close statement", ex);
|
||||
}
|
||||
|
||||
|
@ -327,14 +309,14 @@ public class WsConsumerLoopFull {
|
|||
connection.close();
|
||||
}
|
||||
} catch (SQLException ex) {
|
||||
System.out.println(
|
||||
"Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
throw new SQLException("Failed to close connection", ex);
|
||||
}
|
||||
System.out.println("Connection closed Successfully.");
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws SQLException {
|
||||
|
||||
public static void main(String[] args) throws SQLException, InterruptedException {
|
||||
initConnection();
|
||||
prepareMeta();
|
||||
|
||||
|
@ -345,14 +327,22 @@ public class WsConsumerLoopFull {
|
|||
executor.submit(() -> {
|
||||
try {
|
||||
// please use one example at a time
|
||||
pollDataExample();
|
||||
// seekExample();
|
||||
// pollExample();
|
||||
// commitExample();
|
||||
unsubscribeExample();
|
||||
TaosConsumer<ResultBean> consumer = getConsumer();
|
||||
|
||||
pollDataExample(consumer);
|
||||
seekExample(consumer);
|
||||
consumer.unsubscribe();
|
||||
pollExample(consumer);
|
||||
consumer.unsubscribe();
|
||||
commitExample(consumer);
|
||||
consumer.unsubscribe();
|
||||
unsubscribeExample(consumer);
|
||||
} catch (SQLException ex) {
|
||||
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode()
|
||||
+ "; ErrMessage: " + ex.getMessage());
|
||||
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||
return;
|
||||
} catch (Exception ex) {
|
||||
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
|
||||
return;
|
||||
}
|
||||
System.out.println("pollDataExample executed successfully");
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue