mod java code

This commit is contained in:
sheyanjie-qq 2024-08-02 14:50:07 +08:00 committed by gccgdb1234
parent ee5a8a96b0
commit 4135d08519
14 changed files with 592 additions and 572 deletions

View File

@ -64,5 +64,4 @@ while let Some(row) = rows.try_next().await? {
// ANCHOR: query_with_req_id
let result = taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", 1).await?;
// ANCHOR_END: query_with_req_id
}

View File

@ -44,8 +44,12 @@ async fn main() -> anyhow::Result<()> {
// ANCHOR_END: create_topic
// ANCHOR: create_consumer
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string());
dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), "group1".to_string());
dsn.params.insert("client.id".to_string(), "client1".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;

View File

@ -9,9 +9,11 @@ async fn main() -> anyhow::Result<()> {
.filter_level(log::LevelFilter::Info)
.init();
use taos_query::prelude::*;
// ANCHOR: create_consumer_dsn
let dsn = "ws://localhost:6041".to_string();
log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
// ANCHOR_END: create_consumer_dsn
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
@ -41,8 +43,12 @@ async fn main() -> anyhow::Result<()> {
// ANCHOR_END: create_topic
// ANCHOR: create_consumer
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string());
dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), "group1".to_string());
dsn.params.insert("client.id".to_string(), "client1".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;

View File

@ -104,9 +104,10 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
<TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer}}
{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer_dsn}}
{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer}}
```
</TabItem>

View File

@ -15,6 +15,7 @@ import java.util.concurrent.TimeUnit;
public class ConsumerLoopFull {
static private Connection connection;
static private Statement statement;
public static TaosConsumer<ResultBean> getConsumer() throws SQLException {
// ANCHOR: create_consumer
Properties config = new Properties();
@ -162,6 +163,7 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
}
// ANCHOR_END: commit_code_piece
}
public static void unsubscribeExample() throws SQLException {
TaosConsumer<ResultBean> consumer = getConsumer();
List<String> topics = Collections.singletonList("topic_meters");
@ -182,6 +184,7 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
@ -256,6 +259,7 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
throw new SQLException("Failed to insert data to power.meters", ex);
}
}
public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
@ -288,6 +292,7 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
@ -337,11 +342,11 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
System.out.println("Data prepared successfully");
// 关闭线程池不再接收新任务
// close the executor, which will make the executor reject new tasks
executor.shutdown();
try {
// 等待直到所有任务完成
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {

View File

@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit;
public class WsConsumerLoopFull {
static private Connection connection;
static private Statement statement;
public static TaosConsumer<ResultBean> getConsumer() throws SQLException {
// ANCHOR: create_consumer
Properties config = new Properties();
@ -163,6 +164,7 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
}
// ANCHOR_END: commit_code_piece
}
public static void unsubscribeExample() throws SQLException {
TaosConsumer<ResultBean> consumer = getConsumer();
List<String> topics = Collections.singletonList("topic_meters");
@ -183,6 +185,7 @@ try {
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
@ -257,6 +260,7 @@ public static class ResultBean {
throw new SQLException("Failed to insert data to power.meters", ex);
}
}
public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
@ -289,6 +293,7 @@ public static class ResultBean {
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
@ -338,11 +343,11 @@ public static class ResultBean {
System.out.println("Data prepared successfully");
// 关闭线程池不再接收新任务
// close the executor, which will make the executor reject new tasks
executor.shutdown();
try {
// 等待直到所有任务完成
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {