Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0

This commit is contained in:
Hongze Cheng 2024-02-26 10:48:14 +08:00
commit 8a669d3adc
17 changed files with 125 additions and 47 deletions

View File

@ -15,11 +15,11 @@ import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx"; import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx"; import CDemo from "./_sub_c.mdx";
为了帮助应用实时获取写入 TDengine 的数据或者以事件到达顺序处理数据TDengine 提供了类似 kafka 的数据订阅功能。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 为了帮助应用实时获取写入 TDengine 的数据或者以事件到达顺序处理数据TDengine 提供了类似 kafka 的数据订阅功能。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
# 介绍 ## 数据订阅介绍
## 主题
### 主题
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../taos-sql/tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。 与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../taos-sql/tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。
如下图,每个 topic 涉及到的数据表可能分布在多个 vnode相当于 kafka 里的 partition 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。 如下图,每个 topic 涉及到的数据表可能分布在多个 vnode相当于 kafka 里的 partition 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。
@ -30,11 +30,12 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
对于 `SELECT` 语句形式的 topic在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 对于 `SELECT` 语句形式的 topic在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
## 生产者 ### 生产者
写入 topic 相关联的数据表中数据的都是生产者,生产者实际生产的数据写入到了子表或普通表中,即表所在 vnode 的 WAL 里。 写入 topic 相关联的数据表中数据的都是生产者,生产者实际生产的数据写入到了子表或普通表中,即表所在 vnode 的 WAL 里。
## 消费者 ### 消费者
### 消费者组
#### 消费者组
消费者订阅 topic 后,可以消费 topic 里的所有数据(这些数据所在的表可能分布在多个 vnode 上,即 db 所在的所有 vnode。订阅 topic 时,需要指定一个消费者组 (consumer group),如果这个消费者组里只有一个消费者,那么这个消费者会顺序的消费这些 vnode 上的数据。 消费者订阅 topic 后,可以消费 topic 里的所有数据(这些数据所在的表可能分布在多个 vnode 上,即 db 所在的所有 vnode。订阅 topic 时,需要指定一个消费者组 (consumer group),如果这个消费者组里只有一个消费者,那么这个消费者会顺序的消费这些 vnode 上的数据。
为了提高消费速度,便于多线程、分布式地消费数据,可以在一个消费组里添加多个消费者,这些消费者将均分数据所在的 vnode 进行消费(比如数据分布在 4 个 vnode 上,有 2 个消费者的话,那么每个消费者消费 2 个 vnode有 3 个消费者的话2 个消费者各消费 1 个 vnode1 个消费者消费 2 个 vnode有 5 个消费者的话4 个各分配 1 个 vnode 消费,另外 1 个不消费),如下图: 为了提高消费速度,便于多线程、分布式地消费数据,可以在一个消费组里添加多个消费者,这些消费者将均分数据所在的 vnode 进行消费(比如数据分布在 4 个 vnode 上,有 2 个消费者的话,那么每个消费者消费 2 个 vnode有 3 个消费者的话2 个消费者各消费 1 个 vnode1 个消费者消费 2 个 vnode有 5 个消费者的话4 个各分配 1 个 vnode 消费,另外 1 个不消费),如下图:
@ -44,7 +45,8 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
在一个消费组里添加一个消费者后,在 Mnode 上通过 rebalance 的机制实现消费者的重新分配,该操作对用户是透明的。 在一个消费组里添加一个消费者后,在 Mnode 上通过 rebalance 的机制实现消费者的重新分配,该操作对用户是透明的。
一个消费者可以订阅多个 topic。TDengine 的数据订阅在宕机、重启等复杂环境下确保 at least once 消费。 一个消费者可以订阅多个 topic。TDengine 的数据订阅在宕机、重启等复杂环境下确保 at least once 消费。
### 消费进度
#### 消费进度
在 topic 的一个消费组的一个 vnode 上有消费进度。消费者消费的同时,可以提交消费进度,消费进度即 vnode 上 WAL 的版本号(对于 kafka 里的 offset消费进度可以手动提交也可以通过参数auto.commit.interval.ms设置为周期性自动提交。 在 topic 的一个消费组的一个 vnode 上有消费进度。消费者消费的同时,可以提交消费进度,消费进度即 vnode 上 WAL 的版本号(对于 kafka 里的 offset消费进度可以手动提交也可以通过参数auto.commit.interval.ms设置为周期性自动提交。
首次消费数据时通过订阅参数auto.offset.reset来确定消费位置为最新数据latest还是最旧数据earliest 首次消费数据时通过订阅参数auto.offset.reset来确定消费位置为最新数据latest还是最旧数据earliest
@ -59,16 +61,16 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
作为一个数据库产品, WAL 文件中存储的不全是数据,也包括其他写入消息,元数据等,所以消费进度不是连续的。 作为一个数据库产品, WAL 文件中存储的不全是数据,也包括其他写入消息,元数据等,所以消费进度不是连续的。
##说明 ### 说明
从3.2.0.0版本开始数据订阅支持vnode迁移和分裂。 从3.2.0.0版本开始数据订阅支持vnode迁移和分裂。
由于数据订阅依赖wal文件而在vnode迁移和分裂的过程中wal并不会同步过去所以迁移或分裂后之前没消费完的wal数据后消费不到。所以请保证迁移和分裂之前把数据全部消费完后再进行vnode迁移或分裂否则消费会丢失数据。 由于数据订阅依赖wal文件而在vnode迁移和分裂的过程中wal并不会同步过去所以迁移或分裂后之前没消费完的wal数据后消费不到。所以请保证迁移和分裂之前把数据全部消费完后再进行vnode迁移或分裂否则消费会丢失数据。
# 语法说明 ## 数据订阅语法说明
具体的语法参见 [数据订阅](../../taos-sql/tmq) 具体的语法参见 [数据订阅](../../taos-sql/tmq)
# 消费参数 ## 数据订阅相关参数
消费参数主要用于消费者创建时指定,基础配置项如下表所示: 消费参数主要用于消费者创建时指定,基础配置项如下表所示:
@ -86,9 +88,9 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句从3.2.0.0版本该参数废弃恒为true |默认关闭 | | `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句从3.2.0.0版本该参数废弃恒为true |默认关闭 |
| `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 | | `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 |
# 主要数据结构和 API 接口 ## 数据订阅主要 API 接口
不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer 不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节数据订阅部分注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C"> <TabItem value="c" label="C">
@ -310,8 +312,8 @@ void Close()
</TabItem> </TabItem>
</Tabs> </Tabs>
# 数据订阅示例 ## 数据订阅示例
## 写入数据 ### 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: 首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
@ -324,7 +326,7 @@ CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
``` ```
## 创建 topic ### 创建 topic
使用 SQL 创建一个 topic 使用 SQL 创建一个 topic
@ -332,7 +334,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
``` ```
## 创建消费者 *consumer* ### 创建消费者 consumer
对于不同编程语言,其设置方式如下: 对于不同编程语言,其设置方式如下:
@ -499,7 +501,7 @@ var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
上述配置中包括 consumer group ID如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group共享消费进度。 上述配置中包括 consumer group ID如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group共享消费进度。
## 订阅 *topics* ### 订阅 topics
一个 consumer 支持同时订阅多个 topic。 一个 consumer 支持同时订阅多个 topic。
@ -578,7 +580,7 @@ consumer.Subscribe(topics);
</Tabs> </Tabs>
## 消费 ### 消费
以下代码展示了不同语言下如何对 TMQ 消息进行消费。 以下代码展示了不同语言下如何对 TMQ 消息进行消费。
@ -714,7 +716,7 @@ while (true)
</Tabs> </Tabs>
## 结束消费 ### 结束消费
消费结束后,应当取消订阅。 消费结束后,应当取消订阅。
@ -795,7 +797,7 @@ consumer.Close();
</Tabs> </Tabs>
## 完整示例代码 ### 完整示例代码
以下是各语言的完整示例代码。 以下是各语言的完整示例代码。
@ -838,8 +840,8 @@ consumer.Close();
</Tabs> </Tabs>
#订阅高级功能 ## 数据订阅高级功能
##数据回放 ### 数据回放
- 订阅支持 replay 功能,按照数据写入的时间回放。 - 订阅支持 replay 功能,按照数据写入的时间回放。
比如,如下时间写入三条数据 比如,如下时间写入三条数据
```sql ```sql

View File

@ -6,3 +6,4 @@
``` ```
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```

View File

@ -267,7 +267,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
bool alreadyAddGroupId(char* ctbName); bool alreadyAddGroupId(char* ctbName);
bool isAutoTableName(char* ctbName); bool isAutoTableName(char* ctbName);
void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId); void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);

View File

@ -757,6 +757,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D) #define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D)
#define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E) #define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E)
#define TSDB_CODE_PAR_ORDERBY_AMBIGUOUS TAOS_DEF_ERROR_CODE(0, 0x266F) #define TSDB_CODE_PAR_ORDERBY_AMBIGUOUS TAOS_DEF_ERROR_CODE(0, 0x266F)
#define TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT TAOS_DEF_ERROR_CODE(0, 0x2670)
#define TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY TAOS_DEF_ERROR_CODE(0, 0x2671) #define TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY TAOS_DEF_ERROR_CODE(0, 0x2671)
#define TSDB_CODE_PAR_SECOND_COL_PK TAOS_DEF_ERROR_CODE(0, 0x2672) #define TSDB_CODE_PAR_SECOND_COL_PK TAOS_DEF_ERROR_CODE(0, 0x2672)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)

View File

@ -2119,28 +2119,28 @@ _end:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void buildCtbNameAddGruopId(char* ctbName, uint64_t groupId){ void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId){
char tmp[TSDB_TABLE_NAME_LEN] = {0}; char tmp[TSDB_TABLE_NAME_LEN] = {0};
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end
strcat(ctbName, tmp); strcat(ctbName, tmp);
} }
bool isAutoTableName(char* ctbName){ // auto stream subtable name starts with 't_', followed by the first segment of MD5 digest for group vals.
return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); // the total length is fixed to be 34 bytes.
} bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); }
bool alreadyAddGroupId(char* ctbName){ bool alreadyAddGroupId(char* ctbName) {
size_t len = strlen(ctbName); size_t len = strlen(ctbName);
size_t _location = len - 1; size_t _location = len - 1;
while(_location > 0){ while (_location > 0) {
if(ctbName[_location] < '0' || ctbName[_location] > '9'){ if (ctbName[_location] < '0' || ctbName[_location] > '9') {
break; break;
} }
_location--; _location--;
} }
return ctbName[_location] == '_' && len - 1 - _location > 15; //15 means the min length of groupid return ctbName[_location] == '_' && len - 1 - _location >= 15; // 15 means the min length of groupid
} }
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {

View File

@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeQueryThreads = 4;
float tsRatioOfVnodeStreamThreads = 0.5F; float tsRatioOfVnodeStreamThreads = 1.5F;
int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeQueryThreads = 4;

View File

@ -1100,7 +1100,10 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->rawWritten) return 0; if (pAction->rawWritten) return 0;
if (topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH; if (topHalf) {
terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH;
return TSDB_CODE_MND_TRANS_CTX_SWITCH;
}
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {

View File

@ -72,7 +72,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) { if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) {
buildCtbNameAddGruopId(name, groupId); buildCtbNameAddGroupId(name, groupId);
} }
} else if (stbFullName) { } else if (stbFullName) {
name = buildCtbNameByGroupId(stbFullName, groupId); name = buildCtbNameByGroupId(stbFullName, groupId);
@ -185,23 +185,26 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) { !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) {
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
buildCtbNameAddGruopId(pCreateTableReq->name, gid); buildCtbNameAddGroupId(pCreateTableReq->name, gid);
// tqDebug("gen name from:%s", pDataBlock->info.parTbName);
} else { } else {
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
// tqDebug("copy name:%s", pDataBlock->info.parTbName);
} }
} else { } else {
pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid); pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid);
// tqDebug("gen name from stbFullName:%s gid:%"PRId64, stbFullName, gid);
} }
} }
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
SStreamTask* pTask, int64_t suid) { SStreamTask* pTask, int64_t suid) {
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
SArray* tagArray = taosArrayInit(4, sizeof(STagVal)); SArray* tagArray = taosArrayInit(4, sizeof(STagVal));
;
tqDebug("s-task:%s build create %d table(s) msg", pTask->id.idStr, rows);
int32_t code = 0; int32_t code = 0;
SVCreateTbBatchReq reqs = {0}; SVCreateTbBatchReq reqs = {0};
@ -671,7 +674,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
} else { } else {
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 && if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 &&
!isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) { !isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) {
buildCtbNameAddGruopId(dstTableName, groupId); tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
buildCtbNameAddGroupId(dstTableName, groupId);
} }
} }

