Merge remote-tracking branch 'origin/docs/wade-3.0' into docs/wade-3.0

This commit is contained in:
t_max 2024-08-02 17:13:04 +08:00
commit c052d64a5e
21 changed files with 861 additions and 874 deletions

View File

@ -64,5 +64,4 @@ while let Some(row) = rows.try_next().await? {
// ANCHOR: query_with_req_id // ANCHOR: query_with_req_id
let result = taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", 1).await?; let result = taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", 1).await?;
// ANCHOR_END: query_with_req_id // ANCHOR_END: query_with_req_id
} }

View File

@ -38,7 +38,7 @@ async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030"; let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?; let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?; let taos = builder.build().await?;
let db = "tmq"; let db = "tmq";
// prepare database // prepare database
@ -61,9 +61,10 @@ async fn main() -> anyhow::Result<()> {
// subscribe // subscribe
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
let mut consumer = tmq.build()?; let mut consumer = tmq.build().await?;
consumer.subscribe(["tmq_meters"]).await?; consumer.subscribe(["tmq_meters"]).await?;
// ANCHOR: consumer_commit_manually
consumer consumer
.stream() .stream()
.try_for_each(|(offset, message)| async { .try_for_each(|(offset, message)| async {
@ -78,10 +79,53 @@ async fn main() -> anyhow::Result<()> {
println!("** read {} records: {:#?}\n", records.len(), records); println!("** read {} records: {:#?}\n", records.len(), records);
} }
} }
// commit offset manually when you have processed the message.
consumer.commit(offset).await?; consumer.commit(offset).await?;
Ok(()) Ok(())
}) })
.await?; .await?;
// ANCHOR_END: consumer_commit_manually
// ANCHOR: assignments
let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);
// ANCHOR_END: assignments
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::debug!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
// ANCHOR: seek_offset
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
// ANCHOR_END: seek_offset
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::debug!("topic assignment: {:?}", topic_assignment);
}
// after seek offset
let assignments = consumer.assignments().await.unwrap();
log::info!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await; consumer.unsubscribe().await;

View File

