commit
7a90754232
|
@ -20,13 +20,13 @@ import CDemo from "./_sub_c.mdx";
|
|||
|
||||
# 介绍
|
||||
## 主题
|
||||
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../12-taos-sql/13-tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
|
||||
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../taos-sql/tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
|
||||
|
||||
如下图,每个 topic 涉及到的数据表可能分布在多个 vnode(相当于 kafka 里的 partition) 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中,WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。
|
||||
|
||||

|
||||
|
||||
TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../12-taos-sql/02-database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。
|
||||
TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../taos-sql/database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。
|
||||
|
||||
对于 `SELECT` 语句形式的 topic,在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
|
||||
|
||||
|
@ -66,7 +66,7 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
|
|||
|
||||
# 语法说明
|
||||
|
||||
具体的语法参见 [数据订阅](../../12-taos-sql/13-tmq)
|
||||
具体的语法参见 [数据订阅](../../taos-sql/tmq)
|
||||
|
||||
# 消费参数
|
||||
|
||||
|
|
|
@ -1545,6 +1545,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
mInfo("stream:%s,%"PRId64 " start to pause stream", pauseReq.name, pStream->uid);
|
||||
|
||||
if (pStream->status == STREAM_STATUS__PAUSE) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return 0;
|
||||
|
|
|
@ -211,7 +211,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
|
|||
};
|
||||
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
mInfo("receive pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name);
|
||||
mInfo("receive pause stream:%s, %s, %"PRId64 ", because grant expired", pStream->name, reqPause.name, pStream->uid);
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pStream);
|
||||
|
|
Loading…
Reference in New Issue