diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx
index df651eab96..a852e71ff4 100644
--- a/docs/zh/07-develop/07-tmq.mdx
+++ b/docs/zh/07-develop/07-tmq.mdx
@@ -91,6 +91,7 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer):
+
```c
@@ -101,17 +102,17 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param));
typedef enum tmq_conf_res_t {
- TMQ_CONF_UNKNOWN = -2,
- TMQ_CONF_INVALID = -1,
- TMQ_CONF_OK = 0,
- } tmq_conf_res_t;
+ TMQ_CONF_UNKNOWN = -2,
+ TMQ_CONF_INVALID = -1,
+ TMQ_CONF_OK = 0,
+ } tmq_conf_res_t;
typedef struct tmq_topic_assignment {
- int32_t vgId;
- int64_t currentOffset;
- int64_t begin;
- int64_t end;
- } tmq_topic_assignment;
+ int32_t vgId;
+ int64_t currentOffset;
+ int64_t begin;
+ int64_t end;
+ } tmq_topic_assignment;
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
@@ -146,7 +147,6 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
DLL_EXPORT const char *tmq_err2str(int32_t code);
```
-
@@ -250,281 +250,280 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
+ futures::Stream<
Item = Result<(Self::Offset, MessageSet), Self::Error>,
>,
- >,
- >;
- async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
+ >,
+ >;
+ async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
- async fn unsubscribe(self);
- ```
+ async fn unsubscribe(self);
+ ```
- 可在 上查看详细 API 说明。
+ 可在 上查看详细 API 说明。
-
+
-
+
- ```js
- function TMQConsumer(config)
+ ```js
+ function TMQConsumer(config)
+
+ function subscribe(topic)
+
+ function consume(timeout)
+
+ function subscription()
+
+ function unsubscribe()
+
+ function commit(msg)
+
+ function close()
+ ```
- function subscribe(topic)
+
- function consume(timeout)
+
- function subscription()
+ ```csharp
+ class ConsumerBuilder
+
+ ConsumerBuilder(IEnumerable> config)
+
+ public IConsumer Build()
+
+ void Subscribe(IEnumerable topics)
+
+ void Subscribe(string topic)
+
+ ConsumeResult Consume(int millisecondsTimeout)
+
+ List Subscription()
+
+ void Unsubscribe()
+
+ List Commit()
+
+ void Close()
+ ```
+
+
- function unsubscribe()
+# 数据订阅示例
+## 写入数据
- function commit(msg)
+首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
- function close()
- ```
-
-
-
-
-
- ```csharp
- class ConsumerBuilder
-
- ConsumerBuilder(IEnumerable> config)
-
- public IConsumer Build()
-
- void Subscribe(IEnumerable topics)
-
- void Subscribe(string topic)
-
- ConsumeResult Consume(int millisecondsTimeout)
-
- List Subscription()
-
- void Unsubscribe()
-
- List Commit()
-
- void Close()
- ```
-
-
-
-
- # 数据订阅示例
- ## 写入数据
-
- 首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
-
- ```sql
- DROP DATABASE IF EXISTS tmqdb;
- CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600;
- CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
- CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
- CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
- INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
+```sql
+DROP DATABASE IF EXISTS tmqdb;
+CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600;
+CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
+CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
+CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
+INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
```
## 创建 topic
- 使用 SQL 创建一个 topic:
+使用 SQL 创建一个 topic:
- ```sql
- CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
- ```
+```sql
+CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
+```
- ## 创建消费者 *consumer*
+## 创建消费者 *consumer*
- 对于不同编程语言,其设置方式如下:
+对于不同编程语言,其设置方式如下:
-
-
+
- ```c
- /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
- 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
- tmq_conf_t* conf = tmq_conf_new();
- tmq_conf_set(conf, "enable.auto.commit", "true");
- tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
- tmq_conf_set(conf, "group.id", "cgrpName");
- tmq_conf_set(conf, "td.connect.user", "root");
- tmq_conf_set(conf, "td.connect.pass", "taosdata");
- tmq_conf_set(conf, "auto.offset.reset", "latest");
- tmq_conf_set(conf, "msg.with.table.name", "true");
- tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
+
+ ```c
+ /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
+ 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
+ tmq_conf_t* conf = tmq_conf_new();
+ tmq_conf_set(conf, "enable.auto.commit", "true");
+ tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
+ tmq_conf_set(conf, "group.id", "cgrpName");
+ tmq_conf_set(conf, "td.connect.user", "root");
+ tmq_conf_set(conf, "td.connect.pass", "taosdata");
+ tmq_conf_set(conf, "auto.offset.reset", "latest");
+ tmq_conf_set(conf, "msg.with.table.name", "true");
+ tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
- tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
- tmq_conf_destroy(conf);
- ```
-
-
-
-
- 对于 Java 程序,还可以使用如下配置项:
-
- | 参数名称 | 类型 | 参数说明 |
- | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
- | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
- | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` |
- | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
- | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | |
-
- 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。
-
- ```java
- Properties properties = new Properties();
- properties.setProperty("enable.auto.commit", "true");
- properties.setProperty("auto.commit.interval.ms", "1000");
- properties.setProperty("group.id", "cgrpName");
- properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
- properties.setProperty("td.connect.user", "root");
- properties.setProperty("td.connect.pass", "taosdata");
- properties.setProperty("auto.offset.reset", "latest");
- properties.setProperty("msg.with.table.name", "true");
- properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
-
- TaosConsumer consumer = new TaosConsumer<>(properties);
-
- /* value deserializer definition. */
- import com.taosdata.jdbc.tmq.ReferenceDeserializer;
-
- public class MetersDeserializer extends ReferenceDeserializer {
- }
- ```
-
-
-
-
-
- ```go
- conf := &tmq.ConfigMap{
- "group.id": "test",
- "auto.offset.reset": "latest",
- "td.connect.ip": "127.0.0.1",
- "td.connect.user": "root",
- "td.connect.pass": "taosdata",
- "td.connect.port": "6030",
- "client.id": "test_tmq_c",
- "enable.auto.commit": "false",
- "msg.with.table.name": "true",
- }
- consumer, err := NewConsumer(conf)
- ```
-
-
-
-
-
- ```rust
- let mut dsn: Dsn = "taos://".parse()?;
- dsn.set("group.id", "group1");
- dsn.set("client.id", "test");
- dsn.set("auto.offset.reset", "latest");
-
- let tmq = TmqBuilder::from_dsn(dsn)?;
-
- let mut consumer = tmq.build()?;
- ```
-
-
-
-
-
- Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例:
-
- ```python
- from taos.tmq import Consumer
-
- # Syntax: `consumer = Consumer(configs)`
- #
- # Example:
- consumer = Consumer(
- {
- "group.id": "local",
- "client.id": "1",
- "enable.auto.commit": "true",
- "auto.commit.interval.ms": "1000",
- "td.connect.ip": "127.0.0.1",
- "td.connect.user": "root",
- "td.connect.pass": "taosdata",
- "auto.offset.reset": "latest",
- "msg.with.table.name": "true",
- }
- )
- ```
-
-
-
-
-
- ```js
- // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
- // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
-
- let consumer = taos.consumer({
-'enable.auto.commit': 'true',
- 'auto.commit.interval.ms','1000',
- 'group.id': 'tg2',
- 'td.connect.user': 'root',
- 'td.connect.pass': 'taosdata',
- 'auto.offset.reset','latest',
- 'msg.with.table.name': 'true',
- 'td.connect.ip','127.0.0.1',
- 'td.connect.port','6030'
- });
+ tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
+ tmq_conf_destroy(conf);
```
+
-
+
-
-
- ```csharp
- var cfg = new Dictionary()
- {
- { "group.id", "group1" },
- { "auto.offset.reset", "latest" },
- { "td.connect.ip", "127.0.0.1" },
- { "td.connect.user", "root" },
- { "td.connect.pass", "taosdata" },
- { "td.connect.port", "6030" },
- { "client.id", "tmq_example" },
- { "enable.auto.commit", "true" },
- { "msg.with.table.name", "false" },
- };
- var consumer = new ConsumerBuilder>(cfg).Build();
+ 对于 Java 程序,还可以使用如下配置项:
+
+ | 参数名称 | 类型 | 参数说明 |
+ | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
+ | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
+ | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` |
+ | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
+ | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | |
+
+ 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。
+
+ ```java
+ Properties properties = new Properties();
+ properties.setProperty("enable.auto.commit", "true");
+ properties.setProperty("auto.commit.interval.ms", "1000");
+ properties.setProperty("group.id", "cgrpName");
+ properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
+ properties.setProperty("td.connect.user", "root");
+ properties.setProperty("td.connect.pass", "taosdata");
+ properties.setProperty("auto.offset.reset", "latest");
+ properties.setProperty("msg.with.table.name", "true");
+ properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
+
+ TaosConsumer consumer = new TaosConsumer<>(properties);
+
+ /* value deserializer definition. */
+ import com.taosdata.jdbc.tmq.ReferenceDeserializer;
+
+ public class MetersDeserializer extends ReferenceDeserializer {
+ }
```
+
-
+
+
+ ```go
+ conf := &tmq.ConfigMap{
+ "group.id": "test",
+ "auto.offset.reset": "latest",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "td.connect.port": "6030",
+ "client.id": "test_tmq_c",
+ "enable.auto.commit": "false",
+ "msg.with.table.name": "true",
+ }
+ consumer, err := NewConsumer(conf)
+ ```
+
+
-
+
+
+ ```rust
+ let mut dsn: Dsn = "taos://".parse()?;
+ dsn.set("group.id", "group1");
+ dsn.set("client.id", "test");
+ dsn.set("auto.offset.reset", "latest");
+
+ let tmq = TmqBuilder::from_dsn(dsn)?;
+
+ let mut consumer = tmq.build()?;
+ ```
+
+
- 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
+
+
+ Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例:
+
+ ```python
+ from taos.tmq import Consumer
+
+ # Syntax: `consumer = Consumer(configs)`
+ #
+ # Example:
+ consumer = Consumer(
+ {
+ "group.id": "local",
+ "client.id": "1",
+ "enable.auto.commit": "true",
+ "auto.commit.interval.ms": "1000",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "auto.offset.reset": "latest",
+ "msg.with.table.name": "true",
+ }
+ )
+ ```
+
+
- ## 订阅 *topics*
+
+
+ ```js
+ // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
+ // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
+
+ let consumer = taos.consumer({
+ 'enable.auto.commit': 'true',
+ 'auto.commit.interval.ms','1000',
+ 'group.id': 'tg2',
+ 'td.connect.user': 'root',
+ 'td.connect.pass': 'taosdata',
+ 'auto.offset.reset','latest',
+ 'msg.with.table.name': 'true',
+ 'td.connect.ip','127.0.0.1',
+ 'td.connect.port','6030'
+ });
+ ```
+
+
- 一个 consumer 支持同时订阅多个 topic。
+
+
+ ```csharp
+ var cfg = new Dictionary()
+ {
+ { "group.id", "group1" },
+ { "auto.offset.reset", "latest" },
+ { "td.connect.ip", "127.0.0.1" },
+ { "td.connect.user", "root" },
+ { "td.connect.pass", "taosdata" },
+ { "td.connect.port", "6030" },
+ { "client.id", "tmq_example" },
+ { "enable.auto.commit", "true" },
+ { "msg.with.table.name", "false" },
+ };
+ var consumer = new ConsumerBuilder>(cfg).Build();
+ ```
+
+
+
+
+
+上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
+
+## 订阅 *topics*
-
-
+一个 consumer 支持同时订阅多个 topic。
+
+
-```c
+
+
+ ```c
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
-
+
```
-
-
-
-
+
+
+
+
```java
List topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
```
-
-
-
+
+
+
```go
err = consumer.Subscribe("example_tmq_topic", nil)
@@ -533,26 +532,26 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
}
```
-
-
+
+
-```rust
+ ```rust
consumer.subscribe(["tmq_meters"]).await?;
```
-
+
-
+
-```python
+ ```python
consumer.subscribe(['topic1', 'topic2'])
```
-
+
-
+
-```js
+ ```js
// 创建订阅 topics 列表
let topics = ['topic_test']
@@ -560,11 +559,11 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
consumer.subscribe(topics);
```
-
+
-
+
-```csharp
+ ```csharp
// 创建订阅 topics 列表
List topics = new List();
topics.add("tmq_topic");
@@ -572,18 +571,19 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
consumer.Subscribe(topics);
```
-
+
-
+
- ## 消费
+## 消费
- 以下代码展示了不同语言下如何对 TMQ 消息进行消费。
+以下代码展示了不同语言下如何对 TMQ 消息进行消费。
-
-
+
-```c
+
+
+ ```c
// 消费数据
while (running) {
TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
@@ -591,11 +591,11 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
}
```
-这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
-
-
-
+ 这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
+
+
+
```java
while(running){
ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
@@ -604,10 +604,10 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
}
}
```
+
+
-
-
-
+
```go
for {
@@ -625,9 +625,9 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
}
```
-
+
-
+
```rust
{
@@ -660,8 +660,8 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
}
```
-
-
+
+
```python
while True:
@@ -677,9 +677,9 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
print(block.fetchall())
```
-
+
-
+
```js
while(true){
@@ -691,11 +691,11 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
}
```
-
+
-
+
-```csharp
+ ```csharp
// 消费数据
while (true)
{
@@ -708,18 +708,19 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
}
```
-
+
-
+
- ## 结束消费
+## 结束消费
- 消费结束后,应当取消订阅。
+消费结束后,应当取消订阅。
-
-
+
+
+
-```c
+ ```c
/* 取消订阅 */
tmq_unsubscribe(tmq);
@@ -727,10 +728,10 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
tmq_consumer_close(tmq);
```
-
-
+
+
-```java
+ ```java
/* 取消订阅 */
consumer.unsubscribe();
@@ -738,11 +739,11 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
consumer.close();
```
-
+
+
+
-
-
-```go
+ ```go
/* Unsubscribe */
_ = consumer.Unsubscribe()
@@ -750,38 +751,38 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
_ = consumer.Close()
```
-
+
+
+
-
-
-```rust
+ ```rust
consumer.unsubscribe().await;
```
-
+
-
+
-```py
+ ```py
# 取消订阅
consumer.unsubscribe()
# 关闭消费
consumer.close()
```
-
-
+
+
-```js
+ ```js
consumer.unsubscribe();
consumer.close();
```
-
+
-
+
-```csharp
+ ```csharp
// 取消订阅
consumer.Unsubscribe();
@@ -789,7 +790,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
consumer.Close();
```
-
+
@@ -799,41 +800,41 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
#订阅高级功能
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index e24d4a71b0..88205581fe 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -1400,7 +1400,7 @@ void tFreeSCompactDbReq(SCompactDbReq* pReq);
typedef struct {
int32_t compactId;
- int8_t bAccepted;
+ int8_t bAccepted;
} SCompactDbRsp;
int32_t tSerializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp);
@@ -1414,7 +1414,7 @@ typedef struct {
int32_t tSerializeSKillCompactReq(void* buf, int32_t bufLen, SKillCompactReq* pReq);
int32_t tDeserializeSKillCompactReq(void* buf, int32_t bufLen, SKillCompactReq* pReq);
-void tFreeSKillCompactReq(SKillCompactReq *pReq);
+void tFreeSKillCompactReq(SKillCompactReq* pReq);
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
@@ -1751,9 +1751,9 @@ int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq*
int32_t tDeserializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
typedef struct {
- int32_t compactId;
- int32_t vgId;
- int32_t dnodeId;
+ int32_t compactId;
+ int32_t vgId;
+ int32_t dnodeId;
} SVKillCompactReq;
int32_t tSerializeSVKillCompactReq(void* buf, int32_t bufLen, SVKillCompactReq* pReq);
@@ -1954,9 +1954,9 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
char tb[TSDB_TABLE_NAME_LEN];
char user[TSDB_USER_LEN];
- char filterTb[TSDB_TABLE_NAME_LEN]; // for ins_columns
+ char filterTb[TSDB_TABLE_NAME_LEN]; // for ins_columns
int64_t showId;
- int64_t compactId; // for compact
+ int64_t compactId; // for compact
} SRetrieveTableReq;
typedef struct SSysTableSchema {
@@ -3784,12 +3784,12 @@ typedef struct {
} SMqHbReq;
typedef struct {
- char topic[TSDB_TOPIC_FNAME_LEN];
- int8_t noPrivilege;
+ char topic[TSDB_TOPIC_FNAME_LEN];
+ int8_t noPrivilege;
} STopicPrivilege;
typedef struct {
- SArray* topicPrivileges; // SArray
+ SArray* topicPrivileges; // SArray
} SMqHbRsp;
typedef struct {
@@ -3927,8 +3927,8 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
#define SUBMIT_REQ_FROM_FILE 0x4
-#define SOURCE_NULL 0
-#define SOURCE_TAOSX 1
+#define SOURCE_NULL 0
+#define SOURCE_TAOSX 1
typedef struct {
int32_t flags;
@@ -3940,8 +3940,8 @@ typedef struct {
SArray* aRowP;
SArray* aCol;
};
- int64_t ctimeMs;
- int8_t source;
+ int64_t ctimeMs;
+ int8_t source;
} SSubmitTbData;
typedef struct {
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index d6d89d1d30..c9b48c9979 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -50,21 +50,21 @@ extern "C" {
(_t)->hTaskInfo.id.streamId = 0; \
} while (0)
-#define STREAM_EXEC_T_EXTRACT_WAL_DATA (-1)
-#define STREAM_EXEC_T_START_ALL_TASKS (-2)
-#define STREAM_EXEC_T_START_ONE_TASK (-3)
-#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
-#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
-#define STREAM_EXEC_T_RESUME_TASK (-6)
-#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7)
+#define STREAM_EXEC_T_EXTRACT_WAL_DATA (-1)
+#define STREAM_EXEC_T_START_ALL_TASKS (-2)
+#define STREAM_EXEC_T_START_ONE_TASK (-3)
+#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
+#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
+#define STREAM_EXEC_T_RESUME_TASK (-6)
+#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7)
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM;
-#define SSTREAM_TASK_VER 3
-#define SSTREAM_TASK_INCOMPATIBLE_VER 1
-#define SSTREAM_TASK_NEED_CONVERT_VER 2
+#define SSTREAM_TASK_VER 3
+#define SSTREAM_TASK_INCOMPATIBLE_VER 1
+#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
enum {
@@ -405,8 +405,8 @@ typedef struct SHistoryTaskInfo {
int32_t tickCount;
int32_t retryTimes;
int32_t waitInterval;
- int64_t haltVer; // offset in wal when halt the stream task
- bool operatorOpen; // false by default
+ int64_t haltVer; // offset in wal when halt the stream task
+ bool operatorOpen; // false by default
} SHistoryTaskInfo;
typedef struct STaskOutputInfo {
@@ -463,21 +463,22 @@ struct SStreamTask {
struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
void* pBackend;
- char reserve[256];
+ int8_t subtableWithoutMd5;
+ char reserve[255];
};
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
typedef struct STaskStartInfo {
- int64_t startTs;
- int64_t readyTs;
- int32_t tasksWillRestart;
- int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
- SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
- SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
- int64_t elapsedTime;
- int32_t restartCount; // restart task counter
- startComplete_fn_t completeFn; // complete callback function
+ int64_t startTs;
+ int64_t readyTs;
+ int32_t tasksWillRestart;
+ int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
+ SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
+ SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
+ int64_t elapsedTime;
+ int32_t restartCount; // restart task counter
+ startComplete_fn_t completeFn; // complete callback function
} STaskStartInfo;
typedef struct STaskUpdateInfo {
@@ -504,7 +505,7 @@ typedef struct SStreamMeta {
int32_t vgId;
int64_t stage;
int32_t role;
- bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower.
+ bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower.
STaskStartInfo startInfo;
TdThreadRwlock lock;
SScanWalInfo scanInfo;
@@ -532,7 +533,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo)
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
- SArray* pTaskList, bool hasFillhistory);
+ SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);
@@ -678,18 +679,18 @@ typedef struct STaskStatusEntry {
int32_t statusLastDuration; // to record the last duration of current status
int64_t stage;
int32_t nodeId;
- int64_t verStart; // start version in WAL, only valid for source task
- int64_t verEnd; // end version in WAL, only valid for source task
- int64_t processedVer; // only valid for source task
- int64_t checkpointId; // current active checkpoint id
- int32_t chkpointTransId; // checkpoint trans id
- int8_t checkpointFailed; // denote if the checkpoint is failed or not
- bool inputQChanging; // inputQ is changing or not
+ int64_t verStart; // start version in WAL, only valid for source task
+ int64_t verEnd; // end version in WAL, only valid for source task
+ int64_t processedVer; // only valid for source task
+ int64_t checkpointId; // current active checkpoint id
+ int32_t chkpointTransId; // checkpoint trans id
+ int8_t checkpointFailed; // denote if the checkpoint is failed or not
+ bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter;
- double inputQUsed; // in MiB
+ double inputQUsed; // in MiB
double inputRate;
- double sinkQuota; // existed quota size for sink task
- double sinkDataSize; // sink to dst data size
+ double sinkQuota; // existed quota size for sink task
+ double sinkDataSize; // sink to dst data size
} STaskStatusEntry;
typedef struct SStreamHbMsg {
@@ -720,8 +721,8 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
typedef struct SStreamTaskState {
- ETaskStatus state;
- char* name;
+ ETaskStatus state;
+ char* name;
} SStreamTaskState;
typedef struct {
@@ -839,7 +840,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
// stream task meta
void streamMetaInit();
void streamMetaCleanup();
-SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, startComplete_fn_t fn);
+SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
+ startComplete_fn_t fn);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
@@ -858,22 +860,22 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
- int64_t endTs, bool ready);
+ int64_t endTs, bool ready);
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
-void streamMetaRLock(SStreamMeta* pMeta);
-void streamMetaRUnLock(SStreamMeta* pMeta);
-void streamMetaWLock(SStreamMeta* pMeta);
-void streamMetaWUnLock(SStreamMeta* pMeta);
-void streamMetaResetStartInfo(STaskStartInfo* pMeta);
-SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
-void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
-int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
-int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
-int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
-int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
-bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
-tmr_h streamTimerGetInstance();
+void streamMetaRLock(SStreamMeta* pMeta);
+void streamMetaRUnLock(SStreamMeta* pMeta);
+void streamMetaWLock(SStreamMeta* pMeta);
+void streamMetaWUnLock(SStreamMeta* pMeta);
+void streamMetaResetStartInfo(STaskStartInfo* pMeta);
+SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
+void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
+int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
+int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
+int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
+int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
+bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
+tmr_h streamTimerGetInstance();
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
@@ -890,8 +892,8 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
-int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req);
-void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
+int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq* req);
+void sendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp);
#ifdef __cplusplus
}
#endif
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index 19de4561d6..fc58fabad5 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -699,6 +699,7 @@ typedef struct {
int64_t checkpointId;
int32_t indexForMultiAggBalance;
+ int8_t subTableWithoutMd5;
char reserve[256];
} SStreamObj;
@@ -776,8 +777,8 @@ typedef enum {
GRANT_STATE_REASON_MAX,
} EGrantStateReason;
-#define GRANT_STATE_NUM 30
-#define GRANT_ACTIVE_NUM 10
+#define GRANT_STATE_NUM 30
+#define GRANT_ACTIVE_NUM 10
#define GRANT_ACTIVE_HEAD_LEN 30
typedef struct {
@@ -812,7 +813,7 @@ typedef struct {
int64_t id : 24;
};
};
- char machine[TSDB_MACHINE_ID_LEN + 1];
+ char machine[TSDB_MACHINE_ID_LEN + 1];
} SGrantMachine;
typedef struct {
diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h
index 4d1125a340..1084340dc2 100644
--- a/source/dnode/mnode/impl/inc/mndStream.h
+++ b/source/dnode/mnode/impl/inc/mndStream.h
@@ -24,7 +24,7 @@ extern "C" {
#endif
#define MND_STREAM_RESERVE_SIZE 64
-#define MND_STREAM_VER_NUMBER 4
+#define MND_STREAM_VER_NUMBER 5
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c
index d59354286d..5be641d1c2 100644
--- a/source/dnode/mnode/impl/src/mndDef.c
+++ b/source/dnode/mnode/impl/src/mndDef.c
@@ -85,6 +85,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
// 3.0.50 ver = 3
if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1;
+ if (tEncodeI8(pEncoder, pObj->subTableWithoutMd5) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1;
@@ -168,6 +169,10 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (sver >= 3) {
if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1;
}
+
+ if (sver >= 5) {
+ if (tDecodeI8(pDecoder, &pObj->subTableWithoutMd5) < 0) return -1;
+ }
if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1;
tEndDecode(pDecoder);
diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c
index 88d326a5c4..a8eaf7c711 100644
--- a/source/dnode/mnode/impl/src/mndScheduler.c
+++ b/source/dnode/mnode/impl/src/mndScheduler.c
@@ -14,13 +14,13 @@
*/
#include "mndScheduler.h"
-#include "tmisce.h"
-#include "mndMnode.h"
#include "mndDb.h"
+#include "mndMnode.h"
#include "mndSnode.h"
#include "mndVgroup.h"
#include "parser.h"
#include "tcompare.h"
+#include "tmisce.h"
#include "tname.h"
#include "tuuid.h"
@@ -189,7 +189,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
return NULL;
}
- if(pStream->indexForMultiAggBalance == -1){
+ if (pStream->indexForMultiAggBalance == -1) {
taosSeedRand(taosSafeRand());
pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
}
@@ -204,7 +204,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
sdbRelease(pMnode->pSdb, pVgroup);
continue;
}
- if (index++ == pStream->indexForMultiAggBalance){
+ if (index++ == pStream->indexForMultiAggBalance) {
pStream->indexForMultiAggBalance++;
pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
sdbCancelFetch(pMnode->pSdb, pIter);
@@ -217,12 +217,12 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
return pVgroup;
}
-static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup,
- SEpSet* pEpset, bool isFillhistory) {
- int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
+static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) {
+ int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
- SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory);
+ SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList,
+ pStream->conf.fillHistory, pStream->subTableWithoutMd5);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
@@ -235,12 +235,12 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou
return mndSetSinkTaskInfo(pStream, pTask);
}
-static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj){
+static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false);
if (code != 0) {
return code;
}
- if(pStream->conf.fillHistory){
+ if (pStream->conf.fillHistory) {
code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true);
if (code != 0) {
return code;
@@ -267,7 +267,7 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet*
}
int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
- if(code != 0){
+ if (code != 0) {
sdbRelease(pSdb, pVgroup);
return code;
}
@@ -279,7 +279,7 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet*
}
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
- for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
+ for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SVgroupVer* pVer = taosArrayGet(pList, i);
if (pVer->vgId == vgId) {
return pVer->ver;
@@ -315,19 +315,18 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe
pRange->range.minVer = latestVer + 1;
pRange->range.maxVer = INT64_MAX;
- mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
- pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
+ mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
+ pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
}
}
-static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset,
- bool isFillhistory, bool useTriggerParam) {
+static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam) {
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
- SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset,
- isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
- *pTaskList, pStream->conf.fillHistory);
+ SStreamTask* pTask =
+ tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
+ *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5);
if (pTask == NULL) {
return NULL;
}
@@ -335,7 +334,7 @@ static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset,
return pTask;
}
-static void addNewTaskList(SStreamObj* pStream){
+static void addNewTaskList(SStreamObj* pStream) {
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &pTaskList);
if (pStream->conf.fillHistory) {
@@ -364,11 +363,11 @@ static void setHTasksId(SStreamObj* pStream) {
}
}
-static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
- int64_t skey, SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam ){
+static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
+ SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
// new stream task
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
- if(pTask == NULL){
+ if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
@@ -377,15 +376,15 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
int32_t code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
- if(code != 0){
+ if (code != 0) {
terrno = code;
return terrno;
}
return TDB_CODE_SUCCESS;
}
-static SSubplan* getScanSubPlan(const SQueryPlan* pPlan){
- int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
+static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
+ int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
if (LIST_LENGTH(inner->pNodeList) != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
@@ -400,7 +399,7 @@ static SSubplan* getScanSubPlan(const SQueryPlan* pPlan){
return plan;
}
-static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){
+static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
if (LIST_LENGTH(inner->pNodeList) != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
@@ -415,8 +414,8 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){
return plan;
}
-static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream,
- SEpSet* pEpset, int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
+static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
+ int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
addNewTaskList(pStream);
void* pIter = NULL;
@@ -433,15 +432,16 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
continue;
}
- int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
- if(code != 0){
+ int code =
+ doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
+ if (code != 0) {
sdbRelease(pSdb, pVgroup);
return code;
}
if (pStream->conf.fillHistory) {
code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
- if(code != 0){
+ if (code != 0) {
sdbRelease(pSdb, pVgroup);
return code;
}
@@ -461,9 +461,9 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
- SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory,
- useTriggerParam ? pStream->conf.triggerParam : 0,
- *pTaskList, pStream->conf.fillHistory);
+ SStreamTask* pAggTask =
+ tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
+ *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5);
if (pAggTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
@@ -472,8 +472,8 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
return pAggTask;
}
-static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset,
- SVgObj* pVgroup, SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam){
+static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
+ SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
int32_t code = 0;
SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam);
if (pTask == NULL) {
@@ -490,7 +490,7 @@ static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan,
return code;
}
-static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam){
+static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
SVgObj* pVgroup = NULL;
SSnodeObj* pSnode = NULL;
int32_t code = 0;
@@ -504,20 +504,20 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S
}
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
- if(code != 0){
+ if (code != 0) {
goto END;
}
if (pStream->conf.fillHistory) {
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
- if(code != 0){
+ if (code != 0) {
goto END;
}
setHTasksId(pStream);
}
- END:
+END:
if (pSnode != NULL) {
sdbRelease(pMnode->pSdb, pSnode);
} else {
@@ -526,7 +526,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S
return code;
}
-static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){
+static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
int32_t code = 0;
addNewTaskList(pStream);
@@ -548,9 +548,9 @@ static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){
return TDB_CODE_SUCCESS;
}
-static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task){
+static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task);
- for(int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
+ for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
streamTaskSetUpstreamInfo(pSinkTask, task);
}
@@ -558,10 +558,10 @@ static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSin
}
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
- SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
+ SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
SArray** pAggTaskList = taosArrayGetLast(tasks);
- for(int i = 0; i < taosArrayGetSize(*pAggTaskList); i++){
+ for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
@@ -572,7 +572,7 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
- for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){
+ for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
@@ -591,8 +591,8 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
- end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList): end;
- for(int i = begin; i < end; i++){
+ end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
+ for (int i = begin; i < end; i++) {
SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
pUpTask->info.selfChildId = i - begin;
streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
@@ -616,8 +616,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
sdbRelease(pSdb, pDbObj);
- mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s",
- numOfPlanLevel, externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
+ mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
+ externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
@@ -632,7 +632,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
pStream->totalLevel = numOfPlanLevel + hasExtraSink;
- SSubplan* plan = getScanSubPlan(pPlan); // source plan
+ SSubplan* plan = getScanSubPlan(pPlan); // source plan
if (plan == NULL) {
return terrno;
}
@@ -649,32 +649,32 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
return TDB_CODE_SUCCESS;
}
- if(numOfPlanLevel == 3){
+ if (numOfPlanLevel == 3) {
plan = getAggSubPlan(pPlan, 1); // middle agg plan
if (plan == NULL) {
return terrno;
}
- do{
+ do {
SArray** list = taosArrayGetLast(pStream->tasks);
- float size = (float)taosArrayGetSize(*list);
- size_t cnt = (size_t)ceil(size/tsStreamAggCnt);
- if(cnt <= 1) break;
+ float size = (float)taosArrayGetSize(*list);
+ size_t cnt = (size_t)ceil(size / tsStreamAggCnt);
+ if (cnt <= 1) break;
mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
addNewTaskList(pStream);
- for(int j = 0; j < cnt; j++){
+ for (int j = 0; j < cnt; j++) {
code = addAggTask(pStream, pMnode, plan, pEpset, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- bindTwoLevel(pStream->tasks, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt);
+ bindTwoLevel(pStream->tasks, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
if (pStream->conf.fillHistory) {
- bindTwoLevel(pStream->pHTasksList, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt);
+ bindTwoLevel(pStream->pHTasksList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
}
}
- }while(1);
+ } while (1);
}
plan = getAggSubPlan(pPlan, 0);
@@ -684,7 +684,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
mDebug("doScheduleStream add final agg");
SArray** list = taosArrayGetLast(pStream->tasks);
- size_t size = taosArrayGetSize(*list);
+ size_t size = taosArrayGetSize(*list);
addNewTaskList(pStream);
code = addAggTask(pStream, pMnode, plan, pEpset, true);
if (code != TSDB_CODE_SUCCESS) {
diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c
index 1e92b1a181..05189d5a53 100644
--- a/source/dnode/mnode/impl/src/mndSma.c
+++ b/source/dnode/mnode/impl/src/mndSma.c
@@ -567,6 +567,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.conf.triggerParam = pCreate->maxDelay;
streamObj.ast = taosStrdup(smaObj.ast);
streamObj.indexForMultiAggBalance = -1;
+ streamObj.subTableWithoutMd5 = 1;
// check the maxDelay
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
@@ -898,11 +899,11 @@ _OVER:
}
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
- SSdb *pSdb = pMnode->pSdb;
- SSmaObj *pSma = NULL;
- void *pIter = NULL;
- SVgObj *pVgroup = NULL;
- int32_t code = -1;
+ SSdb *pSdb = pMnode->pSdb;
+ SSmaObj *pSma = NULL;
+ void *pIter = NULL;
+ SVgObj *pVgroup = NULL;
+ int32_t code = -1;
while (1) {
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index 190b4f28ce..e4767d72d7 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -27,7 +27,7 @@
#include "tmisce.h"
#include "tname.h"
-#define MND_STREAM_MAX_NUM 60
+#define MND_STREAM_MAX_NUM 60
typedef struct SMStreamNodeCheckMsg {
int8_t placeHolder; // // to fix windows compile error, define place holder
@@ -192,7 +192,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
STREAM_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
- char* p = (pStream == NULL) ? "null" : pStream->name;
+ char *p = (pStream == NULL) ? "null" : pStream->name;
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
@@ -802,8 +802,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
}
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
- int64_t streamId, int32_t taskId, int32_t transId,
- int8_t mndTrigger) {
+ int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
SStreamCheckpointSourceReq req = {0};
req.checkpointId = checkpointId;
req.nodeId = nodeId;
@@ -878,11 +877,10 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
int32_t code = -1;
int64_t ts = taosGetTimestampMs();
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
-// mWarn("checkpoint interval less than the threshold, ignore it");
+ // mWarn("checkpoint interval less than the threshold, ignore it");
return -1;
}
-
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
if (conflict) {
mndAddtoCheckpointWaitingList(pStream, checkpointId);
@@ -1144,7 +1142,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
- STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream");
+ STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream");
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@@ -1570,7 +1568,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
- STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream");
+ STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream");
if (pTrans == NULL) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@@ -1590,7 +1588,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
// pause stream
taosWLockLatch(&pStream->lock);
pStream->status = STREAM_STATUS__PAUSE;
- if (mndPersistTransLog(pStream, pTrans,SDB_STATUS_READY) < 0) {
+ if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
taosWUnLockLatch(&pStream->lock);
sdbRelease(pMnode->pSdb, pStream);
@@ -1617,7 +1615,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
- if(grantCheckExpire(TSDB_GRANT_STREAMS) < 0){
+ if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) {
terrno = TSDB_CODE_GRANT_EXPIRED;
return -1;
}
@@ -1659,7 +1657,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1;
}
- STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream");
+ STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream");
if (pTrans == NULL) {
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@@ -2106,10 +2104,10 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
-static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
+static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
int32_t num = taosArrayGetSize(pList);
- for(int32_t i = 0; i < num; ++i) {
- int32_t* pId = taosArrayGet(pList, i);
+ for (int32_t i = 0; i < num; ++i) {
+ int32_t *pId = taosArrayGet(pList, i);
if (taskId == *pId) {
return;
}
@@ -2122,7 +2120,7 @@ static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numO
}
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
- SMnode *pMnode = pReq->info.node;
+ SMnode *pMnode = pReq->info.node;
SStreamTaskCheckpointReq req = {0};
SDecoder decoder = {0};
@@ -2143,7 +2141,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
if (pStream == NULL) {
- mError("failed to find the stream:0x%"PRIx64" not handle the checkpoint req", req.streamId);
+ mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock);
@@ -2151,13 +2149,13 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
}
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
- SArray **pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
+ SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(int32_t));
doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks);
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
- pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
+ pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
} else {
doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks);
}
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index b438b2dc0a..b56bf3e0fe 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -33,15 +33,18 @@ static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSData
int64_t suid);
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
-static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id);
+static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
+ const char* id);
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
const char* dstTableName, int64_t* uid);
-static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id);
+static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
+ const char* id);
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
-static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags);
+static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
+ int32_t numOfTags);
static SArray* createDefaultTagColName();
-static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
- int64_t gid, bool newSubTableRule);
+static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
+ int64_t gid, bool newSubTableRule);
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr, bool newSubTableRule) {
@@ -68,10 +71,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
if (varTbName != NULL && varTbName != (void*)-1) {
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
- if(newSubTableRule &&
- !isAutoTableName(name) &&
- !alreadyAddGroupId(name) &&
- groupId != 0) {
+ if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) {
buildCtbNameAddGruopId(name, groupId);
}
} else if (stbFullName) {
@@ -134,7 +134,7 @@ end:
return ret;
}
-static bool tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) {
+static bool tqGetTableInfo(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
if (pVal) {
*pInfo = *(STableSinkInfo**)pVal;
@@ -149,7 +149,7 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
int32_t tlen = 0;
encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
- SRpcMsg msg = { .msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen };
+ SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqError("failed to put into write-queue since %s", terrstr());
}
@@ -181,14 +181,12 @@ SArray* createDefaultTagColName() {
void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid, bool newSubTableRule) {
if (pDataBlock->info.parTbName[0]) {
- if(newSubTableRule &&
- !isAutoTableName(pDataBlock->info.parTbName) &&
- !alreadyAddGroupId(pDataBlock->info.parTbName) &&
- gid != 0) {
+ if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
+ !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) {
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
buildCtbNameAddGruopId(pCreateTableReq->name, gid);
- }else{
+ } else {
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
}
} else {
@@ -196,17 +194,18 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
}
}
-static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
- int64_t suid) {
+static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
+ SStreamTask* pTask, int64_t suid) {
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t rows = pDataBlock->info.rows;
- SArray* tagArray = taosArrayInit(4, sizeof(STagVal));;
- int32_t code = 0;
+ SArray* tagArray = taosArrayInit(4, sizeof(STagVal));
+ ;
+ int32_t code = 0;
SVCreateTbBatchReq reqs = {0};
- SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
+ SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
if (NULL == reqs.pArray) {
tqError("s-task:%s failed to init create table msg, code:%s", pTask->id.idStr, tstrerror(terrno));
goto _end;
@@ -262,7 +261,8 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
ASSERT(gid == *(int64_t*)pGpIdData);
}
- setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
+ setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid,
+ pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
taosArrayPush(reqs.pArray, pCreateTbReq);
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
@@ -274,7 +274,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
tqError("s-task:%s failed to send create table msg", pTask->id.idStr);
}
- _end:
+_end:
taosArrayDestroy(tagArray);
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
return code;
@@ -361,7 +361,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
pExisted->aRowP = pFinal;
tqTrace("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
- (int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (pNew->pCreateTbReq != NULL));
+ (int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL),
+ (pNew->pCreateTbReq != NULL));
tdDestroySVCreateTbReq(pNew->pCreateTbReq);
taosMemoryFree(pNew->pCreateTbReq);
@@ -373,7 +374,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr,
- pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
+ pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -416,8 +417,8 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam
}
if (pReader->me.ctbEntry.suid != suid) {
- tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64,
- vgId, ctbName, suid, pReader->me.ctbEntry.suid);
+ tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId,
+ ctbName, suid, pReader->me.ctbEntry.suid);
terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
return false;
}
@@ -437,10 +438,10 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in
taosArrayClear(pTagArray);
initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
- STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
+ STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
taosArrayPush(pTagArray, &tagVal);
- tTagNew(pTagArray, 1, false, (STag**) &pCreateTbReq->ctb.pTag);
+ tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
if (pCreateTbReq->ctb.pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq);
@@ -513,13 +514,13 @@ int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, i
}
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
- SRow* pRow1 = *(SRow**) p1;
- SRow* pRow2 = *(SRow**) p2;
+ SRow* pRow1 = *(SRow**)p1;
+ SRow* pRow2 = *(SRow**)p2;
if (pRow1->ts == pRow2->ts) {
return 0;
} else {
- return pRow1->ts > pRow2->ts? 1:-1;
+ return pRow1->ts > pRow2->ts ? 1 : -1;
}
}
@@ -563,7 +564,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
void* colData = colDataGetData(pColData, j);
if (IS_STR_DATA_TYPE(pCol->type)) {
// address copy, no value
- SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*) varDataVal(colData)};
+ SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
} else {
@@ -666,11 +667,9 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
if (dstTableName[0] == 0) {
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
- }else{
- if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
- !isAutoTableName(dstTableName) &&
- !alreadyAddGroupId(dstTableName) &&
- groupId != 0) {
+ } else {
+ if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 &&
+ !isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) {
buildCtbNameAddGruopId(dstTableName, groupId);
}
}
@@ -693,7 +692,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
} else {
- tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid);
+ tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
}
} else {
// The auto-create option will always set to be open for those submit messages, which arrive during the period
@@ -714,7 +713,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
pTableData->pCreateTbReq =
- buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
+ buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
+ pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
taosArrayDestroy(pTagArray);
if (pTableData->pCreateTbReq == NULL) {
@@ -746,12 +746,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
return TDB_CODE_SUCCESS;
}
-int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
- SSubmitTbData* pTableData, const char* id) {
+int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
+ SSubmitTbData* pTableData, const char* id) {
int32_t numOfRows = pDataBlock->info.rows;
- tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
- id, blockIndex + 1, numOfRows, suid);
+ tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
+ blockIndex + 1, numOfRows, suid);
char* dstTableName = pDataBlock->info.parTbName;
// convert all rows
@@ -767,14 +767,14 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_
}
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
- for(int32_t i = 0; i < numOfBlocks; ++i) {
+ for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* p = taosArrayGet(pBlocks, i);
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
return false;
}
}
- return true;
+ return true;
}
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
@@ -793,7 +793,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
numOfBlocks);
- for(int32_t i = 0; i < numOfBlocks; ++i) {
+ for (int32_t i = 0; i < numOfBlocks; ++i) {
if (streamTaskShouldStop(pTask)) {
return;
}
@@ -832,7 +832,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
}
} else {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
- SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
+ SHashObj* pTableIndexMap =
+ taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
if (submitReq.aSubmitTbData == NULL) {
diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c
index 98d9a29c87..626f7fc161 100644
--- a/source/libs/stream/src/streamDispatch.c
+++ b/source/libs/stream/src/streamDispatch.c
@@ -569,6 +569,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
if (pDataBlock->info.parTbName[0]) {
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
+ pTask->subtableWithoutMd5 != 1 &&
!isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) &&
groupId != 0){
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index 9f08a55b21..db3d2729af 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -80,7 +80,7 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
}
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
- SArray* pTaskList, bool hasFillhistory) {
+ SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@@ -96,6 +96,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
pTask->info.taskLevel = taskLevel;
pTask->info.fillHistory = fillHistory;
pTask->info.triggerParam = triggerParam;
+ pTask->subtableWithoutMd5 = subtableWithoutMd5;
pTask->status.pSM = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL) {
@@ -205,6 +206,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
+ if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
tEndEncode(pEncoder);
@@ -287,6 +289,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
+ if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
tEndDecode(pDecoder);
@@ -482,7 +485,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->execInfo.created = taosGetTimestampMs();
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
- SDataRange* pRange = &pTask->dataRange;
+ SDataRange* pRange = &pTask->dataRange;
// only set the version info for stream tasks without fill-history task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@@ -756,8 +759,8 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
}
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) {
- SStreamMeta* pMeta = pTask->pMeta;
- STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
+ SStreamMeta* pMeta = pTask->pMeta;
+ STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
if (pTask->info.fillHistory == 0) {
return 0;
}
@@ -864,7 +867,7 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
void streamTaskResume(SStreamTask* pTask) {
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
- SStreamMeta* pMeta = pTask->pMeta;
+ SStreamMeta* pMeta = pTask->pMeta;
if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) {
streamTaskRestoreStatus(pTask);
@@ -881,9 +884,7 @@ void streamTaskResume(SStreamTask* pTask) {
}
}
-bool streamTaskIsSinkTask(const SStreamTask* pTask) {
- return pTask->info.taskLevel == TASK_LEVEL__SINK;
-}
+bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
int32_t code;