diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index e4dc177388..881838f8f0 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -15,11 +15,11 @@ import Node from "./_sub_node.mdx"; import CSharp from "./_sub_cs.mdx"; import CDemo from "./_sub_c.mdx"; - 为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似 kafka 的数据订阅功能。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 -# 介绍 -## 主题 +## 数据订阅介绍 + +### 主题 与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../taos-sql/tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。 如下图,每个 topic 涉及到的数据表可能分布在多个 vnode(相当于 kafka 里的 partition) 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中,WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。 @@ -30,11 +30,12 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提 对于 `SELECT` 语句形式的 topic,在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 -## 生产者 +### 生产者 写入 topic 相关联的数据表中数据的都是生产者,生产者实际生产的数据写入到了子表或普通表中,即表所在 vnode 的 WAL 里。 -## 消费者 -### 消费者组 +### 消费者 + +#### 消费者组 消费者订阅 topic 后,可以消费 topic 里的所有数据(这些数据所在的表可能分布在多个 vnode 上,即 db 所在的所有 vnode)。订阅 topic 时,需要指定一个消费者组 (consumer group),如果这个消费者组里只有一个消费者,那么这个消费者会顺序的消费这些 vnode 上的数据。 为了提高消费速度,便于多线程、分布式地消费数据,可以在一个消费组里添加多个消费者,这些消费者将均分数据所在的 vnode 进行消费(比如数据分布在 4 个 vnode 上,有 2 个消费者的话,那么每个消费者消费 2 个 vnode;有 3 个消费者的话,2 个消费者各消费 1 个 vnode,1 个消费者消费 2 个 vnode;有 5 个消费者的话,4 个各分配 1 个 vnode 消费,另外 1 个不消费),如下图: @@ -44,7 +45,8 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提 在一个消费组里添加一个消费者后,在 Mnode 上通过 rebalance 的机制实现消费者的重新分配,该操作对用户是透明的。 一个消费者可以订阅多个 topic。TDengine 的数据订阅在宕机、重启等复杂环境下确保 at least once 消费。 -### 消费进度 + +#### 消费进度 在 topic 的一个消费组的一个 vnode 上有消费进度。消费者消费的同时,可以提交消费进度,消费进度即 vnode 上 WAL 的版本号(对于 kafka 里的 offset),消费进度可以手动提交,也可以通过参数(auto.commit.interval.ms)设置为周期性自动提交。 首次消费数据时通过订阅参数(auto.offset.reset)来确定消费位置为最新数据(latest)还是最旧数据(earliest)。 @@ -59,16 +61,16 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提 作为一个数据库产品, WAL 文件中存储的不全是数据,也包括其他写入消息,元数据等,所以消费进度不是连续的。 -##说明 +### 说明 从3.2.0.0版本开始,数据订阅支持vnode迁移和分裂。 由于数据订阅依赖wal文件,而在vnode迁移和分裂的过程中,wal并不会同步过去,所以迁移或分裂后,之前没消费完的wal数据后消费不到。所以请保证迁移和分裂之前把数据全部消费完后,再进行vnode迁移或分裂,否则,消费会丢失数据。 -# 语法说明 +## 数据订阅语法说明 具体的语法参见 [数据订阅](../../taos-sql/tmq) -# 消费参数 +## 数据订阅相关参数 消费参数主要用于消费者创建时指定,基础配置项如下表所示: @@ -86,9 +88,9 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提 | `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true) |默认关闭 | | `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 | -# 主要数据结构和 API 接口 +## 数据订阅主要 API 接口 -不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer): +不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节数据订阅部分,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer): @@ -310,8 +312,8 @@ void Close() -# 数据订阅示例 -## 写入数据 +## 数据订阅示例 +### 写入数据 首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: @@ -324,7 +326,7 @@ CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1"); INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); ``` -## 创建 topic +### 创建 topic 使用 SQL 创建一个 topic: @@ -332,7 +334,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; ``` -## 创建消费者 *consumer* +### 创建消费者 consumer 对于不同编程语言,其设置方式如下: @@ -499,7 +501,7 @@ var consumer = new ConsumerBuilder>(cfg).Build(); 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 -## 订阅 *topics* +### 订阅 topics 一个 consumer 支持同时订阅多个 topic。 @@ -578,7 +580,7 @@ consumer.Subscribe(topics); -## 消费 +### 消费 以下代码展示了不同语言下如何对 TMQ 消息进行消费。 @@ -714,7 +716,7 @@ while (true) -## 结束消费 +### 结束消费 消费结束后,应当取消订阅。 @@ -795,7 +797,7 @@ consumer.Close(); -## 完整示例代码 +### 完整示例代码 以下是各语言的完整示例代码。 @@ -838,8 +840,8 @@ consumer.Close(); -#订阅高级功能 -##数据回放 +## 数据订阅高级功能 +### 数据回放 - 订阅支持 replay 功能,按照数据写入的时间回放。 比如,如下时间写入三条数据 ```sql diff --git a/docs/zh/07-develop/_sub_java.mdx b/docs/zh/07-develop/_sub_java.mdx index 60810ec275..c0e9e6c937 100644 --- a/docs/zh/07-develop/_sub_java.mdx +++ b/docs/zh/07-develop/_sub_java.mdx @@ -6,3 +6,4 @@ ``` ```java {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} +``` diff --git a/include/util/taoserror.h b/include/util/taoserror.h index c0c20a0fde..556479f547 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -757,6 +757,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D) #define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E) #define TSDB_CODE_PAR_ORDERBY_AMBIGUOUS TAOS_DEF_ERROR_CODE(0, 0x266F) +#define TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT TAOS_DEF_ERROR_CODE(0, 0x2670) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ee85a909e7..7a5d554b97 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; -float tsRatioOfVnodeStreamThreads = 0.5F; +float tsRatioOfVnodeStreamThreads = 1.5F; int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfQnodeQueryThreads = 4; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 5c76baff08..998a9d6d1b 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1100,7 +1100,10 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { if (pAction->rawWritten) return 0; - if (topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH; + if (topHalf) { + terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH; + return TSDB_CODE_MND_TRANS_CTX_SWITCH; + } int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index abe50a27da..bb22e04cdf 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1672,11 +1672,11 @@ static int32_t dataTypeComp(const SDataType* l, const SDataType* r) { static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { if (isMultiResFunc(pOp->pLeft)) { - generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName); + generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, ((SExprNode*)(pOp->pLeft))->userAlias); return DEAL_RES_ERROR; } if (isMultiResFunc(pOp->pRight)) { - generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); + generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, ((SExprNode*)(pOp->pRight))->userAlias); return DEAL_RES_ERROR; } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index dfe33ce55e..76d8022578 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -192,6 +192,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Out of memory"; case TSDB_CODE_PAR_ORDERBY_AMBIGUOUS: return "ORDER BY \"%s\" is ambiguous"; + case TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT: + return "Operator not supported multi result: %s"; default: return "Unknown error"; } diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 390faab537..d7b31d2ac8 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -381,7 +381,7 @@ TEST_F(ParserSelectTest, semanticCheck) { // TSDB_CODE_PAR_WRONG_VALUE_TYPE run("SELECT timestamp '2010a' FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE); - run("SELECT LAST(*) + SUM(c1) FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE); + run("SELECT LAST(*) + SUM(c1) FROM t1", TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT); run("SELECT CEIL(LAST(ts, c1)) FROM t1", TSDB_CODE_FUNC_FUNTION_PARA_NUM); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f3494377d6..b63dc50836 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -494,6 +494,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pRange->range.maxVer = ver; pRange->range.minVer = ver; } else { + // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task + // is set at the mnode. if (pTask->info.fillHistory == 1) { pChkInfo->checkpointVer = pRange->range.maxVer; pChkInfo->processedVer = pRange->range.maxVer; @@ -502,6 +504,15 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pChkInfo->checkpointVer = pRange->range.minVer - 1; pChkInfo->processedVer = pRange->range.minVer - 1; pChkInfo->nextProcessVer = pRange->range.minVer; + + { // for compatible purpose, remove it later + if (pRange->range.minVer == 0) { + pChkInfo->checkpointVer = 0; + pChkInfo->processedVer = 0; + pChkInfo->nextProcessVer = 1; + stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr); + } + } } } } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8c73604727..836fd980d0 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -619,6 +619,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream quer TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, "Operator not supported multi result") //planner TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")