mod code example

This commit is contained in:
sheyanjie-qq 2024-08-03 19:19:26 +08:00 committed by gccgdb1234
parent 0bfa9bcf8f
commit e1f82b3828
14 changed files with 271 additions and 250 deletions

View File

@ -12,4 +12,4 @@ tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
log = "0.4"
pretty_env_logger = "0.5.0"
taos = { version = "0.11.8" }
taos = "*"

View File

@ -5,7 +5,7 @@ async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let taos = builder.build().await?;
// ANCHOR: create_db_and_table
let db = "power";
@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> {
// create super table
taos.exec_many([
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
"CREATE STABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
]).await?;
println!("Create stable meters successfully.");
@ -27,15 +27,15 @@ async fn main() -> anyhow::Result<()> {
// ANCHOR_END: create_db_and_table
// ANCHOR: insert_data
let inserted = taos.exec("INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ").await?;
let inserted = taos.exec(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000)
(NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco')
VALUES
(NOW + 1a, 10.30000, 218, 0.25000) "#).await?;
println!("inserted: {} rows to power.meters successfully.", inserted);
// ANCHOR_END: insert_data
@ -63,5 +63,10 @@ async fn main() -> anyhow::Result<()> {
// ANCHOR: query_with_req_id
let result = taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", 1).await?;
for field in result.fields() {
println!("got field: {}", field.name());
}
println!("query with reqId successfully");
// ANCHOR_END: query_with_req_id
Ok(())
}

View File

@ -2,11 +2,13 @@ use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use crate::AsyncQueryable;
use crate::AsyncTBuilder;
use crate::TaosBuilder;
use taos::AsyncQueryable;
use taos::AsyncTBuilder;
use taos::TaosBuilder;
use taos::taos_query;
async fn put() -> anyhow::Result<()> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let dsn =
@ -26,7 +28,7 @@ async fn put() -> anyhow::Result<()> {
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000",
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639",
]
.map(String::from)
.to_vec();
@ -42,7 +44,7 @@ async fn put() -> anyhow::Result<()> {
// SchemalessProtocol::Telnet
let data = [
"metric_telnet 1648432611249 10.3 location=California.SanFrancisco group=2",
"metric_telnet 1707095283260 4 host=host0 interface=eth0",
]
.map(String::from)
.to_vec();
@ -58,7 +60,16 @@ async fn put() -> anyhow::Result<()> {
// SchemalessProtocol::Json
let data = [
r#"[{"metric": "metric_json", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#
r#"[{
"metric": "metric_json",
"timestamp": 1626846400,
"value": 10.3,
"tags": {
"groupid": 2,
"location": "California.SanFrancisco",
"id": "d1001"
}
}]"#
]
.map(String::from)
.to_vec();
@ -72,5 +83,6 @@ async fn put() -> anyhow::Result<()> {
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
println!("execute schemaless insert successfully");
Ok(())
}

View File

@ -37,6 +37,7 @@ async fn main() -> anyhow::Result<()> {
// execute.
let rows = stmt.execute().await?;
assert_eq!(rows, NUM_TABLES * NUM_ROWS);
println!("execute stmt insert successfully");
Ok(())
}

View File

@ -1,6 +1,7 @@
use std::time::Duration;
use std::str::FromStr;
use chrono::Local;
use chrono::DateTime;
use taos::*;
#[tokio::main]
@ -23,18 +24,8 @@ async fn main() -> anyhow::Result<()> {
"drop database if exists power",
"create database if not exists power WAL_RETENTION_PERIOD 86400",
"use power",
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))",
"create table if not exists power.d001 using power.meters tags(1,'location')",
])
.await?;
taos.exec_many([
"drop database if exists db2",
"create database if not exists db2 wal_retention_period 3600",
"use db2",
])
.await?;
@ -57,68 +48,74 @@ async fn main() -> anyhow::Result<()> {
let mut consumer = builder.build().await?;
// ANCHOR_END: create_consumer_ac
// ANCHOR: consume
// ANCHOR: subscribe
consumer.subscribe(["topic_meters"]).await?;
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
// ANCHOR_END: subscribe
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"receive message from: topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::info!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error in handling meta message: {}", err);
}
}
MessageSet::Data(data) => {
log::info!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("message data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::info!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error in handling metadata message: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("message data: {:?}", data);
}
}
}
// commit offset manually when handling message successfully
consumer.commit(offset).await?;
}
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
// ANCHOR: consume
consumer
.stream()
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await?;
// ANCHOR_END: consume
// ANCHOR: seek_offset
// ANCHOR: consumer_commit_manually
consumer
.stream()
.try_for_each(|(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
consumer.commit(offset).await?;
Ok(())
})
.await?;
// ANCHOR_END: consumer_commit_manually
// ANCHOR: assignments
let assignments = consumer.assignments().await.unwrap();
log::info!("start assignments: {:?}", assignments);
log::info!("assignments: {:?}", assignments);
// ANCHOR_END: assignments
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
@ -136,24 +133,23 @@ async fn main() -> anyhow::Result<()> {
begin,
end
);
// seek offset of the (topic, vgroup_id) to the begin
let res = consumer.offset_seek(topic, vgroup_id, begin).await;
// ANCHOR: seek_offset
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
// ANCHOR_END: seek_offset
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::debug!("topic assignment: {:?}", topic_assignment);
}
// after seek offset
let assignments = consumer.assignments().await.unwrap();
log::info!("after seek offset assignments: {:?}", assignments);
// ANCHOR_END: seek_offset
// ANCHOR: unsubscribe
consumer.unsubscribe().await;
@ -162,7 +158,6 @@ async fn main() -> anyhow::Result<()> {
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
"drop database db2",
"drop topic topic_meters",
"drop database power",
])

View File

@ -11,4 +11,4 @@ tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
log = "0.4"
pretty_env_logger = "0.5.0"
taos = { version = "0.*" }
taos = "*"

View File

@ -2,31 +2,23 @@ use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use crate::AsyncQueryable;
use crate::AsyncTBuilder;
use crate::TaosBuilder;
use taos::AsyncQueryable;
use taos::AsyncTBuilder;
use taos::TaosBuilder;
use taos::taos_query;
async fn put() -> anyhow::Result<()> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let dsn =
std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("https://localhost:6041".to_string());
let dsn = "http://localhost:6041/power".to_string();
log::debug!("dsn: {:?}", &dsn);
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
client
.exec(format!("create database if not exists {db}"))
.await?;
// should specify database before insert
client.exec(format!("use {db}")).await?;
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000",
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639",
]
.map(String::from)
.to_vec();
@ -42,7 +34,7 @@ async fn put() -> anyhow::Result<()> {
// SchemalessProtocol::Telnet
let data = [
"metric_telnet 1648432611249 10.3 location=California.SanFrancisco group=2",
"metric_telnet 1707095283260 4 host=host0 interface=eth0",
]
.map(String::from)
.to_vec();
@ -58,7 +50,16 @@ async fn put() -> anyhow::Result<()> {
// SchemalessProtocol::Json
let data = [
r#"[{"metric": "metric_json", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#
r#"[{
"metric": "metric_json",
"timestamp": 1626846400,
"value": 10.3,
"tags": {
"groupid": 2,
"location": "California.SanFrancisco",
"id": "d1001"
}
}]"#
]
.map(String::from)
.to_vec();
@ -72,5 +73,6 @@ async fn put() -> anyhow::Result<()> {
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
println!("execute schemaless insert successfully");
Ok(())
}

View File

@ -38,5 +38,6 @@ async fn main() -> anyhow::Result<()> {
let rows = stmt.execute().await?;
assert_eq!(rows, NUM_TABLES * NUM_ROWS);
println!("execute stmt insert successfully");
Ok(())
}

View File

@ -1,6 +1,7 @@
use std::time::Duration;
use std::str::FromStr;
use chrono::Local;
use chrono::DateTime;
use taos::*;
#[tokio::main]
@ -28,13 +29,6 @@ async fn main() -> anyhow::Result<()> {
])
.await?;
taos.exec_many([
"drop database if exists db2",
"create database if not exists db2 wal_retention_period 3600",
"use db2",
])
.await?;
// ANCHOR: create_topic
taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
@ -58,9 +52,45 @@ async fn main() -> anyhow::Result<()> {
consumer.subscribe(["topic_meters"]).await?;
// ANCHOR_END: subscribe
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
// ANCHOR: consume
{
consumer
consumer
.stream()
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await?;
// ANCHOR_END: consume
// ANCHOR: consumer_commit_manually
consumer
.stream()
.try_for_each(|(offset, message)| async {
let topic = offset.topic();
@ -74,11 +104,12 @@ async fn main() -> anyhow::Result<()> {
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
consumer.commit(offset).await?;
Ok(())
})
.await?;
}
// ANCHOR_END: consume
// ANCHOR_END: consumer_commit_manually
// ANCHOR: assignments
let assignments = consumer.assignments().await.unwrap();
@ -127,7 +158,6 @@ async fn main() -> anyhow::Result<()> {
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
"drop database db2",
"drop topic topic_meters",
"drop database power",
])

View File

@ -375,22 +375,15 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto
- `reconnectIntervalMs`:重连间隔毫秒时间,默认为 2000。
</TabItem>
<TabItem label="C" value="c">
使用客户端驱动访问 TDengine 集群的基本过程为:建立连接、查询和写入、关闭连接、清除资源。
C/C++ 语言连接器使用 `taos_connect()` 函数用于建立与 TDengine 数据库的连接。其参数详细说明如下:
下面为建立连接的示例代码,其中省略了查询和写入部分,展示了如何建立连接、关闭连接以及清除资源。
- `host`要连接的数据库服务器的主机名或IP地址。如果是本地数据库可以使用 `"localhost"`
- `user`:用于登录数据库的用户名。
- `passwd`:与用户名对应的密码。
- `db`:连接时默认选择的数据库名。如果不指定数据库,可以传递 `NULL` 或空字符串。
- `port`:数据库服务器监听的端口号。默认的端口号是 `6030`
```c
{{#include docs/examples/c/connect_example.c}}
```
在上面的示例代码中, `taos_connect()` 建立到客户端程序所在主机的 6030 端口的连接,`taos_close()`关闭当前连接,`taos_cleanup()`清除客户端驱动所申请和使用的资源。
:::note
- 如未特别说明,当 API 的返回值是整数时_0_ 代表成功,其它是代表失败原因的错误码,当返回值是指针时, _NULL_ 表示失败。
- 所有的错误码以及对应的原因描述在 `taoserror.h` 文件中。
:::
还提供了 `taos_connect_auth()` 函数用于使用 MD5 加密的密码建立与 TDengine 数据库的连接。此函数与 `taos_connect` 功能相同,不同之处在于密码的处理方式,`taos_connect_auth` 需要的是密码的 MD5 加密字符串。
</TabItem>

View File

@ -155,7 +155,7 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
介绍各语言连接器使用原生连接方式创建消费者。指定连接的服务器地址,设置自动提交,从最新消息开始消费,指定 `group.id``client.id` 等信息。有的语言的连接器还支持反序列化参数。
<Tabs groupId="lang">
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
```java
@ -391,7 +391,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
</Tabs>
### 原生连接
<Tabs groupId="lang">
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
@ -486,7 +486,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
<TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs:consumer_commit_manually}}
{{#include docs/examples/rust/restexample/examples/tmq.rs:consumer_commit_manually}}
```
可以通过 `consumer.commit` 方法来手工提交消费进度。
@ -507,7 +507,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
</Tabs>
### 原生连接
<Tabs groupId="lang">
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
@ -532,7 +532,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
<TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs:consumer_commit_manually}}
{{#include docs/examples/rust/restexample/examples/tmq.rs:consumer_commit_manually}}
```
可以通过 `consumer.commit` 方法来手工提交消费进度。
@ -679,7 +679,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
<TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs}}
{{#include docs/examples/rust/restexample/examples/tmq.rs}}
```
</TabItem>
@ -699,7 +699,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
</Tabs>
### 原生连接
<Tabs groupId="lang">
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
<details>
<summary>完整原生连接代码示例</summary>
@ -731,7 +731,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
<TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}}
{{#include docs/examples/rust/nativeexample/examples/tmq.rs}}
```
</TabItem>

View File

@ -56,14 +56,14 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器
### 使用 http (REST 或 WebSocket) 接口
| **功能特性** | **Java** | **Python** | **Go** | **C# ** | **Node.js** | **Rust** |
| ------------------------------ | -------- | ---------- | ------ | ------- | ----------- | -------- |
| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **普通查询** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **数据订阅TMQ** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **Schemaless** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **批量拉取(基于 WebSocket** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **功能特性** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** |
| ------------------------------ | -------- | ---------- | ------ | ------ | ----------- | -------- |
| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **普通查询** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **数据订阅TMQ** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **Schemaless** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **批量拉取(基于 WebSocket** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
:::warning

View File

@ -6,7 +6,10 @@ import com.taosdata.jdbc.tmq.*;
import java.sql.*;
import java.time.Duration;
import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -15,8 +18,7 @@ import java.util.concurrent.TimeUnit;
public class ConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopFlag = false;
static private volatile boolean stopThread = false;
public static TaosConsumer<ResultBean> getConsumer() throws SQLException {
// ANCHOR: create_consumer
@ -38,45 +40,16 @@ public class ConsumerLoopFull {
return new TaosConsumer<>(config);
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
} catch (Exception ex) {
System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers")
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers")
+ "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
}
// ANCHOR_END: create_consumer
}
public static void pollDataExample(TaosConsumer<ResultBean> consumer) throws SQLException {
try{
// subscribe to the topics
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
System.out.println("subscribe topics successfully");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
// unsubscribe the topics
consumer.unsubscribe();
System.out.println("unsubscribed topics successfully");
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex);
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex);
}
}
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: poll_data_code_piece
try {
@ -172,6 +145,7 @@ public class ConsumerLoopFull {
consumer.subscribe(topics);
// ANCHOR: unsubscribe_data_code_piece
try {
// unsubscribe the consumer
consumer.unsubscribe();
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
@ -182,6 +156,7 @@ public class ConsumerLoopFull {
throw new SQLException("Failed to unsubscribe consumer", ex);
}
finally {
// close the consumer
consumer.close();
}
// ANCHOR_END: unsubscribe_data_code_piece
@ -252,11 +227,11 @@ public class ConsumerLoopFull {
public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopFlag) {
i++;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (SQLException ex) {
@ -330,26 +305,43 @@ public class ConsumerLoopFull {
// submit a task
executor.submit(() -> {
try {
// please use one example at a time
TaosConsumer<ResultBean> consumer = getConsumer();
pollDataExample(consumer);
seekExample(consumer);
pollExample(consumer);
commitExample(consumer);
unsubscribeExample(consumer);
stopFlag = true;
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully");
});
prepareData();
closeConnection();
try {
TaosConsumer<ResultBean> consumer = getConsumer();
System.out.println("Data prepared successfully");
pollExample(consumer);
System.out.println("pollExample executed successfully");
consumer.unsubscribe();
seekExample(consumer);
System.out.println("seekExample executed successfully");
consumer.unsubscribe();
commitExample(consumer);
System.out.println("commitExample executed successfully");
consumer.unsubscribe();
unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();
@ -364,6 +356,7 @@ public class ConsumerLoopFull {
System.out.println("Wait executor termination failed.");
}
closeConnection();
System.out.println("program end.");
}
}

View File

@ -15,6 +15,7 @@ import java.util.concurrent.TimeUnit;
public class WsConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
public static TaosConsumer<ResultBean> getConsumer() throws SQLException {
// ANCHOR: create_consumer
@ -46,35 +47,6 @@ public class WsConsumerLoopFull {
// ANCHOR_END: create_consumer
}
public static void pollDataExample(TaosConsumer<ResultBean> consumer) throws SQLException {
try{
// subscribe to the topics
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
System.out.println("subscribe topics successfully");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
// unsubscribe the topics
consumer.unsubscribe();
System.out.println("unsubscribed topics successfully");
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex);
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex);
}
}
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
// ANCHOR: poll_data_code_piece
try {
@ -170,6 +142,7 @@ public class WsConsumerLoopFull {
consumer.subscribe(topics);
// ANCHOR: unsubscribe_data_code_piece
try {
// unsubscribe the consumer
consumer.unsubscribe();
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
@ -180,6 +153,7 @@ public class WsConsumerLoopFull {
throw new SQLException("Failed to unsubscribe consumer", ex);
}
finally {
// close the consumer
consumer.close();
}
// ANCHOR_END: unsubscribe_data_code_piece
@ -249,10 +223,12 @@ public class WsConsumerLoopFull {
public static void prepareData() throws SQLException, InterruptedException {
try {
for (int i = 0; i < 3000; i++) {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (SQLException ex) {
@ -326,32 +302,44 @@ public class WsConsumerLoopFull {
// submit a task
executor.submit(() -> {
try {
// please use one example at a time
TaosConsumer<ResultBean> consumer = getConsumer();
pollDataExample(consumer);
seekExample(consumer);
consumer.unsubscribe();
pollExample(consumer);
consumer.unsubscribe();
commitExample(consumer);
consumer.unsubscribe();
unsubscribeExample(consumer);
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully");
});
prepareData();
closeConnection();
try {
TaosConsumer<ResultBean> consumer = getConsumer();
System.out.println("Data prepared successfully");
pollExample(consumer);
System.out.println("pollExample executed successfully");
consumer.unsubscribe();
seekExample(consumer);
System.out.println("seekExample executed successfully");
consumer.unsubscribe();
commitExample(consumer);
System.out.println("commitExample executed successfully");
consumer.unsubscribe();
unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();
@ -366,6 +354,7 @@ public class WsConsumerLoopFull {
System.out.println("Wait executor termination failed.");
}
closeConnection();
System.out.println("program end.");
}
}