diff --git a/cmake/rust-bindings_CMakeLists.txt.in b/cmake/rust-bindings_CMakeLists.txt.in deleted file mode 100644 index d16e86139b..0000000000 --- a/cmake/rust-bindings_CMakeLists.txt.in +++ /dev/null @@ -1,12 +0,0 @@ - -# rust-bindings -ExternalProject_Add(rust-bindings - GIT_REPOSITORY https://github.com/songtianyi/tdengine-rust-bindings.git - GIT_TAG 7ed7a97 - SOURCE_DIR "${TD_SOURCE_DIR}/examples/rust" - BINARY_DIR "${TD_SOURCE_DIR}/examples/rust" - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" - ) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 294b59fe95..a1eec81ee0 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -105,11 +105,6 @@ if(${BUILD_WITH_SQLITE}) cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_WITH_SQLITE}) -# rust-bindings -if(${RUST_BINDINGS}) - cat("${TD_SUPPORT_DIR}/rust-bindings_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${RUST_BINDINGS}) - # lucene if(${BUILD_WITH_LUCENE}) cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) diff --git a/examples/rust/.gitignore b/examples/rust/.gitignore new file mode 100644 index 0000000000..96ef6c0b94 --- /dev/null +++ b/examples/rust/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml new file mode 100644 index 0000000000..1ed73e2fde --- /dev/null +++ b/examples/rust/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "rust" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +taos = "*" + +[dev-dependencies] +chrono = "0.4" +itertools = "0.10.3" +pretty_env_logger = "0.4.0" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = ["full"] } +anyhow = "1" diff --git a/examples/rust/examples/bind-tags.rs b/examples/rust/examples/bind-tags.rs new file mode 100644 index 0000000000..a1f7286625 --- /dev/null +++ b/examples/rust/examples/bind-tags.rs @@ -0,0 +1,80 @@ +use anyhow::Result; +use serde::Deserialize; +use taos::*; + +#[tokio::main] +async fn main() -> Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build()?; + taos.exec_many([ + "drop database if exists test", + "create database test keep 36500", + "use test", + "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, + c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, + c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t1 varchar(100))", + ]) + .await?; + let mut stmt = Stmt::init(&taos)?; + stmt.prepare( + "insert into ? using tb1 tags(?) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + )?; + stmt.set_tbname("d0")?; + stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; + + let params = vec![ + ColumnView::from_millis_timestamp(vec![164000000000]), + ColumnView::from_bools(vec![true]), + ColumnView::from_tiny_ints(vec![i8::MAX]), + ColumnView::from_small_ints(vec![i16::MAX]), + ColumnView::from_ints(vec![i32::MAX]), + ColumnView::from_big_ints(vec![i64::MAX]), + ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), + ColumnView::from_unsigned_small_ints(vec![u16::MAX]), + ColumnView::from_unsigned_ints(vec![u32::MAX]), + ColumnView::from_unsigned_big_ints(vec![u64::MAX]), + ColumnView::from_floats(vec![f32::MAX]), + ColumnView::from_doubles(vec![f64::MAX]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), + ]; + let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; + assert_eq!(rows, 1); + + #[derive(Debug, Deserialize)] + #[allow(dead_code)] + struct Row { + ts: String, + c1: bool, + c2: i8, + c3: i16, + c4: i32, + c5: i64, + c6: u8, + c7: u16, + c8: u32, + c9: u64, + c10: Option, + c11: f64, + c12: String, + c13: String, + t1: serde_json::Value, + } + + let rows: Vec = taos + .query("select * from tb1") + .await? + .deserialize() + .try_collect() + .await?; + let row = &rows[0]; + dbg!(&row); + assert_eq!(row.c5, i64::MAX); + assert_eq!(row.c8, u32::MAX); + assert_eq!(row.c9, u64::MAX); + assert_eq!(row.c10.unwrap(), f32::MAX); + // assert_eq!(row.c11, f64::MAX); + assert_eq!(row.c12, "ABC"); + assert_eq!(row.c13, "涛思数据"); + + Ok(()) +} diff --git a/examples/rust/examples/bind.rs b/examples/rust/examples/bind.rs new file mode 100644 index 0000000000..194938a319 --- /dev/null +++ b/examples/rust/examples/bind.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use serde::Deserialize; +use taos::*; + +#[tokio::main] +async fn main() -> Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build()?; + taos.exec_many([ + "drop database if exists test_bindable", + "create database test_bindable keep 36500", + "use test_bindable", + "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, + c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, + c10 float, c11 double, c12 varchar(100), c13 nchar(100))", + ]) + .await?; + let mut stmt = Stmt::init(&taos)?; + stmt.prepare("insert into tb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")?; + let params = vec![ + ColumnView::from_millis_timestamp(vec![0]), + ColumnView::from_bools(vec![true]), + ColumnView::from_tiny_ints(vec![i8::MAX]), + ColumnView::from_small_ints(vec![i16::MAX]), + ColumnView::from_ints(vec![i32::MAX]), + ColumnView::from_big_ints(vec![i64::MAX]), + ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), + ColumnView::from_unsigned_small_ints(vec![u16::MAX]), + ColumnView::from_unsigned_ints(vec![u32::MAX]), + ColumnView::from_unsigned_big_ints(vec![u64::MAX]), + ColumnView::from_floats(vec![f32::MAX]), + ColumnView::from_doubles(vec![f64::MAX]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), + ]; + let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; + assert_eq!(rows, 1); + + #[derive(Debug, Deserialize)] + #[allow(dead_code)] + struct Row { + ts: String, + c1: bool, + c2: i8, + c3: i16, + c4: i32, + c5: i64, + c6: u8, + c7: u16, + c8: u32, + c9: u64, + c10: Option, + c11: f64, + c12: String, + c13: String, + } + + let rows: Vec = taos + .query("select * from tb1") + .await? + .deserialize() + .try_collect() + .await?; + let row = &rows[0]; + dbg!(&row); + assert_eq!(row.c5, i64::MAX); + assert_eq!(row.c8, u32::MAX); + assert_eq!(row.c9, u64::MAX); + assert_eq!(row.c10.unwrap(), f32::MAX); + // assert_eq!(row.c11, f64::MAX); + assert_eq!(row.c12, "ABC"); + assert_eq!(row.c13, "涛思数据"); + + Ok(()) +} diff --git a/examples/rust/examples/query.rs b/examples/rust/examples/query.rs new file mode 100644 index 0000000000..016b291abc --- /dev/null +++ b/examples/rust/examples/query.rs @@ -0,0 +1,106 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://"; + + let opts = PoolBuilder::new() + .max_size(5000) // max connections + .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection + .min_idle(Some(1000)) // minimal idle connections + .connection_timeout(Duration::from_secs(2)); + + let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?; + + let taos = pool.get()?; + + let db = "query"; + + // prepare database + taos.exec_many([ + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + + let inserted = taos.exec_many([ + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))", + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + + assert_eq!(inserted, 6); + loop { + let count: usize = taos + .query_one("select count(*) from `meters`") + .await? + .unwrap_or_default(); + + if count >= 6 { + break; + } else { + println!("waiting for data"); + } + } + + let mut result = taos.query("select tbname, * from `meters`").await?; + + for field in result.fields() { + println!("got field: {}", field.name()); + } + + // Query option 1, use rows stream. + let mut rows = result.rows(); + let mut nrows = 0; + while let Some(row) = rows.try_next().await? { + for (col, (name, value)) in row.enumerate() { + println!( + "[{}] got value in col {} (named `{:>8}`): {}", + nrows, col, name, value + ); + } + nrows += 1; + } + + // Query options 2, use deserialization with serde. + #[derive(Debug, serde::Deserialize)] + #[allow(dead_code)] + struct Record { + tbname: String, + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, + groupid: i32, + // binary/varchar to String + location: String, + } + + let records: Vec = taos + .query("select tbname, * from `meters`") + .await? + .deserialize() + .try_collect() + .await?; + + dbg!(result.summary()); + assert_eq!(records.len(), 6); + dbg!(records); + Ok(()) +} diff --git a/examples/rust/examples/subscribe.rs b/examples/rust/examples/subscribe.rs new file mode 100644 index 0000000000..9e2e890405 --- /dev/null +++ b/examples/rust/examples/subscribe.rs @@ -0,0 +1,103 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +// Query options 2, use deserialization with serde. +#[derive(Debug, serde::Deserialize)] +#[allow(dead_code)] +struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, +} + +async fn prepare(taos: Taos) -> anyhow::Result<()> { + let inserted = taos.exec_many([ + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + assert_eq!(inserted, 6); + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // std::env::set_var("RUST_LOG", "debug"); + pretty_env_logger::init(); + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build()?; + let db = "tmq"; + + // prepare database + taos.exec_many([ + "DROP TOPIC IF EXISTS tmq_meters".to_string(), + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))".to_string(), + // create topic for subscription + format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") + ]) + .await?; + + let task = tokio::spawn(prepare(taos)); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // subscribe + let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; + + let mut consumer = tmq.build()?; + consumer.subscribe(["tmq_meters"]).await?; + + { + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } + } + consumer.commit(offset).await?; + } + } + + consumer.unsubscribe().await; + + task.await??; + + Ok(()) +} diff --git a/examples/rust/src/main.rs b/examples/rust/src/main.rs new file mode 100644 index 0000000000..e7a11a969c --- /dev/null +++ b/examples/rust/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +}