Merge branch '3.0' into fix/nullcheck

This commit is contained in:
Haojun Liao 2024-02-22 19:07:02 +08:00
commit 3498763a1b
32 changed files with 12374 additions and 631 deletions

View File

@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */ hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
hint: hint:
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
select_list: select_list:
select_expr [, select_expr] ... select_expr [, select_expr] ...
@ -87,12 +87,13 @@ Hints are a means of user control over query optimization for individual stateme
The list of currently supported Hints is as follows: The list of currently supported Hints is as follows:
| **Hint** | **Params** | **Comment** | **Scopt** | | **Hint** | **Params** | **Comment** | **Scope** |
| :-----------: | -------------- | -------------------------- | -----------------------------------| | :-----------: | -------------- | -------------------------- | -----------------------------------|
| BATCH_SCAN | None | Batch table scan | JOIN statment for stable | | BATCH_SCAN | None | Batch table scan | JOIN statment for stable |
| NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable | | NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable |
| SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list | | SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list |
| PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list | | PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list |
| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used. When there are numerous tables, each with long rows, the corresponding algorithm associated with this prompt may consume a substantial amount of memory, potentially leading to an Out Of Memory (OOM) situation. | Sorting the supertable rows by timestamp |
For example: For example:
@ -100,6 +101,7 @@ For example:
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts; SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1; SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1; SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
``` ```
## Lists ## Lists

View File

@ -20,13 +20,13 @@ import CDemo from "./_sub_c.mdx";
# 介绍 # 介绍
## 主题 ## 主题
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../12-taos-sql/13-tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。 与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../taos-sql/tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。
如下图,每个 topic 涉及到的数据表可能分布在多个 vnode相当于 kafka 里的 partition 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。 如下图,每个 topic 涉及到的数据表可能分布在多个 vnode相当于 kafka 里的 partition 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。
![img_5.png](img_5.png) ![img_5.png](img_5.png)
TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../12-taos-sql/02-database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。 TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../taos-sql/database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。
对于 `SELECT` 语句形式的 topic在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 对于 `SELECT` 语句形式的 topic在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
@ -66,7 +66,7 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
# 语法说明 # 语法说明
具体的语法参见 [数据订阅](../../12-taos-sql/13-tmq) 具体的语法参见 [数据订阅](../../taos-sql/tmq)
# 消费参数 # 消费参数
@ -91,10 +91,9 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer 不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C"> ```c
```c
typedef struct tmq_t tmq_t; typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t; typedef struct tmq_list_t tmq_list_t;
@ -146,42 +145,45 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
DLL_EXPORT const char *tmq_err2str(int32_t code); DLL_EXPORT const char *tmq_err2str(int32_t code);
``` ```
</TabItem>
<TabItem value="java" label="Java">
```java 下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
void subscribe(Collection<String> topics) throws SQLException;
void unsubscribe() throws SQLException; </TabItem>
<TabItem value="java" label="Java">
Set<String> subscription() throws SQLException; ```java
void subscribe(Collection<String> topics) throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException; void unsubscribe() throws SQLException;
Set<TopicPartition> assignment() throws SQLException; Set<String> subscription() throws SQLException;
long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;
void seek(TopicPartition partition, long offset) throws SQLException; ConsumerRecords<V> poll(Duration timeout) throws SQLException;
void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
void commitSync() throws SQLException; Set<TopicPartition> assignment() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException; long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;
void close() throws SQLException; void seek(TopicPartition partition, long offset) throws SQLException;
``` void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
</TabItem> void commitSync() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException;
<TabItem value="Python" label="Python"> void close() throws SQLException;
```
```python </TabItem>
class Consumer:
<TabItem value="Python" label="Python">
```python
class Consumer:
def subscribe(self, topics): def subscribe(self, topics):
pass pass
@ -202,41 +204,41 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
def commit(self, message): def commit(self, message):
pass pass
``` ```
</TabItem> </TabItem>
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
```go ```go
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
// 出于兼容目的保留 rebalanceCb 参数,当前未使用 // 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
// 出于兼容目的保留 rebalanceCb 参数,当前未使用 // 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
func (c *Consumer) Poll(timeoutMs int) tmq.Event func (c *Consumer) Poll(timeoutMs int) tmq.Event
// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用 // 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
func (c *Consumer) Commit() ([]tmq.TopicPartition, error) func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error func (c *Consumer) Unsubscribe() error
func (c *Consumer) Close() error func (c *Consumer) Close() error
``` ```
</TabItem> </TabItem>
<TabItem label="Rust" value="Rust"> <TabItem label="Rust" value="Rust">
```rust ```rust
impl TBuilder for TmqBuilder impl TBuilder for TmqBuilder
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error> fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
fn build(&self) -> Result<Self::Target, Self::Error> fn build(&self) -> Result<Self::Target, Self::Error>
impl AsAsyncConsumer for Consumer impl AsAsyncConsumer for Consumer
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>( async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self, &mut self,
topics: I, topics: I,
@ -255,56 +257,57 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
async fn unsubscribe(self); async fn unsubscribe(self);
``` ```
可在 <https://docs.rs/taos> 上查看详细 API 说明。 可在 <https://docs.rs/taos> 上查看详细 API 说明。
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
```js ```js
function TMQConsumer(config) function TMQConsumer(config)
function subscribe(topic) function subscribe(topic)
function consume(timeout) function consume(timeout)
function subscription() function subscription()
function unsubscribe() function unsubscribe()
function commit(msg) function commit(msg)
function close() function close()
``` ```
</TabItem> </TabItem>
<TabItem value="C#" label="C#"> <TabItem value="C#" label="C#">
```csharp ```csharp
class ConsumerBuilder<TValue> class ConsumerBuilder<TValue>
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config) ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
public IConsumer<TValue> Build() public IConsumer<TValue> Build()
void Subscribe(IEnumerable<string> topics) void Subscribe(IEnumerable<string> topics)
void Subscribe(string topic) void Subscribe(string topic)
ConsumeResult<TValue> Consume(int millisecondsTimeout) ConsumeResult<TValue> Consume(int millisecondsTimeout)
List<string> Subscription() List<string> Subscription()
void Unsubscribe() void Unsubscribe()
List<TopicPartitionOffset> Commit() List<TopicPartitionOffset> Commit()
void Close() void Close()
``` ```
</TabItem>
</TabItem>
</Tabs> </Tabs>
# 数据订阅示例 # 数据订阅示例
@ -334,65 +337,66 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
对于不同编程语言,其设置方式如下: 对于不同编程语言,其设置方式如下:
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C"> ```c
```c /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */ 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "latest"); tmq_conf_set(conf, "auto.offset.reset", "latest");
tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
``` ```
</TabItem>
<TabItem value="java" label="Java"> </TabItem>
<TabItem value="java" label="Java">
对于 Java 程序,还可以使用如下配置项: 对于 Java 程序,还可以使用如下配置项:
| 参数名称 | 类型 | 参数说明 | | 参数名称 | 类型 | 参数说明 |
| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
| `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" | | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
| `bootstrap.servers` | string | 连接地址,如 `localhost:6030` | | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` |
| `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 | | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
| `value.deserializer.encoding` | string | 指定字符串解析的字符集 | | | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | |
需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。
```java ```java
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true"); properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName"); properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root"); properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata"); properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "latest"); properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("msg.with.table.name", "true"); properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
TaosConsumer<Meters> consumer = new TaosConsumer<>(properties); TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);
/* value deserializer definition. */ /* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer; import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> { public class MetersDeserializer extends ReferenceDeserializer<Meters> {
} }
``` ```
</TabItem>
<TabItem label="Go" value="Go"> </TabItem>
```go <TabItem label="Go" value="Go">
conf := &tmq.ConfigMap{
```go
conf := &tmq.ConfigMap{
"group.id": "test", "group.id": "test",
"auto.offset.reset": "latest", "auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1", "td.connect.ip": "127.0.0.1",
@ -402,38 +406,38 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
"client.id": "test_tmq_c", "client.id": "test_tmq_c",
"enable.auto.commit": "false", "enable.auto.commit": "false",
"msg.with.table.name": "true", "msg.with.table.name": "true",
} }
consumer, err := NewConsumer(conf) consumer, err := NewConsumer(conf)
``` ```
</TabItem> </TabItem>
<TabItem label="Rust" value="Rust"> <TabItem label="Rust" value="Rust">
```rust ```rust
let mut dsn: Dsn = "taos://".parse()?; let mut dsn: Dsn = "taos://".parse()?;
dsn.set("group.id", "group1"); dsn.set("group.id", "group1");
dsn.set("client.id", "test"); dsn.set("client.id", "test");
dsn.set("auto.offset.reset", "latest"); dsn.set("auto.offset.reset", "latest");
let tmq = TmqBuilder::from_dsn(dsn)?; let tmq = TmqBuilder::from_dsn(dsn)?;
let mut consumer = tmq.build()?; let mut consumer = tmq.build()?;
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例: Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例:
```python ```python
from taos.tmq import Consumer from taos.tmq import Consumer
# Syntax: `consumer = Consumer(configs)` # Syntax: `consumer = Consumer(configs)`
# #
# Example: # Example:
consumer = Consumer( consumer = Consumer(
{ {
"group.id": "local", "group.id": "local",
"client.id": "1", "client.id": "1",
@ -445,18 +449,18 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
"auto.offset.reset": "latest", "auto.offset.reset": "latest",
"msg.with.table.name": "true", "msg.with.table.name": "true",
} }
) )
``` ```
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
```js ```js
// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
let consumer = taos.consumer({ let consumer = taos.consumer({
'enable.auto.commit': 'true', 'enable.auto.commit': 'true',
'auto.commit.interval.ms','1000', 'auto.commit.interval.ms','1000',
'group.id': 'tg2', 'group.id': 'tg2',
@ -467,15 +471,15 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
'td.connect.ip','127.0.0.1', 'td.connect.ip','127.0.0.1',
'td.connect.port','6030' 'td.connect.port','6030'
}); });
``` ```
</TabItem> </TabItem>
<TabItem value="C#" label="C#"> <TabItem value="C#" label="C#">
```csharp ```csharp
var cfg = new Dictionary<string, string>() var cfg = new Dictionary<string, string>()
{ {
{ "group.id", "group1" }, { "group.id", "group1" },
{ "auto.offset.reset", "latest" }, { "auto.offset.reset", "latest" },
{ "td.connect.ip", "127.0.0.1" }, { "td.connect.ip", "127.0.0.1" },
@ -485,11 +489,11 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
{ "client.id", "tmq_example" }, { "client.id", "tmq_example" },
{ "enable.auto.commit", "true" }, { "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" }, { "msg.with.table.name", "false" },
}; };
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build(); var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -500,78 +504,77 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
一个 consumer 支持同时订阅多个 topic。 一个 consumer 支持同时订阅多个 topic。
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C"> ```c
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
```c ```
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
``` </TabItem>
<TabItem value="java" label="Java">
</TabItem> ```java
<TabItem value="java" label="Java"> List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
```
```java </TabItem>
List<String> topics = new ArrayList<>(); <TabItem value="Go" label="Go">
topics.add("tmq_topic");
consumer.subscribe(topics);
```
</TabItem> ```go
<TabItem value="Go" label="Go"> err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
```go
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err) panic(err)
} }
``` ```
</TabItem> </TabItem>
<TabItem value="Rust" label="Rust"> <TabItem value="Rust" label="Rust">
```rust ```rust
consumer.subscribe(["tmq_meters"]).await?; consumer.subscribe(["tmq_meters"]).await?;
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
consumer.subscribe(['topic1', 'topic2']) consumer.subscribe(['topic1', 'topic2'])
``` ```
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
```js ```js
// 创建订阅 topics 列表 // 创建订阅 topics 列表
let topics = ['topic_test'] let topics = ['topic_test']
// 启动订阅 // 启动订阅
consumer.subscribe(topics); consumer.subscribe(topics);
``` ```
</TabItem> </TabItem>
<TabItem value="C#" label="C#"> <TabItem value="C#" label="C#">
```csharp ```csharp
// 创建订阅 topics 列表 // 创建订阅 topics 列表
List<String> topics = new List<string>(); List<String> topics = new List<string>();
topics.add("tmq_topic"); topics.add("tmq_topic");
// 启动订阅 // 启动订阅
consumer.Subscribe(topics); consumer.Subscribe(topics);
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -580,37 +583,36 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
以下代码展示了不同语言下如何对 TMQ 消息进行消费。 以下代码展示了不同语言下如何对 TMQ 消息进行消费。
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C"> ```c
// 消费数据
```c while (running) {
// 消费数据
while (running) {
TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut); TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
msg_process(msg); msg_process(msg);
} }
``` ```
这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。 这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
</TabItem> </TabItem>
<TabItem value="java" label="Java"> <TabItem value="java" label="Java">
```java ```java
while(running){ while(running){
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100)); ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) { for (Meters meter : meters) {
processMsg(meter); processMsg(meter);
} }
} }
``` ```
</TabItem> </TabItem>
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
```go ```go
for { for {
ev := consumer.Poll(0) ev := consumer.Poll(0)
if ev != nil { if ev != nil {
switch e := ev.(type) { switch e := ev.(type) {
@ -622,15 +624,15 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
} }
consumer.Commit() consumer.Commit()
} }
} }
``` ```
</TabItem> </TabItem>
<TabItem value="Rust" label="Rust"> <TabItem value="Rust" label="Rust">
```rust ```rust
{ {
let mut stream = consumer.stream(); let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? { while let Some((offset, message)) = stream.try_next().await? {
@ -657,14 +659,14 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
} }
consumer.commit(offset).await?; consumer.commit(offset).await?;
} }
} }
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
while True: while True:
res = consumer.poll(100) res = consumer.poll(100)
if not res: if not res:
continue continue
@ -675,40 +677,40 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
for block in val: for block in val:
print(block.fetchall()) print(block.fetchall())
``` ```
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
```js ```js
while(true){ while(true){
msg = consumer.consume(200); msg = consumer.consume(200);
// process message(consumeResult) // process message(consumeResult)
console.log(msg.topicPartition); console.log(msg.topicPartition);
console.log(msg.block); console.log(msg.block);
console.log(msg.fields) console.log(msg.fields)
} }
``` ```
</TabItem> </TabItem>
<TabItem value="C#" label="C#"> <TabItem value="C#" label="C#">
```csharp ```csharp
// 消费数据 // 消费数据
while (true) while (true)
{ {
using (var result = consumer.Consume(500)) using (var result = consumer.Consume(500))
{ {
if (result == null) continue; if (result == null) continue;
ProcessMsg(result); ProcessMsg(result);
consumer.Commit(); consumer.Commit();
} }
} }
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -717,80 +719,79 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
消费结束后,应当取消订阅。 消费结束后,应当取消订阅。
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C"> ```c
/* 取消订阅 */
tmq_unsubscribe(tmq);
```c /* 关闭消费者对象 */
/* 取消订阅 */ tmq_consumer_close(tmq);
tmq_unsubscribe(tmq); ```
/* 关闭消费者对象 */ </TabItem>
tmq_consumer_close(tmq); <TabItem value="java" label="Java">
```
</TabItem> ```java
<TabItem value="java" label="Java"> /* 取消订阅 */
consumer.unsubscribe();
```java /* 关闭消费 */
/* 取消订阅 */ consumer.close();
consumer.unsubscribe(); ```
/* 关闭消费 */ </TabItem>
consumer.close();
```
</TabItem> <TabItem value="Go" label="Go">
<TabItem value="Go" label="Go"> ```go
/* Unsubscribe */
_ = consumer.Unsubscribe()
```go /* Close consumer */
/* Unsubscribe */ _ = consumer.Close()
_ = consumer.Unsubscribe() ```
/* Close consumer */ </TabItem>
_ = consumer.Close()
```
</TabItem> <TabItem value="Rust" label="Rust">
<TabItem value="Rust" label="Rust"> ```rust
consumer.unsubscribe().await;
```
```rust </TabItem>
consumer.unsubscribe().await;
```
</TabItem> <TabItem value="Python" label="Python">
<TabItem value="Python" label="Python"> ```py
# 取消订阅
consumer.unsubscribe()
# 关闭消费
consumer.close()
```
```py </TabItem>
# 取消订阅 <TabItem label="Node.JS" value="Node.JS">
consumer.unsubscribe()
# 关闭消费
consumer.close()
```
</TabItem> ```js
<TabItem label="Node.JS" value="Node.JS"> consumer.unsubscribe();
consumer.close();
```
```js </TabItem>
consumer.unsubscribe();
consumer.close();
```
</TabItem> <TabItem value="C#" label="C#">
<TabItem value="C#" label="C#"> ```csharp
// 取消订阅
consumer.Unsubscribe();
```csharp // 关闭消费
// 取消订阅 consumer.Close();
consumer.Unsubscribe(); ```
// 关闭消费 </TabItem>
consumer.Close();
```
</TabItem>
</Tabs> </Tabs>
@ -800,40 +801,40 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem label="C" value="c"> <TabItem label="C" value="c">
<CDemo /> <CDemo />
</TabItem> </TabItem>
<TabItem label="Java" value="java"> <TabItem label="Java" value="java">
<Tabs defaultValue="native"> <Tabs defaultValue="native">
<TabItem value="native" label="本地连接"> <TabItem value="native" label="本地连接">
<Java /> <Java />
</TabItem> </TabItem>
<TabItem value="ws" label="WebSocket 连接"> <TabItem value="ws" label="WebSocket 连接">
<JavaWS /> <JavaWS />
</TabItem> </TabItem>
</Tabs> </Tabs>
</TabItem> </TabItem>
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
<Go/> <Go/>
</TabItem> </TabItem>
<TabItem label="Rust" value="Rust"> <TabItem label="Rust" value="Rust">
<Rust /> <Rust />
</TabItem> </TabItem>
<TabItem label="Python" value="Python"> <TabItem label="Python" value="Python">
<Python /> <Python />
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
<Node/> <Node/>
</TabItem> </TabItem>
<TabItem label="C#" value="C#"> <TabItem label="C#" value="C#">
<CSharp/> <CSharp/>
</TabItem> </TabItem>
</Tabs> </Tabs>

View File

@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */ hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
hint: hint:
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
select_list: select_list:
select_expr [, select_expr] ... select_expr [, select_expr] ...
@ -93,13 +93,14 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适
| NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 | | NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 |
| SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 | | SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 |
| PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 | | PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 |
| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存。当子表数量多, 行长比较大时候, 会使用大量内存, 可能发生OOM | 超级表的数据按时间戳排序时 |
举例: 举例:
```sql ```sql
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts; SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1; SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1; SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
``` ```
## 列表 ## 列表

View File

@ -379,6 +379,7 @@
#define TK_NO_BATCH_SCAN 607 #define TK_NO_BATCH_SCAN 607
#define TK_SORT_FOR_GROUP 608 #define TK_SORT_FOR_GROUP 608
#define TK_PARTITION_FIRST 609 #define TK_PARTITION_FIRST 609
#define TK_PARA_TABLES_SORT 610
#define TK_NK_NIL 65535 #define TK_NK_NIL 65535

View File

@ -121,6 +121,7 @@ typedef struct SScanLogicNode {
bool filesetDelimited; // returned blocks delimited by fileset bool filesetDelimited; // returned blocks delimited by fileset
bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname
SArray* pFuncTypes; // for last, last_row SArray* pFuncTypes; // for last, last_row
bool paraTablesSort; // for table merge scan
} SScanLogicNode; } SScanLogicNode;
typedef struct SJoinLogicNode { typedef struct SJoinLogicNode {
@ -443,6 +444,7 @@ typedef struct STableScanPhysiNode {
int8_t igCheckUpdate; int8_t igCheckUpdate;
bool filesetDelimited; bool filesetDelimited;
bool needCountEmptyTable; bool needCountEmptyTable;
bool paraTablesSort;
} STableScanPhysiNode; } STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode;

View File

@ -128,6 +128,7 @@ typedef enum EHintOption {
HINT_BATCH_SCAN, HINT_BATCH_SCAN,
HINT_SORT_FOR_GROUP, HINT_SORT_FOR_GROUP,
HINT_PARTITION_FIRST, HINT_PARTITION_FIRST,
HINT_PARA_TABLES_SORT
} EHintOption; } EHintOption;
typedef struct SHintNode { typedef struct SHintNode {

View File

@ -631,7 +631,6 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
bool isNull = false; bool isNull = false;
if (pBlock->pBlockAgg == NULL) { if (pBlock->pBlockAgg == NULL) {

View File

@ -119,6 +119,7 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
void destroyStreamTaskIter(SStreamTaskIter *pIter); void destroyStreamTaskIter(SStreamTaskIter *pIter);

View File

@ -1545,6 +1545,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
} }
} }
mInfo("stream:%s,%"PRId64 " start to pause stream", pauseReq.name, pStream->uid);
if (pStream->status == STREAM_STATUS__PAUSE) { if (pStream->status == STREAM_STATUS__PAUSE) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return 0; return 0;

View File

@ -69,7 +69,7 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
taosArrayPush(pList, pInfo); taosArrayPush(pList, pInfo);
} }
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) { if (pTrans == NULL) {
return terrno; return terrno;
@ -119,7 +119,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in
} else { } else {
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
pStream->uid, transId); pStream->uid, transId);
code = createStreamResetStatusTrans(pMnode, pStream); code = mndCreateStreamResetStatusTrans(pMnode, pStream);
} }
} }
@ -215,7 +215,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
}; };
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
mInfo("receive pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name); mInfo("receive pause stream:%s, %s, %"PRId64 ", because grant expired", pStream->name, reqPause.name, pStream->uid);
} }
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);

View File

@ -261,22 +261,30 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg
return mndTransAppendRedoAction(pTrans, &action); return mndTransAppendRedoAction(pTrans, &action);
} }
static bool identicalName(const char* pDb, const char* pParam, int32_t len) {
return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
}
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
// data in the hash table will be removed automatically, no need to remove it here. void *pIter = NULL;
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
if (pTransInfo == NULL) {
return TSDB_CODE_SUCCESS;
}
// not checkpoint trans, ignore while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId); continue;
return TSDB_CODE_SUCCESS;
} }
char *pDupDBName = strndup(pDBName, len); SStreamObj *pStream = mndGetStreamObj(pMnode, pTransInfo->streamId);
mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName); if (pStream != NULL) {
taosMemoryFree(pDupDBName); if (identicalName(pStream->sourceDb, pDBName, len)) {
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
} else if (identicalName(pStream->targetDb, pDBName, len)) {
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
}
mndReleaseStream(pMnode, pStream);
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -231,6 +231,8 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
} }
mDebug("set the resume action for trans:%d", pTrans->id);
return 0; return 0;
} }

File diff suppressed because one or more lines are too long

View File

@ -283,6 +283,42 @@ typedef struct STableScanInfo {
bool needCountEmptyTable; bool needCountEmptyTable;
} STableScanInfo; } STableScanInfo;
typedef enum ESubTableInputType {
SUB_TABLE_MEM_BLOCK,
SUB_TABLE_EXT_PAGES,
} ESubTableInputType;
typedef struct STmsSubTableInput {
STsdbReader* pReader;
SQueryTableDataCond tblCond;
STableKeyInfo* pKeyInfo;
bool bInMemReader;
ESubTableInputType type;
SSDataBlock* pReaderBlock;
SArray* aBlockPages;
SSDataBlock* pPageBlock;
int32_t pageIdx;
int32_t rowIdx;
int64_t* aTs;
} STmsSubTableInput;
typedef struct SBlockOrderInfo SBlockOrderInfo;
typedef struct STmsSubTablesMergeInfo {
SBlockOrderInfo* pOrderInfo;
int32_t numSubTables;
STmsSubTableInput* aInputs;
SMultiwayMergeTreeInfo* pTree;
int32_t numSubTablesCompleted;
int32_t numTableBlocksInMem;
SDiskbasedBuf* pBlocksBuf;
int32_t numInMemReaders;
} STmsSubTablesMergeInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
int32_t tableStartIndex; int32_t tableStartIndex;
int32_t tableEndIndex; int32_t tableEndIndex;
@ -296,7 +332,6 @@ typedef struct STableMergeScanInfo {
SSDataBlock* pSortInputBlock; SSDataBlock* pSortInputBlock;
SSDataBlock* pReaderBlock; SSDataBlock* pReaderBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
@ -317,6 +352,8 @@ typedef struct STableMergeScanInfo {
SSDataBlock* nextDurationBlocks[2]; SSDataBlock* nextDurationBlocks[2];
bool rtnNextDurationBlocks; bool rtnNextDurationBlocks;
int32_t nextDurationBlocksIdx; int32_t nextDurationBlocksIdx;
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
} STableMergeScanInfo; } STableMergeScanInfo;
typedef struct STagScanFilterContext { typedef struct STagScanFilterContext {

View File

@ -3421,6 +3421,414 @@ _error:
return NULL; return NULL;
} }
// table merge scan operator
// table merge scan operator
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
int32_t left = *(int32_t*)pLeft;
int32_t right = *(int32_t*)pRight;
STmsSubTablesMergeInfo* pInfo = (STmsSubTablesMergeInfo*)param;
int32_t leftIdx = pInfo->aInputs[left].rowIdx;
int32_t rightIdx = pInfo->aInputs[right].rowIdx;
if (leftIdx == -1) {
return 1;
} else if (rightIdx == -1) {
return -1;
}
int64_t leftTs = pInfo->aInputs[left].aTs[leftIdx];
int64_t rightTs = pInfo->aInputs[right].aTs[rightIdx];
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
if (pInfo->pOrderInfo->order == TSDB_ORDER_DESC) {
ret = -1 * ret;
}
return ret;
}
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
for (int i = 0; i < src->numOfCols; i++) {
dst->colList[i] = src->colList[i];
}
return 0;
}
static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pSubTableHasBlock) {
int32_t code = 0;
STableMergeScanInfo* pInfo = pOperator->info;
SReadHandle* pHandle = &pInfo->base.readHandle;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
const SStorageAPI* pAPI= &pTaskInfo->storageAPI;
blockDataCleanup(pInput->pReaderBlock);
if (!pInput->bInMemReader) {
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, pInput->pKeyInfo, 1, pInput->pReaderBlock,
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
}
pInfo->base.dataReader = pInput->pReader;
while (true) {
bool hasNext = false;
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code);
}
if (!hasNext || isTaskKilled(pTaskInfo)) {
if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
*pSubTableHasBlock = false;
break;
}
if (pInput->tblCond.order == TSDB_ORDER_ASC) {
pInput->tblCond.twindows.skey = pInput->pReaderBlock->info.window.ekey + 1;
} else {
pInput->tblCond.twindows.ekey = pInput->pReaderBlock->info.window.skey - 1;
}
uint32_t status = 0;
code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status);
if (code != 0) {
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code);
}
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
*pSubTableHasBlock = false;
break;
}
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pReaderBlock->info.rows == 0) {
continue;
}
*pSubTableHasBlock = true;
break;
}
if (*pSubTableHasBlock) {
pInput->pReaderBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pReaderBlock->info.id.uid);
pOperator->resultInfo.totalRows += pInput->pReaderBlock->info.rows;
}
if (!pInput->bInMemReader || !*pSubTableHasBlock) {
pAPI->tsdReader.tsdReaderClose(pInput->pReader);
pInput->pReader = NULL;
}
pInfo->base.dataReader = NULL;
return TSDB_CODE_SUCCESS;
}
static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
pInfo->bGroupProcessed = false;
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
}
pInfo->tableEndIndex = i - 1;
}
static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput * pInput = pSubTblsInfo->aInputs + i;
if (pInput->rowIdx == -1) {
continue;
}
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
pInput->rowIdx = 0;
pInput->pageIdx = -1;
}
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
pInput->aTs = (int64_t*)col->pData;
}
tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn);
return TSDB_CODE_SUCCESS;
}
static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
setGroupStartEndIndex(pInfo);
STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo));
if (pSubTblsInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0);
pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput));
if (pSubTblsInfo->aInputs == NULL) {
taosMemoryFree(pSubTblsInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t bufPageSize = pInfo->bufPageSize;
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
int32_t code =
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pSubTblsInfo->aInputs);
taosMemoryFree(pSubTblsInfo);
return code;
}
pSubTblsInfo->numTableBlocksInMem = pSubTblsInfo->numSubTables;
pSubTblsInfo->numInMemReaders = pSubTblsInfo->numSubTables;
pInfo->pSubTablesMergeInfo = pSubTblsInfo;
return TSDB_CODE_SUCCESS;
}
static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* pInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
pInput->type = SUB_TABLE_MEM_BLOCK;
dumpQueryTableCond(&pInfo->base.cond, &pInput->tblCond);
pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false);
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
pInput->pKeyInfo = keyInfo;
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (i + 1 < pSubTblsInfo->numInMemReaders) {
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, keyInfo, 1, pInput->pReaderBlock,
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
pInput->bInMemReader = true;
} else {
pInput->pReader = NULL;
pInput->bInMemReader = false;
}
bool hasNext = true;
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
if (!hasNext) {
pInput->rowIdx = -1;
++pSubTblsInfo->numSubTablesCompleted;
continue;
} else {
pInput->rowIdx = 0;
pInput->pageIdx = -1;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
STableMergeScanInfo* pInfo = pOperatorInfo->info;
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
bool hasNext = true;
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
if (!hasNext) {
pInput->rowIdx = -1;
++pSubTblsInfo->numSubTablesCompleted;
} else {
pInput->rowIdx = 0;
}
return TSDB_CODE_SUCCESS;
}
static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
STableMergeScanInfo* pInfo = pOperatorInfo->info;
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
if (pInput->rowIdx < pInputBlock->info.rows - 1) {
++pInput->rowIdx;
} else if (pInput->rowIdx == pInputBlock->info.rows -1 ) {
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo);
}
if (pInput->rowIdx != -1) {
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
pInput->aTs = (int64_t*)col->pData;
}
}
tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree));
return TSDB_CODE_SUCCESS;
}
static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrcColInfo = taosArrayGet(pInputBlock->pDataBlock, i);
bool isNull = colDataIsNull(pSrcColInfo, pInputBlock->info.rows, pInput->rowIdx, NULL);
if (isNull) {
colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
} else {
if (pSrcColInfo->pData != NULL) {
char* pData = colDataGetData(pSrcColInfo, pInput->rowIdx);
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
}
}
}
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = pInputBlock->info.scanFlag;
pBlock->info.rows += 1;
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pResBlock, int32_t capacity) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
blockDataCleanup(pResBlock);
bool finished = false;
while (true) {
while (1) {
if (pSubTblsInfo->numSubTablesCompleted >= pSubTblsInfo->numSubTables) {
finished = true;
break;
}
appendChosenRowToDataBlock(pSubTblsInfo, pResBlock);
adjustSubTableForNextRow(pOperator, pSubTblsInfo);
if (pResBlock->info.rows >= capacity) {
break;
}
}
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pOperator->pTaskInfo->env, pTaskInfo->code);
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
if (finished || limitReached || pResBlock->info.rows > 0) {
break;
}
}
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
}
static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
initSubTablesMergeInfo(pInfo);
initSubTableInputs(pOperator, pInfo);
openSubTablesMergeSort(pInfo->pSubTablesMergeInfo);
return TSDB_CODE_SUCCESS;
}
static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
if (pSubTblsInfo != NULL) {
tMergeTreeDestroy(&pSubTblsInfo->pTree);
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
taosMemoryFree(pInput->tblCond.colList);
blockDataDestroy(pInput->pReaderBlock);
blockDataDestroy(pInput->pPageBlock);
taosArrayDestroy(pInput->aBlockPages);
pInfo->base.readerAPI.tsdReaderClose(pInput->pReader);
pInput->pReader = NULL;
}
destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf);
taosMemoryFree(pSubTblsInfo->aInputs);
taosMemoryFree(pSubTblsInfo);
pInfo->pSubTablesMergeInfo = NULL;
}
taosMemoryTrim(0);
return TSDB_CODE_SUCCESS;
}
SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableMergeScanInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
if (tableListSize == 0) {
setOperatorCompleted(pOperator);
return NULL;
}
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
startSubTablesTableMergeScan(pOperator);
}
SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
}
if (pBlock != NULL) {
pBlock->info.id.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->bGroupProcessed = true;
return pBlock;
} else {
// Data of this group are all dumped, let's try the next group
stopSubTablesTableMergeScan(pInfo);
if (pInfo->tableEndIndex >= tableListSize - 1) {
setOperatorCompleted(pOperator);
break;
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
startSubTablesTableMergeScan(pOperator);
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
}
return pBlock;
}
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) { static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
STableMergeScanInfo* pInfo = pTableMergeScanInfo; STableMergeScanInfo* pInfo = pTableMergeScanInfo;
if (pInfo->mSkipTables == NULL) { if (pInfo->mSkipTables == NULL) {
@ -3575,15 +3983,6 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
return pList; return pList;
} }
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
for (int i = 0; i < src->numOfCols; i++) {
dst->colList[i] = src->colList[i];
}
return 0;
}
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
STableMergeScanInfo* pTmsInfo = param; STableMergeScanInfo* pTmsInfo = param;
if (type == TSD_READER_NOTIFY_DURATION_START) { if (type == TSD_READER_NOTIFY_DURATION_START) {
@ -3671,8 +4070,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex; int32_t tableEndIdx = pInfo->tableEndIndex;
tSimpleHashClear(pInfo->mTableNumRows);
int32_t numOfTable = tableEndIdx - tableStartIdx + 1; int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
@ -3823,10 +4220,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
void destroyTableMergeScanOperatorInfo(void* param) { void destroyTableMergeScanOperatorInfo(void* param) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
// start one reader variable
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL; pTableScanInfo->base.dataReader = NULL;
@ -3837,18 +4232,22 @@ void destroyTableMergeScanOperatorInfo(void* param) {
} }
} }
taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle); tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL; pTableScanInfo->pSortHandle = NULL;
taosHashCleanup(pTableScanInfo->mSkipTables); taosHashCleanup(pTableScanInfo->mSkipTables);
pTableScanInfo->mSkipTables = NULL; pTableScanInfo->mSkipTables = NULL;
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
// end one reader variable
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock); pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo); taosArrayDestroy(pTableScanInfo->pSortInfo);
stopSubTablesTableMergeScan(pTableScanInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -3922,14 +4321,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
goto _error; goto _error;
} }
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->mergeLimit = -1; pInfo->mergeLimit = -1;
@ -3938,24 +4329,37 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
pInfo->mSkipTables = NULL; pInfo->mSkipTables = NULL;
} }
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock); uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
//start one reader variable
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
if (!tsExperimental) { if (!tsExperimental) {
pInfo->filesetDelimited = false; pInfo->filesetDelimited = false;
} else { } else {
pInfo->filesetDelimited = pTableScanNode->filesetDelimited; pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
} }
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable; // end one reader variable
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo, pOperator->fpSet = createOperatorFpSet(
optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL); optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTables : doTableMergeScan, NULL,
destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn,
NULL);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;

View File

@ -164,7 +164,7 @@ void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, ESt
} else { } else {
pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey); pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey);
} }
SSessionKey tmpKey = {0}; SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN};
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pAggSup->stateStore.streamStateFreeCur(pCur); pAggSup->stateStore.streamStateFreeCur(pCur);

View File

@ -459,7 +459,7 @@ static void idxInterRsltDestroy(SArray* results) {
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) { static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
// refactor, merge interResults into fResults by oType // refactor, merge interResults into fResults by oType
for (int i = 0; i < taosArrayGetSize(in); i--) { for (int i = 0; i < taosArrayGetSize(in); i++) {
SArray* t = taosArrayGetP(in, i); SArray* t = taosArrayGetP(in, i);
taosArraySort(t, uidCompare); taosArraySort(t, uidCompare);
taosArrayRemoveDuplicate(t, uidCompare, NULL); taosArrayRemoveDuplicate(t, uidCompare, NULL);

View File

@ -17,6 +17,7 @@
#include "index.h" #include "index.h"
#include "indexComm.h" #include "indexComm.h"
#include "indexInt.h" #include "indexInt.h"
#include "indexUtil.h"
#include "nodes.h" #include "nodes.h"
#include "querynodes.h" #include "querynodes.h"
#include "scalar.h" #include "scalar.h"
@ -669,6 +670,10 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
if (sifSetFltParam(left, right, &typedata, &param) != 0) return -1; if (sifSetFltParam(left, right, &typedata, &param) != 0) return -1;
} }
ret = left->api.metaFilterTableIds(arg->metaEx, &param, output->result); ret = left->api.metaFilterTableIds(arg->metaEx, &param, output->result);
if (ret == 0) {
taosArraySort(output->result, uidCompare);
taosArrayRemoveDuplicate(output->result, uidCompare, NULL);
}
} }
return ret; return ret;
} }
@ -875,8 +880,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
} else if (node->condType == LOGIC_COND_TYPE_NOT) { } else if (node->condType == LOGIC_COND_TYPE_NOT) {
// taosArrayAddAll(output->result, params[m].result); // taosArrayAddAll(output->result, params[m].result);
} }
taosArraySort(output->result, idxUidCompare); taosArraySort(output->result, uidCompare);
taosArrayRemoveDuplicate(output->result, idxUidCompare, NULL); taosArrayRemoveDuplicate(output->result, uidCompare, NULL);
} }
} else { } else {
for (int32_t m = 0; m < node->pParameterList->length; m++) { for (int32_t m = 0; m < node->pParameterList->length; m++) {
@ -1016,7 +1021,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
return code; return code;
} }
static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilterAPI* pAPI) { static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilterAPI *pAPI) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pNode == NULL) { if (pNode == NULL) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
@ -1054,7 +1059,8 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilte
return code; return code;
} }
int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status, SMetaDataFilterAPI* pAPI) { int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status,
SMetaDataFilterAPI *pAPI) {
SIdxFltStatus st = idxGetFltStatus(pFilterNode, pAPI); SIdxFltStatus st = idxGetFltStatus(pFilterNode, pAPI);
if (st == SFLT_NOT_INDEX) { if (st == SFLT_NOT_INDEX) {
*status = st; *status = st;
@ -1081,7 +1087,7 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode, SMetaDataFilterAPI* pAPI) { SIdxFltStatus idxGetFltStatus(SNode *pFilterNode, SMetaDataFilterAPI *pAPI) {
SIdxFltStatus st = SFLT_NOT_INDEX; SIdxFltStatus st = SFLT_NOT_INDEX;
if (pFilterNode == NULL) { if (pFilterNode == NULL) {
return SFLT_NOT_INDEX; return SFLT_NOT_INDEX;

View File

@ -456,6 +456,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(filesetDelimited); COPY_SCALAR_FIELD(filesetDelimited);
COPY_SCALAR_FIELD(isCountByTag); COPY_SCALAR_FIELD(isCountByTag);
CLONE_OBJECT_FIELD(pFuncTypes, functParamClone); CLONE_OBJECT_FIELD(pFuncTypes, functParamClone);
COPY_SCALAR_FIELD(paraTablesSort);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -688,6 +689,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
COPY_SCALAR_FIELD(igExpired); COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(filesetDelimited); COPY_SCALAR_FIELD(filesetDelimited);
COPY_SCALAR_FIELD(needCountEmptyTable); COPY_SCALAR_FIELD(needCountEmptyTable);
COPY_SCALAR_FIELD(paraTablesSort);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -698,6 +698,7 @@ static const char* jkScanLogicPlanTagCond = "TagCond";
static const char* jkScanLogicPlanGroupTags = "GroupTags"; static const char* jkScanLogicPlanGroupTags = "GroupTags";
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx"; static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited"; static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
static const char* jkScanLogicPlanParaTablesSort = "ParaTablesSort";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
@ -745,6 +746,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited); code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanParaTablesSort, pNode->paraTablesSort);
}
return code; return code;
} }
@ -795,6 +799,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited); code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanParaTablesSort, &pNode->paraTablesSort);
}
return code; return code;
} }
@ -1888,6 +1895,7 @@ static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate"; static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited"; static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable"; static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
static const char* jkTableScanPhysiPlanParaTablesSort = "ParaTablesSort";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
@ -1962,6 +1970,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable); code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanParaTablesSort, pNode->paraTablesSort);
}
return code; return code;
} }
@ -2038,6 +2049,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable); code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanParaTablesSort, &pNode->paraTablesSort);
}
return code; return code;
} }

View File

@ -2185,6 +2185,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->needCountEmptyTable); code = tlvEncodeValueBool(pEncoder, pNode->needCountEmptyTable);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->paraTablesSort);
}
return code; return code;
} }
@ -2269,6 +2272,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->needCountEmptyTable); code = tlvDecodeValueBool(pDecoder, &pNode->needCountEmptyTable);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->paraTablesSort);
}
return code; return code;
} }

View File

@ -401,6 +401,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
case HINT_PARTITION_FIRST: case HINT_PARTITION_FIRST:
if (paramNum > 0 || hasHint(*ppHintList, HINT_SORT_FOR_GROUP)) return true; if (paramNum > 0 || hasHint(*ppHintList, HINT_SORT_FOR_GROUP)) return true;
break; break;
case HINT_PARA_TABLES_SORT:
if (paramNum > 0 || hasHint(*ppHintList, HINT_PARA_TABLES_SORT)) return true;
break;
default: default:
return true; return true;
} }
@ -479,6 +482,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
} }
opt = HINT_PARTITION_FIRST; opt = HINT_PARTITION_FIRST;
break; break;
case TK_PARA_TABLES_SORT:
lastComma = false;
if (0 != opt || inParamList) {
quit = true;
break;
}
opt = HINT_PARA_TABLES_SORT;
break;
case TK_NK_LP: case TK_NK_LP:
lastComma = false; lastComma = false;
if (0 == opt || inParamList) { if (0 == opt || inParamList) {

View File

@ -173,6 +173,7 @@ static SKeyword keywordTable[] = {
{"OUTPUTTYPE", TK_OUTPUTTYPE}, {"OUTPUTTYPE", TK_OUTPUTTYPE},
{"PAGES", TK_PAGES}, {"PAGES", TK_PAGES},
{"PAGESIZE", TK_PAGESIZE}, {"PAGESIZE", TK_PAGESIZE},
{"PARA_TABLES_SORT", TK_PARA_TABLES_SORT},
{"PARTITION", TK_PARTITION}, {"PARTITION", TK_PARTITION},
{"PARTITION_FIRST", TK_PARTITION_FIRST}, {"PARTITION_FIRST", TK_PARTITION_FIRST},
{"PASS", TK_PASS}, {"PASS", TK_PASS},

View File

@ -47,6 +47,7 @@ int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan);
bool getBatchScanOptionFromHint(SNodeList* pList); bool getBatchScanOptionFromHint(SNodeList* pList);
bool getSortForGroupOptHint(SNodeList* pList); bool getSortForGroupOptHint(SNodeList* pList);
bool getparaTablesSortOptHint(SNodeList* pList);
bool getOptHint(SNodeList* pList, EHintOption hint); bool getOptHint(SNodeList* pList, EHintOption hint);
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr); SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes); int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);

View File

@ -501,7 +501,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
} else { } else {
nodesDestroyNode((SNode*)pScan); nodesDestroyNode((SNode*)pScan);
} }
pScan->paraTablesSort = getparaTablesSortOptHint(pSelect->pHint);
pCxt->hasScan = true; pCxt->hasScan = true;
return code; return code;

View File

@ -651,6 +651,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false; pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited; pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag; pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
pTableScan->paraTablesSort = pScanLogicNode->paraTablesSort;
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -466,6 +466,18 @@ bool getOptHint(SNodeList* pList, EHintOption hint) {
return false; return false;
} }
bool getparaTablesSortOptHint(SNodeList* pList) {
if (!pList) return false;
SNode* pNode;
FOREACH(pNode, pList) {
SHintNode* pHint = (SHintNode*)pNode;
if (pHint->option == HINT_PARA_TABLES_SORT) {
return true;
}
}
return false;
}
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) { int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SLogicNode* pCurr = (SLogicNode*)pNode; SLogicNode* pCurr = (SLogicNode*)pNode;

View File

@ -961,7 +961,7 @@ static void cliSendCb(uv_write_t* req, int status) {
tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost); tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
} }
} }
if (pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) { if (pMsg != NULL && pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) {
rpcFreeCont(pMsg->msg.pCont); rpcFreeCont(pMsg->msg.pCont);
pMsg->msg.pCont = 0; pMsg->msg.pCont = 0;
} }

View File

@ -57,6 +57,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/para_tms.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/para_tms2.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py

View File

@ -656,7 +656,9 @@ if $data31 != 4 then
endi endi
sql_error select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,c; sql_error select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,c;
print select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname;
sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname; sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname;
print $rows
if $rows != 40 then if $rows != 40 then
return -1 return -1
endi endi

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff