From 83815bc005bc315da23b15347bacca74d3e67e72 Mon Sep 17 00:00:00 2001 From: Huo Linhe Date: Fri, 18 Nov 2022 11:43:39 +0800 Subject: [PATCH] docs: update tmq demo Close [TD-20499](https://jira.taosdata.com:18080/browse/TD-20499) --- docs/examples/rust/nativeexample/Cargo.toml | 2 +- .../nativeexample/examples/stmt_example.rs | 9 ++++--- .../nativeexample/examples/subscribe_demo.rs | 26 ++++++------------- .../restexample/examples/insert_example.rs | 1 - 4 files changed, 15 insertions(+), 23 deletions(-) diff --git a/docs/examples/rust/nativeexample/Cargo.toml b/docs/examples/rust/nativeexample/Cargo.toml index cdf739d357..5ecc407854 100644 --- a/docs/examples/rust/nativeexample/Cargo.toml +++ b/docs/examples/rust/nativeexample/Cargo.toml @@ -10,4 +10,4 @@ chrono = "0.4" serde = { version = "1", features = ["derive"] } tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } -taos = { version = "0.*" } +taos = { version = "0.4.8" } diff --git a/docs/examples/rust/nativeexample/examples/stmt_example.rs b/docs/examples/rust/nativeexample/examples/stmt_example.rs index 9cf8e8e1fc..7d5a7c0f2b 100644 --- a/docs/examples/rust/nativeexample/examples/stmt_example.rs +++ b/docs/examples/rust/nativeexample/examples/stmt_example.rs @@ -12,7 +12,10 @@ async fn main() -> anyhow::Result<()> { // bind table name and tags stmt.set_tbname_tags( "d1001", - &[Value::VarChar("California.SanFransico".into()), Value::Int(2)], + &[ + Value::VarChar("California.SanFransico".into()), + Value::Int(2), + ], )?; // bind values. let values = vec![ @@ -30,9 +33,9 @@ async fn main() -> anyhow::Result<()> { ColumnView::from_floats(vec![0.33]), ]; stmt.bind(&values2)?; - + stmt.add_batch()?; - + // execute. let rows = stmt.execute()?; assert_eq!(rows, 2); diff --git a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs index 11d6d4e004..7551ad46b1 100644 --- a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs +++ b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> { // create super table format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"), // create topic for subscription - format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") + format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`") ]) .await?; @@ -64,13 +64,9 @@ async fn main() -> anyhow::Result<()> { let mut consumer = tmq.build()?; consumer.subscribe(["tmq_meters"]).await?; - { - let mut stream = consumer.stream(); - - while let Some((offset, message)) = stream.try_next().await? { - // get information from offset - - // the topic + consumer + .stream() + .try_for_each(|(offset, message)| async { let topic = offset.topic(); // the vgroup id, like partition id in kafka. let vgroup_id = offset.vgroup_id(); @@ -78,20 +74,14 @@ async fn main() -> anyhow::Result<()> { if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { - // one block for one table, get table name if needed - let name = block.table_name(); let records: Vec = block.deserialize().try_collect()?; - println!( - "** table: {}, got {} records: {:#?}\n", - name.unwrap(), - records.len(), - records - ); + println!("** read {} records: {:#?}\n", records.len(), records); } } consumer.commit(offset).await?; - } - } + Ok(()) + }) + .await?; consumer.unsubscribe().await; diff --git a/docs/examples/rust/restexample/examples/insert_example.rs b/docs/examples/rust/restexample/examples/insert_example.rs index 11a84f1661..4953a09b35 100644 --- a/docs/examples/rust/restexample/examples/insert_example.rs +++ b/docs/examples/rust/restexample/examples/insert_example.rs @@ -5,7 +5,6 @@ async fn main() -> anyhow::Result<()> { let dsn = "ws://"; let taos = TaosBuilder::from_dsn(dsn)?.build()?; - taos.exec_many([ "DROP DATABASE IF EXISTS power", "CREATE DATABASE power",