From ac9582b24a6f01ac442162478b32d3f239ca962d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 19 Feb 2024 13:58:58 +0800 Subject: [PATCH] fix:[TD-28567] do not add md5 to the end of subtable if create stream by sma --- docs/zh/07-develop/07-tmq.mdx | 667 +++++++++++---------- include/common/tmsg.h | 28 +- include/libs/stream/tstream.h | 108 ++-- source/dnode/mnode/impl/inc/mndDef.h | 7 +- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndDef.c | 5 + source/dnode/mnode/impl/src/mndScheduler.c | 124 ++-- source/dnode/mnode/impl/src/mndSma.c | 11 +- source/dnode/mnode/impl/src/mndStream.c | 34 +- source/dnode/vnode/src/tq/tqSink.c | 97 +-- source/libs/stream/src/streamDispatch.c | 1 + source/libs/stream/src/streamTask.c | 17 +- 12 files changed, 556 insertions(+), 545 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index df651eab96..a852e71ff4 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -91,6 +91,7 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提 不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer): + ```c @@ -101,17 +102,17 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提 typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param)); typedef enum tmq_conf_res_t { - TMQ_CONF_UNKNOWN = -2, - TMQ_CONF_INVALID = -1, - TMQ_CONF_OK = 0, - } tmq_conf_res_t; + TMQ_CONF_UNKNOWN = -2, + TMQ_CONF_INVALID = -1, + TMQ_CONF_OK = 0, + } tmq_conf_res_t; typedef struct tmq_topic_assignment { - int32_t vgId; - int64_t currentOffset; - int64_t begin; - int64_t end; - } tmq_topic_assignment; + int32_t vgId; + int64_t currentOffset; + int64_t begin; + int64_t end; + } tmq_topic_assignment; DLL_EXPORT tmq_conf_t *tmq_conf_new(); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); @@ -146,7 +147,6 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提 DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); DLL_EXPORT const char *tmq_err2str(int32_t code); ``` - @@ -250,281 +250,280 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提 + futures::Stream< Item = Result<(Self::Offset, MessageSet), Self::Error>, >, - >, - >; - async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; + >, + >; + async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; - async fn unsubscribe(self); - ``` + async fn unsubscribe(self); + ``` - 可在 上查看详细 API 说明。 + 可在 上查看详细 API 说明。 - + - + - ```js - function TMQConsumer(config) + ```js + function TMQConsumer(config) + + function subscribe(topic) + + function consume(timeout) + + function subscription() + + function unsubscribe() + + function commit(msg) + + function close() + ``` - function subscribe(topic) + - function consume(timeout) + - function subscription() + ```csharp + class ConsumerBuilder + + ConsumerBuilder(IEnumerable> config) + + public IConsumer Build() + + void Subscribe(IEnumerable topics) + + void Subscribe(string topic) + + ConsumeResult Consume(int millisecondsTimeout) + + List Subscription() + + void Unsubscribe() + + List Commit() + + void Close() + ``` + + - function unsubscribe() +# 数据订阅示例 +## 写入数据 - function commit(msg) +首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: - function close() - ``` - - - - - - ```csharp - class ConsumerBuilder - - ConsumerBuilder(IEnumerable> config) - - public IConsumer Build() - - void Subscribe(IEnumerable topics) - - void Subscribe(string topic) - - ConsumeResult Consume(int millisecondsTimeout) - - List Subscription() - - void Unsubscribe() - - List Commit() - - void Close() - ``` - - - - - # 数据订阅示例 - ## 写入数据 - - 首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: - - ```sql - DROP DATABASE IF EXISTS tmqdb; - CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600; - CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16)); - CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0"); - CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1"); - INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); +```sql +DROP DATABASE IF EXISTS tmqdb; +CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600; +CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16)); +CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0"); +CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1"); +INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); ``` ## 创建 topic - 使用 SQL 创建一个 topic: +使用 SQL 创建一个 topic: - ```sql - CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; - ``` +```sql +CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; +``` - ## 创建消费者 *consumer* +## 创建消费者 *consumer* - 对于不同编程语言,其设置方式如下: +对于不同编程语言,其设置方式如下: - - + - ```c - /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 - 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */ - tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - tmq_conf_set(conf, "group.id", "cgrpName"); - tmq_conf_set(conf, "td.connect.user", "root"); - tmq_conf_set(conf, "td.connect.pass", "taosdata"); - tmq_conf_set(conf, "auto.offset.reset", "latest"); - tmq_conf_set(conf, "msg.with.table.name", "true"); - tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + + ```c + /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 + 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */ + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + tmq_conf_set(conf, "group.id", "cgrpName"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "latest"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); - tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); - tmq_conf_destroy(conf); - ``` - - - - - 对于 Java 程序,还可以使用如下配置项: - - | 参数名称 | 类型 | 参数说明 | - | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | - | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" | - | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` | - | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 | - | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | | - - 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。 - - ```java - Properties properties = new Properties(); - properties.setProperty("enable.auto.commit", "true"); - properties.setProperty("auto.commit.interval.ms", "1000"); - properties.setProperty("group.id", "cgrpName"); - properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); - properties.setProperty("td.connect.user", "root"); - properties.setProperty("td.connect.pass", "taosdata"); - properties.setProperty("auto.offset.reset", "latest"); - properties.setProperty("msg.with.table.name", "true"); - properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); - - TaosConsumer consumer = new TaosConsumer<>(properties); - - /* value deserializer definition. */ - import com.taosdata.jdbc.tmq.ReferenceDeserializer; - - public class MetersDeserializer extends ReferenceDeserializer { - } - ``` - - - - - - ```go - conf := &tmq.ConfigMap{ - "group.id": "test", - "auto.offset.reset": "latest", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "td.connect.port": "6030", - "client.id": "test_tmq_c", - "enable.auto.commit": "false", - "msg.with.table.name": "true", - } - consumer, err := NewConsumer(conf) - ``` - - - - - - ```rust - let mut dsn: Dsn = "taos://".parse()?; - dsn.set("group.id", "group1"); - dsn.set("client.id", "test"); - dsn.set("auto.offset.reset", "latest"); - - let tmq = TmqBuilder::from_dsn(dsn)?; - - let mut consumer = tmq.build()?; - ``` - - - - - - Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例: - - ```python - from taos.tmq import Consumer - - # Syntax: `consumer = Consumer(configs)` - # - # Example: - consumer = Consumer( - { - "group.id": "local", - "client.id": "1", - "enable.auto.commit": "true", - "auto.commit.interval.ms": "1000", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "latest", - "msg.with.table.name": "true", - } - ) - ``` - - - - - - ```js - // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 - // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 - - let consumer = taos.consumer({ -'enable.auto.commit': 'true', - 'auto.commit.interval.ms','1000', - 'group.id': 'tg2', - 'td.connect.user': 'root', - 'td.connect.pass': 'taosdata', - 'auto.offset.reset','latest', - 'msg.with.table.name': 'true', - 'td.connect.ip','127.0.0.1', - 'td.connect.port','6030' - }); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); ``` + - + - - - ```csharp - var cfg = new Dictionary() - { - { "group.id", "group1" }, - { "auto.offset.reset", "latest" }, - { "td.connect.ip", "127.0.0.1" }, - { "td.connect.user", "root" }, - { "td.connect.pass", "taosdata" }, - { "td.connect.port", "6030" }, - { "client.id", "tmq_example" }, - { "enable.auto.commit", "true" }, - { "msg.with.table.name", "false" }, - }; - var consumer = new ConsumerBuilder>(cfg).Build(); + 对于 Java 程序,还可以使用如下配置项: + + | 参数名称 | 类型 | 参数说明 | + | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | + | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" | + | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` | + | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 | + | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | | + + 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。 + + ```java + Properties properties = new Properties(); + properties.setProperty("enable.auto.commit", "true"); + properties.setProperty("auto.commit.interval.ms", "1000"); + properties.setProperty("group.id", "cgrpName"); + properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); + properties.setProperty("td.connect.user", "root"); + properties.setProperty("td.connect.pass", "taosdata"); + properties.setProperty("auto.offset.reset", "latest"); + properties.setProperty("msg.with.table.name", "true"); + properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); + + TaosConsumer consumer = new TaosConsumer<>(properties); + + /* value deserializer definition. */ + import com.taosdata.jdbc.tmq.ReferenceDeserializer; + + public class MetersDeserializer extends ReferenceDeserializer { + } ``` + - + + + ```go + conf := &tmq.ConfigMap{ + "group.id": "test", + "auto.offset.reset": "latest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_c", + "enable.auto.commit": "false", + "msg.with.table.name": "true", + } + consumer, err := NewConsumer(conf) + ``` + + - + + + ```rust + let mut dsn: Dsn = "taos://".parse()?; + dsn.set("group.id", "group1"); + dsn.set("client.id", "test"); + dsn.set("auto.offset.reset", "latest"); + + let tmq = TmqBuilder::from_dsn(dsn)?; + + let mut consumer = tmq.build()?; + ``` + + - 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 + + + Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例: + + ```python + from taos.tmq import Consumer + + # Syntax: `consumer = Consumer(configs)` + # + # Example: + consumer = Consumer( + { + "group.id": "local", + "client.id": "1", + "enable.auto.commit": "true", + "auto.commit.interval.ms": "1000", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "latest", + "msg.with.table.name": "true", + } + ) + ``` + + - ## 订阅 *topics* + + + ```js + // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 + // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 + + let consumer = taos.consumer({ + 'enable.auto.commit': 'true', + 'auto.commit.interval.ms','1000', + 'group.id': 'tg2', + 'td.connect.user': 'root', + 'td.connect.pass': 'taosdata', + 'auto.offset.reset','latest', + 'msg.with.table.name': 'true', + 'td.connect.ip','127.0.0.1', + 'td.connect.port','6030' + }); + ``` + + - 一个 consumer 支持同时订阅多个 topic。 + + + ```csharp + var cfg = new Dictionary() + { + { "group.id", "group1" }, + { "auto.offset.reset", "latest" }, + { "td.connect.ip", "127.0.0.1" }, + { "td.connect.user", "root" }, + { "td.connect.pass", "taosdata" }, + { "td.connect.port", "6030" }, + { "client.id", "tmq_example" }, + { "enable.auto.commit", "true" }, + { "msg.with.table.name", "false" }, + }; + var consumer = new ConsumerBuilder>(cfg).Build(); + ``` + + + + + +上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 + +## 订阅 *topics* - - +一个 consumer 支持同时订阅多个 topic。 + + -```c + + + ```c // 创建订阅 topics 列表 tmq_list_t* topicList = tmq_list_new(); tmq_list_append(topicList, "topicName"); // 启动订阅 tmq_subscribe(tmq, topicList); tmq_list_destroy(topicList); - + ``` - - - - + + + + ```java List topics = new ArrayList<>(); topics.add("tmq_topic"); consumer.subscribe(topics); ``` - - - + + + ```go err = consumer.Subscribe("example_tmq_topic", nil) @@ -533,26 +532,26 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); } ``` - - + + -```rust + ```rust consumer.subscribe(["tmq_meters"]).await?; ``` - + - + -```python + ```python consumer.subscribe(['topic1', 'topic2']) ``` - + - + -```js + ```js // 创建订阅 topics 列表 let topics = ['topic_test'] @@ -560,11 +559,11 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); consumer.subscribe(topics); ``` - + - + -```csharp + ```csharp // 创建订阅 topics 列表 List topics = new List(); topics.add("tmq_topic"); @@ -572,18 +571,19 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); consumer.Subscribe(topics); ``` - + - + - ## 消费 +## 消费 - 以下代码展示了不同语言下如何对 TMQ 消息进行消费。 +以下代码展示了不同语言下如何对 TMQ 消息进行消费。 - - + -```c + + + ```c // 消费数据 while (running) { TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut); @@ -591,11 +591,11 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); } ``` -这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。 - - - + 这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。 + + + ```java while(running){ ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); @@ -604,10 +604,10 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); } } ``` + + - - - + ```go for { @@ -625,9 +625,9 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); } ``` - + - + ```rust { @@ -660,8 +660,8 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); } ``` - - + + ```python while True: @@ -677,9 +677,9 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); print(block.fetchall()) ``` - + - + ```js while(true){ @@ -691,11 +691,11 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); } ``` - + - + -```csharp + ```csharp // 消费数据 while (true) { @@ -708,18 +708,19 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); } ``` - + - + - ## 结束消费 +## 结束消费 - 消费结束后,应当取消订阅。 +消费结束后,应当取消订阅。 - - + + + -```c + ```c /* 取消订阅 */ tmq_unsubscribe(tmq); @@ -727,10 +728,10 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); tmq_consumer_close(tmq); ``` - - + + -```java + ```java /* 取消订阅 */ consumer.unsubscribe(); @@ -738,11 +739,11 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); consumer.close(); ``` - + + + - - -```go + ```go /* Unsubscribe */ _ = consumer.Unsubscribe() @@ -750,38 +751,38 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); _ = consumer.Close() ``` - + + + - - -```rust + ```rust consumer.unsubscribe().await; ``` - + - + -```py + ```py # 取消订阅 consumer.unsubscribe() # 关闭消费 consumer.close() ``` - - + + -```js + ```js consumer.unsubscribe(); consumer.close(); ``` - + - + -```csharp + ```csharp // 取消订阅 consumer.Unsubscribe(); @@ -789,7 +790,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); consumer.Close(); ``` - + @@ -799,41 +800,41 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); - - - + + + - - - - - - - - - - + + + + + + + + + + - - - + + + - - - + + + - - - + + + - - - - - - - + + + + + + + #订阅高级功能 diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e24d4a71b0..88205581fe 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1400,7 +1400,7 @@ void tFreeSCompactDbReq(SCompactDbReq* pReq); typedef struct { int32_t compactId; - int8_t bAccepted; + int8_t bAccepted; } SCompactDbRsp; int32_t tSerializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp); @@ -1414,7 +1414,7 @@ typedef struct { int32_t tSerializeSKillCompactReq(void* buf, int32_t bufLen, SKillCompactReq* pReq); int32_t tDeserializeSKillCompactReq(void* buf, int32_t bufLen, SKillCompactReq* pReq); -void tFreeSKillCompactReq(SKillCompactReq *pReq); +void tFreeSKillCompactReq(SKillCompactReq* pReq); typedef struct { char name[TSDB_FUNC_NAME_LEN]; @@ -1751,9 +1751,9 @@ int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* int32_t tDeserializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); typedef struct { - int32_t compactId; - int32_t vgId; - int32_t dnodeId; + int32_t compactId; + int32_t vgId; + int32_t dnodeId; } SVKillCompactReq; int32_t tSerializeSVKillCompactReq(void* buf, int32_t bufLen, SVKillCompactReq* pReq); @@ -1954,9 +1954,9 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; char tb[TSDB_TABLE_NAME_LEN]; char user[TSDB_USER_LEN]; - char filterTb[TSDB_TABLE_NAME_LEN]; // for ins_columns + char filterTb[TSDB_TABLE_NAME_LEN]; // for ins_columns int64_t showId; - int64_t compactId; // for compact + int64_t compactId; // for compact } SRetrieveTableReq; typedef struct SSysTableSchema { @@ -3784,12 +3784,12 @@ typedef struct { } SMqHbReq; typedef struct { - char topic[TSDB_TOPIC_FNAME_LEN]; - int8_t noPrivilege; + char topic[TSDB_TOPIC_FNAME_LEN]; + int8_t noPrivilege; } STopicPrivilege; typedef struct { - SArray* topicPrivileges; // SArray + SArray* topicPrivileges; // SArray } SMqHbRsp; typedef struct { @@ -3927,8 +3927,8 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 #define SUBMIT_REQ_FROM_FILE 0x4 -#define SOURCE_NULL 0 -#define SOURCE_TAOSX 1 +#define SOURCE_NULL 0 +#define SOURCE_TAOSX 1 typedef struct { int32_t flags; @@ -3940,8 +3940,8 @@ typedef struct { SArray* aRowP; SArray* aCol; }; - int64_t ctimeMs; - int8_t source; + int64_t ctimeMs; + int8_t source; } SSubmitTbData; typedef struct { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d6d89d1d30..c9b48c9979 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -50,21 +50,21 @@ extern "C" { (_t)->hTaskInfo.id.streamId = 0; \ } while (0) -#define STREAM_EXEC_T_EXTRACT_WAL_DATA (-1) -#define STREAM_EXEC_T_START_ALL_TASKS (-2) -#define STREAM_EXEC_T_START_ONE_TASK (-3) -#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4) -#define STREAM_EXEC_T_STOP_ALL_TASKS (-5) -#define STREAM_EXEC_T_RESUME_TASK (-6) -#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7) +#define STREAM_EXEC_T_EXTRACT_WAL_DATA (-1) +#define STREAM_EXEC_T_START_ALL_TASKS (-2) +#define STREAM_EXEC_T_START_ONE_TASK (-3) +#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4) +#define STREAM_EXEC_T_STOP_ALL_TASKS (-5) +#define STREAM_EXEC_T_RESUME_TASK (-6) +#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; typedef struct SStreamTaskSM SStreamTaskSM; -#define SSTREAM_TASK_VER 3 -#define SSTREAM_TASK_INCOMPATIBLE_VER 1 -#define SSTREAM_TASK_NEED_CONVERT_VER 2 +#define SSTREAM_TASK_VER 3 +#define SSTREAM_TASK_INCOMPATIBLE_VER 1 +#define SSTREAM_TASK_NEED_CONVERT_VER 2 #define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 enum { @@ -405,8 +405,8 @@ typedef struct SHistoryTaskInfo { int32_t tickCount; int32_t retryTimes; int32_t waitInterval; - int64_t haltVer; // offset in wal when halt the stream task - bool operatorOpen; // false by default + int64_t haltVer; // offset in wal when halt the stream task + bool operatorOpen; // false by default } SHistoryTaskInfo; typedef struct STaskOutputInfo { @@ -463,21 +463,22 @@ struct SStreamTask { struct SStreamMeta* pMeta; SSHashObj* pNameMap; void* pBackend; - char reserve[256]; + int8_t subtableWithoutMd5; + char reserve[255]; }; typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*); typedef struct STaskStartInfo { - int64_t startTs; - int64_t readyTs; - int32_t tasksWillRestart; - int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. - SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing - SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed - int64_t elapsedTime; - int32_t restartCount; // restart task counter - startComplete_fn_t completeFn; // complete callback function + int64_t startTs; + int64_t readyTs; + int32_t tasksWillRestart; + int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. + SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing + SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed + int64_t elapsedTime; + int32_t restartCount; // restart task counter + startComplete_fn_t completeFn; // complete callback function } STaskStartInfo; typedef struct STaskUpdateInfo { @@ -504,7 +505,7 @@ typedef struct SStreamMeta { int32_t vgId; int64_t stage; int32_t role; - bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower. + bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower. STaskStartInfo startInfo; TdThreadRwlock lock; SScanWalInfo scanInfo; @@ -532,7 +533,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, - SArray* pTaskList, bool hasFillhistory); + SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask); @@ -678,18 +679,18 @@ typedef struct STaskStatusEntry { int32_t statusLastDuration; // to record the last duration of current status int64_t stage; int32_t nodeId; - int64_t verStart; // start version in WAL, only valid for source task - int64_t verEnd; // end version in WAL, only valid for source task - int64_t processedVer; // only valid for source task - int64_t checkpointId; // current active checkpoint id - int32_t chkpointTransId; // checkpoint trans id - int8_t checkpointFailed; // denote if the checkpoint is failed or not - bool inputQChanging; // inputQ is changing or not + int64_t verStart; // start version in WAL, only valid for source task + int64_t verEnd; // end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + int64_t checkpointId; // current active checkpoint id + int32_t chkpointTransId; // checkpoint trans id + int8_t checkpointFailed; // denote if the checkpoint is failed or not + bool inputQChanging; // inputQ is changing or not int64_t inputQUnchangeCounter; - double inputQUsed; // in MiB + double inputQUsed; // in MiB double inputRate; - double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dst data size + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size } STaskStatusEntry; typedef struct SStreamHbMsg { @@ -720,8 +721,8 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); typedef struct SStreamTaskState { - ETaskStatus state; - char* name; + ETaskStatus state; + char* name; } SStreamTaskState; typedef struct { @@ -839,7 +840,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st); // stream task meta void streamMetaInit(); void streamMetaCleanup(); -SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, startComplete_fn_t fn); +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, + startComplete_fn_t fn); void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); @@ -858,22 +860,22 @@ void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, - int64_t endTs, bool ready); + int64_t endTs, bool ready); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); -void streamMetaRLock(SStreamMeta* pMeta); -void streamMetaRUnLock(SStreamMeta* pMeta); -void streamMetaWLock(SStreamMeta* pMeta); -void streamMetaWUnLock(SStreamMeta* pMeta); -void streamMetaResetStartInfo(STaskStartInfo* pMeta); -SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); -void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); -int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); -int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); -int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); -int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); -bool streamMetaAllTasksReady(const SStreamMeta* pMeta); -tmr_h streamTimerGetInstance(); +void streamMetaRLock(SStreamMeta* pMeta); +void streamMetaRUnLock(SStreamMeta* pMeta); +void streamMetaWLock(SStreamMeta* pMeta); +void streamMetaWUnLock(SStreamMeta* pMeta); +void streamMetaResetStartInfo(STaskStartInfo* pMeta); +SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); +void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); +int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); +int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); +int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +bool streamMetaAllTasksReady(const SStreamMeta* pMeta); +tmr_h streamTimerGetInstance(); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); @@ -890,8 +892,8 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); void* streamDestroyStateMachine(SStreamTaskSM* pSM); -int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req); -void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp); +int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq* req); +void sendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp); #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 19de4561d6..fc58fabad5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -699,6 +699,7 @@ typedef struct { int64_t checkpointId; int32_t indexForMultiAggBalance; + int8_t subTableWithoutMd5; char reserve[256]; } SStreamObj; @@ -776,8 +777,8 @@ typedef enum { GRANT_STATE_REASON_MAX, } EGrantStateReason; -#define GRANT_STATE_NUM 30 -#define GRANT_ACTIVE_NUM 10 +#define GRANT_STATE_NUM 30 +#define GRANT_ACTIVE_NUM 10 #define GRANT_ACTIVE_HEAD_LEN 30 typedef struct { @@ -812,7 +813,7 @@ typedef struct { int64_t id : 24; }; }; - char machine[TSDB_MACHINE_ID_LEN + 1]; + char machine[TSDB_MACHINE_ID_LEN + 1]; } SGrantMachine; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 4d1125a340..1084340dc2 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -24,7 +24,7 @@ extern "C" { #endif #define MND_STREAM_RESERVE_SIZE 64 -#define MND_STREAM_VER_NUMBER 4 +#define MND_STREAM_VER_NUMBER 5 #define MND_STREAM_CREATE_NAME "stream-create" #define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index d59354286d..5be641d1c2 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -85,6 +85,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { // 3.0.50 ver = 3 if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->subTableWithoutMd5) < 0) return -1; if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1; @@ -168,6 +169,10 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) { if (sver >= 3) { if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1; } + + if (sver >= 5) { + if (tDecodeI8(pDecoder, &pObj->subTableWithoutMd5) < 0) return -1; + } if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1; tEndDecode(pDecoder); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 88d326a5c4..a8eaf7c711 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -14,13 +14,13 @@ */ #include "mndScheduler.h" -#include "tmisce.h" -#include "mndMnode.h" #include "mndDb.h" +#include "mndMnode.h" #include "mndSnode.h" #include "mndVgroup.h" #include "parser.h" #include "tcompare.h" +#include "tmisce.h" #include "tname.h" #include "tuuid.h" @@ -189,7 +189,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) { return NULL; } - if(pStream->indexForMultiAggBalance == -1){ + if (pStream->indexForMultiAggBalance == -1) { taosSeedRand(taosSafeRand()); pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups; } @@ -204,7 +204,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) { sdbRelease(pMnode->pSdb, pVgroup); continue; } - if (index++ == pStream->indexForMultiAggBalance){ + if (index++ == pStream->indexForMultiAggBalance) { pStream->indexForMultiAggBalance++; pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups; sdbCancelFetch(pMnode->pSdb, pIter); @@ -217,12 +217,12 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) { return pVgroup; } -static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, - SEpSet* pEpset, bool isFillhistory) { - int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; +static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) { + int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory); + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, + pStream->conf.fillHistory, pStream->subTableWithoutMd5); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; @@ -235,12 +235,12 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou return mndSetSinkTaskInfo(pStream, pTask); } -static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj){ +static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) { int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false); if (code != 0) { return code; } - if(pStream->conf.fillHistory){ + if (pStream->conf.fillHistory) { code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true); if (code != 0) { return code; @@ -267,7 +267,7 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* } int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup); - if(code != 0){ + if (code != 0) { sdbRelease(pSdb, pVgroup); return code; } @@ -279,7 +279,7 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* } static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) { - for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { SVgroupVer* pVer = taosArrayGet(pList, i); if (pVer->vgId == vgId) { return pVer->ver; @@ -315,19 +315,18 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe pRange->range.minVer = latestVer + 1; pRange->range.maxVer = INT64_MAX; - mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, - pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer); + mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId, + pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer); } } -static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, - bool isFillhistory, bool useTriggerParam) { +static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam) { uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, - isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0, - *pTaskList, pStream->conf.fillHistory); + SStreamTask* pTask = + tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0, + *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5); if (pTask == NULL) { return NULL; } @@ -335,7 +334,7 @@ static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, return pTask; } -static void addNewTaskList(SStreamObj* pStream){ +static void addNewTaskList(SStreamObj* pStream) { SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); taosArrayPush(pStream->tasks, &pTaskList); if (pStream->conf.fillHistory) { @@ -364,11 +363,11 @@ static void setHTasksId(SStreamObj* pStream) { } } -static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, - int64_t skey, SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam ){ +static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey, + SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) { // new stream task SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam); - if(pTask == NULL){ + if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -377,15 +376,15 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId); int32_t code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); - if(code != 0){ + if (code != 0) { terrno = code; return terrno; } return TDB_CODE_SUCCESS; } -static SSubplan* getScanSubPlan(const SQueryPlan* pPlan){ - int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); +static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) { + int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1); if (LIST_LENGTH(inner->pNodeList) != 1) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -400,7 +399,7 @@ static SSubplan* getScanSubPlan(const SQueryPlan* pPlan){ return plan; } -static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){ +static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) { SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index); if (LIST_LENGTH(inner->pNodeList) != 1) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -415,8 +414,8 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){ return plan; } -static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, - SEpSet* pEpset, int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) { +static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, + int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) { addNewTaskList(pStream); void* pIter = NULL; @@ -433,15 +432,16 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream continue; } - int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam); - if(code != 0){ + int code = + doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam); + if (code != 0) { sdbRelease(pSdb, pVgroup); return code; } if (pStream->conf.fillHistory) { code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam); - if(code != 0){ + if (code != 0) { sdbRelease(pSdb, pVgroup); return code; } @@ -461,9 +461,9 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); - SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, - useTriggerParam ? pStream->conf.triggerParam : 0, - *pTaskList, pStream->conf.fillHistory); + SStreamTask* pAggTask = + tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0, + *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5); if (pAggTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -472,8 +472,8 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil return pAggTask; } -static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, - SVgObj* pVgroup, SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam){ +static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup, + SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) { int32_t code = 0; SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam); if (pTask == NULL) { @@ -490,7 +490,7 @@ static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, return code; } -static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam){ +static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) { SVgObj* pVgroup = NULL; SSnodeObj* pSnode = NULL; int32_t code = 0; @@ -504,20 +504,20 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S } code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam); - if(code != 0){ + if (code != 0) { goto END; } if (pStream->conf.fillHistory) { code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam); - if(code != 0){ + if (code != 0) { goto END; } setHTasksId(pStream); } - END: +END: if (pSnode != NULL) { sdbRelease(pMnode->pSdb, pSnode); } else { @@ -526,7 +526,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S return code; } -static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){ +static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) { int32_t code = 0; addNewTaskList(pStream); @@ -548,9 +548,9 @@ static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){ return TDB_CODE_SUCCESS; } -static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task){ +static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) { mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task); - for(int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) { + for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k); streamTaskSetUpstreamInfo(pSinkTask, task); } @@ -558,10 +558,10 @@ static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSin } static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) { - SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); + SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); SArray** pAggTaskList = taosArrayGetLast(tasks); - for(int i = 0; i < taosArrayGetSize(*pAggTaskList); i++){ + for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) { SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i); bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask); mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr); @@ -572,7 +572,7 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL); - for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){ + for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) { SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i); mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr); @@ -591,8 +591,8 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { SArray* pUpTaskList = taosArrayGetP(tasks, size - 2); SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList); - end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList): end; - for(int i = begin; i < end; i++){ + end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end; + for (int i = begin; i < end; i++) { SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i); pUpTask->info.selfChildId = i - begin; streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask); @@ -616,8 +616,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* bool multiTarget = (pDbObj->cfg.numOfVgroups > 1); sdbRelease(pSdb, pDbObj); - mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", - numOfPlanLevel, externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan); + mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel, + externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan); pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES); pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES); @@ -632,7 +632,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pStream->totalLevel = numOfPlanLevel + hasExtraSink; - SSubplan* plan = getScanSubPlan(pPlan); // source plan + SSubplan* plan = getScanSubPlan(pPlan); // source plan if (plan == NULL) { return terrno; } @@ -649,32 +649,32 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* return TDB_CODE_SUCCESS; } - if(numOfPlanLevel == 3){ + if (numOfPlanLevel == 3) { plan = getAggSubPlan(pPlan, 1); // middle agg plan if (plan == NULL) { return terrno; } - do{ + do { SArray** list = taosArrayGetLast(pStream->tasks); - float size = (float)taosArrayGetSize(*list); - size_t cnt = (size_t)ceil(size/tsStreamAggCnt); - if(cnt <= 1) break; + float size = (float)taosArrayGetSize(*list); + size_t cnt = (size_t)ceil(size / tsStreamAggCnt); + if (cnt <= 1) break; mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt); addNewTaskList(pStream); - for(int j = 0; j < cnt; j++){ + for (int j = 0; j < cnt; j++) { code = addAggTask(pStream, pMnode, plan, pEpset, false); if (code != TSDB_CODE_SUCCESS) { return code; } - bindTwoLevel(pStream->tasks, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt); + bindTwoLevel(pStream->tasks, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt); if (pStream->conf.fillHistory) { - bindTwoLevel(pStream->pHTasksList, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt); + bindTwoLevel(pStream->pHTasksList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt); } } - }while(1); + } while (1); } plan = getAggSubPlan(pPlan, 0); @@ -684,7 +684,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* mDebug("doScheduleStream add final agg"); SArray** list = taosArrayGetLast(pStream->tasks); - size_t size = taosArrayGetSize(*list); + size_t size = taosArrayGetSize(*list); addNewTaskList(pStream); code = addAggTask(pStream, pMnode, plan, pEpset, true); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 1e92b1a181..05189d5a53 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -567,6 +567,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.conf.triggerParam = pCreate->maxDelay; streamObj.ast = taosStrdup(smaObj.ast); streamObj.indexForMultiAggBalance = -1; + streamObj.subTableWithoutMd5 = 1; // check the maxDelay if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) { @@ -898,11 +899,11 @@ _OVER: } int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { - SSdb *pSdb = pMnode->pSdb; - SSmaObj *pSma = NULL; - void *pIter = NULL; - SVgObj *pVgroup = NULL; - int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + SSmaObj *pSma = NULL; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = -1; while (1) { pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 190b4f28ce..e4767d72d7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -27,7 +27,7 @@ #include "tmisce.h" #include "tname.h" -#define MND_STREAM_MAX_NUM 60 +#define MND_STREAM_MAX_NUM 60 typedef struct SMStreamNodeCheckMsg { int8_t placeHolder; // // to fix windows compile error, define place holder @@ -192,7 +192,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { STREAM_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { - char* p = (pStream == NULL) ? "null" : pStream->name; + char *p = (pStream == NULL) ? "null" : pStream->name; mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; @@ -802,8 +802,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { } static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, - int64_t streamId, int32_t taskId, int32_t transId, - int8_t mndTrigger) { + int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) { SStreamCheckpointSourceReq req = {0}; req.checkpointId = checkpointId; req.nodeId = nodeId; @@ -878,11 +877,10 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre int32_t code = -1; int64_t ts = taosGetTimestampMs(); if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { -// mWarn("checkpoint interval less than the threshold, ignore it"); + // mWarn("checkpoint interval less than the threshold, ignore it"); return -1; } - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); if (conflict) { mndAddtoCheckpointWaitingList(pStream, checkpointId); @@ -1144,7 +1142,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream"); + STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream"); if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1570,7 +1568,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } - STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream"); + STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream"); if (pTrans == NULL) { mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1590,7 +1588,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { // pause stream taosWLockLatch(&pStream->lock); pStream->status = STREAM_STATUS__PAUSE; - if (mndPersistTransLog(pStream, pTrans,SDB_STATUS_READY) < 0) { + if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) { taosWUnLockLatch(&pStream->lock); sdbRelease(pMnode->pSdb, pStream); @@ -1617,7 +1615,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; - if(grantCheckExpire(TSDB_GRANT_STREAMS) < 0){ + if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) { terrno = TSDB_CODE_GRANT_EXPIRED; return -1; } @@ -1659,7 +1657,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } - STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream"); + STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream"); if (pTrans == NULL) { mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -2106,10 +2104,10 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { +static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { int32_t num = taosArrayGetSize(pList); - for(int32_t i = 0; i < num; ++i) { - int32_t* pId = taosArrayGet(pList, i); + for (int32_t i = 0; i < num; ++i) { + int32_t *pId = taosArrayGet(pList, i); if (taskId == *pId) { return; } @@ -2122,7 +2120,7 @@ static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numO } int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode *pMnode = pReq->info.node; SStreamTaskCheckpointReq req = {0}; SDecoder decoder = {0}; @@ -2143,7 +2141,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); if (pStream == NULL) { - mError("failed to find the stream:0x%"PRIx64" not handle the checkpoint req", req.streamId); + mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; taosThreadMutexUnlock(&execInfo.lock); @@ -2151,13 +2149,13 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { } int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); - SArray **pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(int32_t)); doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks); taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *)); - pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); + pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); } else { doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks); } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index b438b2dc0a..b56bf3e0fe 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -33,15 +33,18 @@ static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSData int64_t suid); static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks); static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); -static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id); +static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, + const char* id); static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo, const char* dstTableName, int64_t* uid); -static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id); +static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, + const char* id); static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); -static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags); +static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, + int32_t numOfTags); static SArray* createDefaultTagColName(); -static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, - int64_t gid, bool newSubTableRule); +static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, + int64_t gid, bool newSubTableRule); int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr, bool newSubTableRule) { @@ -68,10 +71,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p if (varTbName != NULL && varTbName != (void*)-1) { name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); - if(newSubTableRule && - !isAutoTableName(name) && - !alreadyAddGroupId(name) && - groupId != 0) { + if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) { buildCtbNameAddGruopId(name, groupId); } } else if (stbFullName) { @@ -134,7 +134,7 @@ end: return ret; } -static bool tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) { +static bool tqGetTableInfo(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) { void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t)); if (pVal) { *pInfo = *(STableSinkInfo**)pVal; @@ -149,7 +149,7 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { int32_t tlen = 0; encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); - SRpcMsg msg = { .msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen }; + SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen}; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { tqError("failed to put into write-queue since %s", terrstr()); } @@ -181,14 +181,12 @@ SArray* createDefaultTagColName() { void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { - if(newSubTableRule && - !isAutoTableName(pDataBlock->info.parTbName) && - !alreadyAddGroupId(pDataBlock->info.parTbName) && - gid != 0) { + if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) && + !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); buildCtbNameAddGruopId(pCreateTableReq->name, gid); - }else{ + } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); } } else { @@ -196,17 +194,18 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa } } -static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, - int64_t suid) { +static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, + SStreamTask* pTask, int64_t suid) { tqDebug("s-task:%s build create table msg", pTask->id.idStr); STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t rows = pDataBlock->info.rows; - SArray* tagArray = taosArrayInit(4, sizeof(STagVal));; - int32_t code = 0; + SArray* tagArray = taosArrayInit(4, sizeof(STagVal)); + ; + int32_t code = 0; SVCreateTbBatchReq reqs = {0}; - SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); + SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); if (NULL == reqs.pArray) { tqError("s-task:%s failed to init create table msg, code:%s", pTask->id.idStr, tstrerror(terrno)); goto _end; @@ -262,7 +261,8 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S ASSERT(gid == *(int64_t*)pGpIdData); } - setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER); + setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, + pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); taosArrayPush(reqs.pArray, pCreateTbReq); tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); @@ -274,7 +274,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S tqError("s-task:%s failed to send create table msg", pTask->id.idStr); } - _end: +_end: taosArrayDestroy(tagArray); taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); return code; @@ -361,7 +361,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c pExisted->aRowP = pFinal; tqTrace("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id, - (int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (pNew->pCreateTbReq != NULL)); + (int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), + (pNew->pCreateTbReq != NULL)); tdDestroySVCreateTbReq(pNew->pCreateTbReq); taosMemoryFree(pNew->pCreateTbReq); @@ -373,7 +374,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, - pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER); + pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -416,8 +417,8 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam } if (pReader->me.ctbEntry.suid != suid) { - tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, - vgId, ctbName, suid, pReader->me.ctbEntry.suid); + tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId, + ctbName, suid, pReader->me.ctbEntry.suid); terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE; return false; } @@ -437,10 +438,10 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in taosArrayClear(pTagArray); initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1); - STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; + STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; taosArrayPush(pTagArray, &tagVal); - tTagNew(pTagArray, 1, false, (STag**) &pCreateTbReq->ctb.pTag); + tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag); if (pCreateTbReq->ctb.pTag == NULL) { tdDestroySVCreateTbReq(pCreateTbReq); @@ -513,13 +514,13 @@ int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, i } int32_t tsAscendingSortFn(const void* p1, const void* p2) { - SRow* pRow1 = *(SRow**) p1; - SRow* pRow2 = *(SRow**) p2; + SRow* pRow1 = *(SRow**)p1; + SRow* pRow2 = *(SRow**)p2; if (pRow1->ts == pRow2->ts) { return 0; } else { - return pRow1->ts > pRow2->ts? 1:-1; + return pRow1->ts > pRow2->ts ? 1 : -1; } } @@ -563,7 +564,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat void* colData = colDataGetData(pColData, j); if (IS_STR_DATA_TYPE(pCol->type)) { // address copy, no value - SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*) varDataVal(colData)}; + SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)}; SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } else { @@ -666,11 +667,9 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (dstTableName[0] == 0) { memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); - }else{ - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && - !isAutoTableName(dstTableName) && - !alreadyAddGroupId(dstTableName) && - groupId != 0) { + } else { + if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 && + !isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) { buildCtbNameAddGruopId(dstTableName, groupId); } } @@ -693,7 +692,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); } else { - tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid); + tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid); } } else { // The auto-create option will always set to be open for those submit messages, which arrive during the period @@ -714,7 +713,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; pTableData->pCreateTbReq = - buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER); + buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, + pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); taosArrayDestroy(pTagArray); if (pTableData->pCreateTbReq == NULL) { @@ -746,12 +746,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat return TDB_CODE_SUCCESS; } -int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, - SSubmitTbData* pTableData, const char* id) { +int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, + SSubmitTbData* pTableData, const char* id) { int32_t numOfRows = pDataBlock->info.rows; - tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, - id, blockIndex + 1, numOfRows, suid); + tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id, + blockIndex + 1, numOfRows, suid); char* dstTableName = pDataBlock->info.parTbName; // convert all rows @@ -767,14 +767,14 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_ } bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) { - for(int32_t i = 0; i < numOfBlocks; ++i) { + for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* p = taosArrayGet(pBlocks, i); if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { return false; } } - return true; + return true; } void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { @@ -793,7 +793,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, numOfBlocks); - for(int32_t i = 0; i < numOfBlocks; ++i) { + for (int32_t i = 0; i < numOfBlocks; ++i) { if (streamTaskShouldStop(pTask)) { return; } @@ -832,7 +832,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { } } else { tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks); - SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + SHashObj* pTableIndexMap = + taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; if (submitReq.aSubmitTbData == NULL) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 98d9a29c87..626f7fc161 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -569,6 +569,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && + pTask->subtableWithoutMd5 != 1 && !isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName) && groupId != 0){ diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9f08a55b21..db3d2729af 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -80,7 +80,7 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { } SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, - SArray* pTaskList, bool hasFillhistory) { + SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -96,6 +96,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, pTask->info.taskLevel = taskLevel; pTask->info.fillHistory = fillHistory; pTask->info.triggerParam = triggerParam; + pTask->subtableWithoutMd5 = subtableWithoutMd5; pTask->status.pSM = streamCreateStateMachine(pTask); if (pTask->status.pSM == NULL) { @@ -205,6 +206,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; } if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1; if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1; tEndEncode(pEncoder); @@ -287,6 +289,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; } if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; tEndDecode(pDecoder); @@ -482,7 +485,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->execInfo.created = taosGetTimestampMs(); SCheckpointInfo* pChkInfo = &pTask->chkInfo; - SDataRange* pRange = &pTask->dataRange; + SDataRange* pRange = &pTask->dataRange; // only set the version info for stream tasks without fill-history task if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -756,8 +759,8 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) { } int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) { - SStreamMeta* pMeta = pTask->pMeta; - STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; + SStreamMeta* pMeta = pTask->pMeta; + STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; if (pTask->info.fillHistory == 0) { return 0; } @@ -864,7 +867,7 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { void streamTaskResume(SStreamTask* pTask) { SStreamTaskState prevState = *streamTaskGetStatus(pTask); - SStreamMeta* pMeta = pTask->pMeta; + SStreamMeta* pMeta = pTask->pMeta; if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) { streamTaskRestoreStatus(pTask); @@ -881,9 +884,7 @@ void streamTaskResume(SStreamTask* pTask) { } } -bool streamTaskIsSinkTask(const SStreamTask* pTask) { - return pTask->info.taskLevel == TASK_LEVEL__SINK; -} +bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; } int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { int32_t code;