diff --git a/docs/examples/rust/restexample/examples/schemaless.rs b/docs/examples/rust/restexample/examples/schemaless.rs new file mode 100644 index 0000000000..eefcd63dae --- /dev/null +++ b/docs/examples/rust/restexample/examples/schemaless.rs @@ -0,0 +1,76 @@ +use taos_query::common::SchemalessPrecision; +use taos_query::common::SchemalessProtocol; +use taos_query::common::SmlDataBuilder; + +use crate::AsyncQueryable; +use crate::AsyncTBuilder; +use crate::TaosBuilder; + +async fn put() -> anyhow::Result<()> { + std::env::set_var("RUST_LOG", "taos=debug"); + pretty_env_logger::init(); + let dsn = + std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("https://localhost:6041".to_string()); + log::debug!("dsn: {:?}", &dsn); + + let client = TaosBuilder::from_dsn(dsn)?.build().await?; + + let db = "power"; + + client + .exec(format!("create database if not exists {db}")) + .await?; + + // should specify database before insert + client.exec(format!("use {db}")).await?; + + // SchemalessProtocol::Line + let data = [ + "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000", + ] + .map(String::from) + .to_vec(); + + let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Line) + .precision(SchemalessPrecision::Millisecond) + .data(data.clone()) + .ttl(1000) + .req_id(100u64) + .build()?; + assert_eq!(client.put(&sml_data).await?, ()); + + // SchemalessProtocol::Telnet + let data = [ + "metric_telnet 1648432611249 10.3 location=California.SanFrancisco group=2", + ] + .map(String::from) + .to_vec(); + + let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Telnet) + .precision(SchemalessPrecision::Millisecond) + .data(data.clone()) + .ttl(1000) + .req_id(200u64) + .build()?; + assert_eq!(client.put(&sml_data).await?, ()); + + // SchemalessProtocol::Json + let data = [ + r#"[{"metric": "metric_json", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"# + ] + .map(String::from) + .to_vec(); + + let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Json) + .precision(SchemalessPrecision::Millisecond) + .data(data.clone()) + .ttl(1000) + .req_id(300u64) + .build()?; + assert_eq!(client.put(&sml_data).await?, ()); + + Ok(()) +} diff --git a/docs/examples/rust/restexample/examples/stmt.rs b/docs/examples/rust/restexample/examples/stmt.rs new file mode 100644 index 0000000000..524b37b919 --- /dev/null +++ b/docs/examples/rust/restexample/examples/stmt.rs @@ -0,0 +1,42 @@ +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let taos = TaosBuilder::from_dsn("ws://")?.build().await?; + + taos.exec("DROP DATABASE IF EXISTS power").await?; + taos.create_database("power").await?; + taos.use_database("power").await?; + taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?; + + let mut stmt = Stmt::init(&taos).await?; + stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?; + + const NUM_TABLES: usize = 10; + const NUM_ROWS: usize = 10; + for i in 0..NUM_TABLES { + let table_name = format!("d{}", i); + let tags = vec![Value::VarChar("California.SanFransico".into()), Value::Int(2)]; + + // set table name and tags for the prepared statement. + stmt.set_tbname_tags(&table_name, &tags).await?; + for j in 0..NUM_ROWS { + let values = vec![ + ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]), + ColumnView::from_floats(vec![10.3 + j as f32]), + ColumnView::from_ints(vec![219 + j as i32]), + ColumnView::from_floats(vec![0.31 + j as f32]), + ]; + // bind values to the prepared statement. + stmt.bind(&values).await?; + } + + stmt.add_batch().await?; + } + + // execute. + let rows = stmt.execute().await?; + assert_eq!(rows, NUM_TABLES * NUM_ROWS); + + Ok(()) +} diff --git a/docs/examples/rust/restexample/examples/tmq.rs b/docs/examples/rust/restexample/examples/tmq.rs new file mode 100644 index 0000000000..8f195e1629 --- /dev/null +++ b/docs/examples/rust/restexample/examples/tmq.rs @@ -0,0 +1,163 @@ +use std::time::Duration; +use std::str::FromStr; + +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + pretty_env_logger::formatted_timed_builder() + .filter_level(log::LevelFilter::Info) + .init(); + use taos_query::prelude::*; + let dsn = "ws://localhost:6041".to_string(); + log::info!("dsn: {}", dsn); + let mut dsn = Dsn::from_str(&dsn)?; + + let taos = TaosBuilder::from_dsn(&dsn)?.build().await?; + + // prepare database and table + taos.exec_many([ + "drop topic if exists topic_meters", + "drop database if exists power", + "create database if not exists power WAL_RETENTION_PERIOD 86400", + "use power", + "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))", + "create table if not exists power.d001 using power.meters tags(1,'location')", + ]) + .await?; + + taos.exec_many([ + "drop database if exists db2", + "create database if not exists db2 wal_retention_period 3600", + "use db2", + ]) + .await?; + + // ANCHOR: create_topic + taos.exec_many([ + "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters", + ]) + .await?; + // ANCHOR_END: create_topic + + // ANCHOR: create_consumer + dsn.params.insert("group.id".to_string(), "abc".to_string()); + dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string()); + + let builder = TmqBuilder::from_dsn(&dsn)?; + let mut consumer = builder.build().await?; + // ANCHOR_END: create_consumer + + // ANCHOR: subscribe + consumer.subscribe(["topic_meters"]).await?; + // ANCHOR_END: subscribe + + // ANCHOR: consume + { + let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1)); + + while let Some((offset, message)) = stream.try_next().await? { + + let topic: &str = offset.topic(); + let database = offset.database(); + let vgroup_id = offset.vgroup_id(); + log::debug!( + "topic: {}, database: {}, vgroup_id: {}", + topic, + database, + vgroup_id + ); + + match message { + MessageSet::Meta(meta) => { + log::info!("Meta"); + let raw = meta.as_raw_meta().await?; + taos.write_raw_meta(&raw).await?; + + let json = meta.as_json_meta().await?; + let sql = json.to_string(); + if let Err(err) = taos.exec(sql).await { + println!("maybe error: {}", err); + } + } + MessageSet::Data(data) => { + log::info!("Data"); + while let Some(data) = data.fetch_raw_block().await? { + log::debug!("data: {:?}", data); + } + } + MessageSet::MetaData(meta, data) => { + log::info!("MetaData"); + let raw = meta.as_raw_meta().await?; + taos.write_raw_meta(&raw).await?; + + let json = meta.as_json_meta().await?; + let sql = json.to_string(); + if let Err(err) = taos.exec(sql).await { + println!("maybe error: {}", err); + } + + while let Some(data) = data.fetch_raw_block().await? { + log::debug!("data: {:?}", data); + } + } + } + consumer.commit(offset).await?; + } + } + // ANCHOR_END: consume + + // ANCHOR: assignments + let assignments = consumer.assignments().await.unwrap(); + log::info!("assignments: {:?}", assignments); + // ANCHOR_END: assignments + + // seek offset + for topic_vec_assignment in assignments { + let topic = &topic_vec_assignment.0; + let vec_assignment = topic_vec_assignment.1; + for assignment in vec_assignment { + let vgroup_id = assignment.vgroup_id(); + let current = assignment.current_offset(); + let begin = assignment.begin(); + let end = assignment.end(); + log::debug!( + "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}", + topic, + vgroup_id, + current, + begin, + end + ); + // ANCHOR: seek_offset + let res = consumer.offset_seek(topic, vgroup_id, end).await; + if res.is_err() { + log::error!("seek offset error: {:?}", res); + let a = consumer.assignments().await.unwrap(); + log::error!("assignments: {:?}", a); + } + // ANCHOR_END: seek_offset + } + + let topic_assignment = consumer.topic_assignment(topic).await; + log::debug!("topic assignment: {:?}", topic_assignment); + } + + // after seek offset + let assignments = consumer.assignments().await.unwrap(); + log::info!("after seek offset assignments: {:?}", assignments); + + // ANCHOR: unsubscribe + consumer.unsubscribe().await; + // ANCHOR_END: unsubscribe + + tokio::time::sleep(Duration::from_secs(1)).await; + + taos.exec_many([ + "drop database db2", + "drop topic topic_meters", + "drop database power", + ]) + .await?; + Ok(()) +} diff --git a/docs/zh/08-develop/04-schemaless.md b/docs/zh/08-develop/04-schemaless.md index 7a5730df46..9c136cb109 100644 --- a/docs/zh/08-develop/04-schemaless.md +++ b/docs/zh/08-develop/04-schemaless.md @@ -161,7 +161,7 @@ st,t1=3,t2=4,t3=t3 c1=3i64,c6="passit" 1626006833640000000 ### Websocket 连接 - + ```java