diff --git a/docs/examples/rust/nativeexample/examples/tmq.rs b/docs/examples/rust/nativeexample/examples/tmq.rs index 04bf66b2ef..c8f2b0a19a 100644 --- a/docs/examples/rust/nativeexample/examples/tmq.rs +++ b/docs/examples/rust/nativeexample/examples/tmq.rs @@ -9,9 +9,11 @@ async fn main() -> anyhow::Result<()> { .filter_level(log::LevelFilter::Info) .init(); use taos_query::prelude::*; + // ANCHOR: create_consumer_dsn let dsn = "taos://localhost:6030".to_string(); log::info!("dsn: {}", dsn); let mut dsn = Dsn::from_str(&dsn)?; + // ANCHOR_END: create_consumer_dsn let taos = TaosBuilder::from_dsn(&dsn)?.build().await?; @@ -43,7 +45,7 @@ async fn main() -> anyhow::Result<()> { .await?; // ANCHOR_END: create_topic - // ANCHOR: create_consumer + // ANCHOR: create_consumer_ac dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string()); dsn.params.insert("msg.with.table.name".to_string(), "true".to_string()); dsn.params.insert("enable.auto.commit".to_string(), "true".to_string()); @@ -53,13 +55,11 @@ async fn main() -> anyhow::Result<()> { let builder = TmqBuilder::from_dsn(&dsn)?; let mut consumer = builder.build().await?; - // ANCHOR_END: create_consumer - - // ANCHOR: subscribe - consumer.subscribe(["topic_meters"]).await?; - // ANCHOR_END: subscribe + // ANCHOR_END: create_consumer_ac // ANCHOR: consume + consumer.subscribe(["topic_meters"]).await?; + { let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1)); @@ -69,7 +69,7 @@ async fn main() -> anyhow::Result<()> { let database = offset.database(); let vgroup_id = offset.vgroup_id(); log::debug!( - "topic: {}, database: {}, vgroup_id: {}", + "receive message from: topic: {}, database: {}, vgroup_id: {}", topic, database, vgroup_id @@ -84,13 +84,13 @@ async fn main() -> anyhow::Result<()> { let json = meta.as_json_meta().await?; let sql = json.to_string(); if let Err(err) = taos.exec(sql).await { - println!("maybe error: {}", err); + println!("maybe error in handling meta message: {}", err); } } MessageSet::Data(data) => { log::info!("Data"); while let Some(data) = data.fetch_raw_block().await? { - log::debug!("data: {:?}", data); + log::debug!("message data: {:?}", data); } } MessageSet::MetaData(meta, data) => { @@ -101,24 +101,24 @@ async fn main() -> anyhow::Result<()> { let json = meta.as_json_meta().await?; let sql = json.to_string(); if let Err(err) = taos.exec(sql).await { - println!("maybe error: {}", err); + println!("maybe error in handling metadata message: {}", err); } while let Some(data) = data.fetch_raw_block().await? { - log::debug!("data: {:?}", data); + log::debug!("message data: {:?}", data); } } } + // commit offset manually when handling message successfully consumer.commit(offset).await?; } } // ANCHOR_END: consume - // ANCHOR: assignments + // ANCHOR: seek_offset let assignments = consumer.assignments().await.unwrap(); - log::info!("assignments: {:?}", assignments); - // ANCHOR_END: assignments - + log::info!("start assignments: {:?}", assignments); + // seek offset for topic_vec_assignment in assignments { let topic = &topic_vec_assignment.0; @@ -136,23 +136,24 @@ async fn main() -> anyhow::Result<()> { begin, end ); - // ANCHOR: seek_offset - let res = consumer.offset_seek(topic, vgroup_id, end).await; + + // seek offset of the (topic, vgroup_id) to the begin + let res = consumer.offset_seek(topic, vgroup_id, begin).await; if res.is_err() { log::error!("seek offset error: {:?}", res); let a = consumer.assignments().await.unwrap(); log::error!("assignments: {:?}", a); } - // ANCHOR_END: seek_offset } let topic_assignment = consumer.topic_assignment(topic).await; log::debug!("topic assignment: {:?}", topic_assignment); } - + // after seek offset let assignments = consumer.assignments().await.unwrap(); log::info!("after seek offset assignments: {:?}", assignments); + // ANCHOR_END: seek_offset // ANCHOR: unsubscribe consumer.unsubscribe().await; diff --git a/docs/examples/rust/restexample/Cargo.toml b/docs/examples/rust/restexample/Cargo.toml index 5fffe215d4..7b13b48104 100644 --- a/docs/examples/rust/restexample/Cargo.toml +++ b/docs/examples/rust/restexample/Cargo.toml @@ -8,5 +8,7 @@ anyhow = "1" chrono = "0.4" serde = { version = "1", features = ["derive"] } tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } +log = "0.4" +pretty_env_logger = "0.5.0" taos = { version = "0.*" } diff --git a/docs/examples/rust/restexample/examples/subscribe_demo.rs b/docs/examples/rust/restexample/examples/subscribe_demo.rs new file mode 100644 index 0000000000..8486419386 --- /dev/null +++ b/docs/examples/rust/restexample/examples/subscribe_demo.rs @@ -0,0 +1,135 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +// Query options 2, use deserialization with serde. +#[derive(Debug, serde::Deserialize)] +#[allow(dead_code)] +struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, +} + +async fn prepare(taos: Taos) -> anyhow::Result<()> { + let inserted = taos.exec_many([ + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + assert_eq!(inserted, 6); + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build().await?; + let db = "tmq"; + + // prepare database + taos.exec_many([ + format!("DROP TOPIC IF EXISTS tmq_meters"), + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"), + format!("USE `{db}`"), + // create super table + format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"), + // create topic for subscription + format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`") + ]) + .await?; + + let task = tokio::spawn(prepare(taos)); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // subscribe + let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; + + let mut consumer = tmq.build().await?; + consumer.subscribe(["tmq_meters"]).await?; + + // ANCHOR: consumer_commit_manually + consumer + .stream() + .try_for_each(|(offset, message)| async { + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + let records: Vec = block.deserialize().try_collect()?; + println!("** read {} records: {:#?}\n", records.len(), records); + } + } + // commit offset manually when you have processed the message. + consumer.commit(offset).await?; + Ok(()) + }) + .await?; + // ANCHOR_END: consumer_commit_manually + + + // ANCHOR: assignments + let assignments = consumer.assignments().await.unwrap(); + log::info!("assignments: {:?}", assignments); + // ANCHOR_END: assignments + + // seek offset + for topic_vec_assignment in assignments { + let topic = &topic_vec_assignment.0; + let vec_assignment = topic_vec_assignment.1; + for assignment in vec_assignment { + let vgroup_id = assignment.vgroup_id(); + let current = assignment.current_offset(); + let begin = assignment.begin(); + let end = assignment.end(); + log::debug!( + "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}", + topic, + vgroup_id, + current, + begin, + end + ); + // ANCHOR: seek_offset + let res = consumer.offset_seek(topic, vgroup_id, end).await; + if res.is_err() { + log::error!("seek offset error: {:?}", res); + let a = consumer.assignments().await.unwrap(); + log::error!("assignments: {:?}", a); + } + // ANCHOR_END: seek_offset + } + + let topic_assignment = consumer.topic_assignment(topic).await; + log::debug!("topic assignment: {:?}", topic_assignment); + } + + // after seek offset + let assignments = consumer.assignments().await.unwrap(); + log::info!("after seek offset assignments: {:?}", assignments); + + consumer.unsubscribe().await; + + task.await??; + + Ok(()) +} diff --git a/docs/examples/rust/restexample/examples/tmq.rs b/docs/examples/rust/restexample/examples/tmq.rs index 0438e47515..b931a871b6 100644 --- a/docs/examples/rust/restexample/examples/tmq.rs +++ b/docs/examples/rust/restexample/examples/tmq.rs @@ -42,7 +42,7 @@ async fn main() -> anyhow::Result<()> { .await?; // ANCHOR_END: create_topic - // ANCHOR: create_consumer + // ANCHOR: create_consumer_ac dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string()); dsn.params.insert("msg.with.table.name".to_string(), "true".to_string()); dsn.params.insert("enable.auto.commit".to_string(), "true".to_string()); @@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> { let builder = TmqBuilder::from_dsn(&dsn)?; let mut consumer = builder.build().await?; - // ANCHOR_END: create_consumer + // ANCHOR_END: create_consumer_ac // ANCHOR: subscribe consumer.subscribe(["topic_meters"]).await?; @@ -60,56 +60,23 @@ async fn main() -> anyhow::Result<()> { // ANCHOR: consume { - let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1)); - - while let Some((offset, message)) = stream.try_next().await? { - - let topic: &str = offset.topic(); - let database = offset.database(); + consumer + .stream() + .try_for_each(|(offset, message)| async { + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. let vgroup_id = offset.vgroup_id(); - log::debug!( - "topic: {}, database: {}, vgroup_id: {}", - topic, - database, - vgroup_id - ); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); - match message { - MessageSet::Meta(meta) => { - log::info!("Meta"); - let raw = meta.as_raw_meta().await?; - taos.write_raw_meta(&raw).await?; - - let json = meta.as_json_meta().await?; - let sql = json.to_string(); - if let Err(err) = taos.exec(sql).await { - println!("maybe error: {}", err); - } - } - MessageSet::Data(data) => { - log::info!("Data"); - while let Some(data) = data.fetch_raw_block().await? { - log::debug!("data: {:?}", data); - } - } - MessageSet::MetaData(meta, data) => { - log::info!("MetaData"); - let raw = meta.as_raw_meta().await?; - taos.write_raw_meta(&raw).await?; - - let json = meta.as_json_meta().await?; - let sql = json.to_string(); - if let Err(err) = taos.exec(sql).await { - println!("maybe error: {}", err); - } - - while let Some(data) = data.fetch_raw_block().await? { - log::debug!("data: {:?}", data); - } + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + let records: Vec = block.deserialize().try_collect()?; + println!("** read {} records: {:#?}\n", records.len(), records); } } - consumer.commit(offset).await?; - } + Ok(()) + }) + .await?; } // ANCHOR_END: consume diff --git a/docs/zh/08-develop/07-tmq.md b/docs/zh/08-develop/07-tmq.md index 6f259cf521..36ff09d0f4 100644 --- a/docs/zh/08-develop/07-tmq.md +++ b/docs/zh/08-develop/07-tmq.md @@ -131,8 +131,11 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```rust {{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer_dsn}} +``` -{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer}} + +```rust +{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer_ac}} ``` @@ -190,6 +193,13 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```rust +{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer_dsn}} +``` + +```rust +{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer_ac}} +``` @@ -242,7 +252,12 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +消费者可订阅一个或多个 `TOPIC`,一般建议一个消费者只订阅一个 `TOPIC`。 +TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。 +```rust +{{#include docs/examples/rust/restexample/examples/tmq.rs:consume}} +``` @@ -290,7 +305,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 - +同 Websocket 示例代码 @@ -322,6 +337,10 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```java {{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:consumer_seek}} ``` +1. 使用 consumer.poll 方法轮询数据,直到获取到数据为止。 +2. 对于轮询到的第一批数据,打印第一条数据的内容,并获取当前消费者的分区分配信息。 +3. 使用 consumer.seekToBeginning 方法将所有分区的偏移量重置到开始位置,并打印成功重置的消息。 +4. 再次使用 consumer.poll 方法轮询数据,并打印第一条数据的内容。 @@ -340,6 +359,16 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```rust +{{#include docs/examples/rust/nativeexample/examples/tmq.rs:seek_offset}} +``` + +1. 通过调用 consumer.assignments() 方法获取消费者当前的分区分配信息,并记录初始分配状态。 +2. 遍历每个分区分配信息,对于每个分区:提取主题(topic)、消费组ID(vgroup_id)、当前偏移量(current)、起始偏移量(begin)和结束偏移量(end)。 +记录这些信息。 +1. 调用 consumer.offset_seek 方法将偏移量设置到起始位置。如果操作失败,记录错误信息和当前分配状态。 +2. 在所有分区的偏移量调整完成后,再次获取并记录消费者的分区分配信息,以确认偏移量调整后的状态。 + @@ -387,7 +416,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 - +同 Websocket 代码样例。 @@ -413,6 +442,9 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ## 提交 Offset 当消费者读取并处理完消息后,它可以提交 Offset,这表示消费者已经成功处理到这个 Offset 的消息。Offset 提交可以是自动的(根据配置定期提交)或手动的(应用程序控制何时提交)。 当创建消费者时,属性 `enable.auto.commit` 为 false 时,可以手动提交 offset。 + +**注意**:手工提交消费进度前确保消息正常处理完成,否则处理出错的消息不会被再次消费。自动提交是在本次 `poll` 消息时可能会提交上次消息的消费进度,因此请确保消息处理完毕再进行下一次 `poll` 或消息获取。 + ### Websocket 连接 @@ -438,7 +470,11 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```rust +{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs:consumer_commit_manually}} +``` +可以通过 `consumer.commit` 方法来手工提交消费进度。 @@ -488,7 +524,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 - +同 Websocket 代码样例。 @@ -543,7 +579,11 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```rust +{{#include docs/examples/rust/restexample/examples/tmq.rs:unsubscribe}} +``` +**注意**:消费者取消订阅后无法重用,如果想订阅新的 `topic`, 请重新创建消费者。 @@ -592,7 +632,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 - +同 Websocket 代码样例。