From d63795fd835859f2a96d4d02cd3364ebd12821ea Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Fri, 25 Oct 2024 17:54:43 +0800 Subject: [PATCH] update sample code --- .../com/taos/example/ConsumerLoopFull.java | 15 ++--- .../com/taos/example/ConsumerLoopImp.java | 8 ++- .../com/taos/example/WsConsumerLoopFull.java | 15 ++--- .../com/taos/example/WsConsumerLoopImp.java | 8 ++- .../example/highvolume/DataBaseMonitor.java | 3 + .../taos/example/highvolume/SQLWriter.java | 3 + .../src/test/java/com/taos/test/TestAll.java | 57 +++++++++++++++---- 7 files changed, 81 insertions(+), 28 deletions(-) diff --git a/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java b/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java index a399f3aa6a..647855dc48 100644 --- a/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java +++ b/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java @@ -1,8 +1,9 @@ package com.taos.example; -import com.alibaba.fastjson.JSON; +import com.fasterxml.jackson.core.JsonProcessingException; import com.taosdata.jdbc.TSDBDriver; import com.taosdata.jdbc.tmq.*; +import com.taosdata.jdbc.utils.JsonUtil; import java.sql.*; import java.time.Duration; @@ -60,7 +61,7 @@ public class ConsumerLoopFull { // ANCHOR_END: create_consumer } - public static void pollExample(TaosConsumer consumer) throws SQLException { + public static void pollExample(TaosConsumer consumer) throws SQLException, JsonProcessingException { // ANCHOR: poll_data_code_piece List topics = Collections.singletonList("topic_meters"); try { @@ -73,7 +74,7 @@ public class ConsumerLoopFull { for (ConsumerRecord record : records) { ResultBean bean = record.value(); // Add your data processing logic here - System.out.println("data: " + JSON.toJSONString(bean)); + System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean)); } } } catch (Exception ex) { @@ -91,7 +92,7 @@ public class ConsumerLoopFull { // ANCHOR_END: poll_data_code_piece } - public static void seekExample(TaosConsumer consumer) throws SQLException { + public static void seekExample(TaosConsumer consumer) throws SQLException, JsonProcessingException { // ANCHOR: consumer_seek List topics = Collections.singletonList("topic_meters"); try { @@ -99,7 +100,7 @@ public class ConsumerLoopFull { consumer.subscribe(topics); System.out.println("Subscribe topics successfully."); Set assignment = consumer.assignment(); - System.out.println("Now assignment: " + JSON.toJSONString(assignment)); + System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment)); ConsumerRecords records = ConsumerRecords.emptyRecord(); // make sure we have got some data @@ -125,7 +126,7 @@ public class ConsumerLoopFull { } - public static void commitExample(TaosConsumer consumer) throws SQLException { + public static void commitExample(TaosConsumer consumer) throws SQLException, JsonProcessingException { // ANCHOR: commit_code_piece List topics = Collections.singletonList("topic_meters"); try { @@ -135,7 +136,7 @@ public class ConsumerLoopFull { for (ConsumerRecord record : records) { ResultBean bean = record.value(); // Add your data processing logic here - System.out.println("data: " + JSON.toJSONString(bean)); + System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean)); } if (!records.isEmpty()) { // after processing the data, commit the offset manually diff --git a/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopImp.java b/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopImp.java index a59bfc282f..378ef8ae6d 100644 --- a/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopImp.java +++ b/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopImp.java @@ -1,7 +1,7 @@ package com.taos.example; -import com.alibaba.fastjson.JSON; import com.taosdata.jdbc.TSDBDriver; +import com.taosdata.jdbc.utils.JsonUtil; import java.sql.Connection; import java.sql.DriverManager; @@ -31,7 +31,11 @@ public class ConsumerLoopImp { final AbsConsumerLoop consumerLoop = new AbsConsumerLoop() { @Override public void process(ResultBean result) { - System.out.println("data: " + JSON.toJSONString(result)); + try{ + System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(result)); + } catch (Exception e) { + throw new RuntimeException(e); + } } }; diff --git a/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopFull.java b/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopFull.java index 6db65f47f2..02db97a5a9 100644 --- a/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopFull.java +++ b/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopFull.java @@ -1,8 +1,9 @@ package com.taos.example; -import com.alibaba.fastjson.JSON; +import com.fasterxml.jackson.core.JsonProcessingException; import com.taosdata.jdbc.TSDBDriver; import com.taosdata.jdbc.tmq.*; +import com.taosdata.jdbc.utils.JsonUtil; import java.sql.*; import java.time.Duration; @@ -60,7 +61,7 @@ public class WsConsumerLoopFull { // ANCHOR_END: create_consumer } - public static void pollExample(TaosConsumer consumer) throws SQLException { + public static void pollExample(TaosConsumer consumer) throws SQLException, JsonProcessingException { // ANCHOR: poll_data_code_piece List topics = Collections.singletonList("topic_meters"); try { @@ -73,7 +74,7 @@ public class WsConsumerLoopFull { for (ConsumerRecord record : records) { ResultBean bean = record.value(); // Add your data processing logic here - System.out.println("data: " + JSON.toJSONString(bean)); + System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean)); } } } catch (Exception ex) { @@ -91,7 +92,7 @@ public class WsConsumerLoopFull { // ANCHOR_END: poll_data_code_piece } - public static void seekExample(TaosConsumer consumer) throws SQLException { + public static void seekExample(TaosConsumer consumer) throws SQLException, JsonProcessingException { // ANCHOR: consumer_seek List topics = Collections.singletonList("topic_meters"); try { @@ -99,7 +100,7 @@ public class WsConsumerLoopFull { consumer.subscribe(topics); System.out.println("Subscribe topics successfully."); Set assignment = consumer.assignment(); - System.out.println("Now assignment: " + JSON.toJSONString(assignment)); + System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment)); ConsumerRecords records = ConsumerRecords.emptyRecord(); // make sure we have got some data @@ -125,7 +126,7 @@ public class WsConsumerLoopFull { } - public static void commitExample(TaosConsumer consumer) throws SQLException { + public static void commitExample(TaosConsumer consumer) throws SQLException, JsonProcessingException { // ANCHOR: commit_code_piece List topics = Collections.singletonList("topic_meters"); try { @@ -135,7 +136,7 @@ public class WsConsumerLoopFull { for (ConsumerRecord record : records) { ResultBean bean = record.value(); // Add your data processing logic here - System.out.println("data: " + JSON.toJSONString(bean)); + System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean)); } if (!records.isEmpty()) { // after processing the data, commit the offset manually diff --git a/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopImp.java b/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopImp.java index 70e29503f8..77c6a4fd1b 100644 --- a/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopImp.java +++ b/docs/examples/java/src/main/java/com/taos/example/WsConsumerLoopImp.java @@ -1,7 +1,7 @@ package com.taos.example; -import com.alibaba.fastjson.JSON; import com.taosdata.jdbc.TSDBDriver; +import com.taosdata.jdbc.utils.JsonUtil; import java.sql.Connection; import java.sql.DriverManager; @@ -28,7 +28,11 @@ public abstract class WsConsumerLoopImp { final AbsConsumerLoop consumerLoop = new AbsConsumerLoop() { @Override public void process(ResultBean result) { - System.out.println("data: " + JSON.toJSONString(result)); + try{ + System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(result)); + } catch (Exception e) { + throw new RuntimeException(e); + } } }; diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java index 8678f65231..fa6ebf0858 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java @@ -13,6 +13,9 @@ public class DataBaseMonitor { public DataBaseMonitor init() throws SQLException { if (conn == null) { String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); + if (jdbcURL == null || jdbcURL == ""){ + jdbcURL = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; + } conn = DriverManager.getConnection(jdbcURL); stmt = conn.createStatement(); } diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java index dc820f161c..1497992f6b 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java @@ -69,6 +69,9 @@ public class SQLWriter { */ private static Connection getConnection() throws SQLException { String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); + if (jdbcURL == null || jdbcURL == ""){ + jdbcURL = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; + } return DriverManager.getConnection(jdbcURL); } diff --git a/docs/examples/java/src/test/java/com/taos/test/TestAll.java b/docs/examples/java/src/test/java/com/taos/test/TestAll.java index e014a3b315..a92ddd116c 100644 --- a/docs/examples/java/src/test/java/com/taos/test/TestAll.java +++ b/docs/examples/java/src/test/java/com/taos/test/TestAll.java @@ -17,6 +17,37 @@ public class TestAll { stmt.execute("drop database if exists " + dbName); } } + waitTransaction(); + } + + public void dropTopic(String topicName) throws SQLException { + String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; + try (Connection conn = DriverManager.getConnection(jdbcUrl)) { + try (Statement stmt = conn.createStatement()) { + stmt.execute("drop topic if exists " + topicName); + } + } + waitTransaction(); + } + + public void waitTransaction() throws SQLException { + + String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; + try (Connection conn = DriverManager.getConnection(jdbcUrl)) { + try (Statement stmt = conn.createStatement()) { + for (int i = 0; i < 10; i++) { + stmt.execute("show transactions"); + try (ResultSet resultSet = stmt.getResultSet()) { + if (resultSet.next()) { + int count = resultSet.getInt(1); + if (count == 0) { + break; + } + } + } + } + } + } } public void insertData() throws SQLException { @@ -104,14 +135,20 @@ public class TestAll { SubscribeDemo.main(args); } -// @Test -// public void testSubscribeJni() throws SQLException, InterruptedException { -// dropDB("power"); -// ConsumerLoopFull.main(args); -// } -// @Test -// public void testSubscribeWs() throws SQLException, InterruptedException { -// dropDB("power"); -// WsConsumerLoopFull.main(args); -// } + @Test + public void testSubscribeJni() throws SQLException, InterruptedException { + dropTopic("topic_meters"); + dropDB("power"); + ConsumerLoopFull.main(args); + dropTopic("topic_meters"); + dropDB("power"); + } + @Test + public void testSubscribeWs() throws SQLException, InterruptedException { + dropTopic("topic_meters"); + dropDB("power"); + WsConsumerLoopFull.main(args); + dropTopic("topic_meters"); + dropDB("power"); + } }