Merge branch 'docs/sheyj-3.0' of github.com:taosdata/TDengine into docs/sheyj-3.0
This commit is contained in:
commit
0eba0ff308
|
@ -35,7 +35,7 @@ func main() {
|
||||||
}
|
}
|
||||||
rowsAffected, err = res.RowsAffected()
|
rowsAffected, err = res.RowsAffected()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Failed to get create create rowsAffected, url:" + taosDSN + "; ErrMessage: " + err.Error())
|
log.Fatalln("Failed to get create db rowsAffected, url:" + taosDSN + "; ErrMessage: " + err.Error())
|
||||||
}
|
}
|
||||||
// you can check rowsAffected here
|
// you can check rowsAffected here
|
||||||
fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected)
|
fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected)
|
||||||
|
@ -66,7 +66,7 @@ func main() {
|
||||||
// query data, make sure the database and table are created before
|
// query data, make sure the database and table are created before
|
||||||
rows, err := db.Query("SELECT ts, current, location FROM power.meters limit 100")
|
rows, err := db.Query("SELECT ts, current, location FROM power.meters limit 100")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("query data failed:", err)
|
log.Fatal("Failed to query data from power.meters, url:" + taosDSN + "; ErrMessage: " + err.Error())
|
||||||
}
|
}
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
|
@ -76,7 +76,7 @@ func main() {
|
||||||
)
|
)
|
||||||
err = rows.Scan(&ts, ¤t, &location)
|
err = rows.Scan(&ts, ¤t, &location)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("scan data failed:", err)
|
log.Fatal("Failed to scan data, url:" + taosDSN + "; ErrMessage: " + err.Error())
|
||||||
}
|
}
|
||||||
// you can check data here
|
// you can check data here
|
||||||
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)
|
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)
|
||||||
|
|
|
@ -1,9 +1,17 @@
|
||||||
use taos::*;
|
use taos::*;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Error> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
#[allow(unused_variables)]
|
let dsn = "taos://localhost:6030".to_string();
|
||||||
let taos = TaosBuilder::from_dsn("taos://localhost:6030")?.build()?;
|
|
||||||
println!("Connected to localhost with native connection successfully.");
|
match TaosBuilder::from_dsn(&dsn)?.build().await {
|
||||||
|
Ok(_taos) => {
|
||||||
|
println!("Connected to {} successfully.", dsn);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to connect to {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
use taos::*;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let url = "taos://localhost:6030";
|
||||||
|
|
||||||
|
// ANCHOR: create_db_and_table
|
||||||
|
let taos = TaosBuilder::from_dsn(url)?.build().await?;
|
||||||
|
|
||||||
|
// create database and use it
|
||||||
|
match taos.exec_many([
|
||||||
|
"CREATE DATABASE IF NOT EXISTS power",
|
||||||
|
]).await {
|
||||||
|
Ok(afffected_rows) => println!("Create database power successfully, rowsAffected: {}", afffected_rows),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to create database power; ErrMessage: {}", err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create super table
|
||||||
|
match taos.exec_many([
|
||||||
|
"CREATE STABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
|
||||||
|
TAGS (`groupid` INT, `location` BINARY(24))",
|
||||||
|
]).await {
|
||||||
|
Ok(afffected_rows) => println!("Create stable power.meters successfully, rowsAffected: {}", afffected_rows),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to create stable power.meters; ErrMessage: {}", err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
// ANCHOR_END: create_db_and_table
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
use taos::*;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let dsn = "taos://localhost:6030";
|
||||||
|
let builder = TaosBuilder::from_dsn(dsn)?;
|
||||||
|
|
||||||
|
let taos = builder.build().await?;
|
||||||
|
|
||||||
|
|
||||||
|
// ANCHOR: insert_data
|
||||||
|
match taos.exec(r#"INSERT INTO
|
||||||
|
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
|
||||||
|
VALUES
|
||||||
|
(NOW + 1a, 10.30000, 219, 0.31000)
|
||||||
|
(NOW + 2a, 12.60000, 218, 0.33000)
|
||||||
|
(NOW + 3a, 12.30000, 221, 0.31000)
|
||||||
|
power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco')
|
||||||
|
VALUES
|
||||||
|
(NOW + 1a, 10.30000, 218, 0.25000) "#).await{
|
||||||
|
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to insert data to power.meters, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ANCHOR_END: insert_data
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -9,43 +9,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let taos = builder.build().await?;
|
let taos = builder.build().await?;
|
||||||
|
|
||||||
// ANCHOR: create_db_and_table
|
|
||||||
let db = "power";
|
|
||||||
// create database
|
|
||||||
taos.exec_many([
|
|
||||||
format!("CREATE DATABASE IF NOT EXISTS `{db}`"),
|
|
||||||
format!("USE `{db}`"),
|
|
||||||
])
|
|
||||||
.await?;
|
|
||||||
println!("Create database power successfully.");
|
|
||||||
|
|
||||||
// create super table
|
|
||||||
taos.exec_many([
|
|
||||||
"CREATE STABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
|
|
||||||
TAGS (`groupid` INT, `location` BINARY(24))",
|
|
||||||
]).await?;
|
|
||||||
println!("Create stable meters successfully.");
|
|
||||||
|
|
||||||
// ANCHOR_END: create_db_and_table
|
|
||||||
|
|
||||||
// ANCHOR: insert_data
|
|
||||||
let inserted = taos.exec(r#"INSERT INTO
|
|
||||||
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
|
|
||||||
VALUES
|
|
||||||
(NOW + 1a, 10.30000, 219, 0.31000)
|
|
||||||
(NOW + 2a, 12.60000, 218, 0.33000)
|
|
||||||
(NOW + 3a, 12.30000, 221, 0.31000)
|
|
||||||
power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco')
|
|
||||||
VALUES
|
|
||||||
(NOW + 1a, 10.30000, 218, 0.25000) "#).await?;
|
|
||||||
|
|
||||||
println!("inserted: {} rows to power.meters successfully.", inserted);
|
|
||||||
// ANCHOR_END: insert_data
|
|
||||||
|
|
||||||
// ANCHOR: query_data
|
// ANCHOR: query_data
|
||||||
// query data, make sure the database and table are created before
|
// query data, make sure the database and table are created before
|
||||||
let mut result = taos.query("SELECT ts, current, location FROM power.meters limit 100").await?;
|
match taos.query("SELECT ts, current, location FROM power.meters limit 100").await{
|
||||||
|
Ok(mut result) => {
|
||||||
for field in result.fields() {
|
for field in result.fields() {
|
||||||
println!("got field: {}", field.name());
|
println!("got field: {}", field.name());
|
||||||
}
|
}
|
||||||
|
@ -61,6 +28,14 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
nrows += 1;
|
nrows += 1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to query data from power.meters, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// ANCHOR_END: query_data
|
// ANCHOR_END: query_data
|
||||||
|
|
||||||
// ANCHOR: query_data_2
|
// ANCHOR: query_data_2
|
||||||
|
@ -72,30 +47,56 @@ async fn main() -> anyhow::Result<()> {
|
||||||
ts: DateTime<Local>,
|
ts: DateTime<Local>,
|
||||||
// float to f32
|
// float to f32
|
||||||
current: Option<f32>,
|
current: Option<f32>,
|
||||||
// int to i32
|
|
||||||
voltage: Option<i32>,
|
|
||||||
phase: Option<f32>,
|
|
||||||
groupid: i32,
|
|
||||||
// binary/varchar to String
|
// binary/varchar to String
|
||||||
location: String,
|
location: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
let records: Vec<Record> = taos
|
match taos.query("SELECT ts, current, location FROM power.meters limit 100").await {
|
||||||
.query("select ts, current, voltage, phase, groupid, location from power.meters limit 100")
|
Ok(mut query) => {
|
||||||
.await?
|
match query.deserialize::<Record>().try_collect::<Vec<_>>().await {
|
||||||
.deserialize()
|
Ok(records) => {
|
||||||
.try_collect()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
dbg!(records);
|
dbg!(records);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to deserialize query results; ErrMessage: {}", err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to query data from power.meters, url: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
// ANCHOR_END: query_data_2
|
// ANCHOR_END: query_data_2
|
||||||
|
|
||||||
// ANCHOR: query_with_req_id
|
// ANCHOR: query_with_req_id
|
||||||
let result = taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", 1).await?;
|
|
||||||
|
let req_id :u64 = 3;
|
||||||
|
match taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", req_id).await{
|
||||||
|
Ok(mut result) => {
|
||||||
for field in result.fields() {
|
for field in result.fields() {
|
||||||
println!("got field: {}", field.name());
|
println!("got field: {}", field.name());
|
||||||
}
|
}
|
||||||
println!("query with reqId successfully");
|
|
||||||
|
let mut rows = result.rows();
|
||||||
|
let mut nrows = 0;
|
||||||
|
while let Some(row) = rows.try_next().await? {
|
||||||
|
for (col, (name, value)) in row.enumerate() {
|
||||||
|
println!(
|
||||||
|
"[{}] got value in col {} (named `{:>8}`): {}",
|
||||||
|
nrows, col, name, value
|
||||||
|
);
|
||||||
|
}
|
||||||
|
nrows += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to execute sql with reqId: {}, dsn: {}; ErrMessage: {}", req_id, dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ANCHOR_END: query_with_req_id
|
// ANCHOR_END: query_with_req_id
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,8 @@ use taos::taos_query;
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
std::env::set_var("RUST_LOG", "taos=debug");
|
std::env::set_var("RUST_LOG", "taos=debug");
|
||||||
pretty_env_logger::init();
|
pretty_env_logger::init();
|
||||||
let dsn = "taos://localhost:6030".to_string();
|
let host = "localhost";
|
||||||
|
let dsn = format!("taos://{}:6030", host);
|
||||||
log::debug!("dsn: {:?}", &dsn);
|
log::debug!("dsn: {:?}", &dsn);
|
||||||
|
|
||||||
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
|
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
|
||||||
|
@ -39,7 +40,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.ttl(1000)
|
.ttl(1000)
|
||||||
.req_id(100u64)
|
.req_id(100u64)
|
||||||
.build()?;
|
.build()?;
|
||||||
assert_eq!(client.put(&sml_data).await?, ());
|
match client.put(&sml_data).await{
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to insert data with schemaless, host: {}; ErrMessage: {}", host, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SchemalessProtocol::Telnet
|
// SchemalessProtocol::Telnet
|
||||||
let data = [
|
let data = [
|
||||||
|
@ -55,7 +62,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.ttl(1000)
|
.ttl(1000)
|
||||||
.req_id(200u64)
|
.req_id(200u64)
|
||||||
.build()?;
|
.build()?;
|
||||||
assert_eq!(client.put(&sml_data).await?, ());
|
match client.put(&sml_data).await{
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to insert data with schemaless, host: {}; ErrMessage: {}", host, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SchemalessProtocol::Json
|
// SchemalessProtocol::Json
|
||||||
let data = [
|
let data = [
|
||||||
|
@ -80,7 +93,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.ttl(1000)
|
.ttl(1000)
|
||||||
.req_id(300u64)
|
.req_id(300u64)
|
||||||
.build()?;
|
.build()?;
|
||||||
assert_eq!(client.put(&sml_data).await?, ());
|
match client.put(&sml_data).await{
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to insert data with schemaless, host: {}; ErrMessage: {}", host, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
println!("Inserted data with schemaless successfully.");
|
println!("Inserted data with schemaless successfully.");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -2,12 +2,13 @@ use taos::*;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let taos = TaosBuilder::from_dsn("taos://")?.build().await?;
|
let dsn = "taos://localhost:6030";
|
||||||
|
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
|
||||||
|
|
||||||
taos.exec("DROP DATABASE IF EXISTS power").await?;
|
taos.exec("DROP DATABASE IF EXISTS power").await?;
|
||||||
taos.create_database("power").await?;
|
taos.create_database("power").await?;
|
||||||
taos.use_database("power").await?;
|
taos.use_database("power").await?;
|
||||||
taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
|
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))").await?;
|
||||||
|
|
||||||
let mut stmt = Stmt::init(&taos).await?;
|
let mut stmt = Stmt::init(&taos).await?;
|
||||||
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
|
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
|
||||||
|
@ -15,8 +16,8 @@ async fn main() -> anyhow::Result<()> {
|
||||||
const NUM_TABLES: usize = 10;
|
const NUM_TABLES: usize = 10;
|
||||||
const NUM_ROWS: usize = 10;
|
const NUM_ROWS: usize = 10;
|
||||||
for i in 0..NUM_TABLES {
|
for i in 0..NUM_TABLES {
|
||||||
let table_name = format!("d{}", i);
|
let table_name = format!("d_bind_{}", i);
|
||||||
let tags = vec![Value::VarChar("California.SanFransico".into()), Value::Int(2)];
|
let tags = vec![Value::Int(i as i32), Value::VarChar(format!("location_{}", i).into())];
|
||||||
|
|
||||||
// set table name and tags for the prepared statement.
|
// set table name and tags for the prepared statement.
|
||||||
stmt.set_tbname_tags(&table_name, &tags).await?;
|
stmt.set_tbname_tags(&table_name, &tags).await?;
|
||||||
|
@ -35,9 +36,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// execute.
|
// execute.
|
||||||
let rows = stmt.execute().await?;
|
match stmt.execute().await{
|
||||||
assert_eq!(rows, NUM_TABLES * NUM_ROWS);
|
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to insert to table meters using stmt, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
println!("execute stmt insert successfully");
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
use taos_query::prelude::*;
|
use taos_query::prelude::*;
|
||||||
// ANCHOR: create_consumer_dsn
|
// ANCHOR: create_consumer_dsn
|
||||||
let dsn = "taos://localhost:6030".to_string();
|
let dsn = "taos://localhost:6030".to_string();
|
||||||
log::info!("dsn: {}", dsn);
|
println!("dsn: {}", dsn);
|
||||||
let mut dsn = Dsn::from_str(&dsn)?;
|
let mut dsn = Dsn::from_str(&dsn)?;
|
||||||
// ANCHOR_END: create_consumer_dsn
|
// ANCHOR_END: create_consumer_dsn
|
||||||
|
|
||||||
|
@ -37,20 +37,36 @@ async fn main() -> anyhow::Result<()> {
|
||||||
// ANCHOR_END: create_topic
|
// ANCHOR_END: create_topic
|
||||||
|
|
||||||
// ANCHOR: create_consumer_ac
|
// ANCHOR: create_consumer_ac
|
||||||
|
let group_id = "group1".to_string();
|
||||||
|
let client_id = "client1".to_string();
|
||||||
dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string());
|
dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string());
|
||||||
dsn.params.insert("msg.with.table.name".to_string(), "true".to_string());
|
dsn.params.insert("msg.with.table.name".to_string(), "true".to_string());
|
||||||
dsn.params.insert("enable.auto.commit".to_string(), "true".to_string());
|
dsn.params.insert("enable.auto.commit".to_string(), "true".to_string());
|
||||||
dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
|
dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
|
||||||
dsn.params.insert("group.id".to_string(), "group1".to_string());
|
dsn.params.insert("group.id".to_string(), group_id.clone());
|
||||||
dsn.params.insert("client.id".to_string(), "client1".to_string());
|
dsn.params.insert("client.id".to_string(), client_id.clone());
|
||||||
|
|
||||||
let builder = TmqBuilder::from_dsn(&dsn)?;
|
let builder = TmqBuilder::from_dsn(&dsn)?;
|
||||||
let mut consumer = builder.build().await?;
|
let mut consumer = match builder.build().await{
|
||||||
|
Ok(consumer) => {
|
||||||
|
println!("Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.", dsn, group_id, client_id);
|
||||||
|
consumer
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to create consumer, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
// ANCHOR_END: create_consumer_ac
|
// ANCHOR_END: create_consumer_ac
|
||||||
|
|
||||||
// ANCHOR: subscribe
|
// ANCHOR: consume
|
||||||
consumer.subscribe(["topic_meters"]).await?;
|
match consumer.subscribe(["topic_meters"]).await{
|
||||||
// ANCHOR_END: subscribe
|
Ok(_) => println!("subscribe topics successfully."),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to subscribe topic_meters, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize)]
|
#[derive(Debug, serde::Deserialize)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
@ -67,8 +83,6 @@ async fn main() -> anyhow::Result<()> {
|
||||||
location: String,
|
location: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ANCHOR: consume
|
|
||||||
|
|
||||||
consumer
|
consumer
|
||||||
.stream()
|
.stream()
|
||||||
.try_for_each(|(offset, message)| async move {
|
.try_for_each(|(offset, message)| async move {
|
||||||
|
@ -85,7 +99,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.await?;
|
.await.map_err(|e| {
|
||||||
|
eprintln!("Failed to execute consumer functions. ErrMessage: {:?}", e);
|
||||||
|
e
|
||||||
|
})?;
|
||||||
|
|
||||||
// ANCHOR_END: consume
|
// ANCHOR_END: consume
|
||||||
|
|
||||||
|
@ -105,16 +122,25 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// commit offset manually when you have processed the message.
|
// commit offset manually when you have processed the message.
|
||||||
consumer.commit(offset).await?;
|
match consumer.commit(offset).await{
|
||||||
|
Ok(_) => println!("commit offset manually successfully."),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to commit offset manually, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.await?;
|
.await.map_err(|e| {
|
||||||
|
eprintln!("Failed to execute consumer functions. ErrMessage: {:?}", e);
|
||||||
|
e
|
||||||
|
})?;
|
||||||
// ANCHOR_END: consumer_commit_manually
|
// ANCHOR_END: consumer_commit_manually
|
||||||
|
|
||||||
// ANCHOR: assignments
|
|
||||||
|
// ANCHOR: seek_offset
|
||||||
let assignments = consumer.assignments().await.unwrap();
|
let assignments = consumer.assignments().await.unwrap();
|
||||||
log::info!("assignments: {:?}", assignments);
|
println!("assignments: {:?}", assignments);
|
||||||
// ANCHOR_END: assignments
|
|
||||||
|
|
||||||
// seek offset
|
// seek offset
|
||||||
for topic_vec_assignment in assignments {
|
for topic_vec_assignment in assignments {
|
||||||
|
@ -125,7 +151,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let current = assignment.current_offset();
|
let current = assignment.current_offset();
|
||||||
let begin = assignment.begin();
|
let begin = assignment.begin();
|
||||||
let end = assignment.end();
|
let end = assignment.end();
|
||||||
log::debug!(
|
println!(
|
||||||
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
|
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
|
||||||
topic,
|
topic,
|
||||||
vgroup_id,
|
vgroup_id,
|
||||||
|
@ -133,23 +159,24 @@ async fn main() -> anyhow::Result<()> {
|
||||||
begin,
|
begin,
|
||||||
end
|
end
|
||||||
);
|
);
|
||||||
// ANCHOR: seek_offset
|
|
||||||
let res = consumer.offset_seek(topic, vgroup_id, end).await;
|
match consumer.offset_seek(topic, vgroup_id, begin).await{
|
||||||
if res.is_err() {
|
Ok(_) => (),
|
||||||
log::error!("seek offset error: {:?}", res);
|
Err(err) => {
|
||||||
let a = consumer.assignments().await.unwrap();
|
eprintln!("seek example failed; ErrMessage: {}", err);
|
||||||
log::error!("assignments: {:?}", a);
|
return Err(err.into());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// ANCHOR_END: seek_offset
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let topic_assignment = consumer.topic_assignment(topic).await;
|
let topic_assignment = consumer.topic_assignment(topic).await;
|
||||||
log::debug!("topic assignment: {:?}", topic_assignment);
|
println!("topic assignment: {:?}", topic_assignment);
|
||||||
}
|
}
|
||||||
|
println!("assignment seek to beginning successfully.");
|
||||||
// after seek offset
|
// after seek offset
|
||||||
let assignments = consumer.assignments().await.unwrap();
|
let assignments = consumer.assignments().await.unwrap();
|
||||||
log::info!("after seek offset assignments: {:?}", assignments);
|
println!("after seek offset assignments: {:?}", assignments);
|
||||||
|
// ANCHOR_END: seek_offset
|
||||||
|
|
||||||
// ANCHOR: unsubscribe
|
// ANCHOR: unsubscribe
|
||||||
consumer.unsubscribe().await;
|
consumer.unsubscribe().await;
|
||||||
|
|
|
@ -1,9 +1,17 @@
|
||||||
use taos::*;
|
use taos::*;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Error> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
#[allow(unused_variables)]
|
let dsn = "ws://localhost:6041".to_string();
|
||||||
let taos = TaosBuilder::from_dsn("taos+ws://localhost:6041")?.build()?;
|
|
||||||
println!("Connected to localhost with websocket connection successfully.");
|
match TaosBuilder::from_dsn(&dsn)?.build().await {
|
||||||
|
Ok(_taos) => {
|
||||||
|
println!("Connected to {} successfully.", dsn);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to connect to {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
use taos::*;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let url = "ws://localhost:6041";
|
||||||
|
|
||||||
|
// ANCHOR: create_db_and_table
|
||||||
|
let taos = TaosBuilder::from_dsn(url)?.build().await?;
|
||||||
|
|
||||||
|
// create database and use it
|
||||||
|
match taos.exec_many([
|
||||||
|
"CREATE DATABASE IF NOT EXISTS power",
|
||||||
|
]).await {
|
||||||
|
Ok(afffected_rows) => println!("Create database power successfully, rowsAffected: {}", afffected_rows),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to create database power; ErrMessage: {}", err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create super table
|
||||||
|
match taos.exec_many([
|
||||||
|
"CREATE STABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
|
||||||
|
TAGS (`groupid` INT, `location` BINARY(24))",
|
||||||
|
]).await {
|
||||||
|
Ok(afffected_rows) => println!("Create stable power.meters successfully, rowsAffected: {}", afffected_rows),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to create stable power.meters; ErrMessage: {}", err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
// ANCHOR_END: create_db_and_table
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
use taos::*;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let dsn = "ws://localhost:6041";
|
||||||
|
let builder = TaosBuilder::from_dsn(dsn)?;
|
||||||
|
|
||||||
|
let taos = builder.build().await?;
|
||||||
|
|
||||||
|
|
||||||
|
// ANCHOR: insert_data
|
||||||
|
match taos.exec(r#"INSERT INTO
|
||||||
|
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
|
||||||
|
VALUES
|
||||||
|
(NOW + 1a, 10.30000, 219, 0.31000)
|
||||||
|
(NOW + 2a, 12.60000, 218, 0.33000)
|
||||||
|
(NOW + 3a, 12.30000, 221, 0.31000)
|
||||||
|
power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco')
|
||||||
|
VALUES
|
||||||
|
(NOW + 1a, 10.30000, 218, 0.25000) "#).await{
|
||||||
|
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to insert data to power.meters, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ANCHOR_END: insert_data
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -0,0 +1,102 @@
|
||||||
|
use taos::*;
|
||||||
|
use chrono::Local;
|
||||||
|
use chrono::DateTime;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let dsn = "ws://localhost:6041";
|
||||||
|
let builder = TaosBuilder::from_dsn(dsn)?;
|
||||||
|
|
||||||
|
let taos = builder.build().await?;
|
||||||
|
|
||||||
|
// ANCHOR: query_data
|
||||||
|
// query data, make sure the database and table are created before
|
||||||
|
match taos.query("SELECT ts, current, location FROM power.meters limit 100").await{
|
||||||
|
Ok(mut result) => {
|
||||||
|
for field in result.fields() {
|
||||||
|
println!("got field: {}", field.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut rows = result.rows();
|
||||||
|
let mut nrows = 0;
|
||||||
|
while let Some(row) = rows.try_next().await? {
|
||||||
|
for (col, (name, value)) in row.enumerate() {
|
||||||
|
println!(
|
||||||
|
"[{}] got value in col {} (named `{:>8}`): {}",
|
||||||
|
nrows, col, name, value
|
||||||
|
);
|
||||||
|
}
|
||||||
|
nrows += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to query data from power.meters, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// ANCHOR_END: query_data
|
||||||
|
|
||||||
|
// ANCHOR: query_data_2
|
||||||
|
// query data, make sure the database and table are created before
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
struct Record {
|
||||||
|
// deserialize timestamp to chrono::DateTime<Local>
|
||||||
|
ts: DateTime<Local>,
|
||||||
|
// float to f32
|
||||||
|
current: Option<f32>,
|
||||||
|
// binary/varchar to String
|
||||||
|
location: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
match taos.query("SELECT ts, current, location FROM power.meters limit 100").await {
|
||||||
|
Ok(mut query) => {
|
||||||
|
match query.deserialize::<Record>().try_collect::<Vec<_>>().await {
|
||||||
|
Ok(records) => {
|
||||||
|
dbg!(records);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to deserialize query results; ErrMessage: {}", err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to query data from power.meters, url: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// ANCHOR_END: query_data_2
|
||||||
|
|
||||||
|
// ANCHOR: query_with_req_id
|
||||||
|
|
||||||
|
let req_id :u64 = 3;
|
||||||
|
match taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", req_id).await{
|
||||||
|
Ok(mut result) => {
|
||||||
|
for field in result.fields() {
|
||||||
|
println!("got field: {}", field.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut rows = result.rows();
|
||||||
|
let mut nrows = 0;
|
||||||
|
while let Some(row) = rows.try_next().await? {
|
||||||
|
for (col, (name, value)) in row.enumerate() {
|
||||||
|
println!(
|
||||||
|
"[{}] got value in col {} (named `{:>8}`): {}",
|
||||||
|
nrows, col, name, value
|
||||||
|
);
|
||||||
|
}
|
||||||
|
nrows += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to execute sql with reqId: {}, dsn: {}; ErrMessage: {}", req_id, dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ANCHOR_END: query_with_req_id
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -11,7 +11,8 @@ use taos::taos_query;
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
std::env::set_var("RUST_LOG", "taos=debug");
|
std::env::set_var("RUST_LOG", "taos=debug");
|
||||||
pretty_env_logger::init();
|
pretty_env_logger::init();
|
||||||
let dsn = "http://localhost:6041/power".to_string();
|
let host = "localhost";
|
||||||
|
let dsn = format!("ws://{}:6041/power", host);
|
||||||
log::debug!("dsn: {:?}", &dsn);
|
log::debug!("dsn: {:?}", &dsn);
|
||||||
|
|
||||||
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
|
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
|
||||||
|
@ -30,7 +31,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.ttl(1000)
|
.ttl(1000)
|
||||||
.req_id(100u64)
|
.req_id(100u64)
|
||||||
.build()?;
|
.build()?;
|
||||||
assert_eq!(client.put(&sml_data).await?, ());
|
match client.put(&sml_data).await{
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to insert data with schemaless, host: {}; ErrMessage: {}", host, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SchemalessProtocol::Telnet
|
// SchemalessProtocol::Telnet
|
||||||
let data = [
|
let data = [
|
||||||
|
@ -46,7 +53,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.ttl(1000)
|
.ttl(1000)
|
||||||
.req_id(200u64)
|
.req_id(200u64)
|
||||||
.build()?;
|
.build()?;
|
||||||
assert_eq!(client.put(&sml_data).await?, ());
|
match client.put(&sml_data).await{
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to insert data with schemaless, host: {}; ErrMessage: {}", host, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SchemalessProtocol::Json
|
// SchemalessProtocol::Json
|
||||||
let data = [
|
let data = [
|
||||||
|
@ -71,7 +84,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.ttl(1000)
|
.ttl(1000)
|
||||||
.req_id(300u64)
|
.req_id(300u64)
|
||||||
.build()?;
|
.build()?;
|
||||||
assert_eq!(client.put(&sml_data).await?, ());
|
match client.put(&sml_data).await{
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to insert data with schemaless, host: {}; ErrMessage: {}", host, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
println!("Inserted data with schemaless successfully.");
|
println!("Inserted data with schemaless successfully.");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -2,12 +2,13 @@ use taos::*;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let taos = TaosBuilder::from_dsn("ws://")?.build().await?;
|
let dsn = "ws://";
|
||||||
|
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
|
||||||
|
|
||||||
taos.exec("DROP DATABASE IF EXISTS power").await?;
|
taos.exec("DROP DATABASE IF EXISTS power").await?;
|
||||||
taos.create_database("power").await?;
|
taos.create_database("power").await?;
|
||||||
taos.use_database("power").await?;
|
taos.use_database("power").await?;
|
||||||
taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
|
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))").await?;
|
||||||
|
|
||||||
let mut stmt = Stmt::init(&taos).await?;
|
let mut stmt = Stmt::init(&taos).await?;
|
||||||
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
|
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
|
||||||
|
@ -15,8 +16,8 @@ async fn main() -> anyhow::Result<()> {
|
||||||
const NUM_TABLES: usize = 10;
|
const NUM_TABLES: usize = 10;
|
||||||
const NUM_ROWS: usize = 10;
|
const NUM_ROWS: usize = 10;
|
||||||
for i in 0..NUM_TABLES {
|
for i in 0..NUM_TABLES {
|
||||||
let table_name = format!("d{}", i);
|
let table_name = format!("d_bind_{}", i);
|
||||||
let tags = vec![Value::VarChar("California.SanFransico".into()), Value::Int(2)];
|
let tags = vec![Value::Int(i as i32), Value::VarChar(format!("location_{}", i).into())];
|
||||||
|
|
||||||
// set table name and tags for the prepared statement.
|
// set table name and tags for the prepared statement.
|
||||||
stmt.set_tbname_tags(&table_name, &tags).await?;
|
stmt.set_tbname_tags(&table_name, &tags).await?;
|
||||||
|
@ -35,9 +36,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// execute.
|
// execute.
|
||||||
let rows = stmt.execute().await?;
|
match stmt.execute().await{
|
||||||
assert_eq!(rows, NUM_TABLES * NUM_ROWS);
|
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to insert to table meters using stmt, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
println!("execute stmt insert successfully");
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
use taos_query::prelude::*;
|
use taos_query::prelude::*;
|
||||||
// ANCHOR: create_consumer_dsn
|
// ANCHOR: create_consumer_dsn
|
||||||
let dsn = "ws://localhost:6041".to_string();
|
let dsn = "ws://localhost:6041".to_string();
|
||||||
log::info!("dsn: {}", dsn);
|
println!("dsn: {}", dsn);
|
||||||
let mut dsn = Dsn::from_str(&dsn)?;
|
let mut dsn = Dsn::from_str(&dsn)?;
|
||||||
// ANCHOR_END: create_consumer_dsn
|
// ANCHOR_END: create_consumer_dsn
|
||||||
|
|
||||||
|
@ -37,20 +37,36 @@ async fn main() -> anyhow::Result<()> {
|
||||||
// ANCHOR_END: create_topic
|
// ANCHOR_END: create_topic
|
||||||
|
|
||||||
// ANCHOR: create_consumer_ac
|
// ANCHOR: create_consumer_ac
|
||||||
|
let group_id = "group1".to_string();
|
||||||
|
let client_id = "client1".to_string();
|
||||||
dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string());
|
dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string());
|
||||||
dsn.params.insert("msg.with.table.name".to_string(), "true".to_string());
|
dsn.params.insert("msg.with.table.name".to_string(), "true".to_string());
|
||||||
dsn.params.insert("enable.auto.commit".to_string(), "true".to_string());
|
dsn.params.insert("enable.auto.commit".to_string(), "true".to_string());
|
||||||
dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
|
dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
|
||||||
dsn.params.insert("group.id".to_string(), "group1".to_string());
|
dsn.params.insert("group.id".to_string(), group_id.clone());
|
||||||
dsn.params.insert("client.id".to_string(), "client1".to_string());
|
dsn.params.insert("client.id".to_string(), client_id.clone());
|
||||||
|
|
||||||
let builder = TmqBuilder::from_dsn(&dsn)?;
|
let builder = TmqBuilder::from_dsn(&dsn)?;
|
||||||
let mut consumer = builder.build().await?;
|
let mut consumer = match builder.build().await{
|
||||||
|
Ok(consumer) => {
|
||||||
|
println!("Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.", dsn, group_id, client_id);
|
||||||
|
consumer
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to create consumer, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
// ANCHOR_END: create_consumer_ac
|
// ANCHOR_END: create_consumer_ac
|
||||||
|
|
||||||
// ANCHOR: subscribe
|
// ANCHOR: consume
|
||||||
consumer.subscribe(["topic_meters"]).await?;
|
match consumer.subscribe(["topic_meters"]).await{
|
||||||
// ANCHOR_END: subscribe
|
Ok(_) => println!("subscribe topics successfully."),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to subscribe topic_meters, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize)]
|
#[derive(Debug, serde::Deserialize)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
@ -67,8 +83,6 @@ async fn main() -> anyhow::Result<()> {
|
||||||
location: String,
|
location: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ANCHOR: consume
|
|
||||||
|
|
||||||
consumer
|
consumer
|
||||||
.stream()
|
.stream()
|
||||||
.try_for_each(|(offset, message)| async move {
|
.try_for_each(|(offset, message)| async move {
|
||||||
|
@ -85,7 +99,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.await?;
|
.await.map_err(|e| {
|
||||||
|
eprintln!("Failed to poll data; ErrMessage: {:?}", e);
|
||||||
|
e
|
||||||
|
})?;
|
||||||
|
|
||||||
// ANCHOR_END: consume
|
// ANCHOR_END: consume
|
||||||
|
|
||||||
|
@ -105,16 +122,25 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// commit offset manually when you have processed the message.
|
// commit offset manually when you have processed the message.
|
||||||
consumer.commit(offset).await?;
|
match consumer.commit(offset).await{
|
||||||
|
Ok(_) => println!("commit offset manually successfully."),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to commit offset manually, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.await?;
|
.await.map_err(|e| {
|
||||||
|
eprintln!("Failed to execute consumer functions. ErrMessage: {:?}", e);
|
||||||
|
e
|
||||||
|
})?;
|
||||||
// ANCHOR_END: consumer_commit_manually
|
// ANCHOR_END: consumer_commit_manually
|
||||||
|
|
||||||
// ANCHOR: assignments
|
|
||||||
|
// ANCHOR: seek_offset
|
||||||
let assignments = consumer.assignments().await.unwrap();
|
let assignments = consumer.assignments().await.unwrap();
|
||||||
log::info!("assignments: {:?}", assignments);
|
println!("assignments: {:?}", assignments);
|
||||||
// ANCHOR_END: assignments
|
|
||||||
|
|
||||||
// seek offset
|
// seek offset
|
||||||
for topic_vec_assignment in assignments {
|
for topic_vec_assignment in assignments {
|
||||||
|
@ -125,7 +151,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let current = assignment.current_offset();
|
let current = assignment.current_offset();
|
||||||
let begin = assignment.begin();
|
let begin = assignment.begin();
|
||||||
let end = assignment.end();
|
let end = assignment.end();
|
||||||
log::debug!(
|
println!(
|
||||||
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
|
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
|
||||||
topic,
|
topic,
|
||||||
vgroup_id,
|
vgroup_id,
|
||||||
|
@ -133,26 +159,28 @@ async fn main() -> anyhow::Result<()> {
|
||||||
begin,
|
begin,
|
||||||
end
|
end
|
||||||
);
|
);
|
||||||
// ANCHOR: seek_offset
|
|
||||||
let res = consumer.offset_seek(topic, vgroup_id, end).await;
|
match consumer.offset_seek(topic, vgroup_id, begin).await{
|
||||||
if res.is_err() {
|
Ok(_) => (),
|
||||||
log::error!("seek offset error: {:?}", res);
|
Err(err) => {
|
||||||
let a = consumer.assignments().await.unwrap();
|
eprintln!("seek example failed; ErrMessage: {}", err);
|
||||||
log::error!("assignments: {:?}", a);
|
return Err(err.into());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// ANCHOR_END: seek_offset
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let topic_assignment = consumer.topic_assignment(topic).await;
|
let topic_assignment = consumer.topic_assignment(topic).await;
|
||||||
log::debug!("topic assignment: {:?}", topic_assignment);
|
println!("topic assignment: {:?}", topic_assignment);
|
||||||
}
|
}
|
||||||
|
println!("assignment seek to beginning successfully.");
|
||||||
// after seek offset
|
// after seek offset
|
||||||
let assignments = consumer.assignments().await.unwrap();
|
let assignments = consumer.assignments().await.unwrap();
|
||||||
log::info!("after seek offset assignments: {:?}", assignments);
|
println!("after seek offset assignments: {:?}", assignments);
|
||||||
|
// ANCHOR_END: seek_offset
|
||||||
|
|
||||||
// ANCHOR: unsubscribe
|
// ANCHOR: unsubscribe
|
||||||
consumer.unsubscribe().await;
|
consumer.unsubscribe().await;
|
||||||
|
println!("consumer unsubscribed successfully.");
|
||||||
// ANCHOR_END: unsubscribe
|
// ANCHOR_END: unsubscribe
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
|
|
@ -53,7 +53,7 @@ REST API:直接调用 `taosadapter` 提供的 REST API 接口,进行数据
|
||||||
<TabItem label="Rust" value="rust">
|
<TabItem label="Rust" value="rust">
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
{{#include docs/examples/rust/nativeexample/examples/query.rs:create_db_and_table}}
|
{{#include docs/examples/rust/nativeexample/examples/createdb.rs:create_db_and_table}}
|
||||||
```
|
```
|
||||||
|
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
@ -129,7 +129,7 @@ NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW
|
||||||
<TabItem label="Rust" value="rust">
|
<TabItem label="Rust" value="rust">
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
{{#include docs/examples/rust/nativeexample/examples/query.rs:insert_data}}
|
{{#include docs/examples/rust/nativeexample/examples/insert.rs:insert_data}}
|
||||||
```
|
```
|
||||||
|
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
|
@ -42,10 +42,10 @@ public class ConsumerLoopFull {
|
||||||
return consumer;
|
return consumer;
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to create native consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("Failed to create consumer", ex);
|
throw new SQLException("Failed to create consumer", ex);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers")
|
System.out.println("Failed to create native consumer, host : " + config.getProperty("bootstrap.servers")
|
||||||
+ "; ErrMessage: " + ex.getMessage());
|
+ "; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("Failed to create consumer", ex);
|
throw new SQLException("Failed to create consumer", ex);
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,7 @@ public class ConsumerLoopFull {
|
||||||
|
|
||||||
// subscribe to the topics
|
// subscribe to the topics
|
||||||
consumer.subscribe(topics);
|
consumer.subscribe(topics);
|
||||||
System.out.println("subscribe topics successfully");
|
System.out.println("subscribe topics successfully.");
|
||||||
for (int i = 0; i < 50; i++) {
|
for (int i = 0; i < 50; i++) {
|
||||||
// poll data
|
// poll data
|
||||||
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
||||||
|
@ -72,10 +72,10 @@ public class ConsumerLoopFull {
|
||||||
|
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to poll data, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("Failed to poll data", ex);
|
throw new SQLException("Failed to poll data", ex);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
System.out.println("Failed to poll data; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to poll data, ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("Failed to poll data", ex);
|
throw new SQLException("Failed to poll data", ex);
|
||||||
}
|
}
|
||||||
// ANCHOR_END: poll_data_code_piece
|
// ANCHOR_END: poll_data_code_piece
|
||||||
|
@ -88,7 +88,7 @@ public class ConsumerLoopFull {
|
||||||
|
|
||||||
// subscribe to the topics
|
// subscribe to the topics
|
||||||
consumer.subscribe(topics);
|
consumer.subscribe(topics);
|
||||||
System.out.println("subscribe topics successfully");
|
System.out.println("subscribe topics successfully.");
|
||||||
Set<TopicPartition> assignment = consumer.assignment();
|
Set<TopicPartition> assignment = consumer.assignment();
|
||||||
System.out.println("now assignment: " + JSON.toJSONString(assignment));
|
System.out.println("now assignment: " + JSON.toJSONString(assignment));
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ public class ConsumerLoopFull {
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer.seekToBeginning(assignment);
|
consumer.seekToBeginning(assignment);
|
||||||
System.out.println("assignment seek to beginning successfully");
|
System.out.println("assignment seek to beginning successfully.");
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
|
@ -149,7 +149,7 @@ public class ConsumerLoopFull {
|
||||||
try {
|
try {
|
||||||
// unsubscribe the consumer
|
// unsubscribe the consumer
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
System.out.println("unsubscribe consumer successfully.");
|
System.out.println("consumer unsubscribed successfully.");
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
|
@ -317,22 +317,22 @@ public class ConsumerLoopFull {
|
||||||
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
System.out.println("pollDataExample executed successfully");
|
System.out.println("pollDataExample executed successfully.");
|
||||||
});
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
TaosConsumer<ResultBean> consumer = getConsumer();
|
TaosConsumer<ResultBean> consumer = getConsumer();
|
||||||
|
|
||||||
pollExample(consumer);
|
pollExample(consumer);
|
||||||
System.out.println("pollExample executed successfully");
|
System.out.println("pollExample executed successfully.");
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
|
|
||||||
seekExample(consumer);
|
seekExample(consumer);
|
||||||
System.out.println("seekExample executed successfully");
|
System.out.println("seekExample executed successfully.");
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
|
|
||||||
commitExample(consumer);
|
commitExample(consumer);
|
||||||
System.out.println("commitExample executed successfully");
|
System.out.println("commitExample executed successfully.");
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
|
|
||||||
unsubscribeExample(consumer);
|
unsubscribeExample(consumer);
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class WsConsumerLoopFull {
|
||||||
|
|
||||||
// subscribe to the topics
|
// subscribe to the topics
|
||||||
consumer.subscribe(topics);
|
consumer.subscribe(topics);
|
||||||
System.out.println("subscribe topics successfully");
|
System.out.println("subscribe topics successfully.");
|
||||||
for (int i = 0; i < 50; i++) {
|
for (int i = 0; i < 50; i++) {
|
||||||
// poll data
|
// poll data
|
||||||
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
||||||
|
@ -70,10 +70,10 @@ public class WsConsumerLoopFull {
|
||||||
|
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to poll data, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("Failed to poll data", ex);
|
throw new SQLException("Failed to poll data", ex);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
System.out.println("Failed to poll data; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to poll data, ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("Failed to poll data", ex);
|
throw new SQLException("Failed to poll data", ex);
|
||||||
}
|
}
|
||||||
// ANCHOR_END: poll_data_code_piece
|
// ANCHOR_END: poll_data_code_piece
|
||||||
|
@ -86,7 +86,7 @@ public class WsConsumerLoopFull {
|
||||||
|
|
||||||
// subscribe to the topics
|
// subscribe to the topics
|
||||||
consumer.subscribe(topics);
|
consumer.subscribe(topics);
|
||||||
System.out.println("subscribe topics successfully");
|
System.out.println("subscribe topics successfully.");
|
||||||
Set<TopicPartition> assignment = consumer.assignment();
|
Set<TopicPartition> assignment = consumer.assignment();
|
||||||
System.out.println("now assignment: " + JSON.toJSONString(assignment));
|
System.out.println("now assignment: " + JSON.toJSONString(assignment));
|
||||||
|
|
||||||
|
@ -97,7 +97,7 @@ public class WsConsumerLoopFull {
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer.seekToBeginning(assignment);
|
consumer.seekToBeginning(assignment);
|
||||||
System.out.println("assignment seek to beginning successfully");
|
System.out.println("assignment seek to beginning successfully.");
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
|
@ -147,7 +147,7 @@ public class WsConsumerLoopFull {
|
||||||
try {
|
try {
|
||||||
// unsubscribe the consumer
|
// unsubscribe the consumer
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
System.out.println("unsubscribe consumer successfully.");
|
System.out.println("consumer unsubscribed successfully.");
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
|
@ -315,26 +315,26 @@ public class WsConsumerLoopFull {
|
||||||
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
System.out.println("pollDataExample executed successfully");
|
System.out.println("pollDataExample executed successfully.");
|
||||||
});
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
TaosConsumer<ResultBean> consumer = getConsumer();
|
TaosConsumer<ResultBean> consumer = getConsumer();
|
||||||
|
|
||||||
pollExample(consumer);
|
pollExample(consumer);
|
||||||
System.out.println("pollExample executed successfully");
|
System.out.println("pollExample executed successfully.");
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
|
|
||||||
seekExample(consumer);
|
seekExample(consumer);
|
||||||
System.out.println("seekExample executed successfully");
|
System.out.println("seekExample executed successfully.");
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
|
|
||||||
commitExample(consumer);
|
commitExample(consumer);
|
||||||
System.out.println("commitExample executed successfully");
|
System.out.println("commitExample executed successfully.");
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
|
|
||||||
unsubscribeExample(consumer);
|
unsubscribeExample(consumer);
|
||||||
System.out.println("unsubscribeExample executed successfully");
|
System.out.println("unsubscribeExample executed successfully.");
|
||||||
|
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
|
|
Loading…
Reference in New Issue