@ -9,9 +9,11 @@ async fn main() -> anyhow::Result<()> {
.filter_level(log::LevelFilter::Info) .filter_level(log::LevelFilter::Info)
.init(); .init();
use taos_query::prelude::*; use taos_query::prelude::*;
// ANCHOR: create_consumer_dsn
let dsn = "taos://localhost:6030".to_string(); let dsn = "taos://localhost:6030".to_string();
log::info!("dsn: {}", dsn); log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?; let mut dsn = Dsn::from_str(&dsn)?;
// ANCHOR_END: create_consumer_dsn
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?; let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
@ -43,19 +45,21 @@ async fn main() -> anyhow::Result<()> {
.await?; .await?;
// ANCHOR_END: create_topic // ANCHOR_END: create_topic
// ANCHOR: create_consumer // ANCHOR: create_consumer_ac
dsn.params.insert("group.id".to_string(), "abc".to_string()); dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string()); dsn.params.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), "group1".to_string());
dsn.params.insert("client.id".to_string(), "client1".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?; let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?; let mut consumer = builder.build().await?;
// ANCHOR_END: create_consumer // ANCHOR_END: create_consumer_ac
// ANCHOR: subscribe
consumer.subscribe(["topic_meters"]).await?;
// ANCHOR_END: subscribe
// ANCHOR: consume // ANCHOR: consume
consumer.subscribe(["topic_meters"]).await?;
{ {
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1)); let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
@ -65,7 +69,7 @@ async fn main() -> anyhow::Result<()> {
let database = offset.database(); let database = offset.database();
let vgroup_id = offset.vgroup_id(); let vgroup_id = offset.vgroup_id();
log::debug!( log::debug!(
"topic: {}, database: {}, vgroup_id: {}", "receive message from: topic: {}, database: {}, vgroup_id: {}",
topic, topic,
database, database,
vgroup_id vgroup_id
@ -80,13 +84,13 @@ async fn main() -> anyhow::Result<()> {
let json = meta.as_json_meta().await?; let json = meta.as_json_meta().await?;
let sql = json.to_string(); let sql = json.to_string();
if let Err(err) = taos.exec(sql).await { if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err); println!("maybe error in handling meta message: {}", err);
} }
} }
MessageSet::Data(data) => { MessageSet::Data(data) => {
log::info!("Data"); log::info!("Data");
while let Some(data) = data.fetch_raw_block().await? { while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data); log::debug!("message data: {:?}", data);
} }
} }
MessageSet::MetaData(meta, data) => { MessageSet::MetaData(meta, data) => {
@ -97,23 +101,23 @@ async fn main() -> anyhow::Result<()> {
let json = meta.as_json_meta().await?; let json = meta.as_json_meta().await?;
let sql = json.to_string(); let sql = json.to_string();
if let Err(err) = taos.exec(sql).await { if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err); println!("maybe error in handling metadata message: {}", err);
} }
while let Some(data) = data.fetch_raw_block().await? { while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data); log::debug!("message data: {:?}", data);
} }
} }
} }
// commit offset manually when handling message successfully
consumer.commit(offset).await?; consumer.commit(offset).await?;
} }
} }
// ANCHOR_END: consume // ANCHOR_END: consume
// ANCHOR: assignments // ANCHOR: seek_offset
let assignments = consumer.assignments().await.unwrap(); let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments); log::info!("start assignments: {:?}", assignments);
// ANCHOR_END: assignments
// seek offset // seek offset
for topic_vec_assignment in assignments { for topic_vec_assignment in assignments {
@ -132,14 +136,14 @@ async fn main() -> anyhow::Result<()> {
begin, begin,
end end
); );
// ANCHOR: seek_offset
let res = consumer.offset_seek(topic, vgroup_id, end).await; // seek offset of the (topic, vgroup_id) to the begin
let res = consumer.offset_seek(topic, vgroup_id, begin).await;
if res.is_err() { if res.is_err() {
log::error!("seek offset error: {:?}", res); log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap(); let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a); log::error!("assignments: {:?}", a);
} }
// ANCHOR_END: seek_offset
} }
let topic_assignment = consumer.topic_assignment(topic).await; let topic_assignment = consumer.topic_assignment(topic).await;
@ -149,6 +153,7 @@ async fn main() -> anyhow::Result<()> {
// after seek offset // after seek offset
let assignments = consumer.assignments().await.unwrap(); let assignments = consumer.assignments().await.unwrap();
log::info!("after seek offset assignments: {:?}", assignments); log::info!("after seek offset assignments: {:?}", assignments);
// ANCHOR_END: seek_offset
// ANCHOR: unsubscribe // ANCHOR: unsubscribe
consumer.unsubscribe().await; consumer.unsubscribe().await;

View File

@ -8,5 +8,7 @@ anyhow = "1"
chrono = "0.4" chrono = "0.4"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
log = "0.4"
pretty_env_logger = "0.5.0"
taos = { version = "0.*" } taos = { version = "0.*" }

View File

@ -0,0 +1,135 @@
use std::time::Duration;
use chrono::{DateTime, Local};
use taos::*;
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
}
async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "ws://localhost:6041";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build().await?;
let db = "tmq";
// prepare database
taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"),
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"),
format!("USE `{db}`"),
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
// create topic for subscription
format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`")
])
.await?;
let task = tokio::spawn(prepare(taos));
tokio::time::sleep(Duration::from_secs(1)).await;
// subscribe
let tmq = TmqBuilder::from_dsn("ws://localhost:6041/?group.id=test")?;
let mut consumer = tmq.build().await?;
consumer.subscribe(["tmq_meters"]).await?;
// ANCHOR: consumer_commit_manually
consumer
.stream()
.try_for_each(|(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
consumer.commit(offset).await?;
Ok(())
})
.await?;
// ANCHOR_END: consumer_commit_manually
// ANCHOR: assignments
let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);
// ANCHOR_END: assignments
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::debug!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
// ANCHOR: seek_offset
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
// ANCHOR_END: seek_offset
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::debug!("topic assignment: {:?}", topic_assignment);
}
// after seek offset
let assignments = consumer.assignments().await.unwrap();
log::info!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await;
task.await??;
Ok(())
}

View File

@ -9,9 +9,11 @@ async fn main() -> anyhow::Result<()> {
.filter_level(log::LevelFilter::Info) .filter_level(log::LevelFilter::Info)
.init(); .init();
use taos_query::prelude::*; use taos_query::prelude::*;
// ANCHOR: create_consumer_dsn
let dsn = "ws://localhost:6041".to_string(); let dsn = "ws://localhost:6041".to_string();
log::info!("dsn: {}", dsn); log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?; let mut dsn = Dsn::from_str(&dsn)?;
// ANCHOR_END: create_consumer_dsn
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?; let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
@ -40,13 +42,17 @@ async fn main() -> anyhow::Result<()> {
.await?; .await?;
// ANCHOR_END: create_topic // ANCHOR_END: create_topic
// ANCHOR: create_consumer // ANCHOR: create_consumer_ac
dsn.params.insert("group.id".to_string(), "abc".to_string()); dsn.params.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string()); dsn.params.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), "group1".to_string());
dsn.params.insert("client.id".to_string(), "client1".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?; let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?; let mut consumer = builder.build().await?;
// ANCHOR_END: create_consumer // ANCHOR_END: create_consumer_ac
// ANCHOR: subscribe // ANCHOR: subscribe
consumer.subscribe(["topic_meters"]).await?; consumer.subscribe(["topic_meters"]).await?;
@ -54,56 +60,23 @@ async fn main() -> anyhow::Result<()> {
// ANCHOR: consume // ANCHOR: consume
{ {
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1)); consumer
.stream()
while let Some((offset, message)) = stream.try_next().await? { .try_for_each(|(offset, message)| async {
let topic = offset.topic();
let topic: &str = offset.topic(); // the vgroup id, like partition id in kafka.
let database = offset.database();
let vgroup_id = offset.vgroup_id(); let vgroup_id = offset.vgroup_id();
log::debug!( println!("* in vgroup id {vgroup_id} of topic {topic}\n");
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message { if let Some(data) = message.into_data() {
MessageSet::Meta(meta) => { while let Some(block) = data.fetch_raw_block().await? {
log::info!("Meta"); let records: Vec<Record> = block.deserialize().try_collect()?;
let raw = meta.as_raw_meta().await?; println!("** read {} records: {:#?}\n", records.len(), records);
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
} }
} }
MessageSet::Data(data) => { Ok(())
log::info!("Data"); })
while let Some(data) = data.fetch_raw_block().await? { .await?;
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::info!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
} }
// ANCHOR_END: consume // ANCHOR_END: consume

View File

@ -13,8 +13,6 @@ import ConnNode from "./_connect_node.mdx";
import ConnPythonNative from "./_connect_python.mdx"; import ConnPythonNative from "./_connect_python.mdx";
import ConnCSNative from "./_connect_cs.mdx"; import ConnCSNative from "./_connect_cs.mdx";
import ConnC from "./_connect_c.mdx"; import ConnC from "./_connect_c.mdx";
import ConnR from "./_connect_r.mdx";
import ConnPHP from "./_connect_php.mdx";
import InstallOnLinux from "../../14-reference/05-connector/_linux_install.mdx"; import InstallOnLinux from "../../14-reference/05-connector/_linux_install.mdx";
import InstallOnWindows from "../../14-reference/05-connector/_windows_install.mdx"; import InstallOnWindows from "../../14-reference/05-connector/_windows_install.mdx";
import InstallOnMacOS from "../../14-reference/05-connector/_macos_install.mdx"; import InstallOnMacOS from "../../14-reference/05-connector/_macos_install.mdx";
@ -249,62 +247,12 @@ dotnet add package TDengine.Connector
::: :::
</TabItem>
<TabItem label="R" value="r">
1. 下载 [taos-jdbcdriver-version-dist.jar](https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.0.0/)。
2. 安装 R 的依赖包`RJDBC`
```R
install.packages("RJDBC")
```
</TabItem> </TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
如果已经安装了 TDengine 服务端软件或 TDengine 客户端驱动 taosc 那么已经安装了 C 连接器,无需额外操作。 如果已经安装了 TDengine 服务端软件或 TDengine 客户端驱动 taosc 那么已经安装了 C 连接器,无需额外操作。
<br/> <br/>
</TabItem>
<TabItem label="PHP" value="php">
**下载代码并解压:**
```shell
curl -L -o php-tdengine.tar.gz https://github.com/Yurunsoft/php-tdengine/archive/refs/tags/v1.0.2.tar.gz \
&& mkdir php-tdengine \
&& tar -xzf php-tdengine.tar.gz -C php-tdengine --strip-components=1
```
> 版本 `v1.0.2` 只是示例,可替换为任意更新的版本,可在 [TDengine PHP Connector 发布历史](https://github.com/Yurunsoft/php-tdengine/releases) 中查看可用版本。
**非 Swoole 环境:**
```shell
phpize && ./configure && make -j && make install
```
**手动指定 TDengine 目录:**
```shell
phpize && ./configure --with-tdengine-dir=/usr/local/Cellar/tdengine/3.0.0.0 && make -j && make install
```
> `--with-tdengine-dir=` 后跟上 TDengine 目录。
> 适用于默认找不到的情况,或者 macOS 系统用户。
**Swoole 环境:**
```shell
phpize && ./configure --enable-swoole && make -j && make install
```
**启用扩展:**
方法一:在 `php.ini` 中加入 `extension=tdengine`
方法二:运行带参数 `php -d extension=tdengine test.php`
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -404,8 +352,6 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto
- `reconnectRetryCount`:重连次数,默认为 3。 - `reconnectRetryCount`:重连次数,默认为 3。
- `reconnectIntervalMs`:重连间隔毫秒时间,默认为 2000。 - `reconnectIntervalMs`:重连间隔毫秒时间,默认为 2000。
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
使用客户端驱动访问 TDengine 集群的基本过程为:建立连接、查询和写入、关闭连接、清除资源。 使用客户端驱动访问 TDengine 集群的基本过程为:建立连接、查询和写入、关闭连接、清除资源。
@ -433,10 +379,6 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto
::: :::
</TabItem>
<TabItem label="PHP" value="php">
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -475,15 +417,9 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto
{{#include docs/examples/csharp/wsConnect/Program.cs:main}} {{#include docs/examples/csharp/wsConnect/Program.cs:main}}
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
<ConnR/>
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
<ConnC /> <ConnC />
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
<ConnPHP />
</TabItem>
</Tabs> </Tabs>
### 原生连接 ### 原生连接
@ -513,15 +449,9 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto
{{#include docs/examples/csharp/connect/Program.cs:main}} {{#include docs/examples/csharp/connect/Program.cs:main}}
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
<ConnR/>
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
<ConnC /> <ConnC />
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
<ConnPHP />
</TabItem>
</Tabs> </Tabs>
@ -550,15 +480,9 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto
<TabItem label="C#" value="csharp"> <TabItem label="C#" value="csharp">
不支持 不支持
</TabItem> </TabItem>
<TabItem label="R" value="r">
<ConnR/>
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
<ConnC /> <ConnC />
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
<ConnPHP />
</TabItem>
</Tabs> </Tabs>
@ -642,14 +566,8 @@ let taos = pool.get()?;
<TabItem label="C#" value="csharp"> <TabItem label="C#" value="csharp">
不支持 不支持
</TabItem> </TabItem>
<TabItem label="R" value="r">
<ConnR/>
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
<ConnC /> <ConnC />
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
<ConnPHP />
</TabItem>
</Tabs> </Tabs>

View File

@ -13,9 +13,8 @@ TDengine 对 SQL 语言提供了全面的支持,允许用户以熟悉的 SQL
:::note :::note
REST 连接:各编程语言的连接器封装使用 `HTTP` 请求的连接,支持数据写入和查询操作。 REST 连接:各编程语言的连接器封装使用 `HTTP` 请求的连接,支持数据写入和查询操作,开发者依然使用连接器提供的接口访问 `TDengine`
REST API直接调用 `taosadapter` 提供的 REST API 接口,进行数据写入和查询操作。代码示例使用 `curl` 命令来演示。
REST API通过 `curl` 命令进行数据写入和查询操作。
::: :::
@ -68,16 +67,12 @@ REST API通过 `curl` 命令进行数据写入和查询操作。
{{#include docs/examples/csharp/wsInsert/Program.cs:create_db_and_table}} {{#include docs/examples/csharp/wsInsert/Program.cs:create_db_and_table}}
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
```c ```c
{{#include docs/examples/c/CCreateDBDemo.c:create_db_and_table}} {{#include docs/examples/c/CCreateDBDemo.c:create_db_and_table}}
``` ```
> **注意**:如果不使用 `USE power` 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 power.meters。 > **注意**:如果不使用 `USE power` 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 power.meters。
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
<TabItem label="REST API" value="rest"> <TabItem label="REST API" value="rest">
创建数据库 创建数据库
@ -149,8 +144,6 @@ NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW
{{#include docs/examples/csharp/wsInsert/Program.cs:insert_data}} {{#include docs/examples/csharp/wsInsert/Program.cs:insert_data}}
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
```c ```c
{{#include docs/examples/c/CInsertDataDemo.c:insert_data}} {{#include docs/examples/c/CInsertDataDemo.c:insert_data}}
@ -159,8 +152,6 @@ NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW
**Note** **Note**
NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW + 1s 代表客户端当前时间往后加 1 秒数字后面代表时间单位a毫秒smh小时dwny NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW + 1s 代表客户端当前时间往后加 1 秒数字后面代表时间单位a毫秒smh小时dwny
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
<TabItem label="REST API" value="rest"> <TabItem label="REST API" value="rest">
写入数据 写入数据
@ -222,15 +213,11 @@ curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql' \
{{#include docs/examples/csharp/wsInsert/Program.cs:select_data}} {{#include docs/examples/csharp/wsInsert/Program.cs:select_data}}
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
```c ```c
{{#include docs/examples/c/CQueryDataDemo.c:query_data}} {{#include docs/examples/c/CQueryDataDemo.c:query_data}}
``` ```
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
<TabItem label="REST API" value="rest"> <TabItem label="REST API" value="rest">
查询数据 查询数据
@ -288,15 +275,11 @@ reqId 可用于请求链路追踪reqId 就像分布式系统中的 traceId
{{#include docs/examples/csharp/wsInsert/Program.cs:query_id}} {{#include docs/examples/csharp/wsInsert/Program.cs:query_id}}
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
```c ```c
{{#include docs/examples/c/CWithReqIdDemo.c:with_reqid}} {{#include docs/examples/c/CWithReqIdDemo.c:with_reqid}}
``` ```
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
<TabItem label="REST API" value="rest"> <TabItem label="REST API" value="rest">
查询数据,指定 reqId 为 3 查询数据,指定 reqId 为 3

View File

@ -200,12 +200,8 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
{{#include docs/examples/csharp/wssml/Program.cs:main}} {{#include docs/examples/csharp/wssml/Program.cs:main}}
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>
### 原生连接 ### 原生连接
@ -237,12 +233,8 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
{{#include docs/examples/csharp/nativesml/Program.cs:main}} {{#include docs/examples/csharp/nativesml/Program.cs:main}}
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>
@ -263,12 +255,8 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
<TabItem label="C#" value="csharp"> <TabItem label="C#" value="csharp">
不支持 不支持
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>

View File

@ -60,15 +60,9 @@ import TabItem from "@theme/TabItem";
```csharp ```csharp
{{#include docs/examples/csharp/wsStmt/Program.cs:main}} {{#include docs/examples/csharp/wsStmt/Program.cs:main}}
``` ```
</TabItem>
<TabItem label="R" value="r">
</TabItem> </TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -98,15 +92,8 @@ import TabItem from "@theme/TabItem";
```csharp ```csharp
{{#include docs/examples/csharp/stmtInsert/Program.cs:main}} {{#include docs/examples/csharp/stmtInsert/Program.cs:main}}
``` ```
</TabItem>
<TabItem label="R" value="r">
</TabItem> </TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem> </TabItem>
</Tabs> </Tabs>

View File

@ -90,15 +90,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
其他参数见上表。 其他参数见上表。
</TabItem>
<TabItem label="R" value="r">
</TabItem> </TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -129,9 +123,13 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer_dsn}}
```
```rust ```rust
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer}} {{#include docs/examples/rust/restexample/examples/tmq.rs:create_consumer_ac}}
``` ```
</TabItem> </TabItem>
@ -149,16 +147,8 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -192,6 +182,13 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem> </TabItem>
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer_dsn}}
```
```rust
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:create_consumer_ac}}
```
</TabItem> </TabItem>
@ -201,17 +198,10 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>
## 订阅消费数据 ## 订阅消费数据
@ -244,7 +234,12 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem> </TabItem>
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
消费者可订阅一个或多个 `TOPIC`,一般建议一个消费者只订阅一个 `TOPIC`
TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型,可以使用相应 API 对每个消息进行消费,并通过 `.commit` 进行已消费标记。
```rust
{{#include docs/examples/rust/restexample/examples/tmq.rs:consume}}
```
</TabItem> </TabItem>
<TabItem label="Node.js" value="node"> <TabItem label="Node.js" value="node">
@ -260,16 +255,8 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -295,7 +282,7 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem> </TabItem>
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
同 Websocket 示例代码
</TabItem> </TabItem>
<TabItem label="C#" value="csharp"> <TabItem label="C#" value="csharp">
@ -304,17 +291,10 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>
## 指定订阅的 Offset ## 指定订阅的 Offset
@ -327,6 +307,10 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
```java ```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:consumer_seek}} {{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:consumer_seek}}
``` ```
1. 使用 consumer.poll 方法轮询数据,直到获取到数据为止。
2. 对于轮询到的第一批数据,打印第一条数据的内容,并获取当前消费者的分区分配信息。
3. 使用 consumer.seekToBeginning 方法将所有分区的偏移量重置到开始位置,并打印成功重置的消息。
4. 再次使用 consumer.poll 方法轮询数据,并打印第一条数据的内容。
</TabItem> </TabItem>
@ -345,6 +329,16 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/nativeexample/examples/tmq.rs:seek_offset}}
```
1. 通过调用 consumer.assignments() 方法获取消费者当前的分区分配信息,并记录初始分配状态。
2. 遍历每个分区分配信息对于每个分区提取主题topic、消费组IDvgroup_id、当前偏移量current、起始偏移量begin和结束偏移量end
记录这些信息。
1. 调用 consumer.offset_seek 方法将偏移量设置到起始位置。如果操作失败,记录错误信息和当前分配状态。
2. 在所有分区的偏移量调整完成后,再次获取并记录消费者的分区分配信息,以确认偏移量调整后的状态。
</TabItem> </TabItem>
<TabItem label="Node.js" value="node"> <TabItem label="Node.js" value="node">
@ -360,17 +354,10 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>
### 原生连接 ### 原生连接
@ -395,7 +382,7 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem> </TabItem>
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
同 Websocket 代码样例。
</TabItem> </TabItem>
<TabItem label="C#" value="csharp"> <TabItem label="C#" value="csharp">
@ -404,16 +391,8 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -421,6 +400,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
## 提交 Offset ## 提交 Offset
当消费者读取并处理完消息后,它可以提交 Offset这表示消费者已经成功处理到这个 Offset 的消息。Offset 提交可以是自动的(根据配置定期提交)或手动的(应用程序控制何时提交)。 当消费者读取并处理完消息后,它可以提交 Offset这表示消费者已经成功处理到这个 Offset 的消息。Offset 提交可以是自动的(根据配置定期提交)或手动的(应用程序控制何时提交)。
当创建消费者时,属性 `enable.auto.commit` 为 false 时,可以手动提交 offset。 当创建消费者时,属性 `enable.auto.commit` 为 false 时,可以手动提交 offset。
**注意**:手工提交消费进度前确保消息正常处理完成,否则处理出错的消息不会被再次消费。自动提交是在本次 `poll` 消息时可能会提交上次消息的消费进度,因此请确保消息处理完毕再进行下一次 `poll` 或消息获取。
### Websocket 连接 ### Websocket 连接
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java"> <TabItem value="java" label="Java">
@ -446,7 +428,11 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem> </TabItem>
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs:consumer_commit_manually}}
```
可以通过 `consumer.commit` 方法来手工提交消费进度。
</TabItem> </TabItem>
<TabItem label="Node.js" value="node"> <TabItem label="Node.js" value="node">
@ -462,18 +448,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>
### 原生连接 ### 原生连接
@ -499,7 +476,7 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem> </TabItem>
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
同 Websocket 代码样例。
</TabItem> </TabItem>
<TabItem label="Node.js" value="node"> <TabItem label="Node.js" value="node">
@ -512,18 +489,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>
@ -554,7 +522,11 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem> </TabItem>
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/restexample/examples/tmq.rs:unsubscribe}}
```
**注意**:消费者取消订阅后无法重用,如果想订阅新的 `topic` 请重新创建消费者。
</TabItem> </TabItem>
<TabItem label="Node.js" value="node"> <TabItem label="Node.js" value="node">
@ -570,18 +542,10 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>
### 原生连接 ### 原生连接
@ -606,7 +570,7 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem> </TabItem>
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
同 Websocket 代码样例。
</TabItem> </TabItem>
<TabItem label="Node.js" value="node"> <TabItem label="Node.js" value="node">
@ -619,18 +583,10 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>
@ -662,7 +618,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem> </TabItem>
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/restexample/examples/subscribe_demo.rs}}
```
</TabItem> </TabItem>
<TabItem label="Node.js" value="node"> <TabItem label="Node.js" value="node">
@ -678,18 +636,10 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>
### 原生连接 ### 原生连接
@ -722,7 +672,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem> </TabItem>
<TabItem label="Rust" value="rust"> <TabItem label="Rust" value="rust">
```rust
{{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}}
```
</TabItem> </TabItem>
<TabItem label="Node.js" value="node"> <TabItem label="Node.js" value="node">
@ -735,16 +687,7 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
``` ```
</TabItem> </TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c"> <TabItem label="C" value="c">
</TabItem> </TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs> </Tabs>

View File

@ -15,6 +15,7 @@ import java.util.concurrent.TimeUnit;
public class ConsumerLoopFull { public class ConsumerLoopFull {
static private Connection connection; static private Connection connection;
static private Statement statement; static private Statement statement;
public static TaosConsumer<ResultBean> getConsumer() throws SQLException { public static TaosConsumer<ResultBean> getConsumer() throws SQLException {
// ANCHOR: create_consumer // ANCHOR: create_consumer
Properties config = new Properties(); Properties config = new Properties();
@ -162,6 +163,7 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
} }
// ANCHOR_END: commit_code_piece // ANCHOR_END: commit_code_piece
} }
public static void unsubscribeExample() throws SQLException { public static void unsubscribeExample() throws SQLException {
TaosConsumer<ResultBean> consumer = getConsumer(); TaosConsumer<ResultBean> consumer = getConsumer();
List<String> topics = Collections.singletonList("topic_meters"); List<String> topics = Collections.singletonList("topic_meters");
@ -182,6 +184,7 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> { public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
} }
// use this class to define the data structure of the result record // use this class to define the data structure of the result record
public static class ResultBean { public static class ResultBean {
private Timestamp ts; private Timestamp ts;
@ -256,6 +259,7 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
throw new SQLException("Failed to insert data to power.meters", ex); throw new SQLException("Failed to insert data to power.meters", ex);
} }
} }
public static void prepareMeta() throws SQLException { public static void prepareMeta() throws SQLException {
try { try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
@ -288,6 +292,7 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
} }
System.out.println("Connection created successfully."); System.out.println("Connection created successfully.");
} }
public static void closeConnection() throws SQLException { public static void closeConnection() throws SQLException {
try { try {
if (statement != null) { if (statement != null) {
@ -337,11 +342,11 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
System.out.println("Data prepared successfully"); System.out.println("Data prepared successfully");
// 关闭线程池不再接收新任务 // close the executor, which will make the executor reject new tasks
executor.shutdown(); executor.shutdown();
try { try {
// 等待直到所有任务完成 // wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result; assert result;
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit;
public class WsConsumerLoopFull { public class WsConsumerLoopFull {
static private Connection connection; static private Connection connection;
static private Statement statement; static private Statement statement;
public static TaosConsumer<ResultBean> getConsumer() throws SQLException { public static TaosConsumer<ResultBean> getConsumer() throws SQLException {
// ANCHOR: create_consumer // ANCHOR: create_consumer
Properties config = new Properties(); Properties config = new Properties();
@ -163,6 +164,7 @@ try (TaosConsumer<ResultBean> consumer = getConsumer()){
} }
// ANCHOR_END: commit_code_piece // ANCHOR_END: commit_code_piece
} }
public static void unsubscribeExample() throws SQLException { public static void unsubscribeExample() throws SQLException {
TaosConsumer<ResultBean> consumer = getConsumer(); TaosConsumer<ResultBean> consumer = getConsumer();
List<String> topics = Collections.singletonList("topic_meters"); List<String> topics = Collections.singletonList("topic_meters");
@ -183,6 +185,7 @@ try {
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> { public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
} }
// use this class to define the data structure of the result record // use this class to define the data structure of the result record
public static class ResultBean { public static class ResultBean {
private Timestamp ts; private Timestamp ts;
@ -257,6 +260,7 @@ public static class ResultBean {
throw new SQLException("Failed to insert data to power.meters", ex); throw new SQLException("Failed to insert data to power.meters", ex);
} }
} }
public static void prepareMeta() throws SQLException { public static void prepareMeta() throws SQLException {
try { try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
@ -289,6 +293,7 @@ public static class ResultBean {
} }
System.out.println("Connection created successfully."); System.out.println("Connection created successfully.");
} }
public static void closeConnection() throws SQLException { public static void closeConnection() throws SQLException {
try { try {
if (statement != null) { if (statement != null) {
@ -338,11 +343,11 @@ public static class ResultBean {
System.out.println("Data prepared successfully"); System.out.println("Data prepared successfully");
// 关闭线程池不再接收新任务 // close the executor, which will make the executor reject new tasks
executor.shutdown(); executor.shutdown();
try { try {
// 等待直到所有任务完成 // wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result; assert result;
} catch (InterruptedException e) { } catch (InterruptedException e) {