Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0

This commit is contained in:
Hongze Cheng 2024-02-21 19:26:31 +08:00
commit ccd65fd495
3 changed files with 551 additions and 548 deletions

View File

@ -20,13 +20,13 @@ import CDemo from "./_sub_c.mdx";
# 介绍
## 主题
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../12-taos-sql/13-tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../taos-sql/tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。
如下图,每个 topic 涉及到的数据表可能分布在多个 vnode相当于 kafka 里的 partition 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。
![img_5.png](img_5.png)
TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../12-taos-sql/02-database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。
TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../taos-sql/database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。
对于 `SELECT` 语句形式的 topic在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
@ -66,7 +66,7 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
# 语法说明
具体的语法参见 [数据订阅](../../12-taos-sql/13-tmq)
具体的语法参见 [数据订阅](../../taos-sql/tmq)
# 消费参数
@ -91,7 +91,6 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
@ -147,6 +146,9 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
DLL_EXPORT const char *tmq_err2str(int32_t code);
```
下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
</TabItem>
<TabItem value="java" label="Java">
@ -304,6 +306,7 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
void Close()
```
</TabItem>
</Tabs>
@ -334,8 +337,8 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
对于不同编程语言,其设置方式如下:
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
@ -352,8 +355,8 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
```
</TabItem>
</TabItem>
<TabItem value="java" label="Java">
对于 Java 程序,还可以使用如下配置项:
@ -387,6 +390,7 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
```
</TabItem>
<TabItem label="Go" value="Go">
@ -500,7 +504,6 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
一个 consumer 支持同时订阅多个 topic。
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
@ -580,7 +583,6 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
以下代码展示了不同语言下如何对 TMQ 消息进行消费。
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
@ -717,7 +719,6 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
消费结束后,应当取消订阅。
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c

View File

@ -1545,6 +1545,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
}
mInfo("stream:%s,%"PRId64 " start to pause stream", pauseReq.name, pStream->uid);
if (pStream->status == STREAM_STATUS__PAUSE) {
sdbRelease(pMnode->pSdb, pStream);
return 0;

View File

@ -211,7 +211,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
mInfo("receive pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name);
mInfo("receive pause stream:%s, %s, %"PRId64 ", because grant expired", pStream->name, reqPause.name, pStream->uid);
}
sdbRelease(pSdb, pStream);