diff --git a/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java b/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java index ec9faf383e..a399f3aa6a 100644 --- a/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java +++ b/docs/examples/java/src/main/java/com/taos/example/ConsumerLoopFull.java @@ -47,7 +47,7 @@ public class ConsumerLoopFull { return consumer; } catch (Exception ex) { // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", + System.out.printf("Failed to create native consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n", config.getProperty("bootstrap.servers"), config.getProperty("group.id"), config.getProperty("client.id"), diff --git a/docs/examples/rust/nativeexample/examples/insert.rs b/docs/examples/rust/nativeexample/examples/insert.rs index e78381fc61..585cb69c52 100644 --- a/docs/examples/rust/nativeexample/examples/insert.rs +++ b/docs/examples/rust/nativeexample/examples/insert.rs @@ -9,22 +9,22 @@ async fn main() -> anyhow::Result<()> { // ANCHOR: insert_data - match taos.exec(r#"INSERT INTO - power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') - VALUES - (NOW + 1a, 10.30000, 219, 0.31000) - (NOW + 2a, 12.60000, 218, 0.33000) - (NOW + 3a, 12.30000, 221, 0.31000) - power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') - VALUES - (NOW + 1a, 10.30000, 218, 0.25000) "#).await{ + let insert_sql = r#"INSERT INTO + power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') + VALUES + (NOW + 1a, 10.30000, 219, 0.31000) + (NOW + 2a, 12.60000, 218, 0.33000) + (NOW + 3a, 12.30000, 221, 0.31000) + power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') + VALUES + (NOW + 1a, 10.30000, 218, 0.25000) "#; + match taos.exec(insert_sql).await{ Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows), Err(err) => { - eprintln!("Failed to insert data to power.meters, ErrMessage: {}", err); + eprintln!("Failed to insert data to power.meters, sql: {}, ErrMessage: {}", insert_sql, err); return Err(err.into()); } } - // ANCHOR_END: insert_data Ok(()) diff --git a/docs/examples/rust/nativeexample/examples/tmq.rs b/docs/examples/rust/nativeexample/examples/tmq.rs index f312bee0f2..49580f81b7 100644 --- a/docs/examples/rust/nativeexample/examples/tmq.rs +++ b/docs/examples/rust/nativeexample/examples/tmq.rs @@ -3,6 +3,8 @@ use std::str::FromStr; use chrono::Local; use chrono::DateTime; use taos::*; +use std::thread; +use tokio::runtime::Runtime; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -53,17 +55,38 @@ async fn main() -> anyhow::Result<()> { consumer } Err(err) => { - eprintln!("Failed to create websocket consumer, dsn: {}, ErrMessage: {}", dsn, err); + eprintln!("Failed to create native consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err); return Err(err.into()); } }; // ANCHOR_END: create_consumer_ac + thread::spawn(move || { + let rt = Runtime::new().unwrap(); + + rt.block_on(async { + let taos_insert = TaosBuilder::from_dsn(&dsn).unwrap().build().await.unwrap(); + for i in 0..50 { + let insert_sql = format!(r#"INSERT INTO + power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') + VALUES + (NOW, 10.30000, {}, 0.31000)"#, i); + if let Err(e) = taos_insert.exec(insert_sql).await { + eprintln!("Failed to execute insert: {:?}", e); + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }); + + }).join().unwrap(); + + // ANCHOR: consume - match consumer.subscribe(["topic_meters"]).await{ + let topic = "topic_meters"; + match consumer.subscribe([topic]).await{ Ok(_) => println!("Subscribe topics successfully."), Err(err) => { - eprintln!("Failed to subscribe topic_meters, ErrMessage: {}", err); + eprintln!("Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, err); return Err(err.into()); } } @@ -94,13 +117,14 @@ async fn main() -> anyhow::Result<()> { if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { let records: Vec = block.deserialize().try_collect()?; + // Add your data processing logic here println!("** read {} records: {:#?}\n", records.len(), records); } } Ok(()) }) .await.map_err(|e| { - eprintln!("Failed to poll data; ErrMessage: {:?}", e); + eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e); e })?; @@ -110,14 +134,14 @@ async fn main() -> anyhow::Result<()> { consumer .stream() .try_for_each(|(offset, message)| async { - let topic = offset.topic(); // the vgroup id, like partition id in kafka. let vgroup_id = offset.vgroup_id(); - println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + println!("* in vgroup id {} of topic {}\n", vgroup_id, topic); if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { let records: Vec = block.deserialize().try_collect()?; + // Add your data processing logic here println!("** read {} records: {:#?}\n", records.len(), records); } } @@ -125,21 +149,28 @@ async fn main() -> anyhow::Result<()> { match consumer.commit(offset).await{ Ok(_) => println!("Commit offset manually successfully."), Err(err) => { - eprintln!("Failed to commit offset manually, ErrMessage: {}", err); + eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}", + topic, group_id, client_id, vgroup_id, err); return Err(err.into()); } } Ok(()) }) .await.map_err(|e| { - eprintln!("Failed to poll data, ErrMessage: {:?}", e); + eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e); e })?; // ANCHOR_END: consumer_commit_manually // ANCHOR: seek_offset - let assignments = consumer.assignments().await.unwrap(); + let assignments = match consumer.assignments().await{ + Some(assignments) => assignments, + None => { + eprintln!("Failed to get assignments."); + return Err(anyhow::anyhow!("Failed to get assignments. topic: {}, groupId: {}, clientId: {}", topic, group_id, client_id)); + } + }; println!("assignments: {:?}", assignments); // seek offset @@ -152,7 +183,7 @@ async fn main() -> anyhow::Result<()> { let begin = assignment.begin(); let end = assignment.end(); println!( - "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}", + "topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}", topic, vgroup_id, current, @@ -163,7 +194,8 @@ async fn main() -> anyhow::Result<()> { match consumer.offset_seek(topic, vgroup_id, begin).await{ Ok(_) => (), Err(err) => { - eprintln!("Failed to seek offset, ErrMessage: {}", err); + eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}", + topic, group_id, client_id, vgroup_id, begin, err); return Err(err.into()); } } @@ -174,7 +206,13 @@ async fn main() -> anyhow::Result<()> { } println!("Assignment seek to beginning successfully."); // after seek offset - let assignments = consumer.assignments().await.unwrap(); + let assignments = match consumer.assignments().await{ + Some(assignments) => assignments, + None => { + eprintln!("Failed to get assignments."); + return Err(anyhow::anyhow!("Failed to get assignments. topic: {}, groupId: {}, clientId: {}", topic, group_id, client_id)); + } + }; println!("After seek offset assignments: {:?}", assignments); // ANCHOR_END: seek_offset diff --git a/docs/examples/rust/restexample/examples/insert.rs b/docs/examples/rust/restexample/examples/insert.rs index 537e531501..be85c1f82c 100644 --- a/docs/examples/rust/restexample/examples/insert.rs +++ b/docs/examples/rust/restexample/examples/insert.rs @@ -9,22 +9,22 @@ async fn main() -> anyhow::Result<()> { // ANCHOR: insert_data - match taos.exec(r#"INSERT INTO - power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') - VALUES - (NOW + 1a, 10.30000, 219, 0.31000) - (NOW + 2a, 12.60000, 218, 0.33000) - (NOW + 3a, 12.30000, 221, 0.31000) - power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') - VALUES - (NOW + 1a, 10.30000, 218, 0.25000) "#).await{ + let insert_sql = r#"INSERT INTO + power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') + VALUES + (NOW + 1a, 10.30000, 219, 0.31000) + (NOW + 2a, 12.60000, 218, 0.33000) + (NOW + 3a, 12.30000, 221, 0.31000) + power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') + VALUES + (NOW + 1a, 10.30000, 218, 0.25000) "#; + match taos.exec(insert_sql).await{ Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows), Err(err) => { - eprintln!("Failed to insert data to power.meters, ErrMessage: {}", dsn, err); + eprintln!("Failed to insert data to power.meters, sql: {}, ErrMessage: {}", insert_sql, err); return Err(err.into()); } } - // ANCHOR_END: insert_data Ok(()) diff --git a/docs/examples/rust/restexample/examples/tmq.rs b/docs/examples/rust/restexample/examples/tmq.rs index 0a0214d258..86715d57cb 100644 --- a/docs/examples/rust/restexample/examples/tmq.rs +++ b/docs/examples/rust/restexample/examples/tmq.rs @@ -3,6 +3,8 @@ use std::str::FromStr; use chrono::Local; use chrono::DateTime; use taos::*; +use std::thread; +use tokio::runtime::Runtime; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -53,17 +55,38 @@ async fn main() -> anyhow::Result<()> { consumer } Err(err) => { - eprintln!("Failed to create websocket consumer, dsn: {}, ErrMessage: {}", dsn, err); + eprintln!("Failed to create websocket consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err); return Err(err.into()); } }; // ANCHOR_END: create_consumer_ac + thread::spawn(move || { + let rt = Runtime::new().unwrap(); + + rt.block_on(async { + let taos_insert = TaosBuilder::from_dsn(&dsn).unwrap().build().await.unwrap(); + for i in 0..50 { + let insert_sql = format!(r#"INSERT INTO + power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') + VALUES + (NOW, 10.30000, {}, 0.31000)"#, i); + if let Err(e) = taos_insert.exec(insert_sql).await { + eprintln!("Failed to execute insert: {:?}", e); + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }); + + }).join().unwrap(); + + // ANCHOR: consume - match consumer.subscribe(["topic_meters"]).await{ + let topic = "topic_meters"; + match consumer.subscribe([topic]).await{ Ok(_) => println!("Subscribe topics successfully."), Err(err) => { - eprintln!("Failed to subscribe topic_meters, ErrMessage: {}", err); + eprintln!("Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, err); return Err(err.into()); } } @@ -94,13 +117,14 @@ async fn main() -> anyhow::Result<()> { if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { let records: Vec = block.deserialize().try_collect()?; + // Add your data processing logic here println!("** read {} records: {:#?}\n", records.len(), records); } } Ok(()) }) .await.map_err(|e| { - eprintln!("Failed to poll data; ErrMessage: {:?}", e); + eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e); e })?; @@ -110,14 +134,14 @@ async fn main() -> anyhow::Result<()> { consumer .stream() .try_for_each(|(offset, message)| async { - let topic = offset.topic(); // the vgroup id, like partition id in kafka. let vgroup_id = offset.vgroup_id(); - println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + println!("* in vgroup id {} of topic {}\n", vgroup_id, topic); if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { let records: Vec = block.deserialize().try_collect()?; + // Add your data processing logic here println!("** read {} records: {:#?}\n", records.len(), records); } } @@ -125,21 +149,28 @@ async fn main() -> anyhow::Result<()> { match consumer.commit(offset).await{ Ok(_) => println!("Commit offset manually successfully."), Err(err) => { - eprintln!("Failed to commit offset manually, ErrMessage: {}", err); + eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}", + topic, group_id, client_id, vgroup_id, err); return Err(err.into()); } } Ok(()) }) .await.map_err(|e| { - eprintln!("Failed to poll data, ErrMessage: {:?}", e); + eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e); e })?; // ANCHOR_END: consumer_commit_manually // ANCHOR: seek_offset - let assignments = consumer.assignments().await.unwrap(); + let assignments = match consumer.assignments().await{ + Some(assignments) => assignments, + None => { + eprintln!("Failed to get assignments."); + return Err(anyhow::anyhow!("Failed to get assignments. topic: {}, groupId: {}, clientId: {}", topic, group_id, client_id)); + } + }; println!("assignments: {:?}", assignments); // seek offset @@ -152,7 +183,7 @@ async fn main() -> anyhow::Result<()> { let begin = assignment.begin(); let end = assignment.end(); println!( - "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}", + "topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}", topic, vgroup_id, current, @@ -163,7 +194,8 @@ async fn main() -> anyhow::Result<()> { match consumer.offset_seek(topic, vgroup_id, begin).await{ Ok(_) => (), Err(err) => { - eprintln!("Failed to seek offset, ErrMessage: {}", err); + eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}", + topic, group_id, client_id, vgroup_id, begin, err); return Err(err.into()); } } @@ -174,7 +206,13 @@ async fn main() -> anyhow::Result<()> { } println!("Assignment seek to beginning successfully."); // after seek offset - let assignments = consumer.assignments().await.unwrap(); + let assignments = match consumer.assignments().await{ + Some(assignments) => assignments, + None => { + eprintln!("Failed to get assignments."); + return Err(anyhow::anyhow!("Failed to get assignments. topic: {}, groupId: {}, clientId: {}", topic, group_id, client_id)); + } + }; println!("After seek offset assignments: {:?}", assignments); // ANCHOR_END: seek_offset diff --git a/docs/zh/08-develop/05-stmt.md b/docs/zh/08-develop/05-stmt.md index 9dfd9f56e3..e659177c94 100644 --- a/docs/zh/08-develop/05-stmt.md +++ b/docs/zh/08-develop/05-stmt.md @@ -31,7 +31,7 @@ import TabItem from "@theme/TabItem"; ``` -这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingFullDemo.java) +这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/WSParameterBindingFullDemo.java) @@ -79,7 +79,7 @@ import TabItem from "@theme/TabItem"; {{#include docs/examples/java/src/main/java/com/taos/example/ParameterBindingBasicDemo.java:para_bind}} ``` -这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingFullDemo.java) +这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/ParameterBindingFullDemo.java) diff --git a/docs/zh/14-reference/05-connector/14-java.mdx b/docs/zh/14-reference/05-connector/14-java.mdx index f355aea621..a752867b3f 100644 --- a/docs/zh/14-reference/05-connector/14-java.mdx +++ b/docs/zh/14-reference/05-connector/14-java.mdx @@ -141,7 +141,7 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对 由于历史原因,TDengine中的BINARY底层不是真正的二进制数据,已不建议使用。请用VARBINARY类型代替。 GEOMETRY类型是little endian字节序的二进制数据,符合WKB规范。详细信息请参考 [数据类型](../../taos-sql/data-type/#数据类型) WKB规范请参考[Well-Known Binary (WKB)](https://libgeos.org/specifications/wkb/) -对于java连接器,可以使用jts库来方便的创建GEOMETRY类型对象,序列化后写入TDengine,这里有一个样例[Geometry示例](https://github.com/taosdata/TDengine/blob/3.0/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/GeometryDemo.java) +对于java连接器,可以使用jts库来方便的创建GEOMETRY类型对象,序列化后写入TDengine,这里有一个样例[Geometry示例](https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/GeometryDemo.java) ## 示例程序汇总