mod rust doc
This commit is contained in:
parent
7fa473624d
commit
2133646a78
|
@ -82,360 +82,7 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对
|
|||
|
||||
**注意**:JSON 类型仅在 tag 中支持。
|
||||
|
||||
## 安装步骤
|
||||
|
||||
### 安装前准备
|
||||
|
||||
* 安装 Rust 开发工具链
|
||||
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
|
||||
|
||||
### 安装连接器
|
||||
|
||||
根据选择的连接方式,按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖:
|
||||
|
||||
<Tabs defaultValue="default">
|
||||
<TabItem value="default" label="同时支持">
|
||||
|
||||
在 `Cargo.toml` 文件中添加 [taos][taos]:
|
||||
|
||||
```toml
|
||||
[dependencies]
|
||||
# use default feature
|
||||
taos = "*"
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="rest" label="仅 Websocket">
|
||||
|
||||
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。
|
||||
|
||||
```toml
|
||||
[dependencies]
|
||||
taos = { version = "*", default-features = false, features = ["ws"] }
|
||||
```
|
||||
|
||||
当仅启用 `ws` 特性时,可同时指定 `r2d2` 使得在同步(blocking/sync)模式下使用 [r2d2] 作为连接池:
|
||||
|
||||
```toml
|
||||
[dependencies]
|
||||
taos = { version = "*", default-features = false, features = ["r2d2", "ws"] }
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="native" label="仅原生连接">
|
||||
|
||||
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性:
|
||||
|
||||
```toml
|
||||
[dependencies]
|
||||
taos = { version = "*", default-features = false, features = ["native"] }
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
## 建立连接
|
||||
|
||||
[TaosBuilder] 通过 DSN 连接描述字符串创建一个连接构造器。
|
||||
|
||||
```rust
|
||||
let builder = TaosBuilder::from_dsn("taos://")?;
|
||||
```
|
||||
|
||||
现在您可以使用该对象创建连接:
|
||||
|
||||
```rust
|
||||
let conn = builder.build()?;
|
||||
```
|
||||
|
||||
连接对象可以创建多个:
|
||||
|
||||
```rust
|
||||
let conn1 = builder.build()?;
|
||||
let conn2 = builder.build()?;
|
||||
```
|
||||
|
||||
DSN 描述字符串基本结构如下:
|
||||
|
||||
```text
|
||||
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|
||||
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|
||||
|driver| protocol | | username | password | host | port | database | params |
|
||||
```
|
||||
|
||||
各部分意义见下表:
|
||||
|
||||
- **driver**: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
|
||||
- **taos**: 表明使用 TDengine 连接器驱动。
|
||||
- **tmq**: 使用 TMQ 订阅数据。
|
||||
- **http/ws**: 使用 Websocket 创建连接。
|
||||
- **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
|
||||
- **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。
|
||||
- **username/password**: 用于创建连接的用户名及密码。
|
||||
- **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(`taos://`),原生连接默认为 `localhost:6030`,Websocket 连接默认为 `localhost:6041` 。
|
||||
- **database**: 指定默认连接的数据库名,可选参数。
|
||||
- **params**:其他可选参数。
|
||||
|
||||
一个完整的 DSN 描述字符串示例如下:
|
||||
|
||||
```text
|
||||
taos+ws://localhost:6041/test
|
||||
```
|
||||
|
||||
表示使用 Websocket(`ws`)方式通过 `6041` 端口连接服务器 `localhost`,并指定默认数据库为 `test`。
|
||||
|
||||
这使得用户可以通过 DSN 指定连接方式:
|
||||
|
||||
```rust
|
||||
use taos::*;
|
||||
|
||||
// use native protocol.
|
||||
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
|
||||
let conn1 = builder.build();
|
||||
|
||||
// use websocket protocol.
|
||||
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
|
||||
let conn2 = builder2.build();
|
||||
```
|
||||
|
||||
建立连接后,您可以进行相关数据库操作:
|
||||
|
||||
```rust
|
||||
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
|
||||
// prepare database
|
||||
taos.exec_many([
|
||||
format!("DROP DATABASE IF EXISTS `{db}`"),
|
||||
format!("CREATE DATABASE `{db}`"),
|
||||
format!("USE `{db}`"),
|
||||
])
|
||||
.await?;
|
||||
|
||||
let inserted = taos.exec_many([
|
||||
// create super table
|
||||
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
|
||||
TAGS (`groupid` INT, `location` BINARY(24))",
|
||||
// create child table
|
||||
"CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
|
||||
// insert into child table
|
||||
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
|
||||
// insert with NULL values
|
||||
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
|
||||
// insert and automatically create table with tags if not exists
|
||||
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
|
||||
// insert many records in a single sql
|
||||
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
|
||||
]).await?;
|
||||
|
||||
assert_eq!(inserted, 6);
|
||||
let mut result = taos.query("select * from `meters`").await?;
|
||||
|
||||
for field in result.fields() {
|
||||
println!("got field: {}", field.name());
|
||||
}
|
||||
|
||||
let values = result.
|
||||
}
|
||||
```
|
||||
|
||||
查询数据可以通过两种方式:使用内建类型或 [serde](https://serde.rs) 序列化框架。
|
||||
|
||||
```rust
|
||||
// Query option 1, use rows stream.
|
||||
let mut rows = result.rows();
|
||||
while let Some(row) = rows.try_next().await? {
|
||||
for (name, value) in row {
|
||||
println!("got value of {}: {}", name, value);
|
||||
}
|
||||
}
|
||||
|
||||
// Query options 2, use deserialization with serde.
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[allow(dead_code)]
|
||||
struct Record {
|
||||
// deserialize timestamp to chrono::DateTime<Local>
|
||||
ts: DateTime<Local>,
|
||||
// float to f32
|
||||
current: Option<f32>,
|
||||
// int to i32
|
||||
voltage: Option<i32>,
|
||||
phase: Option<f32>,
|
||||
groupid: i32,
|
||||
// binary/varchar to String
|
||||
location: String,
|
||||
}
|
||||
|
||||
let records: Vec<Record> = taos
|
||||
.query("select * from `meters`")
|
||||
.await?
|
||||
.deserialize()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
dbg!(records);
|
||||
Ok(())
|
||||
```
|
||||
|
||||
## 使用示例
|
||||
|
||||
### 创建数据库和表
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/query.rs:create_db_and_table}}
|
||||
```
|
||||
|
||||
> **注意**:如果不使用 `use db` 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 db.tb。
|
||||
|
||||
### 插入数据
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/query.rs:insert_data}}
|
||||
```
|
||||
|
||||
### 查询数据
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/query.rs:query_data}}
|
||||
```
|
||||
|
||||
### 执行带有 req_id 的 SQL
|
||||
|
||||
<RequestId />
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/query.rs:query_with_req_id}}
|
||||
```
|
||||
|
||||
### 通过参数绑定写入数据
|
||||
|
||||
TDengine 的 Rust 连接器实现了参数绑定方式对数据写入(INSERT)场景的支持。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
|
||||
|
||||
参数绑定接口详见[API参考](#stmt-api)
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/stmt.rs}}
|
||||
```
|
||||
|
||||
### 无模式写入
|
||||
|
||||
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/schemaless.rs}}
|
||||
```
|
||||
|
||||
### 执行带有 req_id 的无模式写入
|
||||
|
||||
此 req_id 可用于请求链路追踪。
|
||||
|
||||
```rust
|
||||
let sml_data = SmlDataBuilder::default()
|
||||
.protocol(SchemalessProtocol::Line)
|
||||
.data(data)
|
||||
.req_id(100u64)
|
||||
.build()?;
|
||||
|
||||
client.put(&sml_data)?
|
||||
```
|
||||
|
||||
### 数据订阅
|
||||
|
||||
TDengine 通过消息队列 [TMQ](../../taos-sql/tmq/) 启动一个订阅。
|
||||
|
||||
#### 创建 Topic
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_topic}}
|
||||
```
|
||||
|
||||
#### 创建 Consumer
|
||||
|
||||
创建消费者:
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer}}
|
||||
```
|
||||
|
||||
#### 订阅消费数据
|
||||
|
||||
消费者可订阅一个或多个 `TOPIC`。
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:subscribe}}
|
||||
```
|
||||
|
||||
TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:consume}}
|
||||
```
|
||||
|
||||
获取消费进度:
|
||||
|
||||
版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:assignments}}
|
||||
```
|
||||
|
||||
#### 指定订阅 Offset
|
||||
|
||||
按照指定的进度消费:
|
||||
|
||||
版本要求 connector-rust >= v0.8.8, TDengine >= 3.0.5.0
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:seek_offset}}
|
||||
```
|
||||
|
||||
#### 关闭订阅
|
||||
|
||||
```rust
|
||||
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:unsubscribe}}
|
||||
```
|
||||
|
||||
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,`group.id` 是必须的。
|
||||
|
||||
- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
|
||||
- `client.id`: 可选的订阅客户端识别项。
|
||||
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认值根据 TDengine 版本有所不同,详细参见 [数据订阅](https://docs.taosdata.com/develop/tmq/)。注意,此选项在同一个 `group.id` 中仅生效一次。
|
||||
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
|
||||
- `auto.commit.interval.ms`: 自动标记的时间间隔。
|
||||
|
||||
#### 完整示例
|
||||
|
||||
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/tmq.rs).
|
||||
|
||||
### 与连接池使用
|
||||
|
||||
在复杂应用中,建议启用连接池。[taos] 的连接池默认(异步模式)使用 [deadpool] 实现。
|
||||
|
||||
如下,可以生成一个默认参数的连接池。
|
||||
|
||||
```rust
|
||||
let pool: Pool<TaosBuilder> = TaosBuilder::from_dsn("taos:///")
|
||||
.unwrap()
|
||||
.pool()
|
||||
.unwrap();
|
||||
```
|
||||
|
||||
同样可以使用连接池的构造器,对连接池参数进行设置:
|
||||
|
||||
```rust
|
||||
let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()).unwrap().0)
|
||||
.max_size(88) // 最大连接数
|
||||
.build()
|
||||
.unwrap();
|
||||
```
|
||||
|
||||
在应用代码中,使用 `pool.get()?` 来获取一个连接对象 [Taos]。
|
||||
|
||||
```rust
|
||||
let taos = pool.get()?;
|
||||
```
|
||||
|
||||
### 更多示例程序
|
||||
## 示例程序汇总
|
||||
|
||||
示例程序源码位于 `TDengine/examples/rust` 下:
|
||||
|
||||
|
@ -445,168 +92,6 @@ let taos = pool.get()?;
|
|||
|
||||
请参考 [FAQ](../../train-faq/faq)
|
||||
|
||||
## API 参考
|
||||
|
||||
[Taos][struct.Taos] 对象提供了多个数据库操作的 API:
|
||||
|
||||
1. `exec`: 执行某个非查询类 SQL 语句,例如 `CREATE`,`ALTER`,`INSERT` 等。
|
||||
|
||||
```rust
|
||||
let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
|
||||
```
|
||||
|
||||
2. `exec_many`: 同时(顺序)执行多个 SQL 语句。
|
||||
|
||||
```rust
|
||||
taos.exec_many([
|
||||
"CREATE DATABASE test",
|
||||
"USE test",
|
||||
"CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
|
||||
]).await?;
|
||||
```
|
||||
|
||||
3. `query`:执行查询语句,返回 [ResultSet] 对象。
|
||||
|
||||
```rust
|
||||
let mut q = taos.query("select * from log.logs").await?;
|
||||
```
|
||||
|
||||
[ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):
|
||||
|
||||
列信息使用 [.fields()] 方法获取:
|
||||
|
||||
```rust
|
||||
let cols = q.fields();
|
||||
for col in cols {
|
||||
println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
|
||||
}
|
||||
```
|
||||
|
||||
逐行获取数据:
|
||||
|
||||
```rust
|
||||
let mut rows = result.rows();
|
||||
let mut nrows = 0;
|
||||
while let Some(row) = rows.try_next().await? {
|
||||
for (col, (name, value)) in row.enumerate() {
|
||||
println!(
|
||||
"[{}] got value in col {} (named `{:>8}`): {}",
|
||||
nrows, col, name, value
|
||||
);
|
||||
}
|
||||
nrows += 1;
|
||||
}
|
||||
```
|
||||
|
||||
或使用 [serde](https://serde.rs) 序列化框架。
|
||||
|
||||
```rust
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Record {
|
||||
// deserialize timestamp to chrono::DateTime<Local>
|
||||
ts: DateTime<Local>,
|
||||
// float to f32
|
||||
current: Option<f32>,
|
||||
// int to i32
|
||||
voltage: Option<i32>,
|
||||
phase: Option<f32>,
|
||||
groupid: i32,
|
||||
// binary/varchar to String
|
||||
location: String,
|
||||
}
|
||||
|
||||
let records: Vec<Record> = taos
|
||||
.query("select * from `meters`")
|
||||
.await?
|
||||
.deserialize()
|
||||
.try_collect()
|
||||
.await?;
|
||||
```
|
||||
|
||||
需要注意的是,需要使用 Rust 异步函数和异步运行时。
|
||||
|
||||
[Taos][struct.Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率:
|
||||
|
||||
- `.describe(table: &str)`: 执行 `DESCRIBE` 并返回一个 Rust 数据结构。
|
||||
- `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。
|
||||
- `.use_database(database: &str)`: 执行 `USE` 语句。
|
||||
|
||||
除此之外,该结构也是参数绑定和行协议接口的入口,使用方法请参考具体的 API 说明。
|
||||
|
||||
<p>
|
||||
<a id="stmt-api" style={{color:'#141414'}}>
|
||||
参数绑定接口
|
||||
</a>
|
||||
</p>
|
||||
|
||||
与 C 接口类似,Rust 提供参数绑定接口。首先,通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]:
|
||||
|
||||
```rust
|
||||
let mut stmt = Stmt::init(&taos).await?;
|
||||
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
|
||||
```
|
||||
|
||||
参数绑定对象提供了一组接口用于实现参数绑定:
|
||||
|
||||
`.set_tbname(name)`
|
||||
|
||||
用于绑定表名。
|
||||
|
||||
```rust
|
||||
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
|
||||
stmt.set_tbname("d0")?;
|
||||
```
|
||||
|
||||
`.set_tags(&[tag])`
|
||||
|
||||
当 SQL 语句使用超级表时,用于绑定子表表名和标签值:
|
||||
|
||||
```rust
|
||||
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
|
||||
stmt.set_tbname("d0")?;
|
||||
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
|
||||
```
|
||||
|
||||
`.bind(&[column])`
|
||||
|
||||
用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:
|
||||
|
||||
```rust
|
||||
let params = vec![
|
||||
ColumnView::from_millis_timestamp(vec![164000000000]),
|
||||
ColumnView::from_bools(vec![true]),
|
||||
ColumnView::from_tiny_ints(vec![i8::MAX]),
|
||||
ColumnView::from_small_ints(vec![i16::MAX]),
|
||||
ColumnView::from_ints(vec![i32::MAX]),
|
||||
ColumnView::from_big_ints(vec![i64::MAX]),
|
||||
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
|
||||
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
|
||||
ColumnView::from_unsigned_ints(vec![u32::MAX]),
|
||||
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
|
||||
ColumnView::from_floats(vec![f32::MAX]),
|
||||
ColumnView::from_doubles(vec![f64::MAX]),
|
||||
ColumnView::from_varchar(vec!["ABC"]),
|
||||
ColumnView::from_nchar(vec!["涛思数据"]),
|
||||
];
|
||||
let rows = stmt.bind(¶ms)?.add_batch()?.execute()?;
|
||||
```
|
||||
|
||||
`.execute()`
|
||||
|
||||
执行 SQL。[Stmt] 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。
|
||||
|
||||
```rust
|
||||
stmt.execute()?;
|
||||
|
||||
// next bind cycle.
|
||||
//stmt.set_tbname()?;
|
||||
//stmt.bind()?;
|
||||
//stmt.execute()?;
|
||||
```
|
||||
|
||||
一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。
|
||||
|
||||
|
||||
## API 参考
|
||||
|
||||
Rust 连接器的接口分为同步接口和异步接口,一般同步接口是由异步接口实现,方法签名除 async 关键字外基本相同。对于同步接口和异步接口功能一样的接口,本文档只提供同步接口的说明。
|
||||
|
|
Loading…
Reference in New Issue