docs: update tmq demo
Close [TD-20499](https://jira.taosdata.com:18080/browse/TD-20499)
This commit is contained in:
parent
48d5e13ae3
commit
83815bc005
|
@ -10,4 +10,4 @@ chrono = "0.4"
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
|
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
|
||||||
|
|
||||||
taos = { version = "0.*" }
|
taos = { version = "0.4.8" }
|
||||||
|
|
|
@ -12,7 +12,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
// bind table name and tags
|
// bind table name and tags
|
||||||
stmt.set_tbname_tags(
|
stmt.set_tbname_tags(
|
||||||
"d1001",
|
"d1001",
|
||||||
&[Value::VarChar("California.SanFransico".into()), Value::Int(2)],
|
&[
|
||||||
|
Value::VarChar("California.SanFransico".into()),
|
||||||
|
Value::Int(2),
|
||||||
|
],
|
||||||
)?;
|
)?;
|
||||||
// bind values.
|
// bind values.
|
||||||
let values = vec![
|
let values = vec![
|
||||||
|
|
|
@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
// create super table
|
// create super table
|
||||||
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
|
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
|
||||||
// create topic for subscription
|
// create topic for subscription
|
||||||
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
|
format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`")
|
||||||
])
|
])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -64,13 +64,9 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let mut consumer = tmq.build()?;
|
let mut consumer = tmq.build()?;
|
||||||
consumer.subscribe(["tmq_meters"]).await?;
|
consumer.subscribe(["tmq_meters"]).await?;
|
||||||
|
|
||||||
{
|
consumer
|
||||||
let mut stream = consumer.stream();
|
.stream()
|
||||||
|
.try_for_each(|(offset, message)| async {
|
||||||
while let Some((offset, message)) = stream.try_next().await? {
|
|
||||||
// get information from offset
|
|
||||||
|
|
||||||
// the topic
|
|
||||||
let topic = offset.topic();
|
let topic = offset.topic();
|
||||||
// the vgroup id, like partition id in kafka.
|
// the vgroup id, like partition id in kafka.
|
||||||
let vgroup_id = offset.vgroup_id();
|
let vgroup_id = offset.vgroup_id();
|
||||||
|
@ -78,20 +74,14 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
if let Some(data) = message.into_data() {
|
if let Some(data) = message.into_data() {
|
||||||
while let Some(block) = data.fetch_raw_block().await? {
|
while let Some(block) = data.fetch_raw_block().await? {
|
||||||
// one block for one table, get table name if needed
|
|
||||||
let name = block.table_name();
|
|
||||||
let records: Vec<Record> = block.deserialize().try_collect()?;
|
let records: Vec<Record> = block.deserialize().try_collect()?;
|
||||||
println!(
|
println!("** read {} records: {:#?}\n", records.len(), records);
|
||||||
"** table: {}, got {} records: {:#?}\n",
|
|
||||||
name.unwrap(),
|
|
||||||
records.len(),
|
|
||||||
records
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
consumer.commit(offset).await?;
|
consumer.commit(offset).await?;
|
||||||
}
|
Ok(())
|
||||||
}
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
consumer.unsubscribe().await;
|
consumer.unsubscribe().await;
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let dsn = "ws://";
|
let dsn = "ws://";
|
||||||
let taos = TaosBuilder::from_dsn(dsn)?.build()?;
|
let taos = TaosBuilder::from_dsn(dsn)?.build()?;
|
||||||
|
|
||||||
|
|
||||||
taos.exec_many([
|
taos.exec_many([
|
||||||
"DROP DATABASE IF EXISTS power",
|
"DROP DATABASE IF EXISTS power",
|
||||||
"CREATE DATABASE power",
|
"CREATE DATABASE power",
|
||||||
|
|
Loading…
Reference in New Issue