From b70d0c66b3c3bec258bff915f452f6d5780b51db Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Fri, 2 Aug 2024 14:50:07 +0800 Subject: [PATCH 1/3] mod java code --- .../rust/nativeexample/examples/query.rs | 97 +++-- .../rust/nativeexample/examples/tmq.rs | 8 +- .../examples/rust/restexample/examples/tmq.rs | 10 +- docs/zh/08-develop/07-tmq.md | 5 +- .../com/taosdata/example/AbsConsumerLoop.java | 146 ++++---- .../taosdata/example/ConsumerLoopFull.java | 209 +++++------ .../com/taosdata/example/ConsumerLoopImp.java | 6 +- .../taosdata/example/ConsumerOffsetSeek.java | 42 +-- .../com/taosdata/example/JdbcBasicDemo.java | 118 +++--- .../com/taosdata/example/JdbcCreatDBDemo.java | 48 +-- .../taosdata/example/JdbcInsertDataDemo.java | 50 +-- .../com/taosdata/example/JdbcQueryDemo.java | 50 +-- .../com/taosdata/example/JdbcReqIdDemo.java | 38 +- .../taosdata/example/WsConsumerLoopFull.java | 337 +++++++++--------- 14 files changed, 592 insertions(+), 572 deletions(-) diff --git a/docs/examples/rust/nativeexample/examples/query.rs b/docs/examples/rust/nativeexample/examples/query.rs index a493e7c729..55c2feb7b7 100644 --- a/docs/examples/rust/nativeexample/examples/query.rs +++ b/docs/examples/rust/nativeexample/examples/query.rs @@ -7,62 +7,61 @@ async fn main() -> anyhow::Result<()> { let taos = builder.build()?; -// ANCHOR: create_db_and_table -let db = "power"; -// create database -taos.exec_many([ - format!("CREATE DATABASE IF NOT EXISTS `{db}`"), - format!("USE `{db}`"), -]) -.await?; -println!("Create database power successfully."); + // ANCHOR: create_db_and_table + let db = "power"; + // create database + taos.exec_many([ + format!("CREATE DATABASE IF NOT EXISTS `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + println!("Create database power successfully."); -// create super table -taos.exec_many([ - "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ - TAGS (`groupid` INT, `location` BINARY(24))", -]).await?; -println!("Create stable meters successfully."); + // create super table + taos.exec_many([ + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ + TAGS (`groupid` INT, `location` BINARY(24))", + ]).await?; + println!("Create stable meters successfully."); -// ANCHOR_END: create_db_and_table + // ANCHOR_END: create_db_and_table -// ANCHOR: insert_data -let inserted = taos.exec("INSERT INTO " + -"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + -"VALUES " + -"(NOW + 1a, 10.30000, 219, 0.31000) " + -"(NOW + 2a, 12.60000, 218, 0.33000) " + -"(NOW + 3a, 12.30000, 221, 0.31000) " + -"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + -"VALUES " + -"(NOW + 1a, 10.30000, 218, 0.25000) ").await?; + // ANCHOR: insert_data + let inserted = taos.exec("INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) ").await?; -println!("inserted: {} rows to power.meters successfully.", inserted); -// ANCHOR_END: insert_data + println!("inserted: {} rows to power.meters successfully.", inserted); + // ANCHOR_END: insert_data -// ANCHOR: query_data -// query data, make sure the database and table are created before -let mut result = taos.query("SELECT ts, current, location FROM power.meters limit 100").await?; + // ANCHOR: query_data + // query data, make sure the database and table are created before + let mut result = taos.query("SELECT ts, current, location FROM power.meters limit 100").await?; -for field in result.fields() { - println!("got field: {}", field.name()); -} - -let mut rows = result.rows(); -let mut nrows = 0; -while let Some(row) = rows.try_next().await? { - for (col, (name, value)) in row.enumerate() { - println!( - "[{}] got value in col {} (named `{:>8}`): {}", - nrows, col, name, value - ); + for field in result.fields() { + println!("got field: {}", field.name()); } - nrows += 1; -} -// ANCHOR_END: query_data -// ANCHOR: query_with_req_id -let result = taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", 1).await?; -// ANCHOR_END: query_with_req_id + let mut rows = result.rows(); + let mut nrows = 0; + while let Some(row) = rows.try_next().await? { + for (col, (name, value)) in row.enumerate() { + println!( + "[{}] got value in col {} (named `{:>8}`): {}", + nrows, col, name, value + ); + } + nrows += 1; + } + // ANCHOR_END: query_data + // ANCHOR: query_with_req_id + let result = taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", 1).await?; + // ANCHOR_END: query_with_req_id } diff --git a/docs/examples/rust/nativeexample/examples/tmq.rs b/docs/examples/rust/nativeexample/examples/tmq.rs index 764c0c1fc8..04bf66b2ef 100644 --- a/docs/examples/rust/nativeexample/examples/tmq.rs +++ b/docs/examples/rust/nativeexample/examples/tmq.rs @@ -44,8 +44,12 @@ async fn main() -> anyhow::Result<()> { // ANCHOR_END: create_topic // ANCHOR: create_consumer - dsn.params.insert("group.id".to_string(), "abc".to_string()); - dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string()); + dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string()); + dsn.params.insert("msg.with.table.name".to_string(), "true".to_string()); + dsn.params.insert("enable.auto.commit".to_string(), "true".to_string()); + dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string()); + dsn.params.insert("group.id".to_string(), "group1".to_string()); + dsn.params.insert("client.id".to_string(), "client1".to_string()); let builder = TmqBuilder::from_dsn(&dsn)?; let mut consumer = builder.build().await?; diff --git a/docs/examples/rust/restexample/examples/tmq.rs b/docs/examples/rust/restexample/examples/tmq.rs index 8f195e1629..0438e47515 100644 --- a/docs/examples/rust/restexample/examples/tmq.rs +++ b/docs/examples/rust/restexample/examples/tmq.rs @@ -9,9 +9,11 @@ async fn main() -> anyhow::Result<()> { .filter_level(log::LevelFilter::Info) .init(); use taos_query::prelude::*; + // ANCHOR: create_consumer_dsn let dsn = "ws://localhost:6041".to_string(); log::info!("dsn: {}", dsn); let mut dsn = Dsn::from_str(&dsn)?; + // ANCHOR_END: create_consumer_dsn let taos = TaosBuilder::from_dsn(&dsn)?.build().await?; @@ -41,8 +43,12 @@ async fn main() -> anyhow::Result<()> { // ANCHOR_END: create_topic // ANCHOR: create_consumer - dsn.params.insert("group.id".to_string(), "abc".to_string()); - dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string()); + dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string()); + dsn.params.insert("msg.with.table.name".to_string(), "true".to_string()); + dsn.params.insert("enable.auto.commit".to_string(), "true".to_string()); + dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string()); + dsn.params.insert("group.id".to_string(), "group1".to_string()); + dsn.params.insert("client.id".to_string(), "client1".to_string()); let builder = TmqBuilder::from_dsn(&dsn)?; let mut consumer = builder.build().await?; diff --git a/docs/zh/08-develop/07-tmq.md b/docs/zh/08-develop/07-tmq.md index 405d227e8e..a5795b5b6e 100644 --- a/docs/zh/08-develop/07-tmq.md +++ b/docs/zh/08-develop/07-tmq.md @@ -104,9 +104,10 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 - ```rust -{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer}} +{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer_dsn}} + +{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer}} ``` diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java index f7570c742a..842abb4086 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java @@ -24,24 +24,24 @@ public abstract class AbsConsumerLoop { public AbsConsumerLoop() throws SQLException { // ANCHOR: create_consumer -Properties config = new Properties(); -config.setProperty("td.connect.type", "jni"); -config.setProperty("bootstrap.servers", "localhost:6030"); -config.setProperty("auto.offset.reset", "latest"); -config.setProperty("msg.with.table.name", "true"); -config.setProperty("enable.auto.commit", "true"); -config.setProperty("auto.commit.interval.ms", "1000"); -config.setProperty("group.id", "group1"); -config.setProperty("client.id", "client1"); -config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer"); -config.setProperty("value.deserializer.encoding", "UTF-8"); -try { - this.consumer = new TaosConsumer<>(config); -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create jni consumer with " + config.getProperty("bootstrap.servers") + " ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create consumer", ex); -} + Properties config = new Properties(); + config.setProperty("td.connect.type", "jni"); + config.setProperty("bootstrap.servers", "localhost:6030"); + config.setProperty("auto.offset.reset", "latest"); + config.setProperty("msg.with.table.name", "true"); + config.setProperty("enable.auto.commit", "true"); + config.setProperty("auto.commit.interval.ms", "1000"); + config.setProperty("group.id", "group1"); + config.setProperty("client.id", "client1"); + config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer"); + config.setProperty("value.deserializer.encoding", "UTF-8"); + try { + this.consumer = new TaosConsumer<>(config); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create jni consumer with " + config.getProperty("bootstrap.servers") + " ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create consumer", ex); + } // ANCHOR_END: create_consumer this.topics = Collections.singletonList("topic_meters"); @@ -53,65 +53,65 @@ try { public void pollDataCodePiece() throws SQLException { // ANCHOR: poll_data_code_piece -try { - consumer.subscribe(topics); - while (!shutdown.get()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process your data here - process(bean); + try { + consumer.subscribe(topics); + while (!shutdown.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process your data here + process(bean); + } + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data", ex); + } finally { + consumer.close(); + shutdownLatch.countDown(); } - } -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to poll data", ex); -} finally { - consumer.close(); - shutdownLatch.countDown(); -} // ANCHOR_END: poll_data_code_piece } public void commitCodePiece() throws SQLException { // ANCHOR: commit_code_piece -try { - consumer.subscribe(topics); - while (!shutdown.get()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process your data here - process(bean); + try { + consumer.subscribe(topics); + while (!shutdown.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process your data here + process(bean); + } + if (!records.isEmpty()) { + // after processing the data, commit the offset manually + consumer.commitSync(); + } + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); + } finally { + consumer.close(); + shutdownLatch.countDown(); } - if (!records.isEmpty()) { - // after processing the data, commit the offset manually - consumer.commitSync(); - } - } -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to execute consumer functions", ex); -} finally { - consumer.close(); - shutdownLatch.countDown(); -} // ANCHOR_END: commit_code_piece } public void unsubscribeCodePiece() throws SQLException { // ANCHOR: unsubscribe_data_code_piece -try { - consumer.unsubscribe(); -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to unsubscribe consumer", ex); -} finally { - consumer.close(); -} + try { + consumer.unsubscribe(); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to unsubscribe consumer", ex); + } finally { + consumer.close(); + } // ANCHOR_END: unsubscribe_data_code_piece } @@ -119,14 +119,14 @@ try { try { // ANCHOR: poll_data -consumer.subscribe(topics); -while (!shutdown.get()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - process(bean); - } -} + consumer.subscribe(topics); + while (!shutdown.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + process(bean); + } + } // ANCHOR_END: poll_data consumer.unsubscribe(); diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java index 4012f2e679..c78a875ab6 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java @@ -15,37 +15,38 @@ import java.util.concurrent.TimeUnit; public class ConsumerLoopFull { static private Connection connection; static private Statement statement; - public static TaosConsumer getConsumer() throws SQLException{ -// ANCHOR: create_consumer -Properties config = new Properties(); -config.setProperty("td.connect.type", "jni"); -config.setProperty("bootstrap.servers", "localhost:6030"); -config.setProperty("auto.offset.reset", "latest"); -config.setProperty("msg.with.table.name", "true"); -config.setProperty("enable.auto.commit", "true"); -config.setProperty("auto.commit.interval.ms", "1000"); -config.setProperty("group.id", "group1"); -config.setProperty("client.id", "1"); -config.setProperty("td.connect.user", "root"); -config.setProperty("td.connect.pass", "taosdata"); -config.setProperty("value.deserializer", "com.taosdata.example.ConsumerLoopFull$ResultDeserializer"); -config.setProperty("value.deserializer.encoding", "UTF-8"); -try { - return new TaosConsumer<>(config); -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create consumer", ex); -} catch (Exception e) { - e.printStackTrace(); - throw new SQLException("Failed to create consumer", e); -} + public static TaosConsumer getConsumer() throws SQLException { +// ANCHOR: create_consumer + Properties config = new Properties(); + config.setProperty("td.connect.type", "jni"); + config.setProperty("bootstrap.servers", "localhost:6030"); + config.setProperty("auto.offset.reset", "latest"); + config.setProperty("msg.with.table.name", "true"); + config.setProperty("enable.auto.commit", "true"); + config.setProperty("auto.commit.interval.ms", "1000"); + config.setProperty("group.id", "group1"); + config.setProperty("client.id", "1"); + config.setProperty("td.connect.user", "root"); + config.setProperty("td.connect.pass", "taosdata"); + config.setProperty("value.deserializer", "com.taosdata.example.ConsumerLoopFull$ResultDeserializer"); + config.setProperty("value.deserializer.encoding", "UTF-8"); + + try { + return new TaosConsumer<>(config); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create consumer", ex); + } catch (Exception e) { + e.printStackTrace(); + throw new SQLException("Failed to create consumer", e); + } // ANCHOR_END: create_consumer } public static void pollDataExample() throws SQLException { - try (TaosConsumer consumer = getConsumer()){ + try (TaosConsumer consumer = getConsumer()) { // subscribe to the topics List topics = Collections.singletonList("topic_meters"); @@ -72,96 +73,97 @@ try { public static void pollExample() throws SQLException { // ANCHOR: poll_data_code_piece -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("subscribe topics successfully"); - for (int i = 0; i < 50; i++) { - // poll data - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process the data here - System.out.println("data: " + JSON.toJSONString(bean)); + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + for (int i = 0; i < 50; i++) { + // poll data + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process the data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + } + + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data", ex); } - } - -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to poll data", ex); -} // ANCHOR_END: poll_data_code_piece -} + } public static void seekExample() throws SQLException { // ANCHOR: consumer_seek -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("subscribe topics successfully"); - ConsumerRecords records = ConsumerRecords.emptyRecord(); - // make sure we have got some data - while (records.isEmpty()){ - records = consumer.poll(Duration.ofMillis(100)); - } + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + ConsumerRecords records = ConsumerRecords.emptyRecord(); + // make sure we have got some data + while (records.isEmpty()) { + records = consumer.poll(Duration.ofMillis(100)); + } - for (ConsumerRecord record : records) { - System.out.println("first data polled: " + JSON.toJSONString(record.value())); - Set assignment = consumer.assignment(); - // seek to the beginning of the all partitions - consumer.seekToBeginning(assignment); - System.out.println("assignment seek to beginning successfully"); - break; - } + for (ConsumerRecord record : records) { + System.out.println("first data polled: " + JSON.toJSONString(record.value())); + Set assignment = consumer.assignment(); + // seek to the beginning of the all partitions + consumer.seekToBeginning(assignment); + System.out.println("assignment seek to beginning successfully"); + break; + } - // poll data agagin - records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - // process the data here - System.out.println("second data polled: " + JSON.toJSONString(record.value())); - break; - } + // poll data agagin + records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + // process the data here + System.out.println("second data polled: " + JSON.toJSONString(record.value())); + break; + } -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("seek example failed", ex); -} + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("seek example failed", ex); + } // ANCHOR_END: consumer_seek } public static void commitExample() throws SQLException { // ANCHOR: commit_code_piece -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - consumer.subscribe(topics); - for (int i = 0; i < 50; i++) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process your data here - System.out.println("data: " + JSON.toJSONString(bean)); + consumer.subscribe(topics); + for (int i = 0; i < 50; i++) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process your data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + if (!records.isEmpty()) { + // after processing the data, commit the offset manually + consumer.commitSync(); + } + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); } - if (!records.isEmpty()) { - // after processing the data, commit the offset manually - consumer.commitSync(); - } - } -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to execute consumer functions", ex); -} // ANCHOR_END: commit_code_piece } + public static void unsubscribeExample() throws SQLException { TaosConsumer consumer = getConsumer(); List topics = Collections.singletonList("topic_meters"); @@ -169,7 +171,7 @@ try (TaosConsumer consumer = getConsumer()){ // ANCHOR: unsubscribe_data_code_piece try { consumer.unsubscribe(); - } catch (SQLException ex){ + } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to unsubscribe consumer", ex); @@ -182,6 +184,7 @@ try (TaosConsumer consumer = getConsumer()){ public static class ResultDeserializer extends ReferenceDeserializer { } + // use this class to define the data structure of the result record public static class ResultBean { private Timestamp ts; @@ -240,12 +243,12 @@ try (TaosConsumer consumer = getConsumer()){ } } - public static void prepareData() throws SQLException{ + public static void prepareData() throws SQLException { StringBuilder insertQuery = new StringBuilder(); insertQuery.append("INSERT INTO " + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + "VALUES "); - for (int i = 0; i < 10000; i++){ + for (int i = 0; i < 10000; i++) { insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) "); } try { @@ -256,7 +259,8 @@ try (TaosConsumer consumer = getConsumer()){ throw new SQLException("Failed to insert data to power.meters", ex); } } - public static void prepareMeta() throws SQLException{ + + public static void prepareMeta() throws SQLException { try { statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); statement.executeUpdate("USE power"); @@ -288,6 +292,7 @@ try (TaosConsumer consumer = getConsumer()){ } System.out.println("Connection created successfully."); } + public static void closeConnection() throws SQLException { try { if (statement != null) { @@ -337,16 +342,16 @@ try (TaosConsumer consumer = getConsumer()){ System.out.println("Data prepared successfully"); - // 关闭线程池,不再接收新任务 + // close the executor, which will make the executor reject new tasks executor.shutdown(); try { - // 等待直到所有任务完成 + // wait for the executor to terminate boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); assert result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - } catch (Exception e){ + } catch (Exception e) { e.printStackTrace(); System.out.println("Wait executor termination failed."); } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopImp.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopImp.java index 22f96841e6..84d84f062b 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopImp.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopImp.java @@ -20,9 +20,9 @@ public class ConsumerLoopImp { properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); // ANCHOR: create_topic -Connection connection = DriverManager.getConnection(url, properties); -Statement statement = connection.createStatement(); -statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); + Connection connection = DriverManager.getConnection(url, properties); + Statement statement = connection.createStatement(); + statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); // ANCHOR_END: create_topic statement.close(); diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java index b9463d30f7..73901aba49 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java @@ -35,29 +35,29 @@ public class ConsumerOffsetSeek { config.setProperty("value.deserializer.encoding", "UTF-8"); // ANCHOR: consumer_seek -String topic = "topic_meters"; -Map offset = null; -try (TaosConsumer consumer = new TaosConsumer<>(config)) { - consumer.subscribe(Collections.singletonList(topic)); - for (int i = 0; i < 10; i++) { - if (i == 3) { - // Saving consumption position - offset = consumer.position(topic); - } - if (i == 5) { - // reset consumption to the previously saved position - for (Map.Entry entry : offset.entrySet()) { - consumer.seek(entry.getKey(), entry.getValue()); + String topic = "topic_meters"; + Map offset = null; + try (TaosConsumer consumer = new TaosConsumer<>(config)) { + consumer.subscribe(Collections.singletonList(topic)); + for (int i = 0; i < 10; i++) { + if (i == 3) { + // Saving consumption position + offset = consumer.position(topic); + } + if (i == 5) { + // reset consumption to the previously saved position + for (Map.Entry entry : offset.entrySet()) { + consumer.seek(entry.getKey(), entry.getValue()); + } + } + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + // you can handle data here } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. server: " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); } - ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); - // you can handle data here - } -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to execute consumer functions. server: " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to execute consumer functions", ex); -} // ANCHOR_END: consumer_seek } } \ No newline at end of file diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java index 4831567eab..945c321260 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java @@ -16,97 +16,97 @@ public class JdbcBasicDemo { public static void main(String[] args) throws SQLException { -final String url = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; + final String url = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; // get connection -Properties properties = new Properties(); -properties.setProperty("charset", "UTF-8"); -properties.setProperty("locale", "en_US.UTF-8"); -properties.setProperty("timezone", "UTC-8"); -System.out.println("get connection starting..."); -try(Connection connection = DriverManager.getConnection(url, properties)){ + Properties properties = new Properties(); + properties.setProperty("charset", "UTF-8"); + properties.setProperty("locale", "en_US.UTF-8"); + properties.setProperty("timezone", "UTC-8"); + System.out.println("get connection starting..."); + try (Connection connection = DriverManager.getConnection(url, properties)) { -if (connection != null){ - System.out.println("[ OK ] Connection established."); -} else { - System.out.println("[ ERR ] Connection can not be established."); - return; -} + if (connection != null) { + System.out.println("[ OK ] Connection established."); + } else { + System.out.println("[ ERR ] Connection can not be established."); + return; + } -Statement stmt = connection.createStatement(); + Statement stmt = connection.createStatement(); // ANCHOR: create_db_and_table // create database -stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); + stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); // use database -stmt.executeUpdate("USE power"); + stmt.executeUpdate("USE power"); // create table -stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); + stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); // ANCHOR_END: create_db_and_table // ANCHOR: insert_data // insert data -String insertQuery = "INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 219, 0.31000) " + - "(NOW + 2a, 12.60000, 218, 0.33000) " + - "(NOW + 3a, 12.30000, 221, 0.31000) " + - "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 218, 0.25000) "; -int affectedRows = stmt.executeUpdate(insertQuery); -System.out.println("insert " + affectedRows + " rows."); + String insertQuery = "INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) "; + int affectedRows = stmt.executeUpdate(insertQuery); + System.out.println("insert " + affectedRows + " rows."); // ANCHOR_END: insert_data // ANCHOR: query_data // query data -ResultSet resultSet = stmt.executeQuery("SELECT * FROM meters"); + ResultSet resultSet = stmt.executeQuery("SELECT * FROM meters"); -Timestamp ts; -float current; -String location; -while(resultSet.next()){ - ts = resultSet.getTimestamp(1); - current = resultSet.getFloat(2); - location = resultSet.getString("location"); + Timestamp ts; + float current; + String location; + while (resultSet.next()) { + ts = resultSet.getTimestamp(1); + current = resultSet.getFloat(2); + location = resultSet.getString("location"); - System.out.printf("%s, %f, %s\n", ts, current, location); -} + System.out.printf("%s, %f, %s\n", ts, current, location); + } // ANCHOR_END: query_data // ANCHOR: with_reqid -AbstractStatement aStmt = (AbstractStatement) connection.createStatement(); -aStmt.execute("CREATE DATABASE IF NOT EXISTS power", 1L); -aStmt.executeUpdate("USE power", 2L); -try (ResultSet rs = aStmt.executeQuery("SELECT * FROM meters limit 1", 3L)) { - while(rs.next()){ - Timestamp timestamp = rs.getTimestamp(1); - System.out.println("timestamp = " + timestamp); - } -} -aStmt.close(); + AbstractStatement aStmt = (AbstractStatement) connection.createStatement(); + aStmt.execute("CREATE DATABASE IF NOT EXISTS power", 1L); + aStmt.executeUpdate("USE power", 2L); + try (ResultSet rs = aStmt.executeQuery("SELECT * FROM meters limit 1", 3L)) { + while (rs.next()) { + Timestamp timestamp = rs.getTimestamp(1); + System.out.println("timestamp = " + timestamp); + } + } + aStmt.close(); // ANCHOR_END: with_reqid -String sql = "SELECT * FROM meters limit 2;"; + String sql = "SELECT * FROM meters limit 2;"; // ANCHOR: jdbc_exception -try (Statement statement = connection.createStatement(); - // executeQuery - ResultSet tempResultSet = statement.executeQuery(sql)) { + try (Statement statement = connection.createStatement(); + // executeQuery + ResultSet tempResultSet = statement.executeQuery(sql)) { - // print result - printResult(tempResultSet); -} catch (SQLException e) { - System.out.println("ERROR Message: " + e.getMessage()); - System.out.println("ERROR Code: " + e.getErrorCode()); - e.printStackTrace(); -} + // print result + printResult(tempResultSet); + } catch (SQLException e) { + System.out.println("ERROR Message: " + e.getMessage()); + System.out.println("ERROR Code: " + e.getErrorCode()); + e.printStackTrace(); + } // ANCHOR_END: jdbc_exception } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java index d02b3c8789..f5f5b1c62c 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java @@ -15,37 +15,37 @@ public class JdbcCreatDBDemo { public static void main(String[] args) throws SQLException { -final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; + final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; // get connection -Properties properties = new Properties(); -properties.setProperty("charset", "UTF-8"); -properties.setProperty("locale", "en_US.UTF-8"); -properties.setProperty("timezone", "UTC-8"); -System.out.println("get connection starting..."); + Properties properties = new Properties(); + properties.setProperty("charset", "UTF-8"); + properties.setProperty("locale", "en_US.UTF-8"); + properties.setProperty("timezone", "UTC-8"); + System.out.println("get connection starting..."); // ANCHOR: create_db_and_table -try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - Statement stmt = connection.createStatement()) { + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + Statement stmt = connection.createStatement()) { - // create database - int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); - // you can check rowsAffected here - assert rowsAffected == 0; + // create database + int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); + // you can check rowsAffected here + assert rowsAffected == 0; - // use database - rowsAffected = stmt.executeUpdate("USE power"); - // you can check rowsAffected here - assert rowsAffected == 0; + // use database + rowsAffected = stmt.executeUpdate("USE power"); + // you can check rowsAffected here + assert rowsAffected == 0; - // create table - rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); - // you can check rowsAffected here - assert rowsAffected == 0; + // create table + rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); + // you can check rowsAffected here + assert rowsAffected == 0; -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create db and table, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); -} + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create db and table, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + } // ANCHOR_END: create_db_and_table } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java index 7fc6d5137d..f8e11a8f41 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java @@ -15,36 +15,36 @@ public class JdbcInsertDataDemo { public static void main(String[] args) throws SQLException { -final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; + final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; // get connection -Properties properties = new Properties(); -properties.setProperty("charset", "UTF-8"); -properties.setProperty("locale", "en_US.UTF-8"); -properties.setProperty("timezone", "UTC-8"); -System.out.println("get connection starting..."); + Properties properties = new Properties(); + properties.setProperty("charset", "UTF-8"); + properties.setProperty("locale", "en_US.UTF-8"); + properties.setProperty("timezone", "UTC-8"); + System.out.println("get connection starting..."); // ANCHOR: insert_data -try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - Statement stmt = connection.createStatement()) { + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + Statement stmt = connection.createStatement()) { - // insert data, please make sure the database and table are created before - String insertQuery = "INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 219, 0.31000) " + - "(NOW + 2a, 12.60000, 218, 0.33000) " + - "(NOW + 3a, 12.30000, 221, 0.31000) " + - "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 218, 0.25000) "; - int affectedRows = stmt.executeUpdate(insertQuery); - // you can check affectedRows here - System.out.println("inserted into " + affectedRows + " rows to power.meters successfully."); -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to insert data to power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + // insert data, please make sure the database and table are created before + String insertQuery = "INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) "; + int affectedRows = stmt.executeUpdate(insertQuery); + // you can check affectedRows here + System.out.println("inserted into " + affectedRows + " rows to power.meters successfully."); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to insert data to power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); -} + } // ANCHOR_END: insert_data } } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java index 30a835bb11..a4749afed5 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java @@ -15,36 +15,36 @@ public class JdbcQueryDemo { public static void main(String[] args) throws SQLException { -final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; + final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; // get connection -Properties properties = new Properties(); -properties.setProperty("charset", "UTF-8"); -properties.setProperty("locale", "en_US.UTF-8"); -properties.setProperty("timezone", "UTC-8"); -System.out.println("get connection starting..."); + Properties properties = new Properties(); + properties.setProperty("charset", "UTF-8"); + properties.setProperty("locale", "en_US.UTF-8"); + properties.setProperty("timezone", "UTC-8"); + System.out.println("get connection starting..."); // ANCHOR: query_data -try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - Statement stmt = connection.createStatement(); - // query data, make sure the database and table are created before - ResultSet resultSet = stmt.executeQuery("SELECT ts, current, location FROM power.meters limit 100")) { + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + Statement stmt = connection.createStatement(); + // query data, make sure the database and table are created before + ResultSet resultSet = stmt.executeQuery("SELECT ts, current, location FROM power.meters limit 100")) { - Timestamp ts; - float current; - String location; - while (resultSet.next()) { - ts = resultSet.getTimestamp(1); - current = resultSet.getFloat(2); - // we recommend using the column name to get the value - location = resultSet.getString("location"); + Timestamp ts; + float current; + String location; + while (resultSet.next()) { + ts = resultSet.getTimestamp(1); + current = resultSet.getFloat(2); + // we recommend using the column name to get the value + location = resultSet.getString("location"); - // you can check data here - System.out.printf("ts: %s, current: %f, location: %s %n", ts, current, location); - } -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to query data from power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); -} + // you can check data here + System.out.printf("ts: %s, current: %f, location: %s %n", ts, current, location); + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to query data from power.meters, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + } // ANCHOR_END: query_data } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java index cc557c18c5..3705e0aac2 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java @@ -15,31 +15,31 @@ public class JdbcReqIdDemo { public static void main(String[] args) throws SQLException { -final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; + final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password; // get connection -Properties properties = new Properties(); -properties.setProperty("charset", "UTF-8"); -properties.setProperty("locale", "en_US.UTF-8"); -properties.setProperty("timezone", "UTC-8"); -System.out.println("get connection starting..."); + Properties properties = new Properties(); + properties.setProperty("charset", "UTF-8"); + properties.setProperty("locale", "en_US.UTF-8"); + properties.setProperty("timezone", "UTC-8"); + System.out.println("get connection starting..."); // ANCHOR: with_reqid -try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - // Create a statement that allows specifying a request ID - AbstractStatement aStmt = (AbstractStatement) connection.createStatement()) { + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + // Create a statement that allows specifying a request ID + AbstractStatement aStmt = (AbstractStatement) connection.createStatement()) { + + try (ResultSet rs = aStmt.executeQuery("SELECT ts, current, location FROM power.meters limit 1", 3L)) { + while (rs.next()) { + Timestamp timestamp = rs.getTimestamp(1); + System.out.println("timestamp = " + timestamp); + } + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute sql with reqId, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - try (ResultSet rs = aStmt.executeQuery("SELECT ts, current, location FROM power.meters limit 1", 3L)) { - while (rs.next()) { - Timestamp timestamp = rs.getTimestamp(1); - System.out.println("timestamp = " + timestamp); } - } -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to execute sql with reqId, url:" + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - -} // ANCHOR_END: with_reqid } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java index d8084eb9f6..6caa42b1c1 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java @@ -18,37 +18,38 @@ import java.util.concurrent.TimeUnit; public class WsConsumerLoopFull { static private Connection connection; static private Statement statement; - public static TaosConsumer getConsumer() throws SQLException{ -// ANCHOR: create_consumer -Properties config = new Properties(); -config.setProperty("td.connect.type", "ws"); -config.setProperty("bootstrap.servers", "localhost:6041"); -config.setProperty("auto.offset.reset", "latest"); -config.setProperty("msg.with.table.name", "true"); -config.setProperty("enable.auto.commit", "true"); -config.setProperty("auto.commit.interval.ms", "1000"); -config.setProperty("group.id", "group1"); -config.setProperty("client.id", "client1"); -config.setProperty("td.connect.user", "root"); -config.setProperty("td.connect.pass", "taosdata"); -config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer"); -config.setProperty("value.deserializer.encoding", "UTF-8"); -try { - return new TaosConsumer<>(config); - } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create consumer", ex); - } catch (Exception e) { - e.printStackTrace(); - throw new SQLException("Failed to create consumer", e); - } + public static TaosConsumer getConsumer() throws SQLException { +// ANCHOR: create_consumer + Properties config = new Properties(); + config.setProperty("td.connect.type", "ws"); + config.setProperty("bootstrap.servers", "localhost:6041"); + config.setProperty("auto.offset.reset", "latest"); + config.setProperty("msg.with.table.name", "true"); + config.setProperty("enable.auto.commit", "true"); + config.setProperty("auto.commit.interval.ms", "1000"); + config.setProperty("group.id", "group1"); + config.setProperty("client.id", "client1"); + config.setProperty("td.connect.user", "root"); + config.setProperty("td.connect.pass", "taosdata"); + config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer"); + config.setProperty("value.deserializer.encoding", "UTF-8"); + + try { + return new TaosConsumer<>(config); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create consumer", ex); + } catch (Exception e) { + e.printStackTrace(); + throw new SQLException("Failed to create consumer", e); + } // ANCHOR_END: create_consumer -} + } public static void pollDataExample() throws SQLException { - try (TaosConsumer consumer = getConsumer()){ + try (TaosConsumer consumer = getConsumer()) { // subscribe to the topics List topics = Collections.singletonList("topic_meters"); @@ -75,178 +76,180 @@ try { public static void pollExample() throws SQLException { // ANCHOR: poll_data_code_piece -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("subscribe topics successfully"); - for (int i = 0; i < 50; i++) { - // poll data - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process the data here - System.out.println("data: " + JSON.toJSONString(bean)); + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + for (int i = 0; i < 50; i++) { + // poll data + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process the data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + } + + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data", ex); } - } - -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to poll data", ex); -} // ANCHOR_END: poll_data_code_piece } public static void seekExample() throws SQLException { // ANCHOR: consumer_seek -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - // subscribe to the topics - consumer.subscribe(topics); - System.out.println("subscribe topics successfully"); - ConsumerRecords records = ConsumerRecords.emptyRecord(); - // make sure we have got some data - while (records.isEmpty()){ - records = consumer.poll(Duration.ofMillis(100)); - } + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + ConsumerRecords records = ConsumerRecords.emptyRecord(); + // make sure we have got some data + while (records.isEmpty()) { + records = consumer.poll(Duration.ofMillis(100)); + } - for (ConsumerRecord record : records) { - System.out.println("first data polled: " + JSON.toJSONString(record.value())); - Set assignment = consumer.assignment(); - // seek to the beginning of the all partitions - consumer.seekToBeginning(assignment); - System.out.println("assignment seek to beginning successfully"); - break; - } + for (ConsumerRecord record : records) { + System.out.println("first data polled: " + JSON.toJSONString(record.value())); + Set assignment = consumer.assignment(); + // seek to the beginning of the all partitions + consumer.seekToBeginning(assignment); + System.out.println("assignment seek to beginning successfully"); + break; + } - // poll data agagin - records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - // process the data here - System.out.println("second data polled: " + JSON.toJSONString(record.value())); - break; - } -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("seek example failed", ex); -} + // poll data agagin + records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + // process the data here + System.out.println("second data polled: " + JSON.toJSONString(record.value())); + break; + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("seek example failed", ex); + } // ANCHOR_END: consumer_seek } public static void commitExample() throws SQLException { // ANCHOR: commit_code_piece -try (TaosConsumer consumer = getConsumer()){ - List topics = Collections.singletonList("topic_meters"); + try (TaosConsumer consumer = getConsumer()) { + List topics = Collections.singletonList("topic_meters"); - consumer.subscribe(topics); - for (int i = 0; i < 50; i++) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process your data here - System.out.println("data: " + JSON.toJSONString(bean)); + consumer.subscribe(topics); + for (int i = 0; i < 50; i++) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process your data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + if (!records.isEmpty()) { + // after processing the data, commit the offset manually + consumer.commitSync(); + } + } + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); } - if (!records.isEmpty()) { - // after processing the data, commit the offset manually - consumer.commitSync(); - } - } -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to execute consumer functions", ex); -} // ANCHOR_END: commit_code_piece } + public static void unsubscribeExample() throws SQLException { TaosConsumer consumer = getConsumer(); List topics = Collections.singletonList("topic_meters"); consumer.subscribe(topics); // ANCHOR: unsubscribe_data_code_piece -try { - consumer.unsubscribe(); -} catch (SQLException ex){ - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to unsubscribe consumer", ex); -} finally { - consumer.close(); -} + try { + consumer.unsubscribe(); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to unsubscribe consumer", ex); + } finally { + consumer.close(); + } // ANCHOR_END: unsubscribe_data_code_piece } -public static class ResultDeserializer extends ReferenceDeserializer { + public static class ResultDeserializer extends ReferenceDeserializer { -} -// use this class to define the data structure of the result record -public static class ResultBean { - private Timestamp ts; - private double current; - private int voltage; - private double phase; - private int groupid; - private String location; - - public Timestamp getTs() { - return ts; } - public void setTs(Timestamp ts) { - this.ts = ts; + // use this class to define the data structure of the result record + public static class ResultBean { + private Timestamp ts; + private double current; + private int voltage; + private double phase; + private int groupid; + private String location; + + public Timestamp getTs() { + return ts; + } + + public void setTs(Timestamp ts) { + this.ts = ts; + } + + public double getCurrent() { + return current; + } + + public void setCurrent(double current) { + this.current = current; + } + + public int getVoltage() { + return voltage; + } + + public void setVoltage(int voltage) { + this.voltage = voltage; + } + + public double getPhase() { + return phase; + } + + public void setPhase(double phase) { + this.phase = phase; + } + + public int getGroupid() { + return groupid; + } + + public void setGroupid(int groupid) { + this.groupid = groupid; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } } - public double getCurrent() { - return current; - } - - public void setCurrent(double current) { - this.current = current; - } - - public int getVoltage() { - return voltage; - } - - public void setVoltage(int voltage) { - this.voltage = voltage; - } - - public double getPhase() { - return phase; - } - - public void setPhase(double phase) { - this.phase = phase; - } - - public int getGroupid() { - return groupid; - } - - public void setGroupid(int groupid) { - this.groupid = groupid; - } - - public String getLocation() { - return location; - } - - public void setLocation(String location) { - this.location = location; - } -} - - public static void prepareData() throws SQLException{ + public static void prepareData() throws SQLException { StringBuilder insertQuery = new StringBuilder(); insertQuery.append("INSERT INTO " + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + "VALUES "); - for (int i = 0; i < 10000; i++){ + for (int i = 0; i < 10000; i++) { insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) "); } try { @@ -257,7 +260,8 @@ public static class ResultBean { throw new SQLException("Failed to insert data to power.meters", ex); } } - public static void prepareMeta() throws SQLException{ + + public static void prepareMeta() throws SQLException { try { statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); statement.executeUpdate("USE power"); @@ -289,6 +293,7 @@ public static class ResultBean { } System.out.println("Connection created successfully."); } + public static void closeConnection() throws SQLException { try { if (statement != null) { @@ -338,16 +343,16 @@ public static class ResultBean { System.out.println("Data prepared successfully"); - // 关闭线程池,不再接收新任务 + // close the executor, which will make the executor reject new tasks executor.shutdown(); try { - // 等待直到所有任务完成 + // wait for the executor to terminate boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); assert result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - } catch (Exception e){ + } catch (Exception e) { e.printStackTrace(); System.out.println("Wait executor termination failed."); } From 6f259ab2983417406b109244e06556784a5cbbb0 Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Fri, 2 Aug 2024 16:32:21 +0800 Subject: [PATCH 2/3] mod rust example --- .../rust/nativeexample/examples/tmq.rs | 39 ++--- docs/examples/rust/restexample/Cargo.toml | 2 + .../restexample/examples/subscribe_demo.rs | 135 ++++++++++++++++++ .../examples/rust/restexample/examples/tmq.rs | 63 ++------ docs/zh/08-develop/07-tmq.md | 50 ++++++- 5 files changed, 217 insertions(+), 72 deletions(-) create mode 100644 docs/examples/rust/restexample/examples/subscribe_demo.rs diff --git a/docs/examples/rust/nativeexample/examples/tmq.rs b/docs/examples/rust/nativeexample/examples/tmq.rs index 04bf66b2ef..c8f2b0a19a 100644 --- a/docs/examples/rust/nativeexample/examples/tmq.rs +++ b/docs/examples/rust/nativeexample/examples/tmq.rs @@ -9,9 +9,11 @@ async fn main() -> anyhow::Result<()> { .filter_level(log::LevelFilter::Info) .init(); use taos_query::prelude::*; + // ANCHOR: create_consumer_dsn let dsn = "taos://localhost:6030".to_string(); log::info!("dsn: {}", dsn); let mut dsn = Dsn::from_str(&dsn)?; + // ANCHOR_END: create_consumer_dsn let taos = TaosBuilder::from_dsn(&dsn)?.build().await?; @@ -43,7 +45,7 @@ async fn main() -> anyhow::Result<()> { .await?; // ANCHOR_END: create_topic - // ANCHOR: create_consumer + // ANCHOR: create_consumer_ac dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string()); dsn.params.insert("msg.with.table.name".to_string(), "true".to_string()); dsn.params.insert("enable.auto.commit".to_string(), "true".to_string()); @@ -53,13 +55,11 @@ async fn main() -> anyhow::Result<()> { let builder = TmqBuilder::from_dsn(&dsn)?; let mut consumer = builder.build().await?; - // ANCHOR_END: create_consumer - - // ANCHOR: subscribe - consumer.subscribe(["topic_meters"]).await?; - // ANCHOR_END: subscribe + // ANCHOR_END: create_consumer_ac // ANCHOR: consume + consumer.subscribe(["topic_meters"]).await?; + { let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1)); @@ -69,7 +69,7 @@ async fn main() -> anyhow::Result<()> { let database = offset.database(); let vgroup_id = offset.vgroup_id(); log::debug!( - "topic: {}, database: {}, vgroup_id: {}", + "receive message from: topic: {}, database: {}, vgroup_id: {}", topic, database, vgroup_id @@ -84,13 +84,13 @@ async fn main() -> anyhow::Result<()> { let json = meta.as_json_meta().await?; let sql = json.to_string(); if let Err(err) = taos.exec(sql).await { - println!("maybe error: {}", err); + println!("maybe error in handling meta message: {}", err); } } MessageSet::Data(data) => { log::info!("Data"); while let Some(data) = data.fetch_raw_block().await? { - log::debug!("data: {:?}", data); + log::debug!("message data: {:?}", data); } } MessageSet::MetaData(meta, data) => { @@ -101,24 +101,24 @@ async fn main() -> anyhow::Result<()> { let json = meta.as_json_meta().await?; let sql = json.to_string(); if let Err(err) = taos.exec(sql).await { - println!("maybe error: {}", err); + println!("maybe error in handling metadata message: {}", err); } while let Some(data) = data.fetch_raw_block().await? { - log::debug!("data: {:?}", data); + log::debug!("message data: {:?}", data); } } } + // commit offset manually when handling message successfully consumer.commit(offset).await?; } } // ANCHOR_END: consume - // ANCHOR: assignments + // ANCHOR: seek_offset let assignments = consumer.assignments().await.unwrap(); - log::info!("assignments: {:?}", assignments); - // ANCHOR_END: assignments - + log::info!("start assignments: {:?}", assignments); + // seek offset for topic_vec_assignment in assignments { let topic = &topic_vec_assignment.0; @@ -136,23 +136,24 @@ async fn main() -> anyhow::Result<()> { begin, end ); - // ANCHOR: seek_offset - let res = consumer.offset_seek(topic, vgroup_id, end).await; + + // seek offset of the (topic, vgroup_id) to the begin + let res = consumer.offset_seek(topic, vgroup_id, begin).await; if res.is_err() { log::error!("seek offset error: {:?}", res); let a = consumer.assignments().await.unwrap(); log::error!("assignments: {:?}", a); } - // ANCHOR_END: seek_offset } let topic_assignment = consumer.topic_assignment(topic).await; log::debug!("topic assignment: {:?}", topic_assignment); } - + // after seek offset let assignments = consumer.assignments().await.unwrap(); log::info!("after seek offset assignments: {:?}", assignments); + // ANCHOR_END: seek_offset // ANCHOR: unsubscribe consumer.unsubscribe().await; diff --git a/docs/examples/rust/restexample/Cargo.toml b/docs/examples/rust/restexample/Cargo.toml index 5fffe215d4..7b13b48104 100644 --- a/docs/examples/rust/restexample/Cargo.toml +++ b/docs/examples/rust/restexample/Cargo.toml @@ -8,5 +8,7 @@ anyhow = "1" chrono = "0.4" serde = { version = "1", features = ["derive"] } tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } +log = "0.4" +pretty_env_logger = "0.5.0" taos = { version = "0.*" } diff --git a/docs/examples/rust/restexample/examples/subscribe_demo.rs b/docs/examples/rust/restexample/examples/subscribe_demo.rs new file mode 100644 index 0000000000..8486419386 --- /dev/null +++ b/docs/examples/rust/restexample/examples/subscribe_demo.rs @@ -0,0 +1,135 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +// Query options 2, use deserialization with serde. +#[derive(Debug, serde::Deserialize)] +#[allow(dead_code)] +struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, +} + +async fn prepare(taos: Taos) -> anyhow::Result<()> { + let inserted = taos.exec_many([ + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + assert_eq!(inserted, 6); + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build().await?; + let db = "tmq"; + + // prepare database + taos.exec_many([ + format!("DROP TOPIC IF EXISTS tmq_meters"), + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"), + format!("USE `{db}`"), + // create super table + format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"), + // create topic for subscription + format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`") + ]) + .await?; + + let task = tokio::spawn(prepare(taos)); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // subscribe + let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; + + let mut consumer = tmq.build().await?; + consumer.subscribe(["tmq_meters"]).await?; + + // ANCHOR: consumer_commit_manually + consumer + .stream() + .try_for_each(|(offset, message)| async { + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + let records: Vec = block.deserialize().try_collect()?; + println!("** read {} records: {:#?}\n", records.len(), records); + } + } + // commit offset manually when you have processed the message. + consumer.commit(offset).await?; + Ok(()) + }) + .await?; + // ANCHOR_END: consumer_commit_manually + + + // ANCHOR: assignments + let assignments = consumer.assignments().await.unwrap(); + log::info!("assignments: {:?}", assignments); + // ANCHOR_END: assignments + + // seek offset + for topic_vec_assignment in assignments { + let topic = &topic_vec_assignment.0; + let vec_assignment = topic_vec_assignment.1; + for assignment in vec_assignment { + let vgroup_id = assignment.vgroup_id(); + let current = assignment.current_offset(); + let begin = assignment.begin(); + let end = assignment.end(); + log::debug!( + "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}", + topic, + vgroup_id, + current, + begin, + end + ); + // ANCHOR: seek_offset + let res = consumer.offset_seek(topic, vgroup_id, end).await; + if res.is_err() { + log::error!("seek offset error: {:?}", res); + let a = consumer.assignments().await.unwrap(); + log::error!("assignments: {:?}", a); + } + // ANCHOR_END: seek_offset + } + + let topic_assignment = consumer.topic_assignment(topic).await; + log::debug!("topic assignment: {:?}", topic_assignment); + } + + // after seek offset + let assignments = consumer.assignments().await.unwrap(); + log::info!("after seek offset assignments: {:?}", assignments); + + consumer.unsubscribe().await; + + task.await??; + + Ok(()) +} diff --git a/docs/examples/rust/restexample/examples/tmq.rs b/docs/examples/rust/restexample/examples/tmq.rs index 0438e47515..b931a871b6 100644 --- a/docs/examples/rust/restexample/examples/tmq.rs +++ b/docs/examples/rust/restexample/examples/tmq.rs @@ -42,7 +42,7 @@ async fn main() -> anyhow::Result<()> { .await?; // ANCHOR_END: create_topic - // ANCHOR: create_consumer + // ANCHOR: create_consumer_ac dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string()); dsn.params.insert("msg.with.table.name".to_string(), "true".to_string()); dsn.params.insert("enable.auto.commit".to_string(), "true".to_string()); @@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> { let builder = TmqBuilder::from_dsn(&dsn)?; let mut consumer = builder.build().await?; - // ANCHOR_END: create_consumer + // ANCHOR_END: create_consumer_ac // ANCHOR: subscribe consumer.subscribe(["topic_meters"]).await?; @@ -60,56 +60,23 @@ async fn main() -> anyhow::Result<()> { // ANCHOR: consume { - let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1)); - - while let Some((offset, message)) = stream.try_next().await? { - - let topic: &str = offset.topic(); - let database = offset.database(); + consumer + .stream() + .try_for_each(|(offset, message)| async { + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. let vgroup_id = offset.vgroup_id(); - log::debug!( - "topic: {}, database: {}, vgroup_id: {}", - topic, - database, - vgroup_id - ); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); - match message { - MessageSet::Meta(meta) => { - log::info!("Meta"); - let raw = meta.as_raw_meta().await?; - taos.write_raw_meta(&raw).await?; - - let json = meta.as_json_meta().await?; - let sql = json.to_string(); - if let Err(err) = taos.exec(sql).await { - println!("maybe error: {}", err); - } - } - MessageSet::Data(data) => { - log::info!("Data"); - while let Some(data) = data.fetch_raw_block().await? { - log::debug!("data: {:?}", data); - } - } - MessageSet::MetaData(meta, data) => { - log::info!("MetaData"); - let raw = meta.as_raw_meta().await?; - taos.write_raw_meta(&raw).await?; - - let json = meta.as_json_meta().await?; - let sql = json.to_string(); - if let Err(err) = taos.exec(sql).await { - println!("maybe error: {}", err); - } - - while let Some(data) = data.fetch_raw_block().await? { - log::debug!("data: {:?}", data); - } + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + let records: Vec = block.deserialize().try_collect()?; + println!("** read {} records: {:#?}\n", records.len(), records); } } - consumer.commit(offset).await?; - } + Ok(()) + }) + .await?; } // ANCHOR_END: consume diff --git a/docs/zh/08-develop/07-tmq.md b/docs/zh/08-develop/07-tmq.md index 6f259cf521..36ff09d0f4 100644 --- a/docs/zh/08-develop/07-tmq.md +++ b/docs/zh/08-develop/07-tmq.md @@ -131,8 +131,11 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```rust {{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer_dsn}} +``` -{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer}} + +```rust +{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer_ac}} ``` @@ -190,6 +193,13 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```rust +{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer_dsn}} +``` + +```rust +{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer_ac}} +``` @@ -242,7 +252,12 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +消费者可订阅一个或多个 `TOPIC`,一般建议一个消费者只订阅一个 `TOPIC`。 +TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。 +```rust +{{#include docs/examples/rust/restexample/examples/tmq.rs:consume}} +``` @@ -290,7 +305,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 - +同 Websocket 示例代码 @@ -322,6 +337,10 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```java {{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:consumer_seek}} ``` +1. 使用 consumer.poll 方法轮询数据,直到获取到数据为止。 +2. 对于轮询到的第一批数据,打印第一条数据的内容,并获取当前消费者的分区分配信息。 +3. 使用 consumer.seekToBeginning 方法将所有分区的偏移量重置到开始位置,并打印成功重置的消息。 +4. 再次使用 consumer.poll 方法轮询数据,并打印第一条数据的内容。 @@ -340,6 +359,16 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```rust +{{#include docs/examples/rust/nativeexample/examples/tmq.rs:seek_offset}} +``` + +1. 通过调用 consumer.assignments() 方法获取消费者当前的分区分配信息,并记录初始分配状态。 +2. 遍历每个分区分配信息,对于每个分区:提取主题(topic)、消费组ID(vgroup_id)、当前偏移量(current)、起始偏移量(begin)和结束偏移量(end)。 +记录这些信息。 +1. 调用 consumer.offset_seek 方法将偏移量设置到起始位置。如果操作失败,记录错误信息和当前分配状态。 +2. 在所有分区的偏移量调整完成后,再次获取并记录消费者的分区分配信息,以确认偏移量调整后的状态。 + @@ -387,7 +416,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 - +同 Websocket 代码样例。 @@ -413,6 +442,9 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ## 提交 Offset 当消费者读取并处理完消息后,它可以提交 Offset,这表示消费者已经成功处理到这个 Offset 的消息。Offset 提交可以是自动的(根据配置定期提交)或手动的(应用程序控制何时提交)。 当创建消费者时,属性 `enable.auto.commit` 为 false 时,可以手动提交 offset。 + +**注意**:手工提交消费进度前确保消息正常处理完成,否则处理出错的消息不会被再次消费。自动提交是在本次 `poll` 消息时可能会提交上次消息的消费进度,因此请确保消息处理完毕再进行下一次 `poll` 或消息获取。 + ### Websocket 连接 @@ -438,7 +470,11 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```rust +{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs:consumer_commit_manually}} +``` +可以通过 `consumer.commit` 方法来手工提交消费进度。 @@ -488,7 +524,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 - +同 Websocket 代码样例。 @@ -543,7 +579,11 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```rust +{{#include docs/examples/rust/restexample/examples/tmq.rs:unsubscribe}} +``` +**注意**:消费者取消订阅后无法重用,如果想订阅新的 `topic`, 请重新创建消费者。 @@ -592,7 +632,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 - +同 Websocket 代码样例。 From 8c6ff828e913e2c2eeef766a9c31b15146cd7b1b Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Fri, 2 Aug 2024 17:06:40 +0800 Subject: [PATCH 3/3] mod rust example --- .../nativeexample/examples/subscribe_demo.rs | 48 +++++++- .../restexample/examples/subscribe_demo.rs | 4 +- docs/zh/08-develop/01-connect/index.md | 82 ------------- docs/zh/08-develop/02-sql.md | 21 +--- docs/zh/08-develop/04-schemaless.md | 12 -- docs/zh/08-develop/05-stmt.md | 13 --- docs/zh/08-develop/07-tmq.md | 110 +----------------- 7 files changed, 56 insertions(+), 234 deletions(-) diff --git a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs index d54bb60e93..8486419386 100644 --- a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs +++ b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs @@ -38,7 +38,7 @@ async fn main() -> anyhow::Result<()> { let dsn = "taos://localhost:6030"; let builder = TaosBuilder::from_dsn(dsn)?; - let taos = builder.build()?; + let taos = builder.build().await?; let db = "tmq"; // prepare database @@ -61,9 +61,10 @@ async fn main() -> anyhow::Result<()> { // subscribe let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; - let mut consumer = tmq.build()?; + let mut consumer = tmq.build().await?; consumer.subscribe(["tmq_meters"]).await?; + // ANCHOR: consumer_commit_manually consumer .stream() .try_for_each(|(offset, message)| async { @@ -78,11 +79,54 @@ async fn main() -> anyhow::Result<()> { println!("** read {} records: {:#?}\n", records.len(), records); } } + // commit offset manually when you have processed the message. consumer.commit(offset).await?; Ok(()) }) .await?; + // ANCHOR_END: consumer_commit_manually + + // ANCHOR: assignments + let assignments = consumer.assignments().await.unwrap(); + log::info!("assignments: {:?}", assignments); + // ANCHOR_END: assignments + + // seek offset + for topic_vec_assignment in assignments { + let topic = &topic_vec_assignment.0; + let vec_assignment = topic_vec_assignment.1; + for assignment in vec_assignment { + let vgroup_id = assignment.vgroup_id(); + let current = assignment.current_offset(); + let begin = assignment.begin(); + let end = assignment.end(); + log::debug!( + "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}", + topic, + vgroup_id, + current, + begin, + end + ); + // ANCHOR: seek_offset + let res = consumer.offset_seek(topic, vgroup_id, end).await; + if res.is_err() { + log::error!("seek offset error: {:?}", res); + let a = consumer.assignments().await.unwrap(); + log::error!("assignments: {:?}", a); + } + // ANCHOR_END: seek_offset + } + + let topic_assignment = consumer.topic_assignment(topic).await; + log::debug!("topic assignment: {:?}", topic_assignment); + } + + // after seek offset + let assignments = consumer.assignments().await.unwrap(); + log::info!("after seek offset assignments: {:?}", assignments); + consumer.unsubscribe().await; task.await??; diff --git a/docs/examples/rust/restexample/examples/subscribe_demo.rs b/docs/examples/rust/restexample/examples/subscribe_demo.rs index 8486419386..2e95fb0543 100644 --- a/docs/examples/rust/restexample/examples/subscribe_demo.rs +++ b/docs/examples/rust/restexample/examples/subscribe_demo.rs @@ -35,7 +35,7 @@ async fn prepare(taos: Taos) -> anyhow::Result<()> { #[tokio::main] async fn main() -> anyhow::Result<()> { - let dsn = "taos://localhost:6030"; + let dsn = "ws://localhost:6041"; let builder = TaosBuilder::from_dsn(dsn)?; let taos = builder.build().await?; @@ -59,7 +59,7 @@ async fn main() -> anyhow::Result<()> { tokio::time::sleep(Duration::from_secs(1)).await; // subscribe - let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; + let tmq = TmqBuilder::from_dsn("ws://localhost:6041/?group.id=test")?; let mut consumer = tmq.build().await?; consumer.subscribe(["tmq_meters"]).await?; diff --git a/docs/zh/08-develop/01-connect/index.md b/docs/zh/08-develop/01-connect/index.md index 21d11df23c..0e92400d9e 100644 --- a/docs/zh/08-develop/01-connect/index.md +++ b/docs/zh/08-develop/01-connect/index.md @@ -13,8 +13,6 @@ import ConnNode from "./_connect_node.mdx"; import ConnPythonNative from "./_connect_python.mdx"; import ConnCSNative from "./_connect_cs.mdx"; import ConnC from "./_connect_c.mdx"; -import ConnR from "./_connect_r.mdx"; -import ConnPHP from "./_connect_php.mdx"; import InstallOnLinux from "../../14-reference/05-connector/_linux_install.mdx"; import InstallOnWindows from "../../14-reference/05-connector/_windows_install.mdx"; import InstallOnMacOS from "../../14-reference/05-connector/_macos_install.mdx"; @@ -249,62 +247,12 @@ dotnet add package TDengine.Connector ::: - - - -1. 下载 [taos-jdbcdriver-version-dist.jar](https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.0.0/)。 -2. 安装 R 的依赖包`RJDBC`: - -```R -install.packages("RJDBC") -``` - 如果已经安装了 TDengine 服务端软件或 TDengine 客户端驱动 taosc, 那么已经安装了 C 连接器,无需额外操作。
-
- - -**下载代码并解压:** - -```shell -curl -L -o php-tdengine.tar.gz https://github.com/Yurunsoft/php-tdengine/archive/refs/tags/v1.0.2.tar.gz \ -&& mkdir php-tdengine \ -&& tar -xzf php-tdengine.tar.gz -C php-tdengine --strip-components=1 -``` - -> 版本 `v1.0.2` 只是示例,可替换为任意更新的版本,可在 [TDengine PHP Connector 发布历史](https://github.com/Yurunsoft/php-tdengine/releases) 中查看可用版本。 - -**非 Swoole 环境:** - -```shell -phpize && ./configure && make -j && make install -``` - -**手动指定 TDengine 目录:** - -```shell -phpize && ./configure --with-tdengine-dir=/usr/local/Cellar/tdengine/3.0.0.0 && make -j && make install -``` - -> `--with-tdengine-dir=` 后跟上 TDengine 目录。 -> 适用于默认找不到的情况,或者 macOS 系统用户。 - -**Swoole 环境:** - -```shell -phpize && ./configure --enable-swoole && make -j && make install -``` - -**启用扩展:** - -方法一:在 `php.ini` 中加入 `extension=tdengine` - -方法二:运行带参数 `php -d extension=tdengine test.php` -
@@ -404,8 +352,6 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto - `reconnectRetryCount`:重连次数,默认为 3。 - `reconnectIntervalMs`:重连间隔毫秒时间,默认为 2000。
- - 使用客户端驱动访问 TDengine 集群的基本过程为:建立连接、查询和写入、关闭连接、清除资源。 @@ -433,10 +379,6 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto ::: - - - - @@ -475,15 +417,9 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto {{#include docs/examples/csharp/wsConnect/Program.cs:main}} ``` - - - - - - ### 原生连接 @@ -513,15 +449,9 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto {{#include docs/examples/csharp/connect/Program.cs:main}} ``` - - - - - - @@ -550,15 +480,9 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto 不支持 - - - - - - @@ -642,14 +566,8 @@ let taos = pool.get()?; 不支持 - - - - - - diff --git a/docs/zh/08-develop/02-sql.md b/docs/zh/08-develop/02-sql.md index 8871dee786..01d94cca53 100644 --- a/docs/zh/08-develop/02-sql.md +++ b/docs/zh/08-develop/02-sql.md @@ -13,9 +13,8 @@ TDengine 对 SQL 语言提供了全面的支持,允许用户以熟悉的 SQL :::note -REST 连接:各编程语言的连接器封装使用 `HTTP` 请求的连接,支持数据写入和查询操作。 - -REST API:通过 `curl` 命令进行数据写入和查询操作。 +REST 连接:各编程语言的连接器封装使用 `HTTP` 请求的连接,支持数据写入和查询操作,开发者依然使用连接器提供的接口访问 `TDengine`。 +REST API:直接调用 `taosadapter` 提供的 REST API 接口,进行数据写入和查询操作。代码示例使用 `curl` 命令来演示。 ::: @@ -68,16 +67,12 @@ REST API:通过 `curl` 命令进行数据写入和查询操作。 {{#include docs/examples/csharp/wsInsert/Program.cs:create_db_and_table}} ``` - - ```c {{#include docs/examples/c/CCreateDBDemo.c:create_db_and_table}} ``` > **注意**:如果不使用 `USE power` 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 power.meters。 - - 创建数据库 @@ -149,8 +144,6 @@ NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW {{#include docs/examples/csharp/wsInsert/Program.cs:insert_data}} ``` - - ```c {{#include docs/examples/c/CInsertDataDemo.c:insert_data}} @@ -159,8 +152,6 @@ NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW **Note** NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW + 1s 代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。 - - 写入数据 @@ -222,15 +213,11 @@ curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql' \ {{#include docs/examples/csharp/wsInsert/Program.cs:select_data}} ``` - - ```c {{#include docs/examples/c/CQueryDataDemo.c:query_data}} ``` - - 查询数据 @@ -288,15 +275,11 @@ reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId {{#include docs/examples/csharp/wsInsert/Program.cs:query_id}} ``` - - ```c {{#include docs/examples/c/CWithReqIdDemo.c:with_reqid}} ``` - - 查询数据,指定 reqId 为 3 diff --git a/docs/zh/08-develop/04-schemaless.md b/docs/zh/08-develop/04-schemaless.md index 611878960d..5d376222e7 100644 --- a/docs/zh/08-develop/04-schemaless.md +++ b/docs/zh/08-develop/04-schemaless.md @@ -200,12 +200,8 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO {{#include docs/examples/csharp/wssml/Program.cs:main}} ``` - - - - ### 原生连接 @@ -234,12 +230,8 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO {{#include docs/examples/csharp/nativesml/Program.cs:main}} ``` - - - - @@ -262,12 +254,8 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO 不支持 - - - - diff --git a/docs/zh/08-develop/05-stmt.md b/docs/zh/08-develop/05-stmt.md index d3e71eed63..f60e7b1705 100644 --- a/docs/zh/08-develop/05-stmt.md +++ b/docs/zh/08-develop/05-stmt.md @@ -60,15 +60,9 @@ import TabItem from "@theme/TabItem"; ```csharp {{#include docs/examples/csharp/wsStmt/Program.cs:main}} ``` - - - - - - @@ -98,15 +92,8 @@ import TabItem from "@theme/TabItem"; ```csharp {{#include docs/examples/csharp/stmtInsert/Program.cs:main}} ``` - - - - - - - diff --git a/docs/zh/08-develop/07-tmq.md b/docs/zh/08-develop/07-tmq.md index d2b759bfd7..ff7ac5d997 100644 --- a/docs/zh/08-develop/07-tmq.md +++ b/docs/zh/08-develop/07-tmq.md @@ -90,15 +90,9 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 其他参数见上表。 - - - - - - @@ -153,16 +147,8 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ``` - - - - - - - - @@ -212,17 +198,10 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ``` - - - - - - - ## 订阅消费数据 @@ -276,16 +255,8 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ``` - - - - - - - - @@ -320,17 +291,10 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ``` - - - - - - - ## 指定订阅的 Offset @@ -390,17 +354,10 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ``` - - - - - - - ### 原生连接 @@ -434,16 +391,8 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ``` - - - - - - - - @@ -499,18 +448,9 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ``` - - - - - - - - - ### 原生连接 @@ -549,18 +489,9 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ``` - - - - - - - - - @@ -611,18 +542,10 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ``` - - - - - - - - ### 原生连接 @@ -660,18 +583,10 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ``` - - - - - - - - @@ -703,7 +618,9 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur - +```rust +{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs}} +``` @@ -719,18 +636,10 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ``` - - - - - - - - ### 原生连接 @@ -763,7 +672,9 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur - +```rust +{{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}} +``` @@ -776,16 +687,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ``` - - - - - - - - -