Merge branch 'docs/sheyj-3.0' of github.com:taosdata/TDengine into docs/sheyj-3.0

This commit is contained in:
Yaming Pei 2024-08-16 19:40:56 +08:00
commit 177112b659
7 changed files with 126 additions and 50 deletions

View File

@ -47,7 +47,7 @@ public class ConsumerLoopFull {
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
System.out.printf("Failed to create native consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),

View File

@ -9,22 +9,22 @@ async fn main() -> anyhow::Result<()> {
// ANCHOR: insert_data
match taos.exec(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000)
(NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 218, 0.25000) "#).await{
let insert_sql = r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000)
(NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 218, 0.25000) "#;
match taos.exec(insert_sql).await{
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
Err(err) => {
eprintln!("Failed to insert data to power.meters, ErrMessage: {}", err);
eprintln!("Failed to insert data to power.meters, sql: {}, ErrMessage: {}", insert_sql, err);
return Err(err.into());
}
}
// ANCHOR_END: insert_data
Ok(())

View File

@ -3,6 +3,8 @@ use std::str::FromStr;
use chrono::Local;
use chrono::DateTime;
use taos::*;
use std::thread;
use tokio::runtime::Runtime;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@ -53,17 +55,38 @@ async fn main() -> anyhow::Result<()> {
consumer
}
Err(err) => {
eprintln!("Failed to create websocket consumer, dsn: {}, ErrMessage: {}", dsn, err);
eprintln!("Failed to create native consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};
// ANCHOR_END: create_consumer_ac
thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let taos_insert = TaosBuilder::from_dsn(&dsn).unwrap().build().await.unwrap();
for i in 0..50 {
let insert_sql = format!(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW, 10.30000, {}, 0.31000)"#, i);
if let Err(e) = taos_insert.exec(insert_sql).await {
eprintln!("Failed to execute insert: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
}).join().unwrap();
// ANCHOR: consume
match consumer.subscribe(["topic_meters"]).await{
let topic = "topic_meters";
match consumer.subscribe([topic]).await{
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!("Failed to subscribe topic_meters, ErrMessage: {}", err);
eprintln!("Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, err);
return Err(err.into());
}
}
@ -94,13 +117,14 @@ async fn main() -> anyhow::Result<()> {
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data; ErrMessage: {:?}", e);
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
@ -110,14 +134,14 @@ async fn main() -> anyhow::Result<()> {
consumer
.stream()
.try_for_each(|(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
@ -125,21 +149,28 @@ async fn main() -> anyhow::Result<()> {
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, ErrMessage: {}", err);
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, ErrMessage: {:?}", e);
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
// ANCHOR_END: consumer_commit_manually
// ANCHOR: seek_offset
let assignments = consumer.assignments().await.unwrap();
let assignments = match consumer.assignments().await{
Some(assignments) => assignments,
None => {
eprintln!("Failed to get assignments.");
return Err(anyhow::anyhow!("Failed to get assignments. topic: {}, groupId: {}, clientId: {}", topic, group_id, client_id));
}
};
println!("assignments: {:?}", assignments);
// seek offset
@ -152,7 +183,7 @@ async fn main() -> anyhow::Result<()> {
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic,
vgroup_id,
current,
@ -163,7 +194,8 @@ async fn main() -> anyhow::Result<()> {
match consumer.offset_seek(topic, vgroup_id, begin).await{
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, ErrMessage: {}", err);
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
@ -174,7 +206,13 @@ async fn main() -> anyhow::Result<()> {
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = consumer.assignments().await.unwrap();
let assignments = match consumer.assignments().await{
Some(assignments) => assignments,
None => {
eprintln!("Failed to get assignments.");
return Err(anyhow::anyhow!("Failed to get assignments. topic: {}, groupId: {}, clientId: {}", topic, group_id, client_id));
}
};
println!("After seek offset assignments: {:?}", assignments);
// ANCHOR_END: seek_offset

View File

@ -9,22 +9,22 @@ async fn main() -> anyhow::Result<()> {
// ANCHOR: insert_data
match taos.exec(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000)
(NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 218, 0.25000) "#).await{
let insert_sql = r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000)
(NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 218, 0.25000) "#;
match taos.exec(insert_sql).await{
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
Err(err) => {
eprintln!("Failed to insert data to power.meters, ErrMessage: {}", dsn, err);
eprintln!("Failed to insert data to power.meters, sql: {}, ErrMessage: {}", insert_sql, err);
return Err(err.into());
}
}
// ANCHOR_END: insert_data
Ok(())

View File

@ -3,6 +3,8 @@ use std::str::FromStr;
use chrono::Local;
use chrono::DateTime;
use taos::*;
use std::thread;
use tokio::runtime::Runtime;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@ -53,17 +55,38 @@ async fn main() -> anyhow::Result<()> {
consumer
}
Err(err) => {
eprintln!("Failed to create websocket consumer, dsn: {}, ErrMessage: {}", dsn, err);
eprintln!("Failed to create websocket consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};
// ANCHOR_END: create_consumer_ac
thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let taos_insert = TaosBuilder::from_dsn(&dsn).unwrap().build().await.unwrap();
for i in 0..50 {
let insert_sql = format!(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW, 10.30000, {}, 0.31000)"#, i);
if let Err(e) = taos_insert.exec(insert_sql).await {
eprintln!("Failed to execute insert: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
}).join().unwrap();
// ANCHOR: consume
match consumer.subscribe(["topic_meters"]).await{
let topic = "topic_meters";
match consumer.subscribe([topic]).await{
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!("Failed to subscribe topic_meters, ErrMessage: {}", err);
eprintln!("Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, err);
return Err(err.into());
}
}
@ -94,13 +117,14 @@ async fn main() -> anyhow::Result<()> {
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data; ErrMessage: {:?}", e);
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
@ -110,14 +134,14 @@ async fn main() -> anyhow::Result<()> {
consumer
.stream()
.try_for_each(|(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
@ -125,21 +149,28 @@ async fn main() -> anyhow::Result<()> {
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, ErrMessage: {}", err);
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, ErrMessage: {:?}", e);
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
// ANCHOR_END: consumer_commit_manually
// ANCHOR: seek_offset
let assignments = consumer.assignments().await.unwrap();
let assignments = match consumer.assignments().await{
Some(assignments) => assignments,
None => {
eprintln!("Failed to get assignments.");
return Err(anyhow::anyhow!("Failed to get assignments. topic: {}, groupId: {}, clientId: {}", topic, group_id, client_id));
}
};
println!("assignments: {:?}", assignments);
// seek offset
@ -152,7 +183,7 @@ async fn main() -> anyhow::Result<()> {
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic,
vgroup_id,
current,
@ -163,7 +194,8 @@ async fn main() -> anyhow::Result<()> {
match consumer.offset_seek(topic, vgroup_id, begin).await{
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, ErrMessage: {}", err);
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
@ -174,7 +206,13 @@ async fn main() -> anyhow::Result<()> {
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = consumer.assignments().await.unwrap();
let assignments = match consumer.assignments().await{
Some(assignments) => assignments,
None => {
eprintln!("Failed to get assignments.");
return Err(anyhow::anyhow!("Failed to get assignments. topic: {}, groupId: {}, clientId: {}", topic, group_id, client_id));
}
};
println!("After seek offset assignments: {:?}", assignments);
// ANCHOR_END: seek_offset

View File

@ -31,7 +31,7 @@ import TabItem from "@theme/TabItem";
```
这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingFullDemo.java)
这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/WSParameterBindingFullDemo.java)
</TabItem>
<TabItem label="Python" value="python">
@ -79,7 +79,7 @@ import TabItem from "@theme/TabItem";
{{#include docs/examples/java/src/main/java/com/taos/example/ParameterBindingBasicDemo.java:para_bind}}
```
这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingFullDemo.java)
这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/ParameterBindingFullDemo.java)
</TabItem>
<TabItem label="Python" value="python">

View File

@ -141,7 +141,7 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对
由于历史原因TDengine中的BINARY底层不是真正的二进制数据已不建议使用。请用VARBINARY类型代替。
GEOMETRY类型是little endian字节序的二进制数据符合WKB规范。详细信息请参考 [数据类型](../../taos-sql/data-type/#数据类型)
WKB规范请参考[Well-Known Binary (WKB)](https://libgeos.org/specifications/wkb/)
对于java连接器可以使用jts库来方便的创建GEOMETRY类型对象序列化后写入TDengine这里有一个样例[Geometry示例](https://github.com/taosdata/TDengine/blob/3.0/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/GeometryDemo.java)
对于java连接器可以使用jts库来方便的创建GEOMETRY类型对象序列化后写入TDengine这里有一个样例[Geometry示例](https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/GeometryDemo.java)
## 示例程序汇总