diff --git a/docs/en/07-develop/03-insert-data/20-kafka-writting.mdx b/docs/en/07-develop/03-insert-data/20-kafka-writting.mdx
new file mode 100644
index 0000000000..ffb969a8a6
--- /dev/null
+++ b/docs/en/07-develop/03-insert-data/20-kafka-writting.mdx
@@ -0,0 +1,46 @@
+---
+title: Write from Kafka
+---
+
+import Tabs from "@theme/Tabs";
+import TabItem from "@theme/TabItem";
+import PyKafka from "./_py_kafka.mdx";
+
+## About Kafka
+
+Apache Kafka is an open-source distributed event streaming platform, used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. For the key concepts of kafka, please refer to [kafka documentation](https://kafka.apache.org/documentation/#gettingStarted).
+
+### kafka topic
+
+Messages in Kafka are organized by topics. A topic may have one or more partitions. We can manage kafka topics through `kafka-topics`.
+
+create a topic named `kafka-events`:
+
+```
+bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092
+```
+
+Alter `kafka-events` topic to set partitions to 3:
+
+```
+bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092
+```
+
+Show all topics and partitions in Kafka:
+
+```
+bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe
+```
+
+## Insert into TDengine
+
+We can write data into TDengine via SQL or Schemaless. For more information, please refer to [Insert Using SQL](/develop/insert-data/sql-writing/) or [High Performance Writing](/develop/insert-data/high-volume/) or [Schemaless Writing](/reference/schemaless/).
+
+## Examples
+
+
+
+
+
+
+
diff --git a/docs/en/07-develop/03-insert-data/02-influxdb-line.mdx b/docs/en/07-develop/03-insert-data/30-influxdb-line.mdx
similarity index 100%
rename from docs/en/07-develop/03-insert-data/02-influxdb-line.mdx
rename to docs/en/07-develop/03-insert-data/30-influxdb-line.mdx
diff --git a/docs/en/07-develop/03-insert-data/03-opentsdb-telnet.mdx b/docs/en/07-develop/03-insert-data/40-opentsdb-telnet.mdx
similarity index 100%
rename from docs/en/07-develop/03-insert-data/03-opentsdb-telnet.mdx
rename to docs/en/07-develop/03-insert-data/40-opentsdb-telnet.mdx
diff --git a/docs/en/07-develop/03-insert-data/04-opentsdb-json.mdx b/docs/en/07-develop/03-insert-data/50-opentsdb-json.mdx
similarity index 100%
rename from docs/en/07-develop/03-insert-data/04-opentsdb-json.mdx
rename to docs/en/07-develop/03-insert-data/50-opentsdb-json.mdx
diff --git a/docs/en/07-develop/03-insert-data/05-high-volume.md b/docs/en/07-develop/03-insert-data/60-high-volume.md
similarity index 100%
rename from docs/en/07-develop/03-insert-data/05-high-volume.md
rename to docs/en/07-develop/03-insert-data/60-high-volume.md
diff --git a/docs/en/07-develop/03-insert-data/_py_kafka.mdx b/docs/en/07-develop/03-insert-data/_py_kafka.mdx
new file mode 100644
index 0000000000..dc43a0d415
--- /dev/null
+++ b/docs/en/07-develop/03-insert-data/_py_kafka.mdx
@@ -0,0 +1,60 @@
+### python Kafka 客户端
+
+For python kafka client, please refer to [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python). In this document, we use [kafka-python](http://github.com/dpkp/kafka-python).
+
+### consume from Kafka
+
+The simple way to consume messages from Kafka is to read messages one by one. The demo is as follows:
+
+```
+from kafka import KafkaConsumer
+consumer = KafkaConsumer('my_favorite_topic')
+for msg in consumer:
+ print (msg)
+```
+
+For higher performance, we can consume message from kafka in batch. The demo is as follows:
+
+```
+from kafka import KafkaConsumer
+consumer = KafkaConsumer('my_favorite_topic')
+while True:
+ msgs = consumer.poll(timeout_ms=500, max_records=1000)
+ if msgs:
+ print (msgs)
+```
+
+### multi-threading
+
+For more higher performance we can process data from kafka in multi-thread. We can use python's ThreadPoolExecutor to achieve multithreading. The demo is as follows:
+
+```
+from concurrent.futures import ThreadPoolExecutor, Future
+pool = ThreadPoolExecutor(max_workers=10)
+pool.submit(...)
+```
+
+### multi-process
+
+For more higher performance, sometimes we use multiprocessing. In this case, the number of Kafka Consumers should not be greater than the number of Kafka Topic Partitions. The demo is as follows:
+
+```
+from multiprocessing import Process
+
+ps = []
+for i in range(5):
+ p = Process(target=Consumer().consume())
+ p.start()
+ ps.append(p)
+
+for p in ps:
+ p.join()
+```
+
+In addition to python's built-in multithreading and multiprocessing library, we can also use the third-party library gunicorn.
+
+### Examples
+
+```py
+{{#include docs/examples/python/kafka_example.py}}
+```
diff --git a/docs/en/20-third-party/01-grafana.mdx b/docs/en/20-third-party/01-grafana.mdx
index e0fbefd5a8..ca32ce8afc 100644
--- a/docs/en/20-third-party/01-grafana.mdx
+++ b/docs/en/20-third-party/01-grafana.mdx
@@ -76,7 +76,7 @@ sudo -u grafana grafana-cli plugins install tdengine-datasource
You can also download zip files from [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) or [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) and install manually. The commands are as follows:
```bash
-GF_VERSION=3.2.2
+GF_VERSION=3.2.7
# from GitHub
wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/tdengine-datasource-$GF_VERSION.zip
# from Grafana
diff --git a/docs/en/25-application/01-telegraf.md b/docs/en/25-application/01-telegraf.md
index f700326449..65fb08ee67 100644
--- a/docs/en/25-application/01-telegraf.md
+++ b/docs/en/25-application/01-telegraf.md
@@ -38,15 +38,9 @@ Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/
## Data Connection Setup
-### Download TDengine plug-in to grafana plug-in directory
+### Install Grafana Plugin and Configure Data Source
-```bash
-1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
-2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
-3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
-4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
-5. sudo systemctl restart grafana-server.service
-```
+Please refer to [Install Grafana Plugin and Configure Data Source](/third-party/grafana/#install-grafana-plugin-and-configure-data-source)
### Modify /etc/telegraf/telegraf.conf
diff --git a/docs/en/25-application/02-collectd.md b/docs/en/25-application/02-collectd.md
index 692cd8d929..97412b2309 100644
--- a/docs/en/25-application/02-collectd.md
+++ b/docs/en/25-application/02-collectd.md
@@ -41,15 +41,9 @@ Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/
## Data Connection Setup
-### Copy the TDengine plugin to the grafana plugin directory
+### Install Grafana Plugin and Configure Data Source
-```bash
-1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
-2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
-3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
-4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
-5. sudo systemctl restart grafana-server.service
-```
+Please refer to [Install Grafana Plugin and Configure Data Source](/third-party/grafana/#install-grafana-plugin-and-configure-data-source)
### Configure collectd
diff --git a/docs/examples/python/kafka_example.py b/docs/examples/python/kafka_example.py
new file mode 100644
index 0000000000..735059eec0
--- /dev/null
+++ b/docs/examples/python/kafka_example.py
@@ -0,0 +1,192 @@
+#! encoding = utf-8
+import json
+import time
+from json import JSONDecodeError
+from typing import Callable
+import logging
+from concurrent.futures import ThreadPoolExecutor, Future
+
+import taos
+from kafka import KafkaConsumer
+from kafka.consumer.fetcher import ConsumerRecord
+
+
+class Consumer(object):
+ DEFAULT_CONFIGS = {
+ 'kafka_brokers': 'localhost:9092',
+ 'kafka_topic': 'python_kafka',
+ 'kafka_group_id': 'taos',
+ 'taos_host': 'localhost',
+ 'taos_user': 'root',
+ 'taos_password': 'taosdata',
+ 'taos_database': 'power',
+ 'taos_port': 6030,
+ 'timezone': None,
+ 'clean_after_testing': False,
+ 'bath_consume': True,
+ 'batch_size': 1000,
+ 'async_model': True,
+ 'workers': 10
+ }
+
+ LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
+ 'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
+ 'California.SantaClara', 'California.Cupertino']
+
+ CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1'
+ USE_DATABASE_SQL = 'use {}'
+ DROP_TABLE_SQL = 'drop table if exists meters'
+ DROP_DATABASE_SQL = 'drop database if exists {}'
+ CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) ' \
+ 'tags (location binary(64), groupId int)'
+ CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})'
+ INSERT_SQL_HEADER = "insert into "
+ INSERT_PART_SQL = 'power.{} values (\'{}\', {}, {}, {})'
+
+ def __init__(self, **configs):
+ self.config: dict = self.DEFAULT_CONFIGS
+ self.config.update(configs)
+ self.consumer = KafkaConsumer(
+ self.config.get('kafka_topic'), # topic
+ bootstrap_servers=self.config.get('kafka_brokers'),
+ group_id=self.config.get('kafka_group_id'),
+ )
+ self.taos = taos.connect(
+ host=self.config.get('taos_host'),
+ user=self.config.get('taos_user'),
+ password=self.config.get('taos_password'),
+ port=self.config.get('taos_port'),
+ timezone=self.config.get('timezone'),
+ )
+ if self.config.get('async_model'):
+ self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
+ self.tasks: list[Future] = []
+ # tags and table mapping # key: {location}_{groupId} value:
+ self.tag_table_mapping = {}
+ i = 0
+ for location in self.LOCATIONS:
+ for j in range(1, 11):
+ table_name = 'd{}'.format(i)
+ self._cache_table(location=location, group_id=j, table_name=table_name)
+ i += 1
+
+ def init_env(self):
+ # create database and table
+ self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
+ self.taos.execute(self.CREATE_DATABASE_SQL.format(self.config.get('taos_database')))
+ self.taos.execute(self.USE_DATABASE_SQL.format(self.config.get('taos_database')))
+ self.taos.execute(self.DROP_TABLE_SQL)
+ self.taos.execute(self.CREATE_STABLE_SQL)
+ for tags, table_name in self.tag_table_mapping.items():
+ location, group_id = _get_location_and_group(tags)
+ self.taos.execute(self.CREATE_TABLE_SQL.format(table_name, location, group_id))
+
+ def consume(self):
+ logging.warning('## start consumer topic-[%s]', self.config.get('kafka_topic'))
+ try:
+ if self.config.get('bath_consume'):
+ self._run_batch(self._to_taos_batch)
+ else:
+ self._run(self._to_taos)
+ except KeyboardInterrupt:
+ logging.warning("## caught keyboard interrupt, stopping")
+ finally:
+ self.stop()
+
+ def stop(self):
+ # close consumer
+ if self.consumer is not None:
+ self.consumer.commit()
+ self.consumer.close()
+
+ # multi thread
+ if self.config.get('async_model'):
+ for task in self.tasks:
+ while not task.done():
+ pass
+ if self.pool is not None:
+ self.pool.shutdown()
+
+ # clean data
+ if self.config.get('clean_after_testing'):
+ self.taos.execute(self.DROP_TABLE_SQL)
+ self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
+ # close taos
+ if self.taos is not None:
+ self.taos.close()
+
+ def _run(self, f: Callable[[ConsumerRecord], bool]):
+ for message in self.consumer:
+ if self.config.get('async_model'):
+ self.pool.submit(f(message))
+ else:
+ f(message)
+
+ def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]):
+ while True:
+ messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size'))
+ if messages:
+ if self.config.get('async_model'):
+ self.pool.submit(f, messages.values())
+ else:
+ f(list(messages.values()))
+ if not messages:
+ time.sleep(0.1)
+
+ def _to_taos(self, message: ConsumerRecord) -> bool:
+ sql = self.INSERT_SQL_HEADER + self._build_sql(message.value)
+ if len(sql) == 0: # decode error, skip
+ return True
+ logging.info('## insert sql %s', sql)
+ return self.taos.execute(sql=sql) == 1
+
+ def _to_taos_batch(self, messages: list[list[ConsumerRecord]]):
+ sql = self._build_sql_batch(messages=messages)
+ if len(sql) == 0: # decode error, skip
+ return
+ self.taos.execute(sql=sql)
+
+ def _build_sql(self, msg_value: str) -> str:
+ try:
+ data = json.loads(msg_value)
+ except JSONDecodeError as e:
+ logging.error('## decode message [%s] error ', msg_value, e)
+ return ''
+ location = data.get('location')
+ group_id = data.get('groupId')
+ ts = data.get('ts')
+ current = data.get('current')
+ voltage = data.get('voltage')
+ phase = data.get('phase')
+
+ table_name = self._get_table_name(location=location, group_id=group_id)
+ return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
+
+ def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str:
+ sql_list = []
+ for partition_messages in messages:
+ for message in partition_messages:
+ sql_list.append(self._build_sql(message.value))
+
+ return self.INSERT_SQL_HEADER + ' '.join(sql_list)
+
+ def _cache_table(self, location: str, group_id: int, table_name: str):
+ self.tag_table_mapping[_tag_table_mapping_key(location=location, group_id=group_id)] = table_name
+
+ def _get_table_name(self, location: str, group_id: int) -> str:
+ return self.tag_table_mapping.get(_tag_table_mapping_key(location=location, group_id=group_id))
+
+
+def _tag_table_mapping_key(location: str, group_id: int):
+ return '{}_{}'.format(location, group_id)
+
+
+def _get_location_and_group(key: str) -> (str, int):
+ fields = key.split('_')
+ return fields[0], fields[1]
+
+
+if __name__ == '__main__':
+ consumer = Consumer(async_model=True)
+ consumer.init_env()
+ consumer.consume()
\ No newline at end of file
diff --git a/docs/zh/07-develop/03-insert-data/20-kafka-writting.mdx b/docs/zh/07-develop/03-insert-data/20-kafka-writting.mdx
new file mode 100644
index 0000000000..32d3c2e5cb
--- /dev/null
+++ b/docs/zh/07-develop/03-insert-data/20-kafka-writting.mdx
@@ -0,0 +1,47 @@
+---
+title: 从 Kafka 写入
+---
+
+import Tabs from "@theme/Tabs";
+import TabItem from "@theme/TabItem";
+import PyKafka from "./_py_kafka.mdx";
+
+## Kafka 介绍
+
+Apache Kafka 是开源的分布式消息分发平台,被广泛应用于高性能数据管道、流式数据分析、数据集成和事件驱动类型的应用程序。Kafka 包含 Producer、Consumer 和 Topic,其中 Producer 是向 Kafka 发送消息的进程,Consumer 是从 Kafka 消费消息的进程。Kafka 相关概念可以参考[官方文档](https://kafka.apache.org/documentation/#gettingStarted)。
+
+
+### kafka topic
+
+Kafka 的消息按 topic 组织,每个 topic 会有一到多个 partition。可以通过 kafka 的 `kafka-topics` 管理 topic。
+
+创建名为 `kafka-events` 的topic:
+
+```
+bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092
+```
+
+修改 `kafka-events` 的 partition 数量为 3:
+
+```
+bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092
+```
+
+展示所有的 topic 和 partition:
+
+```
+bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe
+```
+
+## 写入 TDengine
+
+TDengine 支持 Sql 方式和 Schemaless 方式的数据写入,Sql 方式数据写入可以参考 [TDengine SQL 写入](/develop/insert-data/sql-writing/) 和 [TDengine 高效写入](/develop/insert-data/high-volume/)。Schemaless 方式数据写入可以参考 [TDengine Schemaless 写入](/reference/schemaless/) 文档。
+
+## 示例代码
+
+
+
+
+
+
+
diff --git a/docs/zh/07-develop/03-insert-data/02-influxdb-line.mdx b/docs/zh/07-develop/03-insert-data/30-influxdb-line.mdx
similarity index 100%
rename from docs/zh/07-develop/03-insert-data/02-influxdb-line.mdx
rename to docs/zh/07-develop/03-insert-data/30-influxdb-line.mdx
diff --git a/docs/zh/07-develop/03-insert-data/03-opentsdb-telnet.mdx b/docs/zh/07-develop/03-insert-data/40-opentsdb-telnet.mdx
similarity index 100%
rename from docs/zh/07-develop/03-insert-data/03-opentsdb-telnet.mdx
rename to docs/zh/07-develop/03-insert-data/40-opentsdb-telnet.mdx
diff --git a/docs/zh/07-develop/03-insert-data/04-opentsdb-json.mdx b/docs/zh/07-develop/03-insert-data/50-opentsdb-json.mdx
similarity index 100%
rename from docs/zh/07-develop/03-insert-data/04-opentsdb-json.mdx
rename to docs/zh/07-develop/03-insert-data/50-opentsdb-json.mdx
diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/60-high-volume.md
similarity index 100%
rename from docs/zh/07-develop/03-insert-data/05-high-volume.md
rename to docs/zh/07-develop/03-insert-data/60-high-volume.md
diff --git a/docs/zh/07-develop/03-insert-data/_py_kafka.mdx b/docs/zh/07-develop/03-insert-data/_py_kafka.mdx
new file mode 100644
index 0000000000..cd7edf557d
--- /dev/null
+++ b/docs/zh/07-develop/03-insert-data/_py_kafka.mdx
@@ -0,0 +1,60 @@
+### python Kafka 客户端
+
+Kafka 的 python 客户端可以参考文档 [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python)。推荐使用 [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python) 和 [kafka-python](http://github.com/dpkp/kafka-python)。以下示例以 [kafka-python](http://github.com/dpkp/kafka-python) 为例。
+
+### 从 Kafka 消费数据
+
+Kafka 客户端采用 pull 的方式从 Kafka 消费数据,可以采用单条消费的方式或批量消费的方式读取数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端单条消费数据的示例如下:
+
+```
+from kafka import KafkaConsumer
+consumer = KafkaConsumer('my_favorite_topic')
+for msg in consumer:
+ print (msg)
+```
+
+单条消费的方式在数据流量大的情况下往往存在性能瓶颈,导致 Kafka 消息积压,更推荐使用批量消费的方式消费数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端批量消费数据的示例如下:
+
+```
+from kafka import KafkaConsumer
+consumer = KafkaConsumer('my_favorite_topic')
+while True:
+ msgs = consumer.poll(timeout_ms=500, max_records=1000)
+ if msgs:
+ print (msgs)
+```
+
+### Python 多线程
+
+为了提高数据写入效率,通常采用多线程的方式写入数据,可以使用 python 线程池 ThreadPoolExecutor 实现多线程。示例代码如下:
+
+```
+from concurrent.futures import ThreadPoolExecutor, Future
+pool = ThreadPoolExecutor(max_workers=10)
+pool.submit(...)
+```
+
+### Python 多进程
+
+单个python进程不能充分发挥多核 CPU 的性能,有时候我们会选择多进程的方式。在多进程的情况下,需要注意,Kafka Consumer 的数量应该小于等于 Kafka Topic Partition 数量。Python 多进程示例代码如下:
+
+```
+from multiprocessing import Process
+
+ps = []
+for i in range(5):
+ p = Process(target=Consumer().consume())
+ p.start()
+ ps.append(p)
+
+for p in ps:
+ p.join()
+```
+
+除了 Python 内置的多线程和多进程方式,还可以通过第三方库 gunicorn 实现并发。
+
+### 完整示例
+
+```py
+{{#include docs/examples/python/kafka_example.py}}
+```
diff --git a/docs/zh/25-application/01-telegraf.md b/docs/zh/25-application/01-telegraf.md
index 6338264d17..ec263d7792 100644
--- a/docs/zh/25-application/01-telegraf.md
+++ b/docs/zh/25-application/01-telegraf.md
@@ -39,15 +39,9 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如
## 数据链路设置
-### 下载 TDengine 插件到 Grafana 插件目录
+### 安装 Grafana Plugin 并配置数据源
-```bash
-1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
-2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
-3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
-4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
-5. sudo systemctl restart grafana-server.service
-```
+请参考[安装 Grafana Plugin 并配置数据源](/third-party/grafana/#%E5%AE%89%E8%A3%85-grafana-plugin-%E5%B9%B6%E9%85%8D%E7%BD%AE%E6%95%B0%E6%8D%AE%E6%BA%90)。
### 修改 /etc/telegraf/telegraf.conf
diff --git a/docs/zh/25-application/02-collectd.md b/docs/zh/25-application/02-collectd.md
index c6230f48ab..8b39a6431d 100644
--- a/docs/zh/25-application/02-collectd.md
+++ b/docs/zh/25-application/02-collectd.md
@@ -41,15 +41,9 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如
## 数据链路设置
-### 复制 TDengine 插件到 grafana 插件目录
+### 安装 Grafana Plugin 并配置数据源
-```bash
-1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
-2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
-3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
-4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
-5. sudo systemctl restart grafana-server.service
-```
+请参考[安装 Grafana Plugin 并配置数据源](/third-party/grafana/#%E5%AE%89%E8%A3%85-grafana-plugin-%E5%B9%B6%E9%85%8D%E7%BD%AE%E6%95%B0%E6%8D%AE%E6%BA%90)。
### 配置 collectd
diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index d210004760..412b4b4cf6 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -152,7 +152,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
* @param tinfo qhandle
* @return
*/
-int32_t qAsyncKillTask(qTaskInfo_t tinfo);
+int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
/**
* destroy query info structure
diff --git a/include/libs/function/function.h b/include/libs/function/function.h
index 13a8370dfd..1779af7605 100644
--- a/include/libs/function/function.h
+++ b/include/libs/function/function.h
@@ -163,6 +163,7 @@ typedef struct tExprNode {
int32_t functionId;
int32_t num;
struct SFunctionNode *pFunctNode;
+ int32_t functionType;
} _function;
struct {
diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h
index 81f63537e5..9ca6a7a9fa 100644
--- a/include/libs/function/functionMgt.h
+++ b/include/libs/function/functionMgt.h
@@ -130,6 +130,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_GROUP_KEY,
FUNCTION_TYPE_CACHE_LAST_ROW,
FUNCTION_TYPE_CACHE_LAST,
+ FUNCTION_TYPE_TABLE_COUNT,
// distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h
index cd7cedb6c1..f94e0c98dc 100644
--- a/include/libs/nodes/nodes.h
+++ b/include/libs/nodes/nodes.h
@@ -60,6 +60,12 @@ extern "C" {
for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); \
(NULL != cell ? (node = &(cell->pNode), true) : (node = NULL, false)); cell = cell->pNext)
+#define NODES_DESTORY_NODE(node) \
+ do { \
+ nodesDestroyNode((node)); \
+ (node) = NULL; \
+ } while (0)
+
#define NODES_DESTORY_LIST(list) \
do { \
nodesDestroyList((list)); \
@@ -228,6 +234,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN,
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN,
+ QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN,
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h
index 6a6fd50f2a..574d41db58 100644
--- a/include/libs/nodes/plannodes.h
+++ b/include/libs/nodes/plannodes.h
@@ -62,7 +62,8 @@ typedef enum EScanType {
SCAN_TYPE_STREAM,
SCAN_TYPE_TABLE_MERGE,
SCAN_TYPE_BLOCK_INFO,
- SCAN_TYPE_LAST_ROW
+ SCAN_TYPE_LAST_ROW,
+ SCAN_TYPE_TABLE_COUNT
} EScanType;
typedef struct SScanLogicNode {
@@ -323,6 +324,8 @@ typedef struct SLastRowScanPhysiNode {
bool ignoreNull;
} SLastRowScanPhysiNode;
+typedef SLastRowScanPhysiNode STableCountScanPhysiNode;
+
typedef struct SSystemTableScanPhysiNode {
SScanPhysiNode scan;
SEpSet mgmtEpSet;
diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h
index 9d54ce6aee..4521b32409 100644
--- a/include/libs/qcom/query.h
+++ b/include/libs/qcom/query.h
@@ -260,7 +260,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
-#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
+#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later
diff --git a/include/util/taoserror.h b/include/util/taoserror.h
index b00d2d25fb..9e4ede9afb 100644
--- a/include/util/taoserror.h
+++ b/include/util/taoserror.h
@@ -380,6 +380,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0526)
#define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527)
#define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528)
+#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c
index 1bfa1cd70f..5926936de7 100644
--- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c
+++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c
@@ -243,9 +243,9 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLin
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcReleaseHandle(pHandle, type); }
static bool rpcRfp(int32_t code, tmsg_t msgType) {
- if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
- code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_APP_IS_STARTING ||
- code == TSDB_CODE_MNODE_NOT_FOUND) {
+ if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_MNODE_NOT_FOUND ||
+ code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_RPC_BROKEN_LINK ||
+ code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING || TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false;
diff --git a/source/dnode/mnode/impl/inc/mndUser.h b/source/dnode/mnode/impl/inc/mndUser.h
index 970d1db7db..cf7deba397 100644
--- a/source/dnode/mnode/impl/inc/mndUser.h
+++ b/source/dnode/mnode/impl/inc/mndUser.h
@@ -31,6 +31,7 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
// for trans test
SSdbRaw *mndUserActionEncode(SUserObj *pUser);
SHashObj *mndDupDbHash(SHashObj *pOld);
+SHashObj *mndDupTopicHash(SHashObj *pOld);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c
index 20c2ebb0a4..ec2e844ba5 100644
--- a/source/dnode/mnode/impl/src/mndShow.c
+++ b/source/dnode/mnode/impl/src/mndShow.c
@@ -108,6 +108,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_APPS;
} else if (strncasecmp(name, TSDB_INS_TABLE_STREAM_TASKS, len) == 0) {
type = TSDB_MGMT_TABLE_STREAM_TASKS;
+ } else if (strncasecmp(name, TSDB_INS_TABLE_USER_PRIVILEGES, len) == 0) {
+ type = TSDB_MGMT_TABLE_PRIVILEGES;
} else {
// ASSERT(0);
}
diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c
index 5f34adce77..806ba0c98e 100644
--- a/source/dnode/mnode/impl/src/mndUser.c
+++ b/source/dnode/mnode/impl/src/mndUser.c
@@ -161,7 +161,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
char *topic = taosHashIterate(pUser->topics, NULL);
while (topic != NULL) {
SDB_SET_BINARY(pRaw, dataPos, topic, TSDB_TOPIC_FNAME_LEN, _OVER);
- db = taosHashIterate(pUser->topics, topic);
+ topic = taosHashIterate(pUser->topics, topic);
}
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
@@ -446,7 +446,7 @@ static int32_t mndAlterUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SRpc
return 0;
}
-SHashObj *mndDupDbHash(SHashObj *pOld) {
+SHashObj *mndDupObjHash(SHashObj *pOld, int32_t dataLen) {
SHashObj *pNew =
taosHashInit(taosHashGetSize(pOld), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pNew == NULL) {
@@ -457,7 +457,7 @@ SHashObj *mndDupDbHash(SHashObj *pOld) {
char *db = taosHashIterate(pOld, NULL);
while (db != NULL) {
int32_t len = strlen(db) + 1;
- if (taosHashPut(pNew, db, len, db, TSDB_DB_FNAME_LEN) != 0) {
+ if (taosHashPut(pNew, db, len, db, dataLen) != 0) {
taosHashCancelIterate(pOld, db);
taosHashCleanup(pNew);
terrno = TSDB_CODE_OUT_OF_MEMORY;
@@ -469,6 +469,10 @@ SHashObj *mndDupDbHash(SHashObj *pOld) {
return pNew;
}
+SHashObj *mndDupDbHash(SHashObj *pOld) { return mndDupObjHash(pOld, TSDB_DB_FNAME_LEN); }
+
+SHashObj *mndDupTopicHash(SHashObj *pOld) { return mndDupObjHash(pOld, TSDB_TOPIC_FNAME_LEN); }
+
static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@@ -519,7 +523,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
taosRLockLatch(&pUser->lock);
newUser.readDbs = mndDupDbHash(pUser->readDbs);
newUser.writeDbs = mndDupDbHash(pUser->writeDbs);
- newUser.topics = mndDupDbHash(pUser->topics);
+ newUser.topics = mndDupTopicHash(pUser->topics);
taosRUnLockLatch(&pUser->lock);
if (newUser.readDbs == NULL || newUser.writeDbs == NULL || newUser.topics == NULL) {
@@ -832,6 +836,26 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
int32_t numOfTopics = taosHashGetSize(pUser->topics);
if (numOfRows + numOfReadDbs + numOfWriteDbs + numOfTopics >= rows) break;
+ if (pUser->superUser) {
+ cols = 0;
+ SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
+ STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
+ colDataAppend(pColInfo, numOfRows, (const char *)userName, false);
+
+ char privilege[20] = {0};
+ STR_WITH_MAXSIZE_TO_VARSTR(privilege, "all", pShow->pMeta->pSchemas[cols].bytes);
+ pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ colDataAppend(pColInfo, numOfRows, (const char *)privilege, false);
+
+ char objName[20] = {0};
+ STR_WITH_MAXSIZE_TO_VARSTR(objName, "all", pShow->pMeta->pSchemas[cols].bytes);
+ pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ colDataAppend(pColInfo, numOfRows, (const char *)objName, false);
+
+ numOfRows++;
+ }
+
char *db = taosHashIterate(pUser->readDbs, NULL);
while (db != NULL) {
cols = 0;
@@ -902,7 +926,7 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
numOfRows++;
- db = taosHashIterate(pUser->topics, topic);
+ topic = taosHashIterate(pUser->topics, topic);
}
sdbRelease(pSdb, pUser);
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index 7ef3207b4d..756e23deeb 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -106,14 +106,24 @@ int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList,
int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
-int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
-int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
-bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
-int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
- bool *acquired);
-int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
- int32_t payloadLen, double selectivityRatio);
-int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
+
+int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
+int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
+int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
+bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
+int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
+ bool *acquired);
+int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
+ int32_t payloadLen, double selectivityRatio);
+int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
+tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
+int64_t metaGetTbNum(SMeta *pMeta);
+int64_t metaGetNtbNum(SMeta *pMeta);
+typedef struct {
+ int64_t uid;
+ int64_t ctbNum;
+} SMetaStbStats;
+int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
typedef struct SMetaFltParam {
tb_uid_t suid;
diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h
index f229b3b127..6451d15969 100644
--- a/source/dnode/vnode/src/inc/vnodeInt.h
+++ b/source/dnode/vnode/src/inc/vnodeInt.h
@@ -116,8 +116,6 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaAlterCache(SMeta* pMeta, int32_t nPage);
-tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
-int64_t metaGetTbNum(SMeta* pMeta);
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
@@ -144,12 +142,6 @@ typedef struct SMetaInfo {
} SMetaInfo;
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo, SMetaReader* pReader);
-typedef struct {
- int64_t uid;
- int64_t ctbNum;
-} SMetaStbStats;
-int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
-
// tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int tsdbClose(STsdb** pTsdb);
diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c
index 0257aede3d..cfdb4ab8d1 100644
--- a/source/dnode/vnode/src/meta/metaQuery.c
+++ b/source/dnode/vnode/src/meta/metaQuery.c
@@ -223,6 +223,23 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
return 0;
}
+
+int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
+ int code = 0;
+ SMetaReader mr = {0};
+ metaReaderInit(&mr, (SMeta *)meta, 0);
+ code = metaGetTableEntryByUid(&mr, uid);
+ if (code < 0) {
+ metaReaderClear(&mr);
+ return -1;
+ }
+ strncpy(tbName, mr.me.name, TSDB_TABLE_NAME_LEN);
+ metaReaderClear(&mr);
+
+ return 0;
+}
+
+
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid) {
int code = 0;
SMetaReader mr = {0};
@@ -739,6 +756,10 @@ int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
}
+int64_t metaGetNtbNum(SMeta *pMeta) {
+ return pMeta->pVnode->config.vndStats.numOfNTables;
+}
+
typedef struct {
SMeta *pMeta;
TBC *pCur;
diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h
index 4d0c5389e1..6acf19218d 100644
--- a/source/libs/command/inc/commandInt.h
+++ b/source/libs/command/inc/commandInt.h
@@ -34,6 +34,7 @@ extern "C" {
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s"
#define EXPLAIN_DISTBLK_SCAN_FORMAT "Block Dist Scan on %s"
#define EXPLAIN_LASTROW_SCAN_FORMAT "Last Row Scan on %s"
+#define EXPLAIN_TABLE_COUNT_SCAN_FORMAT "Table Count Row Scan on %s"
#define EXPLAIN_PROJECTION_FORMAT "Projection"
#define EXPLAIN_JOIN_FORMAT "%s"
#define EXPLAIN_AGG_FORMAT "Aggragate"
diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c
index 410e62a18b..03c7249294 100644
--- a/source/libs/command/src/explain.c
+++ b/source/libs/command/src/explain.c
@@ -19,6 +19,7 @@
#include "query.h"
#include "tcommon.h"
#include "tdatablock.h"
+#include "systable.h"
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel);
@@ -212,6 +213,11 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = lastRowPhysiNode->scan.node.pChildren;
break;
}
+ case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: {
+ STableCountScanPhysiNode *tableCountPhysiNode = (STableCountScanPhysiNode *)pNode;
+ pPhysiChildren = tableCountPhysiNode->scan.node.pChildren;
+ break;
+ }
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
SGroupSortPhysiNode *groupSortPhysiNode = (SGroupSortPhysiNode *)pNode;
pPhysiChildren = groupSortPhysiNode->node.pChildren;
@@ -1355,6 +1361,48 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
+ case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: {
+ STableCountScanPhysiNode *pLastRowNode = (STableCountScanPhysiNode *)pNode;
+ EXPLAIN_ROW_NEW(level, EXPLAIN_TABLE_COUNT_SCAN_FORMAT,
+ ('\0' != pLastRowNode->scan.tableName.tname[0] ? pLastRowNode->scan.tableName.tname : TSDB_INS_TABLE_TABLES));
+ EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
+ if (pResNode->pExecInfo) {
+ QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
+ EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
+ }
+ EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, LIST_LENGTH(pLastRowNode->scan.pScanCols));
+ EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
+ if (pLastRowNode->scan.pScanPseudoCols) {
+ EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pLastRowNode->scan.pScanPseudoCols->length);
+ EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
+ }
+ EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->scan.node.pOutputDataBlockDesc->totalRowSize);
+ EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
+ EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
+ EXPLAIN_ROW_END();
+ QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
+
+ if (verbose) {
+ EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
+ EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
+ nodesGetOutputNumFromSlotList(pLastRowNode->scan.node.pOutputDataBlockDesc->pSlots));
+ EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
+ EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->scan.node.pOutputDataBlockDesc->outputRowSize);
+ EXPLAIN_ROW_APPEND_LIMIT(pLastRowNode->scan.node.pLimit);
+ EXPLAIN_ROW_APPEND_SLIMIT(pLastRowNode->scan.node.pSlimit);
+ EXPLAIN_ROW_END();
+ QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
+
+ if (pLastRowNode->scan.node.pConditions) {
+ EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
+ QRY_ERR_RET(nodesNodeToSQL(pLastRowNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE,
+ TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
+ EXPLAIN_ROW_END();
+ QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
+ }
+ }
+ break;
+ }
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
SGroupSortPhysiNode *pSortNode = (SGroupSortPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_GROUP_SORT_FORMAT);
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 7100be58e3..19915956bb 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -482,6 +482,34 @@ typedef struct {
SSnapContext* sContext;
} SStreamRawScanInfo;
+typedef struct STableCountScanSupp {
+ int16_t dbNameSlotId;
+ int16_t stbNameSlotId;
+ int16_t tbCountSlotId;
+
+ bool groupByDbName;
+ bool groupByStbName;
+ char dbName[TSDB_DB_NAME_LEN];
+ char stbName[TSDB_TABLE_NAME_LEN];
+
+} STableCountScanSupp;
+
+typedef struct STableCountScanOperatorInfo {
+ SReadHandle readHandle;
+ SSDataBlock* pRes;
+
+ SName tableName;
+
+ SNodeList* groupTags;
+ SNodeList* scanCols;
+ SNodeList* pseudoCols;
+
+ STableCountScanSupp supp;
+
+ int32_t currGrpIdx;
+ SArray* stbUidList; // when group by db_name and stable_name
+} STableCountScanOperatorInfo;
+
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
SSDataBlock* pRes;
@@ -781,7 +809,7 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
int32_t checkForQueryBuf(size_t numOfTables);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
-void setTaskKilled(SExecTaskInfo* pTaskInfo);
+void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c
index 963a273290..53660d88e1 100644
--- a/source/libs/executor/src/exchangeoperator.c
+++ b/source/libs/executor/src/exchangeoperator.c
@@ -70,7 +70,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
- longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
+ longjmp(pTaskInfo->env, pTaskInfo->code);
}
for (int32_t i = 0; i < totalSources; ++i) {
@@ -573,7 +573,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
- longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
+ longjmp(pTaskInfo->env, pTaskInfo->code);
}
tsem_post(&pExchangeInfo->ready);
@@ -621,7 +621,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
- longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
+ longjmp(pTaskInfo->env, pTaskInfo->code);
}
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c
index 8a85885d98..d98db3e90d 100644
--- a/source/libs/executor/src/executil.c
+++ b/source/libs/executor/src/executil.c
@@ -1348,6 +1348,7 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExprNode->_function.functionId = pFuncNode->funcId;
pExprNode->_function.pFunctNode = pFuncNode;
+ pExprNode->_function.functionType = pFuncNode->funcType;
tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index 10ceb9ccee..ebd1afa855 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -688,7 +688,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}
-int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
+int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) {
@@ -697,7 +697,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
- setTaskKilled(pTaskInfo);
+ setTaskKilled(pTaskInfo, rspCode);
qStopTaskOperators(pTaskInfo);
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index d0d8b42442..4fad307047 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -610,21 +610,10 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
}
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
- // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
- // abort current query execution.
- if (pTaskInfo->owner != 0 &&
- ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
- /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
- assert(pTaskInfo->cost.start != 0);
- // qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
- // ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
- // return true;
- }
-
- return false;
+ return (0 != pTaskInfo->code) ? true : false;
}
-void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
+void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
/////////////////////////////////////////////////////////////////////////////////////////////
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
@@ -1381,7 +1370,8 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
- type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
+ type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN ||
+ type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
*order = TSDB_ORDER_ASC;
*scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS;
@@ -1883,6 +1873,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
+SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode,
+ SExecTaskInfo* pTaskInfo);
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
@@ -2073,6 +2065,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
+ } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
+ STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
+ pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 7f0dec1959..16a32fcfc9 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -30,7 +30,6 @@
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
-#include "vnode.h"
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
@@ -629,7 +628,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
if (isTaskKilled(pTaskInfo)) {
- T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
+ T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
// process this data block based on the probabilities
@@ -2032,7 +2031,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) {
- longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
+ longjmp(pTaskInfo->env, pTaskInfo->code);
}
int32_t rows = 0;
@@ -2529,7 +2528,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
STsdbReader* reader = pInfo->base.dataReader;
while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pTaskInfo)) {
- T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
+ T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
// process this data block based on the probabilities
@@ -2913,3 +2912,305 @@ _error:
taosMemoryFree(pOperator);
return NULL;
}
+
+// ====================================================================================================================
+// TableCountScanOperator
+static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
+static void destoryTableCountScanOperator(void* param);
+static const char* GROUP_TAG_DB_NAME = "db_name";
+static const char* GROUP_TAG_STABLE_NAME = "stable_name";
+
+int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
+ if (scanCols != NULL) {
+ SNode* pNode = NULL;
+ FOREACH(pNode, scanCols) {
+ if (nodeType(pNode) != QUERY_NODE_TARGET) {
+ return TSDB_CODE_QRY_SYS_ERROR;
+ }
+ STargetNode* targetNode = (STargetNode*)pNode;
+ if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
+ return TSDB_CODE_QRY_SYS_ERROR;
+ }
+ SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
+ if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
+ supp->dbNameSlotId = targetNode->slotId;
+ } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
+ supp->stbNameSlotId = targetNode->slotId;
+ }
+ }
+ }
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
+ if (pseudoCols != NULL) {
+ SNode* pNode = NULL;
+ FOREACH(pNode, pseudoCols) {
+ if (nodeType(pNode) != QUERY_NODE_TARGET) {
+ return TSDB_CODE_QRY_SYS_ERROR;
+ }
+ STargetNode* targetNode = (STargetNode*)pNode;
+ if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
+ return TSDB_CODE_QRY_SYS_ERROR;
+ }
+ SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
+ if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
+ supp->tbCountSlotId = targetNode->slotId;
+ }
+ }
+ }
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
+ if (groupTags != NULL) {
+ SNode* pNode = NULL;
+ FOREACH(pNode, groupTags) {
+ if (nodeType(pNode) != QUERY_NODE_COLUMN) {
+ return TSDB_CODE_QRY_SYS_ERROR;
+ }
+ SColumnNode* colNode = (SColumnNode*)pNode;
+ if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
+ supp->groupByDbName = true;
+ }
+ if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
+ supp->groupByStbName = true;
+ }
+ }
+ } else {
+ strncpy(supp->dbName, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
+ strncpy(supp->stbName, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
+ }
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
+ STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
+ int32_t code = 0;
+ code = tblCountScanGetInputs(groupTags, tableName, supp);
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
+ return code;
+ }
+ supp->dbNameSlotId = -1;
+ supp->stbNameSlotId = -1;
+ supp->tbCountSlotId = -1;
+
+ code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
+ return code;
+ }
+ code = tblCountScanGetCountSlotId(pseudoCols, supp);
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
+ return code;
+ }
+ return code;
+}
+
+SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
+ SExecTaskInfo* pTaskInfo) {
+ int32_t code = TSDB_CODE_SUCCESS;
+
+ SScanPhysiNode* pScanNode = &pTblCountScanNode->scan;
+ STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
+ SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
+
+ if (!pInfo || !pOperator) {
+ goto _error;
+ }
+
+ pInfo->readHandle = *readHandle;
+
+ SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
+ initResultSizeInfo(&pOperator->resultInfo, 1);
+ pInfo->pRes = createDataBlockFromDescNode(pDescNode);
+ blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
+
+ getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
+ pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
+ pTaskInfo);
+
+ setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
+ pInfo, pTaskInfo);
+ pOperator->fpSet =
+ createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL);
+ return pOperator;
+
+_error:
+ if (pInfo != NULL) {
+ destoryTableCountScanOperator(pInfo);
+ }
+ taosMemoryFreeClear(pOperator);
+ pTaskInfo->code = code;
+ return NULL;
+}
+
+void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
+ SSDataBlock* pRes) {
+ if (pSupp->dbNameSlotId != -1) {
+ ASSERT(strlen(dbName));
+ SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
+ char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ strncpy(varDataVal(varDbName), dbName, strlen(dbName));
+ varDataSetLen(varDbName, strlen(dbName));
+ colDataAppend(colInfoData, 0, varDbName, false);
+ }
+
+ if (pSupp->stbNameSlotId != -1) {
+ SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
+ if (strlen(stbName) != 0) {
+ char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ strncpy(varDataVal(varStbName), stbName, strlen(stbName));
+ varDataSetLen(varStbName, strlen(stbName));
+ colDataAppend(colInfoData, 0, varStbName, false);
+ } else {
+ colDataAppendNULL(colInfoData, 0);
+ }
+ }
+
+ if (pSupp->tbCountSlotId != -1) {
+ SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
+ colDataAppend(colInfoData, 0, (char*)&count, false);
+ }
+ pRes->info.rows = 1;
+}
+
+static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
+ STableCountScanSupp* pSupp = &pInfo->supp;
+ SSDataBlock* pRes = pInfo->pRes;
+
+ size_t infodbTableNum;
+ getInfosDbMeta(NULL, &infodbTableNum);
+ size_t perfdbTableNum;
+ getPerfDbMeta(NULL, &perfdbTableNum);
+
+ if (pSupp->groupByDbName) {
+ if (pInfo->currGrpIdx == 0) {
+ uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
+ pRes->info.id.groupId = groupId;
+ fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
+ } else if (pInfo->currGrpIdx == 1) {
+ uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
+ pRes->info.id.groupId = groupId;
+ fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
+ } else {
+ setOperatorCompleted(pOperator);
+ return NULL;
+ }
+ pInfo->currGrpIdx++;
+ return (pRes->info.rows > 0) ? pRes : NULL;
+ } else {
+ if (strcmp(pSupp->dbName, TSDB_INFORMATION_SCHEMA_DB) == 0) {
+ fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
+ } else if (strcmp(pSupp->dbName, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
+ fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
+ } else if (strlen(pSupp->dbName) == 0) {
+ fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
+ }
+ setOperatorCompleted(pOperator);
+ return (pRes->info.rows > 0) ? pRes : NULL;
+ }
+}
+
+static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ STableCountScanOperatorInfo* pInfo = pOperator->info;
+ STableCountScanSupp* pSupp = &pInfo->supp;
+ SSDataBlock* pRes = pInfo->pRes;
+ blockDataCleanup(pRes);
+
+ if (pOperator->status == OP_EXEC_DONE) {
+ return NULL;
+ }
+ if (pInfo->readHandle.mnd != NULL) {
+ return buildSysDbTableCount(pOperator, pInfo);
+ }
+
+ const char* db = NULL;
+ int32_t vgId = 0;
+ char dbName[TSDB_DB_NAME_LEN] = {0};
+
+ {
+ // get dbname
+ vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
+ SName sn = {0};
+ tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
+ tNameGetDbName(&sn, dbName);
+ }
+ if (pSupp->groupByDbName) {
+ if (pSupp->groupByStbName) {
+ if (pInfo->stbUidList == NULL) {
+ pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
+ if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
+ qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
+ }
+ }
+ if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
+ tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
+
+ char stbName[TSDB_TABLE_NAME_LEN] = {0};
+ metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);
+
+ char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
+ snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
+ uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
+ pRes->info.id.groupId = groupId;
+
+ SMetaStbStats stats = {0};
+ metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
+ int64_t ctbNum = stats.ctbNum;
+
+ fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
+
+ pInfo->currGrpIdx++;
+ } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
+ char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
+ snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
+ uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
+ pRes->info.id.groupId = groupId;
+ int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
+ fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
+
+ pInfo->currGrpIdx++;
+ } else {
+ setOperatorCompleted(pOperator);
+ return NULL;
+ }
+ } else {
+ uint64_t groupId = calcGroupId(dbName, strlen(dbName));
+ pRes->info.id.groupId = groupId;
+ int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
+ fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
+ setOperatorCompleted(pOperator);
+ }
+ } else {
+ if (strlen(pSupp->dbName) != 0) {
+ if (strlen(pSupp->stbName) != 0) {
+ tb_uid_t uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbName);
+ SMetaStbStats stats = {0};
+ metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
+ int64_t ctbNum = stats.ctbNum;
+ fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbName, ctbNum, pRes);
+ } else {
+ int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
+ fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
+ }
+ } else {
+ int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
+ fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
+ }
+ setOperatorCompleted(pOperator);
+ }
+ return pRes->info.rows > 0 ? pRes : NULL;
+}
+
+static void destoryTableCountScanOperator(void* param) {
+ STableCountScanOperatorInfo* pTableCountScanInfo = param;
+ blockDataDestroy(pTableCountScanInfo->pRes);
+
+ nodesDestroyList(pTableCountScanInfo->groupTags);
+ taosArrayDestroy(pTableCountScanInfo->stbUidList);
+ taosMemoryFreeClear(param);
+}
diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c
index fe010786eb..a2a827e7e5 100644
--- a/source/libs/function/src/builtins.c
+++ b/source/libs/function/src/builtins.c
@@ -2080,6 +2080,11 @@ static int32_t translateTagsPseudoColumn(SFunctionNode* pFunc, char* pErrBuf, in
return TSDB_CODE_SUCCESS;
}
+static int32_t translateTableCountPseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
+ pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
+ return TSDB_CODE_SUCCESS;
+}
+
// clang-format off
const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
@@ -3241,6 +3246,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
+ {
+ .name = "_table_count",
+ .type = FUNCTION_TYPE_TABLE_COUNT,
+ .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
+ .translateFunc = translateTableCountPseudoColumn,
+ .getEnvFunc = NULL,
+ .initFunc = NULL,
+ .sprocessFunc = NULL,
+ .finalizeFunc = NULL
+ },
};
// clang-format on
diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c
index 50e410b339..352e78c2fc 100644
--- a/source/libs/nodes/src/nodesCodeFuncs.c
+++ b/source/libs/nodes/src/nodesCodeFuncs.c
@@ -221,6 +221,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiTableScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
return "PhysiTableSeqScan";
+ case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
+ return "PhysiTableMergeScan";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return "PhysiSreamScan";
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
@@ -229,8 +231,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiBlockDistScan";
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
return "PhysiLastRowScan";
- case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
- return "PhysiTableMergeScan";
+ case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
+ return "PhysiTableCountScan";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
@@ -4646,6 +4648,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
return physiScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
+ case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return physiLastRowScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
@@ -4800,6 +4803,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToLogicPlan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
+ case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return jsonToPhysiScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
return jsonToPhysiLastRowScanNode(pJson, pObj);
diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c
index 1e8ff8da1a..bf6cd33af7 100644
--- a/source/libs/nodes/src/nodesMsgFuncs.c
+++ b/source/libs/nodes/src/nodesMsgFuncs.c
@@ -3640,6 +3640,7 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code = physiScanNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
+ case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
code = physiLastRowScanNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
@@ -3778,6 +3779,7 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
code = msgToPhysiScanNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
+ case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
code = msgToPhysiLastRowScanNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c
index 2f0f6b3432..f363cd0f10 100644
--- a/source/libs/nodes/src/nodesUtilFuncs.c
+++ b/source/libs/nodes/src/nodesUtilFuncs.c
@@ -494,6 +494,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SBlockDistScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
return makeNode(type, sizeof(SLastRowScanPhysiNode));
+ case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
+ return makeNode(type, sizeof(STableCountScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
@@ -1120,6 +1122,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
+ case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
destroyScanPhysiNode((SScanPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: {
diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h
index c53d3f9320..ce5a63f5d0 100644
--- a/source/libs/parser/inc/parUtil.h
+++ b/source/libs/parser/inc/parUtil.h
@@ -86,7 +86,7 @@ STableComInfo getTableInfo(const STableMeta* pTableMeta);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
-int32_t getInsTagsTableTargetName(int32_t acctId, SNode* pWhere, SName* pName);
+int32_t getVnodeSysTableTargetName(int32_t acctId, SNode* pWhere, SName* pName);
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c
index 3481eddba4..f90a42add3 100644
--- a/source/libs/parser/src/parAstParser.c
+++ b/source/libs/parser/src/parAstParser.c
@@ -140,7 +140,7 @@ static int32_t collectMetaKeyFromInsTagsImpl(SCollectMetaKeyCxt* pCxt, SName* pN
static int32_t collectMetaKeyFromInsTags(SCollectMetaKeyCxt* pCxt) {
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pStmt;
SName name = {0};
- int32_t code = getInsTagsTableTargetName(pCxt->pParseCxt->acctId, pSelect->pWhere, &name);
+ int32_t code = getVnodeSysTableTargetName(pCxt->pParseCxt->acctId, pSelect->pWhere, &name);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromInsTagsImpl(pCxt, &name);
}
@@ -165,7 +165,8 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, const c
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES))) {
code = reserveDnodeRequiredInCache(pCxt->pMetaCache);
}
- if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) &&
+ if (TSDB_CODE_SUCCESS == code &&
+ (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS) || 0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) &&
QUERY_NODE_SELECT_STMT == nodeType(pCxt->pStmt)) {
code = collectMetaKeyFromInsTags(pCxt);
}
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index d2d97f7b90..bca139664d 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -47,6 +47,7 @@ typedef struct STranslateContext {
SParseMetaCache* pMetaCache;
bool createStream;
bool stableQuery;
+ bool showRewrite;
} STranslateContext;
typedef struct SFullDatabaseName {
@@ -750,8 +751,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) {
return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId);
} else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) {
SFunctionNode* pFunc = (SFunctionNode*)pExpr;
- if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_FIRST == pFunc->funcType ||
- FUNCTION_TYPE_LAST == pFunc->funcType) {
+ if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType ||
+ FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) {
return true;
@@ -2209,22 +2210,28 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
}
static bool sysTableFromVnode(const char* pTable) {
- return (0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) ||
- (0 == strcmp(pTable, TSDB_INS_TABLE_TABLE_DISTRIBUTED) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)));
+ return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)));
}
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
-static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTargetName, SName* pName,
- SArray** pVgroupList) {
+static int32_t getVnodeSysTableVgroupListImpl(STranslateContext* pCxt, SName* pTargetName, SName* pName,
+ SArray** pVgroupList) {
if (0 == pTargetName->type) {
return getDBVgInfoImpl(pCxt, pName, pVgroupList);
}
+ if (0 == strcmp(pTargetName->dbname, TSDB_INFORMATION_SCHEMA_DB) ||
+ 0 == strcmp(pTargetName->dbname, TSDB_PERFORMANCE_SCHEMA_DB)) {
+ pTargetName->type = 0;
+ return TSDB_CODE_SUCCESS;
+ }
+
if (TSDB_DB_NAME_T == pTargetName->type) {
int32_t code = getDBVgInfoImpl(pCxt, pTargetName, pVgroupList);
- if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
- TSDB_CODE_MND_DB_IN_DROPPING == code) {
+ if (!pCxt->showRewrite && (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
+ TSDB_CODE_MND_DB_IN_DROPPING == code)) {
+ // system table query should not report errors
code = TSDB_CODE_SUCCESS;
}
return code;
@@ -2241,50 +2248,44 @@ static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTarge
}
} else if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
TSDB_CODE_MND_DB_IN_DROPPING == code) {
+ // system table query should not report errors
code = TSDB_CODE_SUCCESS;
}
return code;
}
-static int32_t getTagsTableVgroupList(STranslateContext* pCxt, SName* pName, SArray** pVgroupList) {
+static int32_t getVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, SArray** pVgs, bool* pHasUserDbCond) {
if (!isSelectStmt(pCxt->pCurrStmt)) {
return TSDB_CODE_SUCCESS;
}
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
SName targetName = {0};
- int32_t code = getInsTagsTableTargetName(pCxt->pParseCxt->acctId, pSelect->pWhere, &targetName);
+ int32_t code = getVnodeSysTableTargetName(pCxt->pParseCxt->acctId, pSelect->pWhere, &targetName);
if (TSDB_CODE_SUCCESS == code) {
- code = getTagsTableVgroupListImpl(pCxt, &targetName, pName, pVgroupList);
+ code = getVnodeSysTableVgroupListImpl(pCxt, &targetName, pName, pVgs);
}
+ *pHasUserDbCond = (0 != targetName.type && taosArrayGetSize(*pVgs) > 0);
return code;
}
static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
- int32_t code = TSDB_CODE_SUCCESS;
- SArray* vgroupList = NULL;
- if (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS)) {
- code = getTagsTableVgroupList(pCxt, pName, &vgroupList);
- } else if ('\0' != pRealTable->qualDbName[0]) {
- if (0 != strcmp(pRealTable->qualDbName, TSDB_INFORMATION_SCHEMA_DB)) {
- code = getDBVgInfo(pCxt, pRealTable->qualDbName, &vgroupList);
- }
- } else {
- code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
- }
+ bool hasUserDbCond = false;
+ SArray* pVgs = NULL;
+ int32_t code = getVnodeSysTableVgroupList(pCxt, pName, &pVgs, &hasUserDbCond);
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
- isSelectStmt(pCxt->pCurrStmt) && 0 == taosArrayGetSize(vgroupList)) {
+ isSelectStmt(pCxt->pCurrStmt) && 0 == taosArrayGetSize(pVgs)) {
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
}
- if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
- code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
+ if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) {
+ code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &pVgs);
}
if (TSDB_CODE_SUCCESS == code) {
- code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
+ code = toVgroupsInfo(pVgs, &pRealTable->pVgroupList);
}
- taosArrayDestroy(vgroupList);
+ taosArrayDestroy(pVgs);
return code;
}
@@ -2309,30 +2310,39 @@ static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRea
}
}
+static int32_t setSuperTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
+ SArray* vgroupList = NULL;
+ int32_t code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
+ }
+ taosArrayDestroy(vgroupList);
+ return code;
+}
+
+static int32_t setNormalTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
+ pRealTable->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
+ if (NULL == pRealTable->pVgroupList) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+ pRealTable->pVgroupList->numOfVgroups = 1;
+ return getTableHashVgroupImpl(pCxt, pName, pRealTable->pVgroupList->vgroups);
+}
+
static int32_t setTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (pCxt->pParseCxt->topicQuery) {
return TSDB_CODE_SUCCESS;
}
- int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) {
- SArray* vgroupList = NULL;
- code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
- if (TSDB_CODE_SUCCESS == code) {
- code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
- }
- taosArrayDestroy(vgroupList);
- } else if (TSDB_SYSTEM_TABLE == pRealTable->pMeta->tableType) {
- code = setSysTableVgroupList(pCxt, pName, pRealTable);
- } else {
- pRealTable->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
- if (NULL == pRealTable->pVgroupList) {
- return TSDB_CODE_OUT_OF_MEMORY;
- }
- pRealTable->pVgroupList->numOfVgroups = 1;
- code = getTableHashVgroupImpl(pCxt, pName, pRealTable->pVgroupList->vgroups);
+ return setSuperTableVgroupList(pCxt, pName, pRealTable);
}
- return code;
+
+ if (TSDB_SYSTEM_TABLE == pRealTable->pMeta->tableType) {
+ return setSysTableVgroupList(pCxt, pName, pRealTable);
+ }
+
+ return setNormalTableVgroupList(pCxt, pName, pRealTable);
}
static uint8_t getStmtPrecision(SNode* pStmt) {
@@ -2366,7 +2376,6 @@ static bool isSingleTable(SRealTableNode* pRealTable) {
int8_t tableType = pRealTable->pMeta->tableType;
if (TSDB_SYSTEM_TABLE == tableType) {
return 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) &&
- 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLE_DISTRIBUTED) &&
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS);
}
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
@@ -6296,6 +6305,7 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) {
code = createShowCondition((SShowStmt*)pQuery->pRoot, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
+ pCxt->showRewrite = true;
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;
@@ -6349,6 +6359,7 @@ static int32_t rewriteShowStableTags(STranslateContext* pCxt, SQuery* pQuery) {
code = createShowTableTagsProjections(&pSelect->pProjectionList, &pShow->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
+ pCxt->showRewrite = true;
pQuery->showRewrite = true;
pSelect->tagScan = true;
nodesDestroyNode(pQuery->pRoot);
@@ -6379,6 +6390,7 @@ static int32_t rewriteShowDnodeVariables(STranslateContext* pCxt, SQuery* pQuery
}
}
if (TSDB_CODE_SUCCESS == code) {
+ pCxt->showRewrite = true;
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pSelect;
@@ -6398,6 +6410,7 @@ static int32_t rewriteShowVnodes(STranslateContext* pCxt, SQuery* pQuery) {
}
}
if (TSDB_CODE_SUCCESS == code) {
+ pCxt->showRewrite = true;
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;
@@ -6439,6 +6452,7 @@ static int32_t rewriteShowTableDist(STranslateContext* pCxt, SQuery* pQuery) {
code = nodesListMakeStrictAppend(&pStmt->pProjectionList, createBlockDistFunc());
}
if (TSDB_CODE_SUCCESS == code) {
+ pCxt->showRewrite = true;
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;
diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c
index 7a56b0e0fa..d9fc83c20f 100644
--- a/source/libs/parser/src/parUtil.c
+++ b/source/libs/parser/src/parUtil.c
@@ -474,7 +474,7 @@ static int32_t getInsTagsTableTargetNameFromCond(int32_t acctId, SLogicCondition
return TSDB_CODE_SUCCESS;
}
-int32_t getInsTagsTableTargetName(int32_t acctId, SNode* pWhere, SName* pName) {
+int32_t getVnodeSysTableTargetName(int32_t acctId, SNode* pWhere, SName* pName) {
if (NULL == pWhere) {
return TSDB_CODE_SUCCESS;
}
diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp
index de7487434c..cc51beb842 100644
--- a/source/libs/parser/test/mockCatalog.cpp
+++ b/source/libs/parser/test/mockCatalog.cpp
@@ -65,9 +65,10 @@ void generateInformationSchema(MockCatalogService* mcs) {
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
.addColumn("stable_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.done();
- mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TABLES, TSDB_SYSTEM_TABLE, 2)
- .addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
+ mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TABLES, TSDB_SYSTEM_TABLE, 3)
.addColumn("table_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
+ .addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
+ .addColumn("stable_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.done();
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TABLE_DISTRIBUTED, TSDB_SYSTEM_TABLE, 2)
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
@@ -140,7 +141,7 @@ void generatePerformanceSchema(MockCatalogService* mcs) {
void generateTestTables(MockCatalogService* mcs, const std::string& db) {
mcs->createTableBuilder(db, "t1", TSDB_NORMAL_TABLE, 6)
.setPrecision(TSDB_TIME_PRECISION_MILLI)
- .setVgid(1)
+ .setVgid(2)
.addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
.addColumn("c1", TSDB_DATA_TYPE_INT)
.addColumn("c2", TSDB_DATA_TYPE_BINARY, 20)
@@ -182,9 +183,9 @@ void generateTestStables(MockCatalogService* mcs, const std::string& db) {
.addTag("tag2", TSDB_DATA_TYPE_BINARY, 20)
.addTag("tag3", TSDB_DATA_TYPE_TIMESTAMP);
builder.done();
- mcs->createSubTable(db, "st1", "st1s1", 1);
- mcs->createSubTable(db, "st1", "st1s2", 2);
- mcs->createSubTable(db, "st1", "st1s3", 1);
+ mcs->createSubTable(db, "st1", "st1s1", 2);
+ mcs->createSubTable(db, "st1", "st1s2", 3);
+ mcs->createSubTable(db, "st1", "st1s3", 2);
}
{
ITableBuilder& builder = mcs->createTableBuilder(db, "st2", TSDB_SUPER_TABLE, 3, 1)
@@ -194,8 +195,8 @@ void generateTestStables(MockCatalogService* mcs, const std::string& db) {
.addColumn("c2", TSDB_DATA_TYPE_BINARY, 20)
.addTag("jtag", TSDB_DATA_TYPE_JSON);
builder.done();
- mcs->createSubTable(db, "st2", "st2s1", 1);
- mcs->createSubTable(db, "st2", "st2s2", 2);
+ mcs->createSubTable(db, "st2", "st2s1", 2);
+ mcs->createSubTable(db, "st2", "st2s2", 3);
}
}
diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp
index be2e4b90b9..bed8f82d3a 100644
--- a/source/libs/parser/test/mockCatalogService.cpp
+++ b/source/libs/parser/test/mockCatalogService.cpp
@@ -20,15 +20,19 @@
#include