View File

@ -129,7 +129,7 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
return code; return code;
} }
static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock) { static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo, SFilterInfo* pFilterInfo, SSDataBlock* pBlock) {
SResultRow* pResultRow = NULL; SResultRow* pResultRow = NULL;
for (int32_t i = 0; i < taosArrayGetSize(pCountSup->pWinStates); i++) { for (int32_t i = 0; i < taosArrayGetSize(pCountSup->pWinStates); i++) {
SCountWindowResult* pBuff = setCountWindowOutputBuff(pExprSup, pCountSup, &pResultRow); SCountWindowResult* pBuff = setCountWindowOutputBuff(pExprSup, pCountSup, &pResultRow);
@ -143,6 +143,7 @@ static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, S
clearWinStateBuff(pBuff); clearWinStateBuff(pBuff);
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
} }
doFilter(pBlock, pFilterInfo, NULL);
} }
static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) { static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
@ -177,7 +178,7 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
if (pInfo->groupId == 0) { if (pInfo->groupId == 0) {
pInfo->groupId = pBlock->info.id.groupId; pInfo->groupId = pBlock->info.id.groupId;
} else if (pInfo->groupId != pBlock->info.id.groupId) { } else if (pInfo->groupId != pBlock->info.id.groupId) {
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pRes); buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pRes);
pInfo->groupId = pBlock->info.id.groupId; pInfo->groupId = pBlock->info.id.groupId;
} }
@ -187,7 +188,7 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
} }
} }
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pRes); buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pRes);
return pRes->info.rows == 0 ? NULL : pRes; return pRes->info.rows == 0 ? NULL : pRes;
} }
@ -246,6 +247,11 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
pInfo->countSup.stateIndex = 0; pInfo->countSup.stateIndex = 0;
code = filterInitFromNode((SNode*)pCountWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo,

View File

@ -287,6 +287,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
} }
} }
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
curWin.winInfo.pStatePos->beUpdated = true;
SSessionKey key = {0}; SSessionKey key = {0};
getSessionHashKey(&curWin.winInfo.sessionWin, &key); getSessionHashKey(&curWin.winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
@ -344,7 +345,7 @@ int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOper
size_t keyLen = 0; size_t keyLen = 0;
int32_t iter = 0; int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen); void* key = tSimpleHashGetKey(pIte, &keyLen);
tlen += encodeSSessionKey(buf, key); tlen += encodeSSessionKey(buf, key);
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
} }

View File

@ -1672,11 +1672,11 @@ static int32_t dataTypeComp(const SDataType* l, const SDataType* r) {
static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
if (isMultiResFunc(pOp->pLeft)) { if (isMultiResFunc(pOp->pLeft)) {
generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName); generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, ((SExprNode*)(pOp->pLeft))->userAlias);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
if (isMultiResFunc(pOp->pRight)) { if (isMultiResFunc(pOp->pRight)) {
generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, ((SExprNode*)(pOp->pRight))->userAlias);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }

View File

@ -192,6 +192,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Out of memory"; return "Out of memory";
case TSDB_CODE_PAR_ORDERBY_AMBIGUOUS: case TSDB_CODE_PAR_ORDERBY_AMBIGUOUS:
return "ORDER BY \"%s\" is ambiguous"; return "ORDER BY \"%s\" is ambiguous";
case TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT:
return "Operator not supported multi result: %s";
case TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY: case TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY:
return "tag %s can not be primary key"; return "tag %s can not be primary key";
case TSDB_CODE_PAR_SECOND_COL_PK: case TSDB_CODE_PAR_SECOND_COL_PK:

View File

@ -381,7 +381,7 @@ TEST_F(ParserSelectTest, semanticCheck) {
// TSDB_CODE_PAR_WRONG_VALUE_TYPE // TSDB_CODE_PAR_WRONG_VALUE_TYPE
run("SELECT timestamp '2010a' FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE); run("SELECT timestamp '2010a' FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE);
run("SELECT LAST(*) + SUM(c1) FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE); run("SELECT LAST(*) + SUM(c1) FROM t1", TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT);
run("SELECT CEIL(LAST(ts, c1)) FROM t1", TSDB_CODE_FUNC_FUNTION_PARA_NUM); run("SELECT CEIL(LAST(ts, c1)) FROM t1", TSDB_CODE_FUNC_FUNTION_PARA_NUM);

View File

@ -575,7 +575,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
!isAutoTableName(pDataBlock->info.parTbName) && !isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName) &&
groupId != 0){ groupId != 0){
buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); buildCtbNameAddGroupId(pDataBlock->info.parTbName, groupId);
} }
} else { } else {
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);

View File

@ -494,6 +494,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pRange->range.maxVer = ver; pRange->range.maxVer = ver;
pRange->range.minVer = ver; pRange->range.minVer = ver;
} else { } else {
// the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
// is set at the mnode.
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
pChkInfo->checkpointVer = pRange->range.maxVer; pChkInfo->checkpointVer = pRange->range.maxVer;
pChkInfo->processedVer = pRange->range.maxVer; pChkInfo->processedVer = pRange->range.maxVer;
@ -502,6 +504,15 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pChkInfo->checkpointVer = pRange->range.minVer - 1; pChkInfo->checkpointVer = pRange->range.minVer - 1;
pChkInfo->processedVer = pRange->range.minVer - 1; pChkInfo->processedVer = pRange->range.minVer - 1;
pChkInfo->nextProcessVer = pRange->range.minVer; pChkInfo->nextProcessVer = pRange->range.minVer;
{ // for compatible purpose, remove it later
if (pRange->range.minVer == 0) {
pChkInfo->checkpointVer = 0;
pChkInfo->processedVer = 0;
pChkInfo->nextProcessVer = 1;
stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
}
}
} }
} }
} }

View File

@ -618,6 +618,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream quer
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, "Operator not supported multi result")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY, "tag can not be primary key") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY, "tag can not be primary key")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be second column") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be second column")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")

View File

@ -175,5 +175,51 @@ if $data33 != 3 then
goto loop3 goto loop3
endi endi
print step3
print =============== create database
sql create database test3 vgroups 1;
sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql insert into t1 values(1648791213000,0,1,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.1);
sql insert into t1 values(1648791223002,0,3,3,2.1);
sql insert into t1 values(1648791223003,1,4,3,3.1);
sql insert into t1 values(1648791223004,1,5,3,4.1);
sql insert into t1 values(1648791223005,2,6,3,4.1);
$loop_count = 0
loop4:
sleep 300
print 1 sql select _wstart, count(*),max(b) from t1 count_window(3) having max(b) > 3;
sql select _wstart, count(*),max(b) from t1 count_window(3) having max(b) > 3;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 1 then
print ======rows=$rows
goto loop4
endi
print 1 sql select _wstart, count(*),max(b) from t1 count_window(3) having max(b) > 6;
sql select _wstart, count(*),max(b) from t1 count_window(3) having max(b) > 6;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
# row 0
if $rows != 0 then
print ======rows=$rows
return -1
endi
print query_count0 end print query_count0 end
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT