update sample code

This commit is contained in:
sheyanjie-qq 2024-10-25 17:54:43 +08:00
parent f7843a29ab
commit d63795fd83
7 changed files with 81 additions and 28 deletions

View File

@ -1,8 +1,9 @@
package com.taos.example; package com.taos.example;
import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.core.JsonProcessingException;
import com.taosdata.jdbc.TSDBDriver; import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.tmq.*; import com.taosdata.jdbc.tmq.*;
import com.taosdata.jdbc.utils.JsonUtil;
import java.sql.*; import java.sql.*;
import java.time.Duration; import java.time.Duration;
@ -60,7 +61,7 @@ public class ConsumerLoopFull {
// ANCHOR_END: create_consumer // ANCHOR_END: create_consumer
} }
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException { public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
// ANCHOR: poll_data_code_piece // ANCHOR: poll_data_code_piece
List<String> topics = Collections.singletonList("topic_meters"); List<String> topics = Collections.singletonList("topic_meters");
try { try {
@ -73,7 +74,7 @@ public class ConsumerLoopFull {
for (ConsumerRecord<ResultBean> record : records) { for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value(); ResultBean bean = record.value();
// Add your data processing logic here // Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean)); System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
@ -91,7 +92,7 @@ public class ConsumerLoopFull {
// ANCHOR_END: poll_data_code_piece // ANCHOR_END: poll_data_code_piece
} }
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException { public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
// ANCHOR: consumer_seek // ANCHOR: consumer_seek
List<String> topics = Collections.singletonList("topic_meters"); List<String> topics = Collections.singletonList("topic_meters");
try { try {
@ -99,7 +100,7 @@ public class ConsumerLoopFull {
consumer.subscribe(topics); consumer.subscribe(topics);
System.out.println("Subscribe topics successfully."); System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment(); Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JSON.toJSONString(assignment)); System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord(); ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data // make sure we have got some data
@ -125,7 +126,7 @@ public class ConsumerLoopFull {
} }
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException { public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
// ANCHOR: commit_code_piece // ANCHOR: commit_code_piece
List<String> topics = Collections.singletonList("topic_meters"); List<String> topics = Collections.singletonList("topic_meters");
try { try {
@ -135,7 +136,7 @@ public class ConsumerLoopFull {
for (ConsumerRecord<ResultBean> record : records) { for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value(); ResultBean bean = record.value();
// Add your data processing logic here // Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean)); System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
} }
if (!records.isEmpty()) { if (!records.isEmpty()) {
// after processing the data, commit the offset manually // after processing the data, commit the offset manually

View File

@ -1,7 +1,7 @@
package com.taos.example; package com.taos.example;
import com.alibaba.fastjson.JSON;
import com.taosdata.jdbc.TSDBDriver; import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.utils.JsonUtil;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
@ -31,7 +31,11 @@ public class ConsumerLoopImp {
final AbsConsumerLoop consumerLoop = new AbsConsumerLoop() { final AbsConsumerLoop consumerLoop = new AbsConsumerLoop() {
@Override @Override
public void process(ResultBean result) { public void process(ResultBean result) {
System.out.println("data: " + JSON.toJSONString(result)); try{
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(result));
} catch (Exception e) {
throw new RuntimeException(e);
}
} }
}; };

View File

@ -1,8 +1,9 @@
package com.taos.example; package com.taos.example;
import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.core.JsonProcessingException;
import com.taosdata.jdbc.TSDBDriver; import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.tmq.*; import com.taosdata.jdbc.tmq.*;
import com.taosdata.jdbc.utils.JsonUtil;
import java.sql.*; import java.sql.*;
import java.time.Duration; import java.time.Duration;
@ -60,7 +61,7 @@ public class WsConsumerLoopFull {
// ANCHOR_END: create_consumer // ANCHOR_END: create_consumer
} }
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException { public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
// ANCHOR: poll_data_code_piece // ANCHOR: poll_data_code_piece
List<String> topics = Collections.singletonList("topic_meters"); List<String> topics = Collections.singletonList("topic_meters");
try { try {
@ -73,7 +74,7 @@ public class WsConsumerLoopFull {
for (ConsumerRecord<ResultBean> record : records) { for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value(); ResultBean bean = record.value();
// Add your data processing logic here // Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean)); System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
@ -91,7 +92,7 @@ public class WsConsumerLoopFull {
// ANCHOR_END: poll_data_code_piece // ANCHOR_END: poll_data_code_piece
} }
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException { public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
// ANCHOR: consumer_seek // ANCHOR: consumer_seek
List<String> topics = Collections.singletonList("topic_meters"); List<String> topics = Collections.singletonList("topic_meters");
try { try {
@ -99,7 +100,7 @@ public class WsConsumerLoopFull {
consumer.subscribe(topics); consumer.subscribe(topics);
System.out.println("Subscribe topics successfully."); System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment(); Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JSON.toJSONString(assignment)); System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord(); ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data // make sure we have got some data
@ -125,7 +126,7 @@ public class WsConsumerLoopFull {
} }
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException { public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
// ANCHOR: commit_code_piece // ANCHOR: commit_code_piece
List<String> topics = Collections.singletonList("topic_meters"); List<String> topics = Collections.singletonList("topic_meters");
try { try {
@ -135,7 +136,7 @@ public class WsConsumerLoopFull {
for (ConsumerRecord<ResultBean> record : records) { for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value(); ResultBean bean = record.value();
// Add your data processing logic here // Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean)); System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
} }
if (!records.isEmpty()) { if (!records.isEmpty()) {
// after processing the data, commit the offset manually // after processing the data, commit the offset manually

View File

@ -1,7 +1,7 @@
package com.taos.example; package com.taos.example;
import com.alibaba.fastjson.JSON;
import com.taosdata.jdbc.TSDBDriver; import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.utils.JsonUtil;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
@ -28,7 +28,11 @@ public abstract class WsConsumerLoopImp {
final AbsConsumerLoop consumerLoop = new AbsConsumerLoop() { final AbsConsumerLoop consumerLoop = new AbsConsumerLoop() {
@Override @Override
public void process(ResultBean result) { public void process(ResultBean result) {
System.out.println("data: " + JSON.toJSONString(result)); try{
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(result));
} catch (Exception e) {
throw new RuntimeException(e);
}
} }
}; };

View File

@ -13,6 +13,9 @@ public class DataBaseMonitor {
public DataBaseMonitor init() throws SQLException { public DataBaseMonitor init() throws SQLException {
if (conn == null) { if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
if (jdbcURL == null || jdbcURL == ""){
jdbcURL = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
}
conn = DriverManager.getConnection(jdbcURL); conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement(); stmt = conn.createStatement();
} }

View File

@ -69,6 +69,9 @@ public class SQLWriter {
*/ */
private static Connection getConnection() throws SQLException { private static Connection getConnection() throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
if (jdbcURL == null || jdbcURL == ""){
jdbcURL = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
}
return DriverManager.getConnection(jdbcURL); return DriverManager.getConnection(jdbcURL);
} }

View File

@ -17,6 +17,37 @@ public class TestAll {
stmt.execute("drop database if exists " + dbName); stmt.execute("drop database if exists " + dbName);
} }
} }
waitTransaction();
}
public void dropTopic(String topicName) throws SQLException {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop topic if exists " + topicName);
}
}
waitTransaction();
}
public void waitTransaction() throws SQLException {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
try (Statement stmt = conn.createStatement()) {
for (int i = 0; i < 10; i++) {
stmt.execute("show transactions");
try (ResultSet resultSet = stmt.getResultSet()) {
if (resultSet.next()) {
int count = resultSet.getInt(1);
if (count == 0) {
break;
}
}
}
}
}
}
} }
public void insertData() throws SQLException { public void insertData() throws SQLException {
@ -104,14 +135,20 @@ public class TestAll {
SubscribeDemo.main(args); SubscribeDemo.main(args);
} }
// @Test @Test
// public void testSubscribeJni() throws SQLException, InterruptedException { public void testSubscribeJni() throws SQLException, InterruptedException {
// dropDB("power"); dropTopic("topic_meters");
// ConsumerLoopFull.main(args); dropDB("power");
// } ConsumerLoopFull.main(args);
// @Test dropTopic("topic_meters");
// public void testSubscribeWs() throws SQLException, InterruptedException { dropDB("power");
// dropDB("power"); }
// WsConsumerLoopFull.main(args); @Test
// } public void testSubscribeWs() throws SQLException, InterruptedException {
dropTopic("topic_meters");
dropDB("power");
WsConsumerLoopFull.main(args);
dropTopic("topic_meters");
dropDB("power");
}
} }