update java sample code

This commit is contained in:
sheyanjie-qq 2024-08-16 11:28:03 +08:00
parent 85e6492ac4
commit 3d9380b51a
13 changed files with 67 additions and 1291 deletions

View File

@ -19,8 +19,10 @@ public class ConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
static private String groupId = "group1";
static private String clientId = "clinet1";
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
// ANCHOR: create_consumer
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
@ -30,7 +32,7 @@ public class ConsumerLoopFull {
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.ConsumerLoopFull$ResultDeserializer");
@ -45,8 +47,10 @@ public class ConsumerLoopFull {
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create native consumer, host: %s, %sErrMessage: %s%n",
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
@ -58,9 +62,8 @@ public class ConsumerLoopFull {
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: poll_data_code_piece
List<String> topics = Collections.singletonList("topic_meters");
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
@ -69,13 +72,16 @@ public class ConsumerLoopFull {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
// Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, %sErrMessage: %s%n",
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
@ -87,9 +93,8 @@ public class ConsumerLoopFull {
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: consumer_seek
List<String> topics = Collections.singletonList("topic_meters");
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
@ -106,7 +111,10 @@ public class ConsumerLoopFull {
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute seek example, %sErrMessage: %s%n",
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
@ -119,15 +127,14 @@ public class ConsumerLoopFull {
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: commit_code_piece
List<String> topics = Collections.singletonList("topic_meters");
try {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process your data here
// Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean));
}
if (!records.isEmpty()) {
@ -138,7 +145,10 @@ public class ConsumerLoopFull {
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute commit example, %sErrMessage: %s%n",
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
@ -158,7 +168,10 @@ public class ConsumerLoopFull {
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, %sErrMessage: %s%n",
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.

View File

@ -25,25 +25,26 @@ public class JdbcInsertDataDemo {
properties.setProperty("timezone", "UTC-8");
System.out.println("get connection starting...");
// ANCHOR: insert_data
// insert data, please make sure the database and table are created before
String insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
Statement stmt = connection.createStatement()) {
// insert data, please make sure the database and table are created before
String insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
int affectedRows = stmt.executeUpdate(insertQuery);
// you can check affectedRows here
System.out.println("Successfully inserted " + affectedRows + " rows to power.meters.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
System.out.printf("Failed to insert data to power.meters, sql: %s, %sErrMessage: %s%n",
insertQuery,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.

View File

@ -19,6 +19,8 @@ public class WsConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
static private String groupId = "group1";
static private String clientId = "clinet1";
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
// ANCHOR: create_consumer
@ -30,7 +32,7 @@ public class WsConsumerLoopFull {
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.WsConsumerLoopFull$ResultDeserializer");
@ -45,8 +47,10 @@ public class WsConsumerLoopFull {
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, %sErrMessage: %s%n",
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
@ -58,9 +62,8 @@ public class WsConsumerLoopFull {
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: poll_data_code_piece
List<String> topics = Collections.singletonList("topic_meters");
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
@ -69,13 +72,16 @@ public class WsConsumerLoopFull {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
// Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, %sErrMessage: %s%n",
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
@ -87,9 +93,8 @@ public class WsConsumerLoopFull {
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: consumer_seek
List<String> topics = Collections.singletonList("topic_meters");
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
@ -106,7 +111,10 @@ public class WsConsumerLoopFull {
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute seek example, %sErrMessage: %s%n",
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
@ -119,15 +127,14 @@ public class WsConsumerLoopFull {
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: commit_code_piece
List<String> topics = Collections.singletonList("topic_meters");
try {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process your data here
// Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean));
}
if (!records.isEmpty()) {
@ -138,7 +145,10 @@ public class WsConsumerLoopFull {
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute commit example, %sErrMessage: %s%n",
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
@ -158,7 +168,10 @@ public class WsConsumerLoopFull {
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, %sErrMessage: %s%n",
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.

View File

@ -1,384 +0,0 @@
package com.taosdata.example;
import com.alibaba.fastjson.JSON;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.tmq.*;
import java.sql.*;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
// ANCHOR: consumer_demo
public class ConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
// ANCHOR: create_consumer
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taosdata.example.ConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create native consumer, host: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: create_consumer
}
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: poll_data_code_piece
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: poll_data_code_piece
}
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: consumer_seek
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JSON.toJSONString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute seek example, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: consumer_seek
}
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: commit_code_piece
try {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process your data here
System.out.println("data: " + JSON.toJSONString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute commit example, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: commit_code_piece
}
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
// ANCHOR: unsubscribe_data_code_piece
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
// ANCHOR_END: unsubscribe_data_code_piece
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create db and table, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();
// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit a task
executor.submit(() -> {
try {
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully.");
});
try {
TaosConsumer<ResultBean> consumer = getConsumer();
pollExample(consumer);
System.out.println("pollExample executed successfully.");
consumer.unsubscribe();
seekExample(consumer);
System.out.println("seekExample executed successfully.");
consumer.unsubscribe();
commitExample(consumer);
System.out.println("commitExample executed successfully.");
consumer.unsubscribe();
unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();
try {
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}
closeConnection();
System.out.println("program end.");
}
}
// ANCHOR_END: consumer_demo

View File

@ -1,55 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.AbstractStatement;
import java.sql.*;
import java.util.Properties;
public class JdbcCreatDBDemo {
private static final String host = "localhost";
private static final String dbName = "test";
private static final String tbName = "weather";
private static final String user = "root";
private static final String password = "taosdata";
public static void main(String[] args) throws SQLException {
final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password;
// get connection
Properties properties = new Properties();
properties.setProperty("charset", "UTF-8");
properties.setProperty("locale", "en_US.UTF-8");
properties.setProperty("timezone", "UTC-8");
System.out.println("get connection starting...");
// ANCHOR: create_db_and_table
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
Statement stmt = connection.createStatement()) {
// create database
int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
// you can check rowsAffected here
System.out.println("Create database power successfully, rowsAffected: " + rowsAffected);
// create table
rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
// you can check rowsAffected here
System.out.println("Create stable power.meters successfully, rowsAffected: " + rowsAffected);
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create database power or stable meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: create_db_and_table
}
private static void printResult(ResultSet resultSet) throws SQLException {
Util.printResult(resultSet);
}
}

View File

@ -1,54 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.AbstractStatement;
import java.sql.*;
import java.util.Properties;
public class JdbcInsertDataDemo {
private static final String host = "localhost";
private static final String dbName = "test";
private static final String tbName = "weather";
private static final String user = "root";
private static final String password = "taosdata";
public static void main(String[] args) throws SQLException {
final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password;
// get connection
Properties properties = new Properties();
properties.setProperty("charset", "UTF-8");
properties.setProperty("locale", "en_US.UTF-8");
properties.setProperty("timezone", "UTC-8");
System.out.println("get connection starting...");
// ANCHOR: insert_data
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
Statement stmt = connection.createStatement()) {
// insert data, please make sure the database and table are created before
String insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
int affectedRows = stmt.executeUpdate(insertQuery);
// you can check affectedRows here
System.out.println("Successfully inserted " + affectedRows + " rows to power.meters.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: insert_data
}
}

View File

@ -1,57 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.AbstractStatement;
import java.sql.*;
import java.util.Properties;
public class JdbcQueryDemo {
private static final String host = "localhost";
private static final String dbName = "test";
private static final String tbName = "weather";
private static final String user = "root";
private static final String password = "taosdata";
public static void main(String[] args) throws SQLException {
final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password;
// get connection
Properties properties = new Properties();
properties.setProperty("charset", "UTF-8");
properties.setProperty("locale", "en_US.UTF-8");
properties.setProperty("timezone", "UTC-8");
System.out.println("get connection starting...");
// ANCHOR: query_data
String sql = "SELECT ts, current, location FROM power.meters limit 100";
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
Statement stmt = connection.createStatement();
// query data, make sure the database and table are created before
ResultSet resultSet = stmt.executeQuery(sql)) {
Timestamp ts;
float current;
String location;
while (resultSet.next()) {
ts = resultSet.getTimestamp(1);
current = resultSet.getFloat(2);
// we recommend using the column name to get the value
location = resultSet.getString("location");
// you can check data here
System.out.printf("ts: %s, current: %f, location: %s %n", ts, current, location);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to query data from power.meters, sql: %s, %sErrMessage: %s%n",
sql,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: query_data
}
}

View File

@ -1,64 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.AbstractStatement;
import java.sql.*;
import java.util.Properties;
public class JdbcReqIdDemo {
private static final String host = "localhost";
private static final String dbName = "test";
private static final String tbName = "weather";
private static final String user = "root";
private static final String password = "taosdata";
public static void main(String[] args) throws SQLException {
final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password;
// get connection
Properties properties = new Properties();
properties.setProperty("charset", "UTF-8");
properties.setProperty("locale", "en_US.UTF-8");
properties.setProperty("timezone", "UTC-8");
System.out.println("get connection starting...");
// ANCHOR: with_reqid
long reqId = 3L;
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
// Create a statement that allows specifying a request ID
AbstractStatement aStmt = (AbstractStatement) connection.createStatement()) {
try (ResultSet resultSet = aStmt.executeQuery("SELECT ts, current, location FROM power.meters limit 1", reqId)) {
Timestamp ts;
float current;
String location;
while (resultSet.next()) {
ts = resultSet.getTimestamp(1);
current = resultSet.getFloat(2);
// we recommend using the column name to get the value
location = resultSet.getString("location");
// you can check data here
System.out.printf("ts: %s, current: %f, location: %s %n", ts, current, location);
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute sql with reqId: %s, %sErrMessage: %s%n", reqId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: with_reqid
}
private static void printResult(ResultSet resultSet) throws SQLException {
Util.printResult(resultSet);
}
}

View File

@ -1,90 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.TSDBPreparedStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Random;
// ANCHOR: para_bind
public class ParameterBindingBasicDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set column ts
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
// set column current
ArrayList<Float> currentList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
currentList.add(random.nextFloat() * 30);
pstmt.setFloat(1, currentList);
// set column voltage
ArrayList<Integer> voltageList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
voltageList.add(random.nextInt(300));
pstmt.setInt(2, voltageList);
// set column phase
ArrayList<Float> phaseList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
phaseList.add(random.nextFloat());
pstmt.setFloat(3, phaseList);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + (numOfSubTable * numOfRow) + " rows to power.meters.");
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
// ANCHOR_END: para_bind

View File

@ -1,47 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.AbstractConnection;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
// ANCHOR: schemaless
public class SchemalessJniTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata";
try (Connection connection = DriverManager.getConnection(jdbcUrl)) {
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
System.out.println("Inserted data with schemaless successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
// ANCHOR_END: schemaless

View File

@ -1,47 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.AbstractConnection;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
// ANCHOR: schemaless
public class SchemalessWsTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041?user=root&password=taosdata&batchfetch=true";
try(Connection connection = DriverManager.getConnection(url)){
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS);
System.out.println("Inserted data with schemaless successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
// ANCHOR_END: schemaless

View File

@ -1,69 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.TSDBPreparedStatement;
import com.taosdata.jdbc.ws.TSWSPreparedStatement;
import java.sql.*;
import java.util.ArrayList;
import java.util.Random;
// ANCHOR: para_bind
public class WSParameterBindingBasicDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://" + host + ":6041/?batchfetch=true";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setFloat(2, random.nextFloat() * 30);
pstmt.setInt(3, random.nextInt(300));
pstmt.setFloat(4, random.nextFloat());
pstmt.addBatch();
}
int [] exeResult = pstmt.executeBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + exeResult.length + " rows to power.meters.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
// ANCHOR_END: para_bind

View File

@ -1,384 +0,0 @@
package com.taosdata.example;
import com.alibaba.fastjson.JSON;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.tmq.*;
import java.sql.*;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
// ANCHOR: consumer_demo
public class WsConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
// ANCHOR: create_consumer
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: create_consumer
}
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: poll_data_code_piece
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: poll_data_code_piece
}
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: consumer_seek
try {
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JSON.toJSONString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute seek example, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: consumer_seek
}
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: commit_code_piece
try {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process your data here
System.out.println("data: " + JSON.toJSONString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute commit example, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
// ANCHOR_END: commit_code_piece
}
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
// ANCHOR: unsubscribe_data_code_piece
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
// ANCHOR_END: unsubscribe_data_code_piece
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create db and table, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();
// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit a task
executor.submit(() -> {
try {
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully.");
});
try {
TaosConsumer<ResultBean> consumer = getConsumer();
pollExample(consumer);
System.out.println("pollExample executed successfully.");
consumer.unsubscribe();
seekExample(consumer);
System.out.println("seekExample executed successfully.");
consumer.unsubscribe();
commitExample(consumer);
System.out.println("commitExample executed successfully.");
consumer.unsubscribe();
unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();
try {
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}
closeConnection();
System.out.println("program end.");
}
}
// ANCHOR_END: consumer_demo