Merge branch '3.0' into fix/TD-28376-3.0

This commit is contained in:
kailixu 2024-02-25 15:26:33 +08:00
commit 3a34697249
10 changed files with 48 additions and 27 deletions

View File

@ -15,11 +15,11 @@ import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx"; import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx"; import CDemo from "./_sub_c.mdx";
为了帮助应用实时获取写入 TDengine 的数据或者以事件到达顺序处理数据TDengine 提供了类似 kafka 的数据订阅功能。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 为了帮助应用实时获取写入 TDengine 的数据或者以事件到达顺序处理数据TDengine 提供了类似 kafka 的数据订阅功能。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
# 介绍 ## 数据订阅介绍
## 主题
### 主题
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../taos-sql/tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。 与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../taos-sql/tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。
如下图,每个 topic 涉及到的数据表可能分布在多个 vnode相当于 kafka 里的 partition 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。 如下图,每个 topic 涉及到的数据表可能分布在多个 vnode相当于 kafka 里的 partition 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。
@ -30,11 +30,12 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
对于 `SELECT` 语句形式的 topic在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 对于 `SELECT` 语句形式的 topic在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
## 生产者 ### 生产者
写入 topic 相关联的数据表中数据的都是生产者,生产者实际生产的数据写入到了子表或普通表中,即表所在 vnode 的 WAL 里。 写入 topic 相关联的数据表中数据的都是生产者,生产者实际生产的数据写入到了子表或普通表中,即表所在 vnode 的 WAL 里。
## 消费者 ### 消费者
### 消费者组
#### 消费者组
消费者订阅 topic 后,可以消费 topic 里的所有数据(这些数据所在的表可能分布在多个 vnode 上,即 db 所在的所有 vnode。订阅 topic 时,需要指定一个消费者组 (consumer group),如果这个消费者组里只有一个消费者,那么这个消费者会顺序的消费这些 vnode 上的数据。 消费者订阅 topic 后,可以消费 topic 里的所有数据(这些数据所在的表可能分布在多个 vnode 上,即 db 所在的所有 vnode。订阅 topic 时,需要指定一个消费者组 (consumer group),如果这个消费者组里只有一个消费者,那么这个消费者会顺序的消费这些 vnode 上的数据。
为了提高消费速度,便于多线程、分布式地消费数据,可以在一个消费组里添加多个消费者,这些消费者将均分数据所在的 vnode 进行消费(比如数据分布在 4 个 vnode 上,有 2 个消费者的话,那么每个消费者消费 2 个 vnode有 3 个消费者的话2 个消费者各消费 1 个 vnode1 个消费者消费 2 个 vnode有 5 个消费者的话4 个各分配 1 个 vnode 消费,另外 1 个不消费),如下图: 为了提高消费速度,便于多线程、分布式地消费数据,可以在一个消费组里添加多个消费者,这些消费者将均分数据所在的 vnode 进行消费(比如数据分布在 4 个 vnode 上,有 2 个消费者的话,那么每个消费者消费 2 个 vnode有 3 个消费者的话2 个消费者各消费 1 个 vnode1 个消费者消费 2 个 vnode有 5 个消费者的话4 个各分配 1 个 vnode 消费,另外 1 个不消费),如下图:
@ -44,7 +45,8 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
在一个消费组里添加一个消费者后,在 Mnode 上通过 rebalance 的机制实现消费者的重新分配,该操作对用户是透明的。 在一个消费组里添加一个消费者后,在 Mnode 上通过 rebalance 的机制实现消费者的重新分配,该操作对用户是透明的。
一个消费者可以订阅多个 topic。TDengine 的数据订阅在宕机、重启等复杂环境下确保 at least once 消费。 一个消费者可以订阅多个 topic。TDengine 的数据订阅在宕机、重启等复杂环境下确保 at least once 消费。
### 消费进度
#### 消费进度
在 topic 的一个消费组的一个 vnode 上有消费进度。消费者消费的同时,可以提交消费进度,消费进度即 vnode 上 WAL 的版本号(对于 kafka 里的 offset消费进度可以手动提交也可以通过参数auto.commit.interval.ms设置为周期性自动提交。 在 topic 的一个消费组的一个 vnode 上有消费进度。消费者消费的同时,可以提交消费进度,消费进度即 vnode 上 WAL 的版本号(对于 kafka 里的 offset消费进度可以手动提交也可以通过参数auto.commit.interval.ms设置为周期性自动提交。
首次消费数据时通过订阅参数auto.offset.reset来确定消费位置为最新数据latest还是最旧数据earliest 首次消费数据时通过订阅参数auto.offset.reset来确定消费位置为最新数据latest还是最旧数据earliest
@ -59,16 +61,16 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
作为一个数据库产品, WAL 文件中存储的不全是数据,也包括其他写入消息,元数据等,所以消费进度不是连续的。 作为一个数据库产品, WAL 文件中存储的不全是数据,也包括其他写入消息,元数据等,所以消费进度不是连续的。
##说明 ### 说明
从3.2.0.0版本开始数据订阅支持vnode迁移和分裂。 从3.2.0.0版本开始数据订阅支持vnode迁移和分裂。
由于数据订阅依赖wal文件而在vnode迁移和分裂的过程中wal并不会同步过去所以迁移或分裂后之前没消费完的wal数据后消费不到。所以请保证迁移和分裂之前把数据全部消费完后再进行vnode迁移或分裂否则消费会丢失数据。 由于数据订阅依赖wal文件而在vnode迁移和分裂的过程中wal并不会同步过去所以迁移或分裂后之前没消费完的wal数据后消费不到。所以请保证迁移和分裂之前把数据全部消费完后再进行vnode迁移或分裂否则消费会丢失数据。
# 语法说明 ## 数据订阅语法说明
具体的语法参见 [数据订阅](../../taos-sql/tmq) 具体的语法参见 [数据订阅](../../taos-sql/tmq)
# 消费参数 ## 数据订阅相关参数
消费参数主要用于消费者创建时指定,基础配置项如下表所示: 消费参数主要用于消费者创建时指定,基础配置项如下表所示:
@ -86,9 +88,9 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句从3.2.0.0版本该参数废弃恒为true |默认关闭 | | `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句从3.2.0.0版本该参数废弃恒为true |默认关闭 |
| `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 | | `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 |
# 主要数据结构和 API 接口 ## 数据订阅主要 API 接口
不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer 不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节数据订阅部分注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C"> <TabItem value="c" label="C">
@ -310,8 +312,8 @@ void Close()
</TabItem> </TabItem>
</Tabs> </Tabs>
# 数据订阅示例 ## 数据订阅示例
## 写入数据 ### 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: 首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
@ -324,7 +326,7 @@ CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
``` ```
## 创建 topic ### 创建 topic
使用 SQL 创建一个 topic 使用 SQL 创建一个 topic
@ -332,7 +334,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
``` ```
## 创建消费者 *consumer* ### 创建消费者 consumer
对于不同编程语言,其设置方式如下: 对于不同编程语言,其设置方式如下:
@ -499,7 +501,7 @@ var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
上述配置中包括 consumer group ID如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group共享消费进度。 上述配置中包括 consumer group ID如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group共享消费进度。
## 订阅 *topics* ### 订阅 topics
一个 consumer 支持同时订阅多个 topic。 一个 consumer 支持同时订阅多个 topic。
@ -578,7 +580,7 @@ consumer.Subscribe(topics);
</Tabs> </Tabs>
## 消费 ### 消费
以下代码展示了不同语言下如何对 TMQ 消息进行消费。 以下代码展示了不同语言下如何对 TMQ 消息进行消费。
@ -714,7 +716,7 @@ while (true)
</Tabs> </Tabs>
## 结束消费 ### 结束消费
消费结束后,应当取消订阅。 消费结束后,应当取消订阅。
@ -795,7 +797,7 @@ consumer.Close();
</Tabs> </Tabs>
## 完整示例代码 ### 完整示例代码
以下是各语言的完整示例代码。 以下是各语言的完整示例代码。
@ -838,8 +840,8 @@ consumer.Close();
</Tabs> </Tabs>
#订阅高级功能 ## 数据订阅高级功能
##数据回放 ### 数据回放
- 订阅支持 replay 功能,按照数据写入的时间回放。 - 订阅支持 replay 功能,按照数据写入的时间回放。
比如,如下时间写入三条数据 比如,如下时间写入三条数据
```sql ```sql

View File

@ -6,3 +6,4 @@
``` ```
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```

View File

@ -757,6 +757,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D) #define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D)
#define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E) #define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E)
#define TSDB_CODE_PAR_ORDERBY_AMBIGUOUS TAOS_DEF_ERROR_CODE(0, 0x266F) #define TSDB_CODE_PAR_ORDERBY_AMBIGUOUS TAOS_DEF_ERROR_CODE(0, 0x266F)
#define TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT TAOS_DEF_ERROR_CODE(0, 0x2670)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner //planner

View File

@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeQueryThreads = 4;
float tsRatioOfVnodeStreamThreads = 0.5F; float tsRatioOfVnodeStreamThreads = 1.5F;
int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeQueryThreads = 4;

View File

@ -1100,7 +1100,10 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->rawWritten) return 0; if (pAction->rawWritten) return 0;
if (topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH; if (topHalf) {
terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH;
return TSDB_CODE_MND_TRANS_CTX_SWITCH;
}
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {

View File

@ -1672,11 +1672,11 @@ static int32_t dataTypeComp(const SDataType* l, const SDataType* r) {
static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
if (isMultiResFunc(pOp->pLeft)) { if (isMultiResFunc(pOp->pLeft)) {
generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName); generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, ((SExprNode*)(pOp->pLeft))->userAlias);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
if (isMultiResFunc(pOp->pRight)) { if (isMultiResFunc(pOp->pRight)) {
generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, ((SExprNode*)(pOp->pRight))->userAlias);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }

View File

@ -192,6 +192,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Out of memory"; return "Out of memory";
case TSDB_CODE_PAR_ORDERBY_AMBIGUOUS: case TSDB_CODE_PAR_ORDERBY_AMBIGUOUS:
return "ORDER BY \"%s\" is ambiguous"; return "ORDER BY \"%s\" is ambiguous";
case TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT:
return "Operator not supported multi result: %s";
default: default:
return "Unknown error"; return "Unknown error";
} }

View File

@ -381,7 +381,7 @@ TEST_F(ParserSelectTest, semanticCheck) {
// TSDB_CODE_PAR_WRONG_VALUE_TYPE // TSDB_CODE_PAR_WRONG_VALUE_TYPE
run("SELECT timestamp '2010a' FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE); run("SELECT timestamp '2010a' FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE);
run("SELECT LAST(*) + SUM(c1) FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE); run("SELECT LAST(*) + SUM(c1) FROM t1", TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT);
run("SELECT CEIL(LAST(ts, c1)) FROM t1", TSDB_CODE_FUNC_FUNTION_PARA_NUM); run("SELECT CEIL(LAST(ts, c1)) FROM t1", TSDB_CODE_FUNC_FUNTION_PARA_NUM);

View File

@ -494,6 +494,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pRange->range.maxVer = ver; pRange->range.maxVer = ver;
pRange->range.minVer = ver; pRange->range.minVer = ver;
} else { } else {
// the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
// is set at the mnode.
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
pChkInfo->checkpointVer = pRange->range.maxVer; pChkInfo->checkpointVer = pRange->range.maxVer;
pChkInfo->processedVer = pRange->range.maxVer; pChkInfo->processedVer = pRange->range.maxVer;
@ -502,6 +504,15 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pChkInfo->checkpointVer = pRange->range.minVer - 1; pChkInfo->checkpointVer = pRange->range.minVer - 1;
pChkInfo->processedVer = pRange->range.minVer - 1; pChkInfo->processedVer = pRange->range.minVer - 1;
pChkInfo->nextProcessVer = pRange->range.minVer; pChkInfo->nextProcessVer = pRange->range.minVer;
{ // for compatible purpose, remove it later
if (pRange->range.minVer == 0) {
pChkInfo->checkpointVer = 0;
pChkInfo->processedVer = 0;
pChkInfo->nextProcessVer = 1;
stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
}
}
} }
} }
} }

View File

@ -619,6 +619,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream quer
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, "Operator not supported multi result")
//planner //planner
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error") TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")