docs(rust): update rust documentations and examples for 3.0 (#16009)

Closes
- [TD-17895](https://jira.taosdata.com:18080/browse/TD-17895)
- [TD-18191](https://jira.taosdata.com:18080/browse/TD-18191)
- [TD-18197](https://jira.taosdata.com:18080/browse/TD-18197)
This commit is contained in:
Linhe Huo 2022-08-11 19:55:23 +08:00 committed by GitHub
parent e252eba6cd
commit 74cd6a971e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 832 additions and 576 deletions

View File

@ -1,3 +1,2 @@
```rust ```rust
{{#include docs/examples/rust/schemalessexample/examples/influxdb_line_example.rs}}
``` ```

View File

@ -1,3 +1,2 @@
```rust ```rust
{{#include docs/examples/rust/schemalessexample/examples/opentsdb_json_example.rs}}
``` ```

View File

@ -1,3 +1,2 @@
```rust ```rust
{{#include docs/examples/rust/schemalessexample/examples/opentsdb_telnet_example.rs}}
``` ```

View File

@ -10,16 +10,14 @@ import TabItem from '@theme/TabItem';
import Preparation from "./_preparation.mdx" import Preparation from "./_preparation.mdx"
import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx" import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx"
import RustInfluxLine from "../../07-develop/03-insert-data/_rust_line.mdx" import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx"
import RustOpenTSDBTelnet from "../../07-develop/03-insert-data/_rust_opts_telnet.mdx"
import RustOpenTSDBJson from "../../07-develop/03-insert-data/_rust_opts_json.mdx"
import RustQuery from "../../07-develop/04-query-data/_rust.mdx" import RustQuery from "../../07-develop/04-query-data/_rust.mdx"
`libtaos` is the official Rust language connector for TDengine. Rust developers can develop applications to access the TDengine instance data. [`taos`][taos] is the official Rust language connector for TDengine. Rust developers can develop applications to access the TDengine instance data.
`libtaos` provides two ways to establish connections. One is the **Native Connection**, which connects to TDengine instances via the TDengine client driver (taosc). The other is **REST connection**, which connects to TDengine instances via taosAdapter's REST interface. Rust connector provides two ways to establish connections. One is the **Native Connection**, which connects to TDengine instances via the TDengine client driver (taosc). The other is **Websocket connection**, which connects to TDengine instances via taosAdapter service.
The source code for `libtaos` is hosted on [GitHub](https://github.com/taosdata/libtaos-rs). The source code is hosted on [taosdata/taos-connector-rust](https://github.com/taosdata/taos-connector-rust).
## Supported platforms ## Supported platforms
@ -30,119 +28,195 @@ REST connections are supported on all platforms that can run Rust.
Please refer to [version support list](/reference/connector#version-support). Please refer to [version support list](/reference/connector#version-support).
The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 2.4 or higher to avoid known issues. The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.
## Installation ## Installation
### Pre-installation ### Pre-installation
* Install the Rust development toolchain * Install the Rust development toolchain
* If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver) * If using the native connection, please install the TDengine client driver. Please refer to [install client driver](/reference/connector#install-client-driver)
### Adding libtaos dependencies ### Add dependencies
Add the [libtaos][libtaos] dependency to the [Rust](https://rust-lang.org) project as follows, depending on the connection method selected. Add the dependency to the [Rust](https://rust-lang.org) project as follows, depending on the connection method selected.
<Tabs defaultValue="native"> <Tabs defaultValue="default">
<TabItem value="native" label="native connection"> <TabItem value="default" label="Both">
Add [libtaos][libtaos] to the `Cargo.toml` file. Add [taos] to the `Cargo.toml` file.
```toml ```toml
[dependencies] [dependencies]
# use default feature # use default feature
libtaos = "*" taos = "*"
``` ```
</TabItem> </TabItem>
<TabItem value="rest" label="REST connection"> <TabItem value="native" label="Native only">
Add [libtaos][libtaos] to the `Cargo.toml` file and enable the `rest` feature. Add [taos] to the `Cargo.toml` file.
```toml ```toml
[dependencies] [dependencies]
# use rest feature taos = { version = "*", default-features = false, features = ["native"] }
libtaos = { version = "*", features = ["rest"]} ```
</TabItem>
<TabItem value="rest" label="Websocket only">
Add [taos] to the `Cargo.toml` file and enable the `ws` feature.
```toml
[dependencies]
taos = { version = "*", default-features = false, features = ["ws"] }
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
### Using connection pools
Please enable the `r2d2` feature in `Cargo.toml`.
```toml
[dependencies]
# with taosc
libtaos = { version = "*", features = ["r2d2"] }
# or rest
libtaos = { version = "*", features = ["rest", "r2d2"] }
```
## Create a connection ## Create a connection
The [TaosCfgBuilder] provides the user with an API in the form of a constructor for the subsequent creation of connections or use of connection pools. In rust connector, we use a DSN connection string as a connection builder. For example,
```rust ```rust
let cfg: TaosCfg = TaosCfgBuilder::default() let builder = TaosBuilder::from_dsn("taos://")?;
.ip("127.0.0.1")
.user("root")
.pass("taosdata")
.db("log") // do not set if not require a default database.
.port(6030u16)
.build()
.expect("TaosCfg builder error");
}
``` ```
You can now use this object to create the connection. You can now use connection client to create the connection.
```rust ```rust
let conn = cfg.connect()? ; let conn = builder.build()?;
``` ```
The connection object can create more than one. The connection object can create more than one.
```rust ```rust
let conn = cfg.connect()? ; let conn1 = builder.build()?;
let conn2 = cfg.connect()? ; let conn2 = builder.build()?;
``` ```
You can use connection pools in applications. DSN is short for **D**ata **S**ource **N**ame string - [a data structure used to describe a connection to a data source](https://en.wikipedia.org/wiki/Data_source_name).
```rust A common DSN is basically constructed as this:
let pool = r2d2::Pool::builder()
.max_size(10000) // max connections
.build(cfg)? ;
// ... ```text
// Use pool to get connection <driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
let conn = pool.get()? ; |------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver| protocol | | username | password | host | port | database | params |
``` ```
After that, you can perform the following operations on the database. - **Driver**: the main entrypoint to a processer. **Required**. In Rust connector, the supported driver names are listed here:
- **taos**: the legacy TDengine connection data source.
- **tmq**: subscription data source from TDengine.
- **http/ws**: use websocket protocol via `ws://` scheme.
- **https/wss**: use websocket protocol via `wss://` scheme.
- **Protocol**: the additional information appended to driver, which can be be used to support different kind of data sources. By default, leave it empty for native driver(only under feature "native"), and `ws/wss` for websocket driver (only under feature "ws"). **Optional**.
- **Username**: as its definition, is the username to the connection. **Optional**.
- **Password**: the password of the username. **Optional**.
- **Host**: address host to the datasource. **Optional**.
- **Port**: address port to the datasource. **Optional**.
- **Database**: database name or collection name in the datasource. **Optional**.
- **Params**: a key-value map for any other informations to the datasource. **Optional**.
Here is a simple DSN connection string example:
```text
taos+ws://localhost:6041/test
```
which means connect `localhost` with port `6041` via `ws` protocol, and make `test` as the default database.
So that you can use DSN to specify connection protocol at runtime:
```rust ```rust
async fn demo() -> Result<(), Error> { use taos::*; // use it like a `prelude` mod, we need some traits at next.
// get connection ...
// create database // use native protocol.
conn.exec("create database if not exists demo").await? let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
// change database context let conn1 = builder.build();
conn.exec("use demo").await?
// create table // use websocket protocol.
conn.exec("create table if not exists tb1 (ts timestamp, v int)").await? let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
// insert ```
conn.exec("insert into tb1 values(now, 1)").await?
// query After connected, you can perform the following operations on the database.
let rows = conn.query("select * from tb1").await?
for row in rows.rows { ```rust
println!("{}", row.into_iter().join(",")); async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
// prepare database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
let inserted = taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(16))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
let mut result = taos.query("select * from `meters`").await?;
for field in result.fields() {
println!("got field: {}", field.name());
} }
let values = result.
} }
``` ```
Rust connector provides two kinds of ways to fetch data:
```rust
// Query option 1, use rows stream.
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("got value of {}: {}", name, value);
}
}
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
dbg!(records);
Ok(())
```
## Usage examples ## Usage examples
### Write data ### Write data
@ -151,122 +225,138 @@ async fn demo() -> Result<(), Error> {
<RustInsert /> <RustInsert />
#### InfluxDB line protocol write #### Stmt bind
<RustInfluxLine /> <RustBind />
#### OpenTSDB Telnet line protocol write
<RustOpenTSDBTelnet />
#### OpenTSDB JSON line protocol write
<RustOpenTSDBJson />
### Query data ### Query data
<RustQuery /> <RustQuery />|
### More sample programs
| Program Path | Program Description |
| -------------- | ----------------------------------------------------------------------------- |
| [demo.rs] | Basic API Usage Examples |
| [bailongma-rs] | Using TDengine as the Prometheus remote storage API adapter for the storage backend, using the r2d2 connection pool |
## API Reference ## API Reference
### Connection constructor API ### Connector builder
The [Builder Pattern](https://doc.rust-lang.org/1.0.0/style/ownership/builders.html) constructor pattern is Rust's solution for handling complex data types or optional configuration types. The [libtaos] implementation uses the connection constructor [TaosCfgBuilder] as the entry point for the TDengine Rust connector. The [TaosCfgBuilder] provides optional configuration of servers, ports, databases, usernames, passwords, etc. Use DSN to directly construct a TaosBuilder object.
Using the `default()` method, you can construct a [TaosCfg] with default parameters for subsequent connections to the database or establishing connection pools.
```rust ```rust
let cfg = TaosCfgBuilder::default().build()? ; let builder = TaosBuilder::from_dsn("")? ;
``` ```
Using the constructor pattern, the user can set on-demand. Use `builder` to create many connections:
```rust ```rust
let cfg = TaosCfgBuilder::default() let conn: Taos = cfg.build();
.ip("127.0.0.1")
.user("root")
.pass("taosdata")
.db("log")
.port(6030u16)
.build()? ;
``` ```
Create TDengine connection using [TaosCfg] object. ### Connection pool
In complex applications, we recommend enabling connection pools. Connection pool for [taos] is implemented using [r2d2] by enabling "r2d2" feature.
Basically, a connection pool with default parameters can be generated as:
```rust ```rust
let conn: Taos = cfg.connect(); let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
``` ```
### Connection pooling You can set the connection pool parameters using the `PoolBuilder`.
In complex applications, we recommend enabling connection pools. Connection pool for [libtaos] is implemented using [r2d2].
As follows, a connection pool with default parameters can be generated.
```rust ```rust
let pool = r2d2::Pool::new(cfg)? ; let dsn = "taos://localhost:6030";
let opts = PoolBuilder::new()
.max_size(5000) // max connections
.max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
.min_idle(Some(1000)) // minimal idle connections
.connection_timeout(Duration::from_secs(2));
let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
``` ```
You can set the same connection pool parameters using the connection pool's constructor. In the application code, use `pool.get()?` to get a connection object [Taos].
```rust
use std::time::Duration;
let pool = r2d2::Pool::builder()
.max_size(5000) // max connections
.max_lifetime(Some(Duration::from_minutes(100))) // lifetime of each connection
.min_idle(Some(1000)) // minimal idle connections
.connection_timeout(Duration::from_minutes(2))
.build(cfg);
```
In the application code, use `pool.get()? ` to get a connection object [Taos].
```rust ```rust
let taos = pool.get()? ; let taos = pool.get()? ;
``` ```
The [Taos] structure is the connection manager in [libtaos] and provides two main APIs. ### Connection methods
1. `exec`: Execute some non-query SQL statements, such as `CREATE`, `ALTER`, `INSERT`, etc. The [Taos] connection struct provides several APIs for convenient use.
1. `exec`: Execute some non-query SQL statements, such as `CREATE`, `ALTER`, `INSERT` etc. and return affected rows (only meaningful to `INSERT`).
```rust ```rust
taos.exec().await? let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
``` ```
2. `query`: Execute the query statement and return the [TaosQueryData] object. 2. `exec_many`: You can execute many SQL statements in order with `exec_many` method.
```rust ```rust
let q = taos.query("select * from log.logs").await? taos.exec_many([
"CREATE DATABASE test",
"USE test",
"CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
]).await?;
``` ```
The [TaosQueryData] object stores the query result data and basic information about the returned columns (column name, type, length). 3. `query`: Execute the query statement and return the [ResultSet] object.
Column information is stored using [ColumnMeta].
```rust ```rust
let cols = &q.column_meta; let mut q = taos.query("select * from log.logs").await?
```
The [ResultSet] object stores the query result data and basic information about the returned columns (column name, type, length).
Get filed information with `fields` method.
```rust
let cols = q.fields();
for col in cols { for col in cols {
println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes); println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
} }
``` ```
It fetches data line by line. Users could fetch data by rows.
```rust ```rust
for (i, row) in q.rows.iter().enumerate() { let mut rows = result.rows();
for (j, cell) in row.iter().enumerate() { let mut nrows = 0;
println!("cell({}, {}) data: {}", i, j, cell); while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
} }
nrows += 1;
} }
``` ```
Or use it with [serde](https://serde.rs) deserialization.
```rust
#[derive(Debug, Deserialize)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
```
Note that Rust asynchronous functions and an asynchronous runtime are required. Note that Rust asynchronous functions and an asynchronous runtime are required.
[Taos] provides a few Rust methods that encapsulate SQL to reduce the frequency of `format!` code blocks. [Taos] provides a few Rust methods that encapsulate SQL to reduce the frequency of `format!` code blocks.
@ -275,110 +365,152 @@ Note that Rust asynchronous functions and an asynchronous runtime are required.
- `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement. - `.create_database(database: &str)`: Executes the `CREATE DATABASE` statement.
- `.use_database(database: &str)`: Executes the `USE` statement. - `.use_database(database: &str)`: Executes the `USE` statement.
In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage. ### Bind API
### Bind Interface Similar to the C interface, Rust provides the bind interface's wrapping. First, create a bind object [Stmt] for a SQL command with the [Taos] object.
Similar to the C interface, Rust provides the bind interface's wrapping. First, create a bind object [Stmt] for a SQL command from the [Taos] object.
```rust ```rust
let mut stmt: Stmt = taos.stmt("insert into ? values(? ,?)") ? ; let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
``` ```
The bind object provides a set of interfaces for implementing parameter binding. The bind object provides a set of interfaces for implementing parameter binding.
##### `.set_tbname(tbname: impl ToCString)` #### `.set_tbname(name)`
To bind table names. To bind table names.
##### `.set_tbname_tags(tbname: impl ToCString, tags: impl IntoParams)`
Bind sub-table table names and tag values when the SQL statement uses a super table.
```rust ```rust
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)") ? ; let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
// tags can be created with any supported type, here is an example using JSON stmt.set_tbname("d0")?;
let v = Field::Json(serde_json::from_str("{\"tag1\":\"one, two, three, four, five, six, seven, eight, nine, ten\"}").unwrap());
stmt.set_tbname_tags("tb0", [&tag])? ;
``` ```
##### `.bind(params: impl IntoParams)` #### `.set_tags(&[tag])`
Bind value types. Use the [Field] structure to construct the desired type and bind. Bind tag values when the SQL statement uses a super table.
```rust ```rust
let ts = Field::Timestamp(Timestamp::now()); let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
let value = Field::Float(0.0); stmt.set_tbname("d0")?;
stmt.bind(vec![ts, value].iter())? ; stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
``` ```
##### `.execute()` #### `.bind(&[column])`
Execute SQL.[Stmt] objects can be reused, re-binded, and executed after execution. Bind value types. Use the [ColumnView] structure to construct the desired type and bind.
```rust ```rust
stmt.execute()? ; let params = vec![
ColumnView::from_millis_timestamp(vec![164000000000]),
ColumnView::from_bools(vec![true]),
ColumnView::from_tiny_ints(vec![i8::MAX]),
ColumnView::from_small_ints(vec![i16::MAX]),
ColumnView::from_ints(vec![i32::MAX]),
ColumnView::from_big_ints(vec![i64::MAX]),
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
ColumnView::from_unsigned_ints(vec![u32::MAX]),
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
ColumnView::from_floats(vec![f32::MAX]),
ColumnView::from_doubles(vec![f64::MAX]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
```
#### `.execute()`
Execute to insert all bind records. [Stmt] objects can be reused, re-bind, and executed after execution. Remember to call `add_batch` before `execute`.
```rust
stmt.add_batch()?.execute()?;
// next bind cycle. // next bind cycle.
// stmt.set_tbname()? ; // stmt.set_tbname()? ;
//stmt.bind()? ; //stmt.bind()? ;
//stmt.execute()? ; //stmt.add_batch().execute()? ;
``` ```
### Line protocol interface A runnable example for bind can be found [here](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
The line protocol interface supports multiple modes and different precision and requires the introduction of constants in the schemaless module to set. ### Subscription API
Users can subscribe a [TOPIC](../../../taos-sql/tmq/) with TMQ(the TDengine Message Queue) API.
Start from a TMQ builder:
```rust ```rust
use libtaos::*; let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
use libtaos::schemaless::*;
``` ```
- InfluxDB line protocol Build a consumer:
```rust ```rust
let lines = [ let mut consumer = tmq.build()?;
"st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"pass\",c2=false 1626006833639000000" ```
"st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"abc\",c4=4f64 1626006833639000000"
];
taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANOSECONDS)? ;
```
- OpenTSDB Telnet Protocol Subscribe a topic:
```rust ```rust
let lines = ["sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0"]; consumer.subscribe(["tmq_meters"]).await?;
taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)? ; ```
```
- OpenTSDB JSON protocol Consume messages, and commit the offset for each message.
```rust ```rust
let lines = [r#" {
{ let mut stream = consumer.stream();
"metric": "st",
"timestamp": 1626006833, while let Some((offset, message)) = stream.try_next().await? {
"value": 10, // get information from offset
"tags": {
"t1": true, // the topic
"t2": false, let topic = offset.topic();
"t3": 10, // the vgroup id, like partition id in kafka.
"t4": "123_abc_.! @#$%^&*:;,. /? |+-=()[]{}<>" let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
} }
}"#]; }
taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)? ; consumer.commit(offset).await?;
``` }
}
```
Please move to the Rust documentation hosting page for other related structure API usage instructions: <https://docs.rs/libtaos>. Unsubscribe:
[libtaos]: https://github.com/taosdata/libtaos-rs ```rust
[tdengine]: https://github.com/taosdata/TDengine consumer.unsubscribe().await;
[bailongma-rs]: https://github.com/taosdata/bailongma-rs ```
In TMQ DSN, you must choose to subscribe with a group id. Also, there's several options could be set:
- `group.id`: **Required**, a group id is any visible string you set.
- `client.id`: a optional client description string.
- `auto.offset.reset`: choose to subscribe from *earliest* or *latest*, default is *none* which means 'earliest'.
- `enable.auto.commit`: automatically commit with specified time interval. By default - in the recommended way _ you must use `commit` to ensure that you've consumed the messages correctly, otherwise, consumers will received repeated messages when re-subscribe.
- `auto.commit.interval.ms`: the auto commit interval in milliseconds.
Check the whole subscription example at [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/subscribe.rs).
Please move to the Rust documentation hosting page for other related structure API usage instructions: <https://docs.rs/taos>.
[TDengine]: https://github.com/taosdata/TDengine
[r2d2]: https://crates.io/crates/r2d2 [r2d2]: https://crates.io/crates/r2d2
[demo.rs]: https://github.com/taosdata/libtaos-rs/blob/main/examples/demo.rs [Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html
[TaosCfgBuilder]: https://docs.rs/libtaos/latest/libtaos/struct.TaosCfgBuilder.html [ResultSet]: https://docs.rs/taos/latest/taos/struct.ResultSet.html
[TaosCfg]: https://docs.rs/libtaos/latest/libtaos/struct.TaosCfg.html [Value]: https://docs.rs/taos/latest/taos/enum.Value.html
[Taos]: https://docs.rs/libtaos/latest/libtaos/struct.Taos.html [Stmt]: https://docs.rs/taos/latest/taos/stmt/struct.Stmt.html
[TaosQueryData]: https://docs.rs/libtaos/latest/libtaos/field/struct.TaosQueryData.html [taos]: https://crates.io/crates/taos
[Field]: https://docs.rs/libtaos/latest/libtaos/field/enum.Field.html
[Stmt]: https://docs.rs/libtaos/latest/libtaos/stmt/struct.Stmt.html

View File

@ -1,2 +1,2 @@
[workspace] [workspace]
members = ["restexample", "nativeexample", "schemalessexample"] members = ["restexample", "nativeexample"]

View File

@ -5,6 +5,9 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
libtaos = { version = "0.4.3" } anyhow = "1"
tokio = { version = "*", features = ["rt", "macros", "rt-multi-thread"] } chrono = "0.4"
bstr = { version = "*" } serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
taos = { version = "0.*" }

View File

@ -1,19 +1,9 @@
use libtaos::*; use taos::*;
fn taos_connect() -> Result<Taos, Error> { #[tokio::main]
TaosCfgBuilder::default() async fn main() -> Result<(), Error> {
.ip("localhost")
.user("root")
.pass("taosdata")
// .db("log") // remove comment if you want to connect to database log by default.
.port(6030u16)
.build()
.expect("TaosCfg builder error")
.connect()
}
fn main() {
#[allow(unused_variables)] #[allow(unused_variables)]
let taos = taos_connect().unwrap(); let taos = TaosBuilder::from_dsn("taos://")?.build()?;
println!("Connected") println!("Connected");
Ok(())
} }

View File

@ -1,38 +1,40 @@
use bstr::BString; use taos::*;
use libtaos::*;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Error> { async fn main() -> anyhow::Result<()> {
let taos = TaosCfg::default().connect().expect("fail to connect"); let taos = TaosBuilder::from_dsn("taos://")?.build()?;
taos.create_database("power").await?; taos.create_database("power").await?;
taos.use_database("power").await?; taos.use_database("power").await?;
taos.exec("CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?; taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
let mut stmt = taos.stmt("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
let mut stmt = Stmt::init(&taos)?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
// bind table name and tags // bind table name and tags
stmt.set_tbname_tags( stmt.set_tbname_tags(
"d1001", "d1001",
[ &[Value::VarChar("San Fransico".into()), Value::Int(2)],
Field::Binary(BString::from("California.SanFrancisco")),
Field::Int(2),
],
)?; )?;
// bind values. // bind values.
let values = vec![ let values = vec![
Field::Timestamp(Timestamp::new(1648432611249, TimestampPrecision::Milli)), ColumnView::from_millis_timestamp(vec![1648432611249]),
Field::Float(10.3), ColumnView::from_floats(vec![10.3]),
Field::Int(219), ColumnView::from_ints(vec![219]),
Field::Float(0.31), ColumnView::from_floats(vec![0.31]),
]; ];
stmt.bind(&values)?; stmt.bind(&values)?;
// bind one more row // bind one more row
let values2 = vec![ let values2 = vec![
Field::Timestamp(Timestamp::new(1648432611749, TimestampPrecision::Milli)), ColumnView::from_millis_timestamp(vec![1648432611749]),
Field::Float(12.6), ColumnView::from_floats(vec![12.6]),
Field::Int(218), ColumnView::from_ints(vec![218]),
Field::Float(0.33), ColumnView::from_floats(vec![0.33]),
]; ];
stmt.bind(&values2)?; stmt.bind(&values2)?;
// execute
stmt.execute()?; stmt.add_batch()?;
// execute.
let rows = stmt.execute()?;
assert_eq!(rows, 2);
Ok(()) Ok(())
} }

View File

@ -1,3 +1,101 @@
fn main() { use std::time::Duration;
} use chrono::{DateTime, Local};
use taos::*;
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
}
async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "tmq";
// prepare database
taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"),
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"),
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
])
.await?;
let task = tokio::spawn(prepare(taos));
tokio::time::sleep(Duration::from_secs(1)).await;
// subscribe
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?;
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
consumer.unsubscribe().await;
task.await??;
Ok(())
}

View File

@ -4,5 +4,9 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
libtaos = { version = "0.4.3", features = ["rest"] } anyhow = "1"
tokio = { version = "*", features = ["rt", "macros", "rt-multi-thread"] } chrono = "0.4"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
taos = { version = "0.*" }

View File

@ -1,20 +1,9 @@
use libtaos::*; use taos::*;
fn taos_connect() -> Result<Taos, Error> {
TaosCfgBuilder::default()
.ip("localhost")
.user("root")
.pass("taosdata")
// .db("log") // remove comment if you want to connect to database log by default.
.port(6030u16)
.build()
.expect("TaosCfg builder error")
.connect()
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() -> Result<(), Error> {
#[allow(unused_variables)] #[allow(unused_variables)]
let taos = taos_connect().expect("connect error"); let taos = TaosBuilder::from_dsn("taos://")?.build()?;
println!("Connected") println!("Connected");
Ok(())
} }

View File

@ -1,18 +1,29 @@
use libtaos::*; use taos::*;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Error> { async fn main() -> anyhow::Result<()> {
let taos = TaosCfg::default().connect().expect("fail to connect"); let dsn = "ws://";
taos.create_database("power").await?; let taos = TaosBuilder::from_dsn(dsn)?.build()?;
taos.exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
let sql = "INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) taos.exec_many([
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) "DROP DATABASE IF EXISTS power",
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)"; "CREATE DATABASE power",
let result = taos.query(sql).await?; "USE power",
println!("{:?}", result); "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
]).await?;
let inserted = taos.exec("INSERT INTO
power.d1001 USING power.meters TAGS('San Francisco', 2)
VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000)
('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('San Francisco', 3)
VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('Los Angeles', 2)
VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('Los Angeles', 3)
VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)").await?;
assert_eq!(inserted, 8);
Ok(()) Ok(())
} }
// output:
// TaosQueryData { column_meta: [ColumnMeta { name: "affected_rows", type_: Int, bytes: 4 }], rows: [[Int(8)]] }

View File

@ -1,39 +1,25 @@
use libtaos::*; use taos::sync::*;
fn taos_connect() -> Result<Taos, Error> { fn main() -> anyhow::Result<()> {
TaosCfgBuilder::default() let taos = TaosBuilder::from_dsn("ws:///power")?.build()?;
.ip("localhost") let mut result = taos.query("SELECT ts, current FROM meters LIMIT 2")?;
.user("root")
.pass("taosdata")
.db("power")
.port(6030u16)
.build()
.expect("TaosCfg builder error")
.connect()
}
#[tokio::main]
async fn main() -> Result<(), Error> {
let taos = taos_connect().expect("connect error");
let result = taos.query("SELECT ts, current FROM meters LIMIT 2").await?;
// print column names // print column names
let meta: Vec<ColumnMeta> = result.column_meta; let meta = result.fields();
for column in meta { println!("{}", meta.iter().map(|field| field.name()).join("\t"));
print!("{}\t", column.name)
}
println!();
// print rows // print rows
let rows: Vec<Vec<Field>> = result.rows; let rows = result.rows();
for row in rows { for row in rows {
for field in row { let row = row?;
print!("{}\t", field); for (_name, value) in row {
print!("{}\t", value);
} }
println!(); println!();
} }
Ok(()) Ok(())
} }
// output: // output(suppose you are in +8 timezone):
// ts current // ts current
// 2022-03-28 09:56:51.249 10.3 // 2018-10-03T14:38:05+08:00 10.3
// 2022-03-28 09:56:51.749 12.6 // 2018-10-03T14:38:15+08:00 12.6

View File

@ -1,7 +0,0 @@
[package]
name = "schemalessexample"
version = "0.1.0"
edition = "2021"
[dependencies]
libtaos = { version = "0.4.3" }

View File

@ -1,22 +0,0 @@
use libtaos::schemaless::*;
use libtaos::*;
fn main() {
let taos = TaosCfg::default().connect().expect("fail to connect");
taos.raw_query("CREATE DATABASE test").unwrap();
taos.raw_query("USE test").unwrap();
let lines = ["meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
"meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
"meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
"meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250"];
let affected_rows = taos
.schemaless_insert(
&lines,
TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLISECONDS,
)
.unwrap();
println!("affected_rows={}", affected_rows);
}
// run with: cargo run --example influxdb_line_example

View File

@ -1,25 +0,0 @@
use libtaos::schemaless::*;
use libtaos::*;
fn main() {
let taos = TaosCfg::default().connect().expect("fail to connect");
taos.raw_query("CREATE DATABASE test").unwrap();
taos.raw_query("USE test").unwrap();
let lines = [
r#"[{"metric": "meters.current", "timestamp": 1648432611249, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
{"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}},
{"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
{"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#,
];
let affected_rows = taos
.schemaless_insert(
&lines,
TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NOT_CONFIGURED,
)
.unwrap();
println!("affected_rows={}", affected_rows); // affected_rows=4
}
// run with: cargo run --example opentsdb_json_example

View File

@ -1,28 +0,0 @@
use libtaos::schemaless::*;
use libtaos::*;
fn main() {
let taos = TaosCfg::default().connect().expect("fail to connect");
taos.raw_query("CREATE DATABASE test").unwrap();
taos.raw_query("USE test").unwrap();
let lines = [
"meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
"meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
"meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
"meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
"meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
"meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
"meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
"meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
];
let affected_rows = taos
.schemaless_insert(
&lines,
TSDB_SML_TELNET_PROTOCOL,
TSDB_SML_TIMESTAMP_NOT_CONFIGURED,
)
.unwrap();
println!("affected_rows={}", affected_rows); // affected_rows=8
}
// run with: cargo run --example opentsdb_telnet_example

View File

@ -1,3 +0,0 @@
fn main() {
println!("Hello, world!");
}

View File

@ -10,141 +10,213 @@ import TabItem from '@theme/TabItem';
import Preparition from "./_preparition.mdx" import Preparition from "./_preparition.mdx"
import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx" import RustInsert from "../../07-develop/03-insert-data/_rust_sql.mdx"
import RustInfluxLine from "../../07-develop/03-insert-data/_rust_line.mdx" import RustBind from "../../07-develop/03-insert-data/_rust_stmt.mdx"
import RustOpenTSDBTelnet from "../../07-develop/03-insert-data/_rust_opts_telnet.mdx"
import RustOpenTSDBJson from "../../07-develop/03-insert-data/_rust_opts_json.mdx"
import RustQuery from "../../07-develop/04-query-data/_rust.mdx" import RustQuery from "../../07-develop/04-query-data/_rust.mdx"
[![Crates.io](https://img.shields.io/crates/v/libtaos)](https://crates.io/crates/libtaos) ![Crates.io](https://img.shields.io/crates/d/libtaos) [![docs.rs](https://img.shields.io/docsrs/libtaos)](https://docs.rs/libtaos) [![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos)
`libtaos` 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。 `taos` 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。
`libtaos` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序taosc连接 TDengine 运行实例。另外一种是 **REST 连接**,它通过 taosAdapter 的 REST 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 features” 来指定使用哪种连接器。REST 连接支持任何平台,但原生连接支持所有 TDengine 客户端能运行的平台。 `taos` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序taosc连接 TDengine 运行实例。另外一种是 **Websocket 连接**,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 `features`)” 来指定使用哪种连接器默认同时支持。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。
`libtaos` 的源码托管在 [GitHub](https://github.com/taosdata/libtaos-rs)。 该 Rust 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-rust)。
## 支持的平台 ## 支持的平台
原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。
REST 连接支持所有能运行 Rust 的平台。 Websocket 连接支持所有能运行 Rust 的平台。
## 版本支持 ## 版本支持
请参考[版本支持列表](/reference/connector#版本支持) 请参考[版本支持列表](/reference/connector#版本支持)
Rust 连接器仍然在快速开发中1.0 之前无法保证其向后兼容。建议使用 2.4 版本以上的 TDengine以避免已知问题。 Rust 连接器仍然在快速开发中1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine以避免已知问题。
## 安装 ## 安装
### 安装前准备 ### 安装前准备
* 安装 Rust 开发工具链 * 安装 Rust 开发工具链
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动) * 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](/reference/connector#安装客户端驱动)
### 添加 libtaos 依赖 ### 添加 taos 依赖
根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [libtaos][libtaos] 依赖: 根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖:
<Tabs defaultValue="native"> <Tabs defaultValue="default">
<TabItem value="native" label="原生连接"> <TabItem value="default" label="同时支持">
在 `Cargo.toml` 文件中添加 [libtaos][libtaos] 在 `Cargo.toml` 文件中添加 [taos][taos]
```toml ```toml
[dependencies] [dependencies]
# use default feature # use default feature
libtaos = "*" taos = "*"
``` ```
</TabItem> <Tabs defaultValue="native">
<TabItem value="rest" label="REST 连接"> <TabItem value="native" label="仅原生连接">
在 `Cargo.toml` 文件中添加 [libtaos][libtaos],并启用 `rest` 特性。 在 `Cargo.toml` 文件中添加 [taos][taos]
```toml ```toml
[dependencies] [dependencies]
# use rest feature taos = { version = "*", default-features = false, features = ["native"] }
libtaos = { version = "*", features = ["rest"]} ```
</TabItem>
<TabItem value="rest" label="仅 Websocket">
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。
```toml
[dependencies]
taos = { version = "*", default-features = false, features = ["ws"] }
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
### 使用连接池
请在 `Cargo.toml` 中启用 `r2d2` 特性。
```toml
[dependencies]
# with taosc
libtaos = { version = "*", features = ["r2d2"] }
# or rest
libtaos = { version = "*", features = ["rest", "r2d2"] }
```
## 建立连接 ## 建立连接
[TaosCfgBuilder] 为使用者提供构造器形式的 API以便于后续创建连接或使用连接池 [TaosBuilder] 通过 DSN 连接描述字符串创建一个连接构造器。
```rust ```rust
let cfg: TaosCfg = TaosCfgBuilder::default() let builder = TaosBuilder::from_dsn("taos://")?;
.ip("127.0.0.1")
.user("root")
.pass("taosdata")
.db("log") // do not set if not require a default database.
.port(6030u16)
.build()
.expect("TaosCfg builder error");
}
``` ```
现在您可以使用该对象创建连接: 现在您可以使用该对象创建连接:
```rust ```rust
let conn = cfg.connect()?; let conn = builder.build()?;
``` ```
连接对象可以创建多个: 连接对象可以创建多个:
```rust ```rust
let conn = cfg.connect()?; let conn1 = builder.build()?;
let conn2 = cfg.connect()?; let conn2 = builder.build()?;
``` ```
可以在应用中使用连接池 DSN 描述字符串基本结构如下
```rust ```text
let pool = r2d2::Pool::builder() <driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
.max_size(10000) // max connections |------|------------|---|-----------|-----------|------|------|------------|-----------------------|
.build(cfg)?; |driver| protocol | | username | password | host | port | database | params |
// ...
// Use pool to get connection
let conn = pool.get()?;
``` ```
之后您可以对数据库进行相关操作: 各部分意义见下表:
- **driver**: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
- **taos**: 表名使用 TDengine 连接器驱动。
- **tmq**: 使用 TMQ 订阅数据。
- **http/ws**: 使用 Websocket 创建连接。
- **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
- **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。
- **username/password**: 用于创建连接的用户名及密码。
- **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(`taos://`),原生连接默认为 `localhost:6030`Websocket 连接默认为 `localhost:6041` 。
- **database**: 指定默认连接的数据库名。
- **params**:其他可选参数。
一个完整的 DSN 描述字符串示例如下:
```text
taos+ws://localhost:6041/test
```
表示使用 Websocket`ws`)方式通过 `6041` 端口连接服务器 `localhost`,并指定默认数据库为 `test`。
这使得用户可以通过 DSN 指定连接方式:
```rust ```rust
async fn demo() -> Result<(), Error> { use taos::*;
// get connection ...
// create database // use native protocol.
conn.exec("create database if not exists demo").await?; let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
// change database context let conn1 = builder.build();
conn.exec("use demo").await?;
// create table // use websocket protocol.
conn.exec("create table if not exists tb1 (ts timestamp, v int)").await?; let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
// insert ```
conn.exec("insert into tb1 values(now, 1)").await?;
// query 建立连接后,您可以进行相关数据库操作:
let rows = conn.query("select * from tb1").await?;
for row in rows.rows { ```rust
println!("{}", row.into_iter().join(",")); async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
// prepare database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
let inserted = taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(16))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
let mut result = taos.query("select * from `meters`").await?;
for field in result.fields() {
println!("got field: {}", field.name());
} }
let values = result.
} }
``` ```
查询数据可以通过两种方式:使用内建类型或 [serde](https://serde.rs) 序列化框架。
```rust
// Query option 1, use rows stream.
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("got value of {}: {}", name, value);
}
}
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
dbg!(records);
Ok(())
```
## 使用示例 ## 使用示例
### 写入数据 ### 写入数据
@ -153,79 +225,52 @@ async fn demo() -> Result<(), Error> {
<RustInsert /> <RustInsert />
#### InfluxDB 行协议写入 #### STMT 写入
<RustInfluxLine /> <RustBind />
#### OpenTSDB Telnet 行协议写入
<RustOpenTSDBTelnet />
#### OpenTSDB JSON 行协议写入
<RustOpenTSDBJson />
### 查询数据 ### 查询数据
<RustQuery /> <RustQuery />
### 更多示例程序
| 程序路径 | 程序说明 |
| -------------- | ----------------------------------------------------------------------------- |
| [demo.rs] | 基本API 使用示例 |
| [bailongma-rs] | 使用 TDengine 作为存储后端的 Prometheus 远程存储 API 适配器,使用 r2d2 连接池 |
## API 参考 ## API 参考
### 连接构造器 API ### 连接构造器
[Builder Pattern](https://doc.rust-lang.org/1.0.0/style/ownership/builders.html) 构造器模式是 Rust 处理复杂数据类型或可选配置类型的解决方案。[libtaos] 实现中,使用连接构造器 [TaosCfgBuilder] 作为 TDengine Rust 连接器的入口。[TaosCfgBuilder] 提供对服务器、端口、数据库、用户名和密码等的可选配置。 通过 DSN 来构建一个连接器构造器。
使用 `default()` 方法可以构建一个默认参数的 [TaosCfg],用于后续连接数据库或建立连接池。
```rust ```rust
let cfg = TaosCfgBuilder::default().build()?; let cfg = TaosBuilder::default().build()?;
``` ```
使用构造器模式,用户可按需设置 使用 `builder` 对象创建多个连接
```rust ```rust
let cfg = TaosCfgBuilder::default() let conn: Taos = cfg.build();
.ip("127.0.0.1")
.user("root")
.pass("taosdata")
.db("log")
.port(6030u16)
.build()?;
```
使用 [TaosCfg] 对象创建 TDengine 连接:
```rust
let conn: Taos = cfg.connect();
``` ```
### 连接池 ### 连接池
在复杂应用中,建议启用连接池。[libtaos] 的连接池使用 [r2d2] 实现。 在复杂应用中,建议启用连接池。[taos] 的连接池使用 [r2d2] 实现。
如下,可以生成一个默认参数的连接池。 如下,可以生成一个默认参数的连接池。
```rust ```rust
let pool = r2d2::Pool::new(cfg)?; let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
``` ```
同样可以使用连接池的构造器,对连接池参数进行设置: 同样可以使用连接池的构造器,对连接池参数进行设置:
```rust ```rust
use std::time::Duration; let dsn = "taos://localhost:6030";
let pool = r2d2::Pool::builder()
.max_size(5000) // max connections let opts = PoolBuilder::new()
.max_lifetime(Some(Duration::from_minutes(100))) // lifetime of each connection .max_size(5000) // max connections
.min_idle(Some(1000)) // minimal idle connections .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
.connection_timeout(Duration::from_minutes(2)) .min_idle(Some(1000)) // minimal idle connections
.build(cfg); .connection_timeout(Duration::from_secs(2));
let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
``` ```
在应用代码中,使用 `pool.get()?` 来获取一个连接对象 [Taos]。 在应用代码中,使用 `pool.get()?` 来获取一个连接对象 [Taos]。
@ -236,44 +281,85 @@ let taos = pool.get()?;
### 连接 ### 连接
[Taos] 结构体是 [libtaos] 中的连接管理者,主要提供了两个 API [Taos][struct.Taos] 对象提供了多个数据库操作的 API
1. `exec`: 执行某个非查询类 SQL 语句,例如 `CREATE``ALTER``INSERT` 等。 1. `exec`: 执行某个非查询类 SQL 语句,例如 `CREATE``ALTER``INSERT` 等。
```rust ```rust
taos.exec().await?; let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
``` ```
2. `query`:执行查询语句,返回 [TaosQueryData] 对象 2. `exec_many`: 同时(顺序)执行多个 SQL 语句
```rust ```rust
let q = taos.query("select * from log.logs").await?; taos.exec_many([
"CREATE DATABASE test",
"USE test",
"CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
]).await?;
``` ```
[TaosQueryData] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度): 3. `query`:执行查询语句,返回 [ResultSet] 对象。
列信息使用 [ColumnMeta] 存储:
```rust ```rust
let cols = &q.column_meta; let mut q = taos.query("select * from log.logs").await?;
```
[ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):
列信息使用 [.fields()] 方法获取:
```rust
let cols = q.fields();
for col in cols { for col in cols {
println!("name: {}, type: {:?}, bytes: {}", col.name, col.type_, col.bytes); println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
} }
``` ```
逐行获取数据: 逐行获取数据:
```rust ```rust
for (i, row) in q.rows.iter().enumerate() { let mut rows = result.rows();
for (j, cell) in row.iter().enumerate() { let mut nrows = 0;
println!("cell({}, {}) data: {}", i, j, cell); while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
} }
nrows += 1;
} }
``` ```
或使用 [serde](https://serde.rs) 序列化框架。
```rust
#[derive(Debug, Deserialize)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
```
需要注意的是,需要使用 Rust 异步函数和异步运行时。 需要注意的是,需要使用 Rust 异步函数和异步运行时。
[Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率: [Taos][struct.Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率:
- `.describe(table: &str)`: 执行 `DESCRIBE` 并返回一个 Rust 数据结构。 - `.describe(table: &str)`: 执行 `DESCRIBE` 并返回一个 Rust 数据结构。
- `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。 - `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。
@ -283,42 +369,61 @@ let taos = pool.get()?;
### 参数绑定接口 ### 参数绑定接口
与 C 接口类似Rust 提供参数绑定接口。首先,通过 [Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt] 与 C 接口类似Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]
```rust ```rust
let mut stmt: Stmt = taos.stmt("insert into ? values(?,?)")?; let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
``` ```
参数绑定对象提供了一组接口用于实现参数绑定: 参数绑定对象提供了一组接口用于实现参数绑定:
##### `.set_tbname(tbname: impl ToCString)` #### `.set_tbname(name)`
用于绑定表名。 用于绑定表名。
##### `.set_tbname_tags(tbname: impl ToCString, tags: impl IntoParams)` ```rust
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
```
#### `.set_tags(&[tag])`
当 SQL 语句使用超级表时,用于绑定子表表名和标签值: 当 SQL 语句使用超级表时,用于绑定子表表名和标签值:
```rust ```rust
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(?,?)")?; let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
// tags can be created with any supported type, here is an example using JSON stmt.set_tbname("d0")?;
let v = Field::Json(serde_json::from_str("{\"tag1\":\"一二三四五六七八九十\"}").unwrap()); stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
stmt.set_tbname_tags("tb0", [&tag])?;
``` ```
##### `.bind(params: impl IntoParams)` #### `.bind(&[column])`
用于绑定值类型。使用 [Field] 结构体构建需要的类型并绑定: 用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:
```rust ```rust
let ts = Field::Timestamp(Timestamp::now()); let params = vec![
let value = Field::Float(0.0); ColumnView::from_millis_timestamp(vec![164000000000]),
stmt.bind(vec![ts, value].iter())?; ColumnView::from_bools(vec![true]),
ColumnView::from_tiny_ints(vec![i8::MAX]),
ColumnView::from_small_ints(vec![i16::MAX]),
ColumnView::from_ints(vec![i32::MAX]),
ColumnView::from_big_ints(vec![i64::MAX]),
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
ColumnView::from_unsigned_ints(vec![u32::MAX]),
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
ColumnView::from_floats(vec![f32::MAX]),
ColumnView::from_doubles(vec![f64::MAX]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
``` ```
##### `.execute()` #### `.execute()`
执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。 执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。
```rust ```rust
stmt.execute()?; stmt.execute()?;
@ -329,60 +434,84 @@ stmt.execute()?;
//stmt.execute()?; //stmt.execute()?;
``` ```
### 行协议接口 一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。
行协议接口支持多种模式和不同精度,需要引入 schemaless 模块中的常量以进行设置: ### 订阅
TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。
从 DSN 开始,构建一个 TMQ 连接器。
```rust ```rust
use libtaos::*; let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
use libtaos::schemaless::*;
``` ```
- InfluxDB 行协议 创建消费者:
```rust ```rust
let lines = [ let mut consumer = tmq.build()?;
"st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"pass\",c2=false 1626006833639000000" ```
"st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"abc\",c4=4f64 1626006833639000000"
];
taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANOSECONDS)?;
```
- OpenTSDB Telnet 协议 消费者可订阅一个或多个 `TOPIC`。
```rust ```rust
let lines = ["sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0"]; consumer.subscribe(["tmq_meters"]).await?;
taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)?; ```
```
- OpenTSDB JSON 协议 TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
```rust ```rust
let lines = [r#" {
{ let mut stream = consumer.stream();
"metric": "st",
"timestamp": 1626006833, while let Some((offset, message)) = stream.try_next().await? {
"value": 10, // get information from offset
"tags": {
"t1": true, // the topic
"t2": false, let topic = offset.topic();
"t3": 10, // the vgroup id, like partition id in kafka.
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>" let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
} }
}"#]; }
taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)?; consumer.commit(offset).await?;
``` }
}
```
其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/libtaos>。 停止订阅:
[libtaos]: https://github.com/taosdata/libtaos-rs ```rust
[tdengine]: https://github.com/taosdata/TDengine consumer.unsubscribe().await;
[bailongma-rs]: https://github.com/taosdata/bailongma-rs ```
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。
- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/taos-connector-rust/blob/main/examples/subscribe.rs).
其他相关结构体 API 使用说明请移步 Rust 文档托管网页:<https://docs.rs/taos>。
[taos]: https://github.com/taosdata/rust-connector-taos
[r2d2]: https://crates.io/crates/r2d2 [r2d2]: https://crates.io/crates/r2d2
[demo.rs]: https://github.com/taosdata/libtaos-rs/blob/main/examples/demo.rs [TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html
[TaosCfgBuilder]: https://docs.rs/libtaos/latest/libtaos/struct.TaosCfgBuilder.html [TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html
[TaosCfg]: https://docs.rs/libtaos/latest/libtaos/struct.TaosCfg.html [struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html
[Taos]: https://docs.rs/libtaos/latest/libtaos/struct.Taos.html [Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html
[TaosQueryData]: https://docs.rs/libtaos/latest/libtaos/field/struct.TaosQueryData.html
[Field]: https://docs.rs/libtaos/latest/libtaos/field/enum.Field.html
[Stmt]: https://docs.rs/libtaos/latest/libtaos/stmt/struct.Stmt.html