From b54e811eb767013de78d4777a7e9f76bf0f9a3ac Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Sat, 3 Aug 2024 19:19:26 +0800 Subject: [PATCH] mod code example --- docs/examples/rust/nativeexample/Cargo.toml | 2 +- .../rust/nativeexample/examples/query.rs | 27 ++-- .../rust/nativeexample/examples/schemaless.rs | 26 +++- .../rust/nativeexample/examples/stmt.rs | 3 +- .../rust/nativeexample/examples/tmq.rs | 141 +++++++++--------- docs/examples/rust/restexample/Cargo.toml | 2 +- .../rust/restexample/examples/schemaless.rs | 38 ++--- .../rust/restexample/examples/stmt.rs | 1 + .../examples/rust/restexample/examples/tmq.rs | 56 +++++-- docs/zh/08-develop/01-connect/index.md | 21 +-- docs/zh/08-develop/07-tmq.md | 16 +- docs/zh/14-reference/05-connector/index.md | 16 +- .../taosdata/example/ConsumerLoopFull.java | 91 ++++++----- .../taosdata/example/WsConsumerLoopFull.java | 81 +++++----- 14 files changed, 271 insertions(+), 250 deletions(-) diff --git a/docs/examples/rust/nativeexample/Cargo.toml b/docs/examples/rust/nativeexample/Cargo.toml index 98c90eb7af..13e68d6d9d 100644 --- a/docs/examples/rust/nativeexample/Cargo.toml +++ b/docs/examples/rust/nativeexample/Cargo.toml @@ -12,4 +12,4 @@ tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } log = "0.4" pretty_env_logger = "0.5.0" -taos = { version = "0.11.8" } \ No newline at end of file +taos = "*" diff --git a/docs/examples/rust/nativeexample/examples/query.rs b/docs/examples/rust/nativeexample/examples/query.rs index 55c2feb7b7..857df01f01 100644 --- a/docs/examples/rust/nativeexample/examples/query.rs +++ b/docs/examples/rust/nativeexample/examples/query.rs @@ -5,7 +5,7 @@ async fn main() -> anyhow::Result<()> { let dsn = "taos://localhost:6030"; let builder = TaosBuilder::from_dsn(dsn)?; - let taos = builder.build()?; + let taos = builder.build().await?; // ANCHOR: create_db_and_table let db = "power"; @@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> { // create super table taos.exec_many([ - "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ + "CREATE STABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ TAGS (`groupid` INT, `location` BINARY(24))", ]).await?; println!("Create stable meters successfully."); @@ -27,15 +27,15 @@ async fn main() -> anyhow::Result<()> { // ANCHOR_END: create_db_and_table // ANCHOR: insert_data - let inserted = taos.exec("INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 219, 0.31000) " + - "(NOW + 2a, 12.60000, 218, 0.33000) " + - "(NOW + 3a, 12.30000, 221, 0.31000) " + - "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 218, 0.25000) ").await?; + let inserted = taos.exec(r#"INSERT INTO + power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') + VALUES + (NOW + 1a, 10.30000, 219, 0.31000) + (NOW + 2a, 12.60000, 218, 0.33000) + (NOW + 3a, 12.30000, 221, 0.31000) + power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') + VALUES + (NOW + 1a, 10.30000, 218, 0.25000) "#).await?; println!("inserted: {} rows to power.meters successfully.", inserted); // ANCHOR_END: insert_data @@ -63,5 +63,10 @@ async fn main() -> anyhow::Result<()> { // ANCHOR: query_with_req_id let result = taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", 1).await?; + for field in result.fields() { + println!("got field: {}", field.name()); + } + println!("query with reqId successfully"); // ANCHOR_END: query_with_req_id + Ok(()) } diff --git a/docs/examples/rust/nativeexample/examples/schemaless.rs b/docs/examples/rust/nativeexample/examples/schemaless.rs index 298dcd3890..9f3a68a4c4 100644 --- a/docs/examples/rust/nativeexample/examples/schemaless.rs +++ b/docs/examples/rust/nativeexample/examples/schemaless.rs @@ -2,11 +2,13 @@ use taos_query::common::SchemalessPrecision; use taos_query::common::SchemalessProtocol; use taos_query::common::SmlDataBuilder; -use crate::AsyncQueryable; -use crate::AsyncTBuilder; -use crate::TaosBuilder; +use taos::AsyncQueryable; +use taos::AsyncTBuilder; +use taos::TaosBuilder; +use taos::taos_query; -async fn put() -> anyhow::Result<()> { +#[tokio::main] +async fn main() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "taos=debug"); pretty_env_logger::init(); let dsn = @@ -26,7 +28,7 @@ async fn put() -> anyhow::Result<()> { // SchemalessProtocol::Line let data = [ - "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000", + "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639", ] .map(String::from) .to_vec(); @@ -42,7 +44,7 @@ async fn put() -> anyhow::Result<()> { // SchemalessProtocol::Telnet let data = [ - "metric_telnet 1648432611249 10.3 location=California.SanFrancisco group=2", + "metric_telnet 1707095283260 4 host=host0 interface=eth0", ] .map(String::from) .to_vec(); @@ -58,7 +60,16 @@ async fn put() -> anyhow::Result<()> { // SchemalessProtocol::Json let data = [ - r#"[{"metric": "metric_json", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"# + r#"[{ + "metric": "metric_json", + "timestamp": 1626846400, + "value": 10.3, + "tags": { + "groupid": 2, + "location": "California.SanFrancisco", + "id": "d1001" + } + }]"# ] .map(String::from) .to_vec(); @@ -72,5 +83,6 @@ async fn put() -> anyhow::Result<()> { .build()?; assert_eq!(client.put(&sml_data).await?, ()); + println!("execute schemaless insert successfully"); Ok(()) } diff --git a/docs/examples/rust/nativeexample/examples/stmt.rs b/docs/examples/rust/nativeexample/examples/stmt.rs index d784607e03..04faa888bf 100644 --- a/docs/examples/rust/nativeexample/examples/stmt.rs +++ b/docs/examples/rust/nativeexample/examples/stmt.rs @@ -37,6 +37,7 @@ async fn main() -> anyhow::Result<()> { // execute. let rows = stmt.execute().await?; assert_eq!(rows, NUM_TABLES * NUM_ROWS); - + + println!("execute stmt insert successfully"); Ok(()) } diff --git a/docs/examples/rust/nativeexample/examples/tmq.rs b/docs/examples/rust/nativeexample/examples/tmq.rs index c8f2b0a19a..4adfd28989 100644 --- a/docs/examples/rust/nativeexample/examples/tmq.rs +++ b/docs/examples/rust/nativeexample/examples/tmq.rs @@ -1,6 +1,7 @@ use std::time::Duration; use std::str::FromStr; - +use chrono::Local; +use chrono::DateTime; use taos::*; #[tokio::main] @@ -23,18 +24,8 @@ async fn main() -> anyhow::Result<()> { "drop database if exists power", "create database if not exists power WAL_RETENTION_PERIOD 86400", "use power", - "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))", - "create table if not exists power.d001 using power.meters tags(1,'location')", - - ]) - .await?; - - taos.exec_many([ - "drop database if exists db2", - "create database if not exists db2 wal_retention_period 3600", - "use db2", ]) .await?; @@ -57,68 +48,74 @@ async fn main() -> anyhow::Result<()> { let mut consumer = builder.build().await?; // ANCHOR_END: create_consumer_ac - // ANCHOR: consume + // ANCHOR: subscribe consumer.subscribe(["topic_meters"]).await?; - - { - let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1)); + // ANCHOR_END: subscribe - while let Some((offset, message)) = stream.try_next().await? { - - let topic: &str = offset.topic(); - let database = offset.database(); - let vgroup_id = offset.vgroup_id(); - log::debug!( - "receive message from: topic: {}, database: {}, vgroup_id: {}", - topic, - database, - vgroup_id - ); - - match message { - MessageSet::Meta(meta) => { - log::info!("Meta"); - let raw = meta.as_raw_meta().await?; - taos.write_raw_meta(&raw).await?; - - let json = meta.as_json_meta().await?; - let sql = json.to_string(); - if let Err(err) = taos.exec(sql).await { - println!("maybe error in handling meta message: {}", err); - } - } - MessageSet::Data(data) => { - log::info!("Data"); - while let Some(data) = data.fetch_raw_block().await? { - log::debug!("message data: {:?}", data); - } - } - MessageSet::MetaData(meta, data) => { - log::info!("MetaData"); - let raw = meta.as_raw_meta().await?; - taos.write_raw_meta(&raw).await?; - - let json = meta.as_json_meta().await?; - let sql = json.to_string(); - if let Err(err) = taos.exec(sql).await { - println!("maybe error in handling metadata message: {}", err); - } - - while let Some(data) = data.fetch_raw_block().await? { - log::debug!("message data: {:?}", data); - } - } - } - // commit offset manually when handling message successfully - consumer.commit(offset).await?; - } + #[derive(Debug, serde::Deserialize)] + #[allow(dead_code)] + struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, + groupid: i32, + // binary/varchar to String + location: String, } + + // ANCHOR: consume + + consumer + .stream() + .try_for_each(|(offset, message)| async move { + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + let records: Vec = block.deserialize().try_collect()?; + println!("** read {} records: {:#?}\n", records.len(), records); + } + } + Ok(()) + }) + .await?; + // ANCHOR_END: consume - // ANCHOR: seek_offset + // ANCHOR: consumer_commit_manually + consumer + .stream() + .try_for_each(|(offset, message)| async { + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + let records: Vec = block.deserialize().try_collect()?; + println!("** read {} records: {:#?}\n", records.len(), records); + } + } + // commit offset manually when you have processed the message. + consumer.commit(offset).await?; + Ok(()) + }) + .await?; + // ANCHOR_END: consumer_commit_manually + + // ANCHOR: assignments let assignments = consumer.assignments().await.unwrap(); - log::info!("start assignments: {:?}", assignments); - + log::info!("assignments: {:?}", assignments); + // ANCHOR_END: assignments + // seek offset for topic_vec_assignment in assignments { let topic = &topic_vec_assignment.0; @@ -136,24 +133,23 @@ async fn main() -> anyhow::Result<()> { begin, end ); - - // seek offset of the (topic, vgroup_id) to the begin - let res = consumer.offset_seek(topic, vgroup_id, begin).await; + // ANCHOR: seek_offset + let res = consumer.offset_seek(topic, vgroup_id, end).await; if res.is_err() { log::error!("seek offset error: {:?}", res); let a = consumer.assignments().await.unwrap(); log::error!("assignments: {:?}", a); } + // ANCHOR_END: seek_offset } let topic_assignment = consumer.topic_assignment(topic).await; log::debug!("topic assignment: {:?}", topic_assignment); } - + // after seek offset let assignments = consumer.assignments().await.unwrap(); log::info!("after seek offset assignments: {:?}", assignments); - // ANCHOR_END: seek_offset // ANCHOR: unsubscribe consumer.unsubscribe().await; @@ -162,7 +158,6 @@ async fn main() -> anyhow::Result<()> { tokio::time::sleep(Duration::from_secs(1)).await; taos.exec_many([ - "drop database db2", "drop topic topic_meters", "drop database power", ]) diff --git a/docs/examples/rust/restexample/Cargo.toml b/docs/examples/rust/restexample/Cargo.toml index 7b13b48104..ff057ed4e5 100644 --- a/docs/examples/rust/restexample/Cargo.toml +++ b/docs/examples/rust/restexample/Cargo.toml @@ -11,4 +11,4 @@ tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } log = "0.4" pretty_env_logger = "0.5.0" -taos = { version = "0.*" } +taos = "*" diff --git a/docs/examples/rust/restexample/examples/schemaless.rs b/docs/examples/rust/restexample/examples/schemaless.rs index eefcd63dae..d2ec7af030 100644 --- a/docs/examples/rust/restexample/examples/schemaless.rs +++ b/docs/examples/rust/restexample/examples/schemaless.rs @@ -2,31 +2,23 @@ use taos_query::common::SchemalessPrecision; use taos_query::common::SchemalessProtocol; use taos_query::common::SmlDataBuilder; -use crate::AsyncQueryable; -use crate::AsyncTBuilder; -use crate::TaosBuilder; +use taos::AsyncQueryable; +use taos::AsyncTBuilder; +use taos::TaosBuilder; +use taos::taos_query; -async fn put() -> anyhow::Result<()> { +#[tokio::main] +async fn main() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "taos=debug"); pretty_env_logger::init(); - let dsn = - std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("https://localhost:6041".to_string()); + let dsn = "http://localhost:6041/power".to_string(); log::debug!("dsn: {:?}", &dsn); let client = TaosBuilder::from_dsn(dsn)?.build().await?; - let db = "power"; - - client - .exec(format!("create database if not exists {db}")) - .await?; - - // should specify database before insert - client.exec(format!("use {db}")).await?; - // SchemalessProtocol::Line let data = [ - "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000", + "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639", ] .map(String::from) .to_vec(); @@ -42,7 +34,7 @@ async fn put() -> anyhow::Result<()> { // SchemalessProtocol::Telnet let data = [ - "metric_telnet 1648432611249 10.3 location=California.SanFrancisco group=2", + "metric_telnet 1707095283260 4 host=host0 interface=eth0", ] .map(String::from) .to_vec(); @@ -58,7 +50,16 @@ async fn put() -> anyhow::Result<()> { // SchemalessProtocol::Json let data = [ - r#"[{"metric": "metric_json", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"# + r#"[{ + "metric": "metric_json", + "timestamp": 1626846400, + "value": 10.3, + "tags": { + "groupid": 2, + "location": "California.SanFrancisco", + "id": "d1001" + } + }]"# ] .map(String::from) .to_vec(); @@ -72,5 +73,6 @@ async fn put() -> anyhow::Result<()> { .build()?; assert_eq!(client.put(&sml_data).await?, ()); + println!("execute schemaless insert successfully"); Ok(()) } diff --git a/docs/examples/rust/restexample/examples/stmt.rs b/docs/examples/rust/restexample/examples/stmt.rs index 524b37b919..8148050e15 100644 --- a/docs/examples/rust/restexample/examples/stmt.rs +++ b/docs/examples/rust/restexample/examples/stmt.rs @@ -38,5 +38,6 @@ async fn main() -> anyhow::Result<()> { let rows = stmt.execute().await?; assert_eq!(rows, NUM_TABLES * NUM_ROWS); + println!("execute stmt insert successfully"); Ok(()) } diff --git a/docs/examples/rust/restexample/examples/tmq.rs b/docs/examples/rust/restexample/examples/tmq.rs index b931a871b6..48df89d5bf 100644 --- a/docs/examples/rust/restexample/examples/tmq.rs +++ b/docs/examples/rust/restexample/examples/tmq.rs @@ -1,6 +1,7 @@ use std::time::Duration; use std::str::FromStr; - +use chrono::Local; +use chrono::DateTime; use taos::*; #[tokio::main] @@ -28,13 +29,6 @@ async fn main() -> anyhow::Result<()> { ]) .await?; - taos.exec_many([ - "drop database if exists db2", - "create database if not exists db2 wal_retention_period 3600", - "use db2", - ]) - .await?; - // ANCHOR: create_topic taos.exec_many([ "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters", @@ -58,9 +52,45 @@ async fn main() -> anyhow::Result<()> { consumer.subscribe(["topic_meters"]).await?; // ANCHOR_END: subscribe + #[derive(Debug, serde::Deserialize)] + #[allow(dead_code)] + struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, + groupid: i32, + // binary/varchar to String + location: String, + } + // ANCHOR: consume - { - consumer + + consumer + .stream() + .try_for_each(|(offset, message)| async move { + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + let records: Vec = block.deserialize().try_collect()?; + println!("** read {} records: {:#?}\n", records.len(), records); + } + } + Ok(()) + }) + .await?; + + // ANCHOR_END: consume + + // ANCHOR: consumer_commit_manually + consumer .stream() .try_for_each(|(offset, message)| async { let topic = offset.topic(); @@ -74,11 +104,12 @@ async fn main() -> anyhow::Result<()> { println!("** read {} records: {:#?}\n", records.len(), records); } } + // commit offset manually when you have processed the message. + consumer.commit(offset).await?; Ok(()) }) .await?; - } - // ANCHOR_END: consume + // ANCHOR_END: consumer_commit_manually // ANCHOR: assignments let assignments = consumer.assignments().await.unwrap(); @@ -127,7 +158,6 @@ async fn main() -> anyhow::Result<()> { tokio::time::sleep(Duration::from_secs(1)).await; taos.exec_many([ - "drop database db2", "drop topic topic_meters", "drop database power", ]) diff --git a/docs/zh/08-develop/01-connect/index.md b/docs/zh/08-develop/01-connect/index.md index da2a39eeb7..7fbd3f20a4 100644 --- a/docs/zh/08-develop/01-connect/index.md +++ b/docs/zh/08-develop/01-connect/index.md @@ -375,22 +375,15 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto - `reconnectIntervalMs`:重连间隔毫秒时间,默认为 2000。 -使用客户端驱动访问 TDengine 集群的基本过程为:建立连接、查询和写入、关闭连接、清除资源。 +C/C++ 语言连接器使用 `taos_connect()` 函数用于建立与 TDengine 数据库的连接。其参数详细说明如下: -下面为建立连接的示例代码,其中省略了查询和写入部分,展示了如何建立连接、关闭连接以及清除资源。 +- `host`:要连接的数据库服务器的主机名或IP地址。如果是本地数据库,可以使用 `"localhost"`。 +- `user`:用于登录数据库的用户名。 +- `passwd`:与用户名对应的密码。 +- `db`:连接时默认选择的数据库名。如果不指定数据库,可以传递 `NULL` 或空字符串。 +- `port`:数据库服务器监听的端口号。默认的端口号是 `6030`。 -```c -{{#include docs/examples/c/connect_example.c}} -``` - -在上面的示例代码中, `taos_connect()` 建立到客户端程序所在主机的 6030 端口的连接,`taos_close()`关闭当前连接,`taos_cleanup()`清除客户端驱动所申请和使用的资源。 - -:::note - -- 如未特别说明,当 API 的返回值是整数时,_0_ 代表成功,其它是代表失败原因的错误码,当返回值是指针时, _NULL_ 表示失败。 -- 所有的错误码以及对应的原因描述在 `taoserror.h` 文件中。 - -::: +还提供了 `taos_connect_auth()` 函数用于使用 MD5 加密的密码建立与 TDengine 数据库的连接。此函数与 `taos_connect` 功能相同,不同之处在于密码的处理方式,`taos_connect_auth` 需要的是密码的 MD5 加密字符串。 diff --git a/docs/zh/08-develop/07-tmq.md b/docs/zh/08-develop/07-tmq.md index ba6c24884e..2a700990af 100644 --- a/docs/zh/08-develop/07-tmq.md +++ b/docs/zh/08-develop/07-tmq.md @@ -155,7 +155,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 介绍各语言连接器使用原生连接方式创建消费者。指定连接的服务器地址,设置自动提交,从最新消息开始消费,指定 `group.id` 和 `client.id` 等信息。有的语言的连接器还支持反序列化参数。 - + ```java @@ -391,7 +391,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ### 原生连接 - + @@ -486,7 +486,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ```rust -{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs:consumer_commit_manually}} +{{#include docs/examples/rust/restexample/examples/tmq.rs:consumer_commit_manually}} ``` 可以通过 `consumer.commit` 方法来手工提交消费进度。 @@ -507,7 +507,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ### 原生连接 - + @@ -532,7 +532,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ```rust -{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs:consumer_commit_manually}} +{{#include docs/examples/rust/restexample/examples/tmq.rs:consumer_commit_manually}} ``` 可以通过 `consumer.commit` 方法来手工提交消费进度。 @@ -679,7 +679,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ```rust -{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs}} +{{#include docs/examples/rust/restexample/examples/tmq.rs}} ``` @@ -699,7 +699,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ### 原生连接 - +
完整原生连接代码示例 @@ -731,7 +731,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur ```rust -{{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}} +{{#include docs/examples/rust/nativeexample/examples/tmq.rs}} ``` diff --git a/docs/zh/14-reference/05-connector/index.md b/docs/zh/14-reference/05-connector/index.md index 1e68dcc40e..21bb9396e9 100644 --- a/docs/zh/14-reference/05-connector/index.md +++ b/docs/zh/14-reference/05-connector/index.md @@ -56,14 +56,14 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器 ### 使用 http (REST 或 WebSocket) 接口 -| **功能特性** | **Java** | **Python** | **Go** | **C# ** | **Node.js** | **Rust** | -| ------------------------------ | -------- | ---------- | ------ | ------- | ----------- | -------- | -| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | -| **普通查询** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | -| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | -| **数据订阅(TMQ)** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | -| **Schemaless** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | -| **批量拉取(基于 WebSocket)** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | +| **功能特性** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** | +| ------------------------------ | -------- | ---------- | ------ | ------ | ----------- | -------- | +| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | +| **普通查询** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | +| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | +| **数据订阅(TMQ)** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | +| **Schemaless** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | +| **批量拉取(基于 WebSocket)** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | :::warning diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java index 7e3bc05d6a..25b3958069 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java @@ -6,7 +6,10 @@ import com.taosdata.jdbc.tmq.*; import java.sql.*; import java.time.Duration; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -15,8 +18,7 @@ import java.util.concurrent.TimeUnit; public class ConsumerLoopFull { static private Connection connection; static private Statement statement; - - static private volatile boolean stopFlag = false; + static private volatile boolean stopThread = false; public static TaosConsumer getConsumer() throws SQLException { // ANCHOR: create_consumer @@ -38,45 +40,16 @@ public class ConsumerLoopFull { return new TaosConsumer<>(config); } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create consumer", ex); } catch (Exception ex) { - System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create consumer", ex); } // ANCHOR_END: create_consumer } - public static void pollDataExample(TaosConsumer consumer) throws SQLException { - try{ - // subscribe to the topics - List topics = Collections.singletonList("topic_meters"); - - consumer.subscribe(topics); - System.out.println("subscribe topics successfully"); - for (int i = 0; i < 50; i++) { - // poll data - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process the data here - System.out.println("data: " + JSON.toJSONString(bean)); - } - } - // unsubscribe the topics - consumer.unsubscribe(); - System.out.println("unsubscribed topics successfully"); - } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to poll data from topic_meters", ex); - } catch (Exception ex) { - System.out.println("Failed to poll data from topic_meters; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to poll data from topic_meters", ex); - } - } - public static void pollExample(TaosConsumer consumer) throws SQLException { // ANCHOR: poll_data_code_piece try { @@ -172,6 +145,7 @@ public class ConsumerLoopFull { consumer.subscribe(topics); // ANCHOR: unsubscribe_data_code_piece try { + // unsubscribe the consumer consumer.unsubscribe(); } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info @@ -182,6 +156,7 @@ public class ConsumerLoopFull { throw new SQLException("Failed to unsubscribe consumer", ex); } finally { + // close the consumer consumer.close(); } // ANCHOR_END: unsubscribe_data_code_piece @@ -252,11 +227,11 @@ public class ConsumerLoopFull { public static void prepareData() throws SQLException, InterruptedException { try { int i = 0; - while (!stopFlag) { - i++; + while (!stopThread) { String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) "; int affectedRows = statement.executeUpdate(insertQuery); assert affectedRows == 1; + i++; Thread.sleep(1); } } catch (SQLException ex) { @@ -330,26 +305,43 @@ public class ConsumerLoopFull { // submit a task executor.submit(() -> { try { - // please use one example at a time - TaosConsumer consumer = getConsumer(); - - pollDataExample(consumer); - seekExample(consumer); - pollExample(consumer); - commitExample(consumer); - unsubscribeExample(consumer); - stopFlag = true; + prepareData(); } catch (SQLException ex) { - System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + return; + } catch (Exception ex) { + System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage()); + return; } System.out.println("pollDataExample executed successfully"); }); - prepareData(); - closeConnection(); + try { + TaosConsumer consumer = getConsumer(); - System.out.println("Data prepared successfully"); + pollExample(consumer); + System.out.println("pollExample executed successfully"); + consumer.unsubscribe(); + seekExample(consumer); + System.out.println("seekExample executed successfully"); + consumer.unsubscribe(); + + commitExample(consumer); + System.out.println("commitExample executed successfully"); + consumer.unsubscribe(); + + unsubscribeExample(consumer); + System.out.println("unsubscribeExample executed successfully"); + } catch (SQLException ex) { + System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + return; + } catch (Exception ex) { + System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage()); + return; + } + + stopThread = true; // close the executor, which will make the executor reject new tasks executor.shutdown(); @@ -364,6 +356,7 @@ public class ConsumerLoopFull { System.out.println("Wait executor termination failed."); } + closeConnection(); System.out.println("program end."); } } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java index 726c795b7b..55368b3e56 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java @@ -15,6 +15,7 @@ import java.util.concurrent.TimeUnit; public class WsConsumerLoopFull { static private Connection connection; static private Statement statement; + static private volatile boolean stopThread = false; public static TaosConsumer getConsumer() throws SQLException { // ANCHOR: create_consumer @@ -46,35 +47,6 @@ public class WsConsumerLoopFull { // ANCHOR_END: create_consumer } - public static void pollDataExample(TaosConsumer consumer) throws SQLException { - try{ - // subscribe to the topics - List topics = Collections.singletonList("topic_meters"); - - consumer.subscribe(topics); - System.out.println("subscribe topics successfully"); - for (int i = 0; i < 50; i++) { - // poll data - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process the data here - System.out.println("data: " + JSON.toJSONString(bean)); - } - } - // unsubscribe the topics - consumer.unsubscribe(); - System.out.println("unsubscribed topics successfully"); - } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to poll data from topic_meters", ex); - } catch (Exception ex) { - System.out.println("Failed to poll data from topic_meters; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to poll data from topic_meters", ex); - } - } - public static void pollExample(TaosConsumer consumer) throws SQLException { // ANCHOR: poll_data_code_piece try { @@ -170,6 +142,7 @@ public class WsConsumerLoopFull { consumer.subscribe(topics); // ANCHOR: unsubscribe_data_code_piece try { + // unsubscribe the consumer consumer.unsubscribe(); } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info @@ -180,6 +153,7 @@ public class WsConsumerLoopFull { throw new SQLException("Failed to unsubscribe consumer", ex); } finally { + // close the consumer consumer.close(); } // ANCHOR_END: unsubscribe_data_code_piece @@ -249,10 +223,12 @@ public class WsConsumerLoopFull { public static void prepareData() throws SQLException, InterruptedException { try { - for (int i = 0; i < 3000; i++) { + int i = 0; + while (!stopThread) { String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) "; int affectedRows = statement.executeUpdate(insertQuery); assert affectedRows == 1; + i++; Thread.sleep(1); } } catch (SQLException ex) { @@ -326,32 +302,44 @@ public class WsConsumerLoopFull { // submit a task executor.submit(() -> { try { - // please use one example at a time - TaosConsumer consumer = getConsumer(); - - pollDataExample(consumer); - seekExample(consumer); - consumer.unsubscribe(); - pollExample(consumer); - consumer.unsubscribe(); - commitExample(consumer); - consumer.unsubscribe(); - unsubscribeExample(consumer); + prepareData(); } catch (SQLException ex) { - System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); return; } catch (Exception ex) { - System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage()); + System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage()); return; } System.out.println("pollDataExample executed successfully"); }); - prepareData(); - closeConnection(); + try { + TaosConsumer consumer = getConsumer(); - System.out.println("Data prepared successfully"); + pollExample(consumer); + System.out.println("pollExample executed successfully"); + consumer.unsubscribe(); + seekExample(consumer); + System.out.println("seekExample executed successfully"); + consumer.unsubscribe(); + + commitExample(consumer); + System.out.println("commitExample executed successfully"); + consumer.unsubscribe(); + + unsubscribeExample(consumer); + System.out.println("unsubscribeExample executed successfully"); + + } catch (SQLException ex) { + System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + return; + } catch (Exception ex) { + System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage()); + return; + } + + stopThread = true; // close the executor, which will make the executor reject new tasks executor.shutdown(); @@ -366,6 +354,7 @@ public class WsConsumerLoopFull { System.out.println("Wait executor termination failed."); } + closeConnection(); System.out.println("program end."); } }