Merge branch '3.0' into enh/TD-20891

This commit is contained in:
Shengliang Guan 2022-12-02 16:44:46 +08:00
commit 24ee4b4289
65 changed files with 1468 additions and 271 deletions

View File

@ -0,0 +1,46 @@
---
title: Write from Kafka
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import PyKafka from "./_py_kafka.mdx";
## About Kafka
Apache Kafka is an open-source distributed event streaming platform, used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. For the key concepts of kafka, please refer to [kafka documentation](https://kafka.apache.org/documentation/#gettingStarted).
### kafka topic
Messages in Kafka are organized by topics. A topic may have one or more partitions. We can manage kafka topics through `kafka-topics`.
create a topic named `kafka-events`:
```
bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092
```
Alter `kafka-events` topic to set partitions to 3:
```
bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092
```
Show all topics and partitions in Kafka:
```
bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe
```
## Insert into TDengine
We can write data into TDengine via SQL or Schemaless. For more information, please refer to [Insert Using SQL](/develop/insert-data/sql-writing/) or [High Performance Writing](/develop/insert-data/high-volume/) or [Schemaless Writing](/reference/schemaless/).
## Examples
<Tabs defaultValue="Python" groupId="lang">
<TabItem label="Python" value="Python">
<PyKafka />
</TabItem>
</Tabs>

View File

@ -0,0 +1,60 @@
### python Kafka 客户端
For python kafka client, please refer to [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python). In this document, we use [kafka-python](http://github.com/dpkp/kafka-python).
### consume from Kafka
The simple way to consume messages from Kafka is to read messages one by one. The demo is as follows:
```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
print (msg)
```
For higher performance, we can consume message from kafka in batch. The demo is as follows:
```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
while True:
msgs = consumer.poll(timeout_ms=500, max_records=1000)
if msgs:
print (msgs)
```
### multi-threading
For more higher performance we can process data from kafka in multi-thread. We can use python's ThreadPoolExecutor to achieve multithreading. The demo is as follows:
```
from concurrent.futures import ThreadPoolExecutor, Future
pool = ThreadPoolExecutor(max_workers=10)
pool.submit(...)
```
### multi-process
For more higher performance, sometimes we use multiprocessing. In this case, the number of Kafka Consumers should not be greater than the number of Kafka Topic Partitions. The demo is as follows:
```
from multiprocessing import Process
ps = []
for i in range(5):
p = Process(target=Consumer().consume())
p.start()
ps.append(p)
for p in ps:
p.join()
```
In addition to python's built-in multithreading and multiprocessing library, we can also use the third-party library gunicorn.
### Examples
```py
{{#include docs/examples/python/kafka_example.py}}
```

View File

@ -76,7 +76,7 @@ sudo -u grafana grafana-cli plugins install tdengine-datasource
You can also download zip files from [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) or [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) and install manually. The commands are as follows:
```bash
GF_VERSION=3.2.2
GF_VERSION=3.2.7
# from GitHub
wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/tdengine-datasource-$GF_VERSION.zip
# from Grafana

View File

@ -38,15 +38,9 @@ Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/
## Data Connection Setup
### Download TDengine plug-in to grafana plug-in directory
### Install Grafana Plugin and Configure Data Source
```bash
1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
5. sudo systemctl restart grafana-server.service
```
Please refer to [Install Grafana Plugin and Configure Data Source](/third-party/grafana/#install-grafana-plugin-and-configure-data-source)
### Modify /etc/telegraf/telegraf.conf

View File

@ -41,15 +41,9 @@ Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/
## Data Connection Setup
### Copy the TDengine plugin to the grafana plugin directory
### Install Grafana Plugin and Configure Data Source
```bash
1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
5. sudo systemctl restart grafana-server.service
```
Please refer to [Install Grafana Plugin and Configure Data Source](/third-party/grafana/#install-grafana-plugin-and-configure-data-source)
### Configure collectd

View File

@ -0,0 +1,192 @@
#! encoding = utf-8
import json
import time
from json import JSONDecodeError
from typing import Callable
import logging
from concurrent.futures import ThreadPoolExecutor, Future
import taos
from kafka import KafkaConsumer
from kafka.consumer.fetcher import ConsumerRecord
class Consumer(object):
DEFAULT_CONFIGS = {
'kafka_brokers': 'localhost:9092',
'kafka_topic': 'python_kafka',
'kafka_group_id': 'taos',
'taos_host': 'localhost',
'taos_user': 'root',
'taos_password': 'taosdata',
'taos_database': 'power',
'taos_port': 6030,
'timezone': None,
'clean_after_testing': False,
'bath_consume': True,
'batch_size': 1000,
'async_model': True,
'workers': 10
}
LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
'California.SantaClara', 'California.Cupertino']
CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1'
USE_DATABASE_SQL = 'use {}'
DROP_TABLE_SQL = 'drop table if exists meters'
DROP_DATABASE_SQL = 'drop database if exists {}'
CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) ' \
'tags (location binary(64), groupId int)'
CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})'
INSERT_SQL_HEADER = "insert into "
INSERT_PART_SQL = 'power.{} values (\'{}\', {}, {}, {})'
def __init__(self, **configs):
self.config: dict = self.DEFAULT_CONFIGS
self.config.update(configs)
self.consumer = KafkaConsumer(
self.config.get('kafka_topic'), # topic
bootstrap_servers=self.config.get('kafka_brokers'),
group_id=self.config.get('kafka_group_id'),
)
self.taos = taos.connect(
host=self.config.get('taos_host'),
user=self.config.get('taos_user'),
password=self.config.get('taos_password'),
port=self.config.get('taos_port'),
timezone=self.config.get('timezone'),
)
if self.config.get('async_model'):
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
self.tasks: list[Future] = []
# tags and table mapping # key: {location}_{groupId} value:
self.tag_table_mapping = {}
i = 0
for location in self.LOCATIONS:
for j in range(1, 11):
table_name = 'd{}'.format(i)
self._cache_table(location=location, group_id=j, table_name=table_name)
i += 1
def init_env(self):
# create database and table
self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
self.taos.execute(self.CREATE_DATABASE_SQL.format(self.config.get('taos_database')))
self.taos.execute(self.USE_DATABASE_SQL.format(self.config.get('taos_database')))
self.taos.execute(self.DROP_TABLE_SQL)
self.taos.execute(self.CREATE_STABLE_SQL)
for tags, table_name in self.tag_table_mapping.items():
location, group_id = _get_location_and_group(tags)
self.taos.execute(self.CREATE_TABLE_SQL.format(table_name, location, group_id))
def consume(self):
logging.warning('## start consumer topic-[%s]', self.config.get('kafka_topic'))
try:
if self.config.get('bath_consume'):
self._run_batch(self._to_taos_batch)
else:
self._run(self._to_taos)
except KeyboardInterrupt:
logging.warning("## caught keyboard interrupt, stopping")
finally:
self.stop()
def stop(self):
# close consumer
if self.consumer is not None:
self.consumer.commit()
self.consumer.close()
# multi thread
if self.config.get('async_model'):
for task in self.tasks:
while not task.done():
pass
if self.pool is not None:
self.pool.shutdown()
# clean data
if self.config.get('clean_after_testing'):
self.taos.execute(self.DROP_TABLE_SQL)
self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
# close taos
if self.taos is not None:
self.taos.close()
def _run(self, f: Callable[[ConsumerRecord], bool]):
for message in self.consumer:
if self.config.get('async_model'):
self.pool.submit(f(message))
else:
f(message)
def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]):
while True:
messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size'))
if messages:
if self.config.get('async_model'):
self.pool.submit(f, messages.values())
else:
f(list(messages.values()))
if not messages:
time.sleep(0.1)
def _to_taos(self, message: ConsumerRecord) -> bool:
sql = self.INSERT_SQL_HEADER + self._build_sql(message.value)
if len(sql) == 0: # decode error, skip
return True
logging.info('## insert sql %s', sql)
return self.taos.execute(sql=sql) == 1
def _to_taos_batch(self, messages: list[list[ConsumerRecord]]):
sql = self._build_sql_batch(messages=messages)
if len(sql) == 0: # decode error, skip
return
self.taos.execute(sql=sql)
def _build_sql(self, msg_value: str) -> str:
try:
data = json.loads(msg_value)
except JSONDecodeError as e:
logging.error('## decode message [%s] error ', msg_value, e)
return ''
location = data.get('location')
group_id = data.get('groupId')
ts = data.get('ts')
current = data.get('current')
voltage = data.get('voltage')
phase = data.get('phase')
table_name = self._get_table_name(location=location, group_id=group_id)
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str:
sql_list = []
for partition_messages in messages:
for message in partition_messages:
sql_list.append(self._build_sql(message.value))
return self.INSERT_SQL_HEADER + ' '.join(sql_list)
def _cache_table(self, location: str, group_id: int, table_name: str):
self.tag_table_mapping[_tag_table_mapping_key(location=location, group_id=group_id)] = table_name
def _get_table_name(self, location: str, group_id: int) -> str:
return self.tag_table_mapping.get(_tag_table_mapping_key(location=location, group_id=group_id))
def _tag_table_mapping_key(location: str, group_id: int):
return '{}_{}'.format(location, group_id)
def _get_location_and_group(key: str) -> (str, int):
fields = key.split('_')
return fields[0], fields[1]
if __name__ == '__main__':
consumer = Consumer(async_model=True)
consumer.init_env()
consumer.consume()

View File

@ -0,0 +1,47 @@
---
title: 从 Kafka 写入
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import PyKafka from "./_py_kafka.mdx";
## Kafka 介绍
Apache Kafka 是开源的分布式消息分发平台被广泛应用于高性能数据管道、流式数据分析、数据集成和事件驱动类型的应用程序。Kafka 包含 Producer、Consumer 和 Topic其中 Producer 是向 Kafka 发送消息的进程Consumer 是从 Kafka 消费消息的进程。Kafka 相关概念可以参考[官方文档](https://kafka.apache.org/documentation/#gettingStarted)。
### kafka topic
Kafka 的消息按 topic 组织,每个 topic 会有一到多个 partition。可以通过 kafka 的 `kafka-topics` 管理 topic。
创建名为 `kafka-events` 的topic:
```
bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092
```
修改 `kafka-events` 的 partition 数量为 3:
```
bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092
```
展示所有的 topic 和 partition:
```
bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe
```
## 写入 TDengine
TDengine 支持 Sql 方式和 Schemaless 方式的数据写入Sql 方式数据写入可以参考 [TDengine SQL 写入](/develop/insert-data/sql-writing/) 和 [TDengine 高效写入](/develop/insert-data/high-volume/)。Schemaless 方式数据写入可以参考 [TDengine Schemaless 写入](/reference/schemaless/) 文档。
## 示例代码
<Tabs defaultValue="Python" groupId="lang">
<TabItem label="Python" value="Python">
<PyKafka />
</TabItem>
</Tabs>

View File

@ -0,0 +1,60 @@
### python Kafka 客户端
Kafka 的 python 客户端可以参考文档 [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python)。推荐使用 [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python) 和 [kafka-python](http://github.com/dpkp/kafka-python)。以下示例以 [kafka-python](http://github.com/dpkp/kafka-python) 为例。
### 从 Kafka 消费数据
Kafka 客户端采用 pull 的方式从 Kafka 消费数据,可以采用单条消费的方式或批量消费的方式读取数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端单条消费数据的示例如下:
```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
print (msg)
```
单条消费的方式在数据流量大的情况下往往存在性能瓶颈,导致 Kafka 消息积压,更推荐使用批量消费的方式消费数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端批量消费数据的示例如下:
```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
while True:
msgs = consumer.poll(timeout_ms=500, max_records=1000)
if msgs:
print (msgs)
```
### Python 多线程
为了提高数据写入效率,通常采用多线程的方式写入数据,可以使用 python 线程池 ThreadPoolExecutor 实现多线程。示例代码如下:
```
from concurrent.futures import ThreadPoolExecutor, Future
pool = ThreadPoolExecutor(max_workers=10)
pool.submit(...)
```
### Python 多进程
单个python进程不能充分发挥多核 CPU 的性能有时候我们会选择多进程的方式。在多进程的情况下需要注意Kafka Consumer 的数量应该小于等于 Kafka Topic Partition 数量。Python 多进程示例代码如下:
```
from multiprocessing import Process
ps = []
for i in range(5):
p = Process(target=Consumer().consume())
p.start()
ps.append(p)
for p in ps:
p.join()
```
除了 Python 内置的多线程和多进程方式,还可以通过第三方库 gunicorn 实现并发。
### 完整示例
```py
{{#include docs/examples/python/kafka_example.py}}
```

View File

@ -39,15 +39,9 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如
## 数据链路设置
### 下载 TDengine 插件到 Grafana 插件目录
### 安装 Grafana Plugin 并配置数据源
```bash
1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
5. sudo systemctl restart grafana-server.service
```
请参考[安装 Grafana Plugin 并配置数据源](/third-party/grafana/#%E5%AE%89%E8%A3%85-grafana-plugin-%E5%B9%B6%E9%85%8D%E7%BD%AE%E6%95%B0%E6%8D%AE%E6%BA%90)。
### 修改 /etc/telegraf/telegraf.conf

View File

@ -41,15 +41,9 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如
## 数据链路设置
### 复制 TDengine 插件到 grafana 插件目录
### 安装 Grafana Plugin 并配置数据源
```bash
1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
5. sudo systemctl restart grafana-server.service
```
请参考[安装 Grafana Plugin 并配置数据源](/third-party/grafana/#%E5%AE%89%E8%A3%85-grafana-plugin-%E5%B9%B6%E9%85%8D%E7%BD%AE%E6%95%B0%E6%8D%AE%E6%BA%90)。
### 配置 collectd

View File

@ -152,7 +152,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
* @param tinfo qhandle
* @return
*/
int32_t qAsyncKillTask(qTaskInfo_t tinfo);
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
/**
* destroy query info structure

View File

@ -163,6 +163,7 @@ typedef struct tExprNode {
int32_t functionId;
int32_t num;
struct SFunctionNode *pFunctNode;
int32_t functionType;
} _function;
struct {

View File

@ -130,6 +130,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_GROUP_KEY,
FUNCTION_TYPE_CACHE_LAST_ROW,
FUNCTION_TYPE_CACHE_LAST,
FUNCTION_TYPE_TABLE_COUNT,
// distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,

View File

@ -60,6 +60,12 @@ extern "C" {
for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); \
(NULL != cell ? (node = &(cell->pNode), true) : (node = NULL, false)); cell = cell->pNext)
#define NODES_DESTORY_NODE(node) \
do { \
nodesDestroyNode((node)); \
(node) = NULL; \
} while (0)
#define NODES_DESTORY_LIST(list) \
do { \
nodesDestroyList((list)); \
@ -228,6 +234,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN,
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN,
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,

View File

@ -62,7 +62,8 @@ typedef enum EScanType {
SCAN_TYPE_STREAM,
SCAN_TYPE_TABLE_MERGE,
SCAN_TYPE_BLOCK_INFO,
SCAN_TYPE_LAST_ROW
SCAN_TYPE_LAST_ROW,
SCAN_TYPE_TABLE_COUNT
} EScanType;
typedef struct SScanLogicNode {
@ -323,6 +324,8 @@ typedef struct SLastRowScanPhysiNode {
bool ignoreNull;
} SLastRowScanPhysiNode;
typedef SLastRowScanPhysiNode STableCountScanPhysiNode;
typedef struct SSystemTableScanPhysiNode {
SScanPhysiNode scan;
SEpSet mgmtEpSet;

View File

@ -260,7 +260,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later

View File

@ -380,6 +380,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0526)
#define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527)
#define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528)
#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)

View File

@ -243,9 +243,9 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLin
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcReleaseHandle(pHandle, type); }
static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_MNODE_NOT_FOUND) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_MNODE_NOT_FOUND ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_RPC_BROKEN_LINK ||
code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING || TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false;

View File

@ -31,6 +31,7 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
// for trans test
SSdbRaw *mndUserActionEncode(SUserObj *pUser);
SHashObj *mndDupDbHash(SHashObj *pOld);
SHashObj *mndDupTopicHash(SHashObj *pOld);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);

View File

@ -108,6 +108,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_APPS;
} else if (strncasecmp(name, TSDB_INS_TABLE_STREAM_TASKS, len) == 0) {
type = TSDB_MGMT_TABLE_STREAM_TASKS;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_PRIVILEGES, len) == 0) {
type = TSDB_MGMT_TABLE_PRIVILEGES;
} else {
// ASSERT(0);
}

View File

@ -161,7 +161,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
char *topic = taosHashIterate(pUser->topics, NULL);
while (topic != NULL) {
SDB_SET_BINARY(pRaw, dataPos, topic, TSDB_TOPIC_FNAME_LEN, _OVER);
db = taosHashIterate(pUser->topics, topic);
topic = taosHashIterate(pUser->topics, topic);
}
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
@ -446,7 +446,7 @@ static int32_t mndAlterUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SRpc
return 0;
}
SHashObj *mndDupDbHash(SHashObj *pOld) {
SHashObj *mndDupObjHash(SHashObj *pOld, int32_t dataLen) {
SHashObj *pNew =
taosHashInit(taosHashGetSize(pOld), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pNew == NULL) {
@ -457,7 +457,7 @@ SHashObj *mndDupDbHash(SHashObj *pOld) {
char *db = taosHashIterate(pOld, NULL);
while (db != NULL) {
int32_t len = strlen(db) + 1;
if (taosHashPut(pNew, db, len, db, TSDB_DB_FNAME_LEN) != 0) {
if (taosHashPut(pNew, db, len, db, dataLen) != 0) {
taosHashCancelIterate(pOld, db);
taosHashCleanup(pNew);
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -469,6 +469,10 @@ SHashObj *mndDupDbHash(SHashObj *pOld) {
return pNew;
}
SHashObj *mndDupDbHash(SHashObj *pOld) { return mndDupObjHash(pOld, TSDB_DB_FNAME_LEN); }
SHashObj *mndDupTopicHash(SHashObj *pOld) { return mndDupObjHash(pOld, TSDB_TOPIC_FNAME_LEN); }
static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -519,7 +523,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
taosRLockLatch(&pUser->lock);
newUser.readDbs = mndDupDbHash(pUser->readDbs);
newUser.writeDbs = mndDupDbHash(pUser->writeDbs);
newUser.topics = mndDupDbHash(pUser->topics);
newUser.topics = mndDupTopicHash(pUser->topics);
taosRUnLockLatch(&pUser->lock);
if (newUser.readDbs == NULL || newUser.writeDbs == NULL || newUser.topics == NULL) {
@ -832,6 +836,26 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
int32_t numOfTopics = taosHashGetSize(pUser->topics);
if (numOfRows + numOfReadDbs + numOfWriteDbs + numOfTopics >= rows) break;
if (pUser->superUser) {
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
colDataAppend(pColInfo, numOfRows, (const char *)userName, false);
char privilege[20] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(privilege, "all", pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)privilege, false);
char objName[20] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(objName, "all", pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)objName, false);
numOfRows++;
}
char *db = taosHashIterate(pUser->readDbs, NULL);
while (db != NULL) {
cols = 0;
@ -902,7 +926,7 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
numOfRows++;
db = taosHashIterate(pUser->topics, topic);
topic = taosHashIterate(pUser->topics, topic);
}
sdbRelease(pSdb, pUser);

View File

@ -106,14 +106,24 @@ int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList,
int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
bool *acquired);
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen, double selectivityRatio);
int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
bool *acquired);
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen, double selectivityRatio);
int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int64_t metaGetTbNum(SMeta *pMeta);
int64_t metaGetNtbNum(SMeta *pMeta);
typedef struct {
int64_t uid;
int64_t ctbNum;
} SMetaStbStats;
int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
typedef struct SMetaFltParam {
tb_uid_t suid;

View File

@ -116,8 +116,6 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaAlterCache(SMeta* pMeta, int32_t nPage);
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
int64_t metaGetTbNum(SMeta* pMeta);
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
@ -144,12 +142,6 @@ typedef struct SMetaInfo {
} SMetaInfo;
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo, SMetaReader* pReader);
typedef struct {
int64_t uid;
int64_t ctbNum;
} SMetaStbStats;
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
// tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int tsdbClose(STsdb** pTsdb);

View File

@ -223,6 +223,23 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
return 0;
}
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
code = metaGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
return -1;
}
strncpy(tbName, mr.me.name, TSDB_TABLE_NAME_LEN);
metaReaderClear(&mr);
return 0;
}
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid) {
int code = 0;
SMetaReader mr = {0};
@ -739,6 +756,10 @@ int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
}
int64_t metaGetNtbNum(SMeta *pMeta) {
return pMeta->pVnode->config.vndStats.numOfNTables;
}
typedef struct {
SMeta *pMeta;
TBC *pCur;

View File

@ -34,6 +34,7 @@ extern "C" {
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s"
#define EXPLAIN_DISTBLK_SCAN_FORMAT "Block Dist Scan on %s"
#define EXPLAIN_LASTROW_SCAN_FORMAT "Last Row Scan on %s"
#define EXPLAIN_TABLE_COUNT_SCAN_FORMAT "Table Count Row Scan on %s"
#define EXPLAIN_PROJECTION_FORMAT "Projection"
#define EXPLAIN_JOIN_FORMAT "%s"
#define EXPLAIN_AGG_FORMAT "Aggragate"

View File

@ -19,6 +19,7 @@
#include "query.h"
#include "tcommon.h"
#include "tdatablock.h"
#include "systable.h"
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel);
@ -212,6 +213,11 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = lastRowPhysiNode->scan.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: {
STableCountScanPhysiNode *tableCountPhysiNode = (STableCountScanPhysiNode *)pNode;
pPhysiChildren = tableCountPhysiNode->scan.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
SGroupSortPhysiNode *groupSortPhysiNode = (SGroupSortPhysiNode *)pNode;
pPhysiChildren = groupSortPhysiNode->node.pChildren;
@ -1355,6 +1361,48 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: {
STableCountScanPhysiNode *pLastRowNode = (STableCountScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_TABLE_COUNT_SCAN_FORMAT,
('\0' != pLastRowNode->scan.tableName.tname[0] ? pLastRowNode->scan.tableName.tname : TSDB_INS_TABLE_TABLES));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, LIST_LENGTH(pLastRowNode->scan.pScanCols));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
if (pLastRowNode->scan.pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pLastRowNode->scan.pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pLastRowNode->scan.node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pLastRowNode->scan.node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pLastRowNode->scan.node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pLastRowNode->scan.node.pSlimit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pLastRowNode->scan.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pLastRowNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
SGroupSortPhysiNode *pSortNode = (SGroupSortPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_GROUP_SORT_FORMAT);

View File

@ -482,6 +482,34 @@ typedef struct {
SSnapContext* sContext;
} SStreamRawScanInfo;
typedef struct STableCountScanSupp {
int16_t dbNameSlotId;
int16_t stbNameSlotId;
int16_t tbCountSlotId;
bool groupByDbName;
bool groupByStbName;
char dbName[TSDB_DB_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
} STableCountScanSupp;
typedef struct STableCountScanOperatorInfo {
SReadHandle readHandle;
SSDataBlock* pRes;
SName tableName;
SNodeList* groupTags;
SNodeList* scanCols;
SNodeList* pseudoCols;
STableCountScanSupp supp;
int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and stable_name
} STableCountScanOperatorInfo;
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
SSDataBlock* pRes;
@ -781,7 +809,7 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
int32_t checkForQueryBuf(size_t numOfTables);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);

View File

@ -70,7 +70,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
longjmp(pTaskInfo->env, pTaskInfo->code);
}
for (int32_t i = 0; i < totalSources; ++i) {
@ -573,7 +573,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
longjmp(pTaskInfo->env, pTaskInfo->code);
}
tsem_post(&pExchangeInfo->ready);
@ -621,7 +621,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
longjmp(pTaskInfo->env, pTaskInfo->code);
}
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);

View File

@ -1348,6 +1348,7 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExprNode->_function.functionId = pFuncNode->funcId;
pExprNode->_function.pFunctNode = pFuncNode;
pExprNode->_function.functionType = pFuncNode->funcType;
tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));

View File

@ -688,7 +688,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) {
@ -697,7 +697,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo);
setTaskKilled(pTaskInfo, rspCode);
qStopTaskOperators(pTaskInfo);

View File

@ -610,21 +610,10 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
}
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
if (pTaskInfo->owner != 0 &&
((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
/*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
assert(pTaskInfo->cost.start != 0);
// qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
// ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
// return true;
}
return false;
return (0 != pTaskInfo->code) ? true : false;
}
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
/////////////////////////////////////////////////////////////////////////////////////////////
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
@ -1381,7 +1370,8 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
*order = TSDB_ORDER_ASC;
*scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS;
@ -1883,6 +1873,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode,
SExecTaskInfo* pTaskInfo);
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
@ -2073,6 +2065,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;

View File

@ -30,7 +30,6 @@
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
@ -629,7 +628,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
// process this data block based on the probabilities
@ -2032,7 +2031,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
longjmp(pTaskInfo->env, pTaskInfo->code);
}
int32_t rows = 0;
@ -2529,7 +2528,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
STsdbReader* reader = pInfo->base.dataReader;
while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
// process this data block based on the probabilities
@ -2913,3 +2912,305 @@ _error:
taosMemoryFree(pOperator);
return NULL;
}
// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
static void destoryTableCountScanOperator(void* param);
static const char* GROUP_TAG_DB_NAME = "db_name";
static const char* GROUP_TAG_STABLE_NAME = "stable_name";
int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
if (scanCols != NULL) {
SNode* pNode = NULL;
FOREACH(pNode, scanCols) {
if (nodeType(pNode) != QUERY_NODE_TARGET) {
return TSDB_CODE_QRY_SYS_ERROR;
}
STargetNode* targetNode = (STargetNode*)pNode;
if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
return TSDB_CODE_QRY_SYS_ERROR;
}
SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
supp->dbNameSlotId = targetNode->slotId;
} else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
supp->stbNameSlotId = targetNode->slotId;
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
if (pseudoCols != NULL) {
SNode* pNode = NULL;
FOREACH(pNode, pseudoCols) {
if (nodeType(pNode) != QUERY_NODE_TARGET) {
return TSDB_CODE_QRY_SYS_ERROR;
}
STargetNode* targetNode = (STargetNode*)pNode;
if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
return TSDB_CODE_QRY_SYS_ERROR;
}
SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
supp->tbCountSlotId = targetNode->slotId;
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
if (groupTags != NULL) {
SNode* pNode = NULL;
FOREACH(pNode, groupTags) {
if (nodeType(pNode) != QUERY_NODE_COLUMN) {
return TSDB_CODE_QRY_SYS_ERROR;
}
SColumnNode* colNode = (SColumnNode*)pNode;
if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
supp->groupByDbName = true;
}
if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
supp->groupByStbName = true;
}
}
} else {
strncpy(supp->dbName, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
strncpy(supp->stbName, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
}
return TSDB_CODE_SUCCESS;
}
int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
int32_t code = 0;
code = tblCountScanGetInputs(groupTags, tableName, supp);
if (code != TSDB_CODE_SUCCESS) {
qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
return code;
}
supp->dbNameSlotId = -1;
supp->stbNameSlotId = -1;
supp->tbCountSlotId = -1;
code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
if (code != TSDB_CODE_SUCCESS) {
qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
return code;
}
code = tblCountScanGetCountSlotId(pseudoCols, supp);
if (code != TSDB_CODE_SUCCESS) {
qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
return code;
}
return code;
}
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SScanPhysiNode* pScanNode = &pTblCountScanNode->scan;
STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (!pInfo || !pOperator) {
goto _error;
}
pInfo->readHandle = *readHandle;
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
initResultSizeInfo(&pOperator->resultInfo, 1);
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
pTaskInfo);
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL);
return pOperator;
_error:
if (pInfo != NULL) {
destoryTableCountScanOperator(pInfo);
}
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
SSDataBlock* pRes) {
if (pSupp->dbNameSlotId != -1) {
ASSERT(strlen(dbName));
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
strncpy(varDataVal(varDbName), dbName, strlen(dbName));
varDataSetLen(varDbName, strlen(dbName));
colDataAppend(colInfoData, 0, varDbName, false);
}
if (pSupp->stbNameSlotId != -1) {
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
if (strlen(stbName) != 0) {
char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
strncpy(varDataVal(varStbName), stbName, strlen(stbName));
varDataSetLen(varStbName, strlen(stbName));
colDataAppend(colInfoData, 0, varStbName, false);
} else {
colDataAppendNULL(colInfoData, 0);
}
}
if (pSupp->tbCountSlotId != -1) {
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
colDataAppend(colInfoData, 0, (char*)&count, false);
}
pRes->info.rows = 1;
}
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
STableCountScanSupp* pSupp = &pInfo->supp;
SSDataBlock* pRes = pInfo->pRes;
size_t infodbTableNum;
getInfosDbMeta(NULL, &infodbTableNum);
size_t perfdbTableNum;
getPerfDbMeta(NULL, &perfdbTableNum);
if (pSupp->groupByDbName) {
if (pInfo->currGrpIdx == 0) {
uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
pRes->info.id.groupId = groupId;
fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
} else if (pInfo->currGrpIdx == 1) {
uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
pRes->info.id.groupId = groupId;
fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
} else {
setOperatorCompleted(pOperator);
return NULL;
}
pInfo->currGrpIdx++;
return (pRes->info.rows > 0) ? pRes : NULL;
} else {
if (strcmp(pSupp->dbName, TSDB_INFORMATION_SCHEMA_DB) == 0) {
fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
} else if (strcmp(pSupp->dbName, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
} else if (strlen(pSupp->dbName) == 0) {
fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
}
setOperatorCompleted(pOperator);
return (pRes->info.rows > 0) ? pRes : NULL;
}
}
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableCountScanOperatorInfo* pInfo = pOperator->info;
STableCountScanSupp* pSupp = &pInfo->supp;
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
if (pInfo->readHandle.mnd != NULL) {
return buildSysDbTableCount(pOperator, pInfo);
}
const char* db = NULL;
int32_t vgId = 0;
char dbName[TSDB_DB_NAME_LEN] = {0};
{
// get dbname
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
SName sn = {0};
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&sn, dbName);
}
if (pSupp->groupByDbName) {
if (pSupp->groupByStbName) {
if (pInfo->stbUidList == NULL) {
pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
}
}
if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
char stbName[TSDB_TABLE_NAME_LEN] = {0};
metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
pRes->info.id.groupId = groupId;
SMetaStbStats stats = {0};
metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
int64_t ctbNum = stats.ctbNum;
fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
pInfo->currGrpIdx++;
} else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
pRes->info.id.groupId = groupId;
int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
pInfo->currGrpIdx++;
} else {
setOperatorCompleted(pOperator);
return NULL;
}
} else {
uint64_t groupId = calcGroupId(dbName, strlen(dbName));
pRes->info.id.groupId = groupId;
int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
setOperatorCompleted(pOperator);
}
} else {
if (strlen(pSupp->dbName) != 0) {
if (strlen(pSupp->stbName) != 0) {
tb_uid_t uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbName);
SMetaStbStats stats = {0};
metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
int64_t ctbNum = stats.ctbNum;
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbName, ctbNum, pRes);
} else {
int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
}
} else {
int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
}
setOperatorCompleted(pOperator);
}
return pRes->info.rows > 0 ? pRes : NULL;
}
static void destoryTableCountScanOperator(void* param) {
STableCountScanOperatorInfo* pTableCountScanInfo = param;
blockDataDestroy(pTableCountScanInfo->pRes);
nodesDestroyList(pTableCountScanInfo->groupTags);
taosArrayDestroy(pTableCountScanInfo->stbUidList);
taosMemoryFreeClear(param);
}

View File

@ -2080,6 +2080,11 @@ static int32_t translateTagsPseudoColumn(SFunctionNode* pFunc, char* pErrBuf, in
return TSDB_CODE_SUCCESS;
}
static int32_t translateTableCountPseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS;
}
// clang-format off
const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
@ -3241,6 +3246,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
{
.name = "_table_count",
.type = FUNCTION_TYPE_TABLE_COUNT,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
.translateFunc = translateTableCountPseudoColumn,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
};
// clang-format on

View File

@ -221,6 +221,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiTableScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
return "PhysiTableSeqScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
return "PhysiTableMergeScan";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return "PhysiSreamScan";
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
@ -229,8 +231,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiBlockDistScan";
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
return "PhysiLastRowScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
return "PhysiTableMergeScan";
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return "PhysiTableCountScan";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
@ -4646,6 +4648,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
return physiScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return physiLastRowScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
@ -4800,6 +4803,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToLogicPlan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return jsonToPhysiScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
return jsonToPhysiLastRowScanNode(pJson, pObj);

View File

@ -3640,6 +3640,7 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code = physiScanNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
code = physiLastRowScanNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
@ -3778,6 +3779,7 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
code = msgToPhysiScanNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
code = msgToPhysiLastRowScanNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:

View File

@ -494,6 +494,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SBlockDistScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
return makeNode(type, sizeof(SLastRowScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return makeNode(type, sizeof(STableCountScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
@ -1120,6 +1122,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
destroyScanPhysiNode((SScanPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: {

View File

@ -86,7 +86,7 @@ STableComInfo getTableInfo(const STableMeta* pTableMeta);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
int32_t getInsTagsTableTargetName(int32_t acctId, SNode* pWhere, SName* pName);
int32_t getVnodeSysTableTargetName(int32_t acctId, SNode* pWhere, SName* pName);
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);

View File

@ -140,7 +140,7 @@ static int32_t collectMetaKeyFromInsTagsImpl(SCollectMetaKeyCxt* pCxt, SName* pN
static int32_t collectMetaKeyFromInsTags(SCollectMetaKeyCxt* pCxt) {
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pStmt;
SName name = {0};
int32_t code = getInsTagsTableTargetName(pCxt->pParseCxt->acctId, pSelect->pWhere, &name);
int32_t code = getVnodeSysTableTargetName(pCxt->pParseCxt->acctId, pSelect->pWhere, &name);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromInsTagsImpl(pCxt, &name);
}
@ -165,7 +165,8 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, const c
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES))) {
code = reserveDnodeRequiredInCache(pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) &&
if (TSDB_CODE_SUCCESS == code &&
(0 == strcmp(pTable, TSDB_INS_TABLE_TAGS) || 0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) &&
QUERY_NODE_SELECT_STMT == nodeType(pCxt->pStmt)) {
code = collectMetaKeyFromInsTags(pCxt);
}

View File

@ -47,6 +47,7 @@ typedef struct STranslateContext {
SParseMetaCache* pMetaCache;
bool createStream;
bool stableQuery;
bool showRewrite;
} STranslateContext;
typedef struct SFullDatabaseName {
@ -750,8 +751,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) {
return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId);
} else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) {
SFunctionNode* pFunc = (SFunctionNode*)pExpr;
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_FIRST == pFunc->funcType ||
FUNCTION_TYPE_LAST == pFunc->funcType) {
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType ||
FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) {
return true;
@ -2209,22 +2210,28 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
}
static bool sysTableFromVnode(const char* pTable) {
return (0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_TABLE_DISTRIBUTED) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)));
return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)));
}
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTargetName, SName* pName,
SArray** pVgroupList) {
static int32_t getVnodeSysTableVgroupListImpl(STranslateContext* pCxt, SName* pTargetName, SName* pName,
SArray** pVgroupList) {
if (0 == pTargetName->type) {
return getDBVgInfoImpl(pCxt, pName, pVgroupList);
}
if (0 == strcmp(pTargetName->dbname, TSDB_INFORMATION_SCHEMA_DB) ||
0 == strcmp(pTargetName->dbname, TSDB_PERFORMANCE_SCHEMA_DB)) {
pTargetName->type = 0;
return TSDB_CODE_SUCCESS;
}
if (TSDB_DB_NAME_T == pTargetName->type) {
int32_t code = getDBVgInfoImpl(pCxt, pTargetName, pVgroupList);
if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
TSDB_CODE_MND_DB_IN_DROPPING == code) {
if (!pCxt->showRewrite && (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
TSDB_CODE_MND_DB_IN_DROPPING == code)) {
// system table query should not report errors
code = TSDB_CODE_SUCCESS;
}
return code;
@ -2241,50 +2248,44 @@ static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTarge
}
} else if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
TSDB_CODE_MND_DB_IN_DROPPING == code) {
// system table query should not report errors
code = TSDB_CODE_SUCCESS;
}
return code;
}
static int32_t getTagsTableVgroupList(STranslateContext* pCxt, SName* pName, SArray** pVgroupList) {
static int32_t getVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, SArray** pVgs, bool* pHasUserDbCond) {
if (!isSelectStmt(pCxt->pCurrStmt)) {
return TSDB_CODE_SUCCESS;
}
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
SName targetName = {0};
int32_t code = getInsTagsTableTargetName(pCxt->pParseCxt->acctId, pSelect->pWhere, &targetName);
int32_t code = getVnodeSysTableTargetName(pCxt->pParseCxt->acctId, pSelect->pWhere, &targetName);
if (TSDB_CODE_SUCCESS == code) {
code = getTagsTableVgroupListImpl(pCxt, &targetName, pName, pVgroupList);
code = getVnodeSysTableVgroupListImpl(pCxt, &targetName, pName, pVgs);
}
*pHasUserDbCond = (0 != targetName.type && taosArrayGetSize(*pVgs) > 0);
return code;
}
static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* vgroupList = NULL;
if (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS)) {
code = getTagsTableVgroupList(pCxt, pName, &vgroupList);
} else if ('\0' != pRealTable->qualDbName[0]) {
if (0 != strcmp(pRealTable->qualDbName, TSDB_INFORMATION_SCHEMA_DB)) {
code = getDBVgInfo(pCxt, pRealTable->qualDbName, &vgroupList);
}
} else {
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
}
bool hasUserDbCond = false;
SArray* pVgs = NULL;
int32_t code = getVnodeSysTableVgroupList(pCxt, pName, &pVgs, &hasUserDbCond);
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
isSelectStmt(pCxt->pCurrStmt) && 0 == taosArrayGetSize(vgroupList)) {
isSelectStmt(pCxt->pCurrStmt) && 0 == taosArrayGetSize(pVgs)) {
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
}
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &pVgs);
}
if (TSDB_CODE_SUCCESS == code) {
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
code = toVgroupsInfo(pVgs, &pRealTable->pVgroupList);
}
taosArrayDestroy(vgroupList);
taosArrayDestroy(pVgs);
return code;
}
@ -2309,30 +2310,39 @@ static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRea
}
}
static int32_t setSuperTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
SArray* vgroupList = NULL;
int32_t code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
if (TSDB_CODE_SUCCESS == code) {
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
}
taosArrayDestroy(vgroupList);
return code;
}
static int32_t setNormalTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
pRealTable->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
if (NULL == pRealTable->pVgroupList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pRealTable->pVgroupList->numOfVgroups = 1;
return getTableHashVgroupImpl(pCxt, pName, pRealTable->pVgroupList->vgroups);
}
static int32_t setTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (pCxt->pParseCxt->topicQuery) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) {
SArray* vgroupList = NULL;
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
if (TSDB_CODE_SUCCESS == code) {
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
}
taosArrayDestroy(vgroupList);
} else if (TSDB_SYSTEM_TABLE == pRealTable->pMeta->tableType) {
code = setSysTableVgroupList(pCxt, pName, pRealTable);
} else {
pRealTable->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
if (NULL == pRealTable->pVgroupList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pRealTable->pVgroupList->numOfVgroups = 1;
code = getTableHashVgroupImpl(pCxt, pName, pRealTable->pVgroupList->vgroups);
return setSuperTableVgroupList(pCxt, pName, pRealTable);
}
return code;
if (TSDB_SYSTEM_TABLE == pRealTable->pMeta->tableType) {
return setSysTableVgroupList(pCxt, pName, pRealTable);
}
return setNormalTableVgroupList(pCxt, pName, pRealTable);
}
static uint8_t getStmtPrecision(SNode* pStmt) {
@ -2366,7 +2376,6 @@ static bool isSingleTable(SRealTableNode* pRealTable) {
int8_t tableType = pRealTable->pMeta->tableType;
if (TSDB_SYSTEM_TABLE == tableType) {
return 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) &&
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLE_DISTRIBUTED) &&
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS);
}
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
@ -6296,6 +6305,7 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) {
code = createShowCondition((SShowStmt*)pQuery->pRoot, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
pCxt->showRewrite = true;
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;
@ -6349,6 +6359,7 @@ static int32_t rewriteShowStableTags(STranslateContext* pCxt, SQuery* pQuery) {
code = createShowTableTagsProjections(&pSelect->pProjectionList, &pShow->pTags);
}
if (TSDB_CODE_SUCCESS == code) {
pCxt->showRewrite = true;
pQuery->showRewrite = true;
pSelect->tagScan = true;
nodesDestroyNode(pQuery->pRoot);
@ -6379,6 +6390,7 @@ static int32_t rewriteShowDnodeVariables(STranslateContext* pCxt, SQuery* pQuery
}
}
if (TSDB_CODE_SUCCESS == code) {
pCxt->showRewrite = true;
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pSelect;
@ -6398,6 +6410,7 @@ static int32_t rewriteShowVnodes(STranslateContext* pCxt, SQuery* pQuery) {
}
}
if (TSDB_CODE_SUCCESS == code) {
pCxt->showRewrite = true;
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;
@ -6439,6 +6452,7 @@ static int32_t rewriteShowTableDist(STranslateContext* pCxt, SQuery* pQuery) {
code = nodesListMakeStrictAppend(&pStmt->pProjectionList, createBlockDistFunc());
}
if (TSDB_CODE_SUCCESS == code) {
pCxt->showRewrite = true;
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;

View File

@ -474,7 +474,7 @@ static int32_t getInsTagsTableTargetNameFromCond(int32_t acctId, SLogicCondition
return TSDB_CODE_SUCCESS;
}
int32_t getInsTagsTableTargetName(int32_t acctId, SNode* pWhere, SName* pName) {
int32_t getVnodeSysTableTargetName(int32_t acctId, SNode* pWhere, SName* pName) {
if (NULL == pWhere) {
return TSDB_CODE_SUCCESS;
}

View File

@ -65,9 +65,10 @@ void generateInformationSchema(MockCatalogService* mcs) {
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
.addColumn("stable_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.done();
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TABLES, TSDB_SYSTEM_TABLE, 2)
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TABLES, TSDB_SYSTEM_TABLE, 3)
.addColumn("table_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
.addColumn("stable_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.done();
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TABLE_DISTRIBUTED, TSDB_SYSTEM_TABLE, 2)
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
@ -140,7 +141,7 @@ void generatePerformanceSchema(MockCatalogService* mcs) {
void generateTestTables(MockCatalogService* mcs, const std::string& db) {
mcs->createTableBuilder(db, "t1", TSDB_NORMAL_TABLE, 6)
.setPrecision(TSDB_TIME_PRECISION_MILLI)
.setVgid(1)
.setVgid(2)
.addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
.addColumn("c1", TSDB_DATA_TYPE_INT)
.addColumn("c2", TSDB_DATA_TYPE_BINARY, 20)
@ -182,9 +183,9 @@ void generateTestStables(MockCatalogService* mcs, const std::string& db) {
.addTag("tag2", TSDB_DATA_TYPE_BINARY, 20)
.addTag("tag3", TSDB_DATA_TYPE_TIMESTAMP);
builder.done();
mcs->createSubTable(db, "st1", "st1s1", 1);
mcs->createSubTable(db, "st1", "st1s2", 2);
mcs->createSubTable(db, "st1", "st1s3", 1);
mcs->createSubTable(db, "st1", "st1s1", 2);
mcs->createSubTable(db, "st1", "st1s2", 3);
mcs->createSubTable(db, "st1", "st1s3", 2);
}
{
ITableBuilder& builder = mcs->createTableBuilder(db, "st2", TSDB_SUPER_TABLE, 3, 1)
@ -194,8 +195,8 @@ void generateTestStables(MockCatalogService* mcs, const std::string& db) {
.addColumn("c2", TSDB_DATA_TYPE_BINARY, 20)
.addTag("jtag", TSDB_DATA_TYPE_JSON);
builder.done();
mcs->createSubTable(db, "st2", "st2s1", 1);
mcs->createSubTable(db, "st2", "st2s2", 2);
mcs->createSubTable(db, "st2", "st2s1", 2);
mcs->createSubTable(db, "st2", "st2s2", 3);
}
}

View File

@ -20,15 +20,19 @@
#include <map>
#include <set>
#include "systable.h"
#include "tdatablock.h"
#include "tmisce.h"
#include "tname.h"
#include "ttypes.h"
#include "tmisce.h"
using std::string;
std::unique_ptr<MockCatalogService> g_mockCatalogService;
class TableBuilder : public ITableBuilder {
public:
virtual TableBuilder& addColumn(const std::string& name, int8_t type, int32_t bytes) {
virtual TableBuilder& addColumn(const string& name, int8_t type, int32_t bytes) {
assert(colId_ <= schema()->tableInfo.numOfTags + schema()->tableInfo.numOfColumns);
SSchema* col = schema()->schema + (colId_ - 1);
col->type = type;
@ -142,27 +146,16 @@ class MockCatalogServiceImpl {
}
int32_t catalogGetDBVgList(const char* pDbFName, SArray** pVgList) const {
std::string dbFName(pDbFName);
DbMetaCache::const_iterator it = meta_.find(dbFName.substr(std::string(pDbFName).find_last_of('.') + 1));
if (meta_.end() == it) {
return TSDB_CODE_FAILED;
string dbName(string(pDbFName).substr(string(pDbFName).find_last_of('.') + 1));
if (0 == dbName.compare(TSDB_INFORMATION_SCHEMA_DB) || 0 == dbName.compare(TSDB_PERFORMANCE_SCHEMA_DB)) {
return catalogGetAllDBVgList(pVgList);
}
std::set<int32_t> vgSet;
*pVgList = taosArrayInit(it->second.size(), sizeof(SVgroupInfo));
for (const auto& vgs : it->second) {
for (const auto& vg : vgs.second->vgs) {
if (0 == vgSet.count(vg.vgId)) {
taosArrayPush(*pVgList, &vg);
vgSet.insert(vg.vgId);
}
}
}
return TSDB_CODE_SUCCESS;
return catalogGetDBVgListImpl(dbName, pVgList);
}
int32_t catalogGetDBCfg(const char* pDbFName, SDbCfgInfo* pDbCfg) const {
std::string dbFName(pDbFName);
DbCfgCache::const_iterator it = dbCfg_.find(dbFName.substr(std::string(pDbFName).find_last_of('.') + 1));
string dbFName(pDbFName);
DbCfgCache::const_iterator it = dbCfg_.find(dbFName.substr(string(pDbFName).find_last_of('.') + 1));
if (dbCfg_.end() == it) {
return TSDB_CODE_FAILED;
}
@ -171,7 +164,7 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
int32_t catalogGetUdfInfo(const string& funcName, SFuncInfo* pInfo) const {
auto it = udf_.find(funcName);
if (udf_.end() == it) {
return TSDB_CODE_FAILED;
@ -236,15 +229,15 @@ class MockCatalogServiceImpl {
return code;
}
TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType,
int32_t numOfColumns, int32_t numOfTags) {
TableBuilder& createTableBuilder(const string& db, const string& tbname, int8_t tableType, int32_t numOfColumns,
int32_t numOfTags) {
builder_ = TableBuilder::createTableBuilder(tableType, numOfColumns, numOfTags);
meta_[db][tbname] = builder_->table();
meta_[db][tbname]->schema->uid = getNextId();
return *(builder_.get());
}
void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid) {
void createSubTable(const string& db, const string& stbname, const string& tbname, int16_t vgid) {
std::unique_ptr<STableMeta> table;
if (TSDB_CODE_SUCCESS != copyTableSchemaMeta(db, stbname, &table)) {
throw std::runtime_error("copyTableSchemaMeta failed");
@ -274,13 +267,13 @@ class MockCatalogServiceImpl {
// string field length
#define SFL 20
// string field header
#define SH(h) CA(SFL, std::string(h))
#define SH(h) CA(SFL, string(h))
// string field
#define SF(n) CA(SFL, n)
// integer field length
#define IFL 10
// integer field header
#define IH(i) CA(IFL, std::string(i))
#define IH(i) CA(IFL, string(i))
// integer field
#define IF(i) CA(IFL, std::to_string(i))
// split line
@ -308,7 +301,7 @@ class MockCatalogServiceImpl {
int16_t numOfFields = numOfColumns + schema->tableInfo.numOfTags;
for (int16_t i = 0; i < numOfFields; ++i) {
const SSchema* col = schema->schema + i;
std::cout << SF(std::string(col->name)) << SH(ftToString(i, numOfColumns)) << SH(dtToString(col->type))
std::cout << SF(string(col->name)) << SH(ftToString(i, numOfColumns)) << SH(dtToString(col->type))
<< IF(col->bytes) << std::endl;
}
std::cout << std::endl;
@ -316,7 +309,7 @@ class MockCatalogServiceImpl {
}
}
void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize) {
void createFunction(const string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize) {
std::shared_ptr<SFuncInfo> info(new SFuncInfo);
strcpy(info->name, func.c_str());
info->funcType = funcType;
@ -342,19 +335,19 @@ class MockCatalogServiceImpl {
info.expr = strdup(pReq->expr);
auto it = index_.find(pReq->stb);
if (index_.end() == it) {
index_.insert(std::make_pair(std::string(pReq->stb), std::vector<STableIndexInfo>{info}));
index_.insert(std::make_pair(string(pReq->stb), std::vector<STableIndexInfo>{info}));
} else {
it->second.push_back(info);
}
}
void createDnode(int32_t dnodeId, const std::string& host, int16_t port) {
void createDnode(int32_t dnodeId, const string& host, int16_t port) {
SEpSet epSet = {0};
addEpIntoEpSet(&epSet, host.c_str(), port);
dnode_.insert(std::make_pair(dnodeId, epSet));
}
void createDatabase(const std::string& db, bool rollup, int8_t cacheLast) {
void createDatabase(const string& db, bool rollup, int8_t cacheLast) {
SDbCfgInfo cfg = {0};
if (rollup) {
cfg.pRetensions = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SRetention));
@ -364,12 +357,12 @@ class MockCatalogServiceImpl {
}
private:
typedef std::map<std::string, std::shared_ptr<MockTableMeta>> TableMetaCache;
typedef std::map<std::string, TableMetaCache> DbMetaCache;
typedef std::map<std::string, std::shared_ptr<SFuncInfo>> UdfMetaCache;
typedef std::map<std::string, std::vector<STableIndexInfo>> IndexMetaCache;
typedef std::map<int32_t, SEpSet> DnodeCache;
typedef std::map<std::string, SDbCfgInfo> DbCfgCache;
typedef std::map<string, std::shared_ptr<MockTableMeta>> TableMetaCache;
typedef std::map<string, TableMetaCache> DbMetaCache;
typedef std::map<string, std::shared_ptr<SFuncInfo>> UdfMetaCache;
typedef std::map<string, std::vector<STableIndexInfo>> IndexMetaCache;
typedef std::map<int32_t, SEpSet> DnodeCache;
typedef std::map<string, SDbCfgInfo> DbCfgCache;
uint64_t getNextId() { return id_++; }
@ -386,15 +379,15 @@ class MockCatalogServiceImpl {
return pDst;
}
std::string toDbname(const std::string& dbFullName) const {
std::string::size_type n = dbFullName.find(".");
if (n == std::string::npos) {
string toDbname(const string& dbFullName) const {
string::size_type n = dbFullName.find(".");
if (n == string::npos) {
return dbFullName;
}
return dbFullName.substr(n + 1);
}
std::string ttToString(int8_t tableType) const {
string ttToString(int8_t tableType) const {
switch (tableType) {
case TSDB_SUPER_TABLE:
return "super table";
@ -407,7 +400,7 @@ class MockCatalogServiceImpl {
}
}
std::string pToString(uint8_t precision) const {
string pToString(uint8_t precision) const {
switch (precision) {
case TSDB_TIME_PRECISION_MILLI:
return "millisecond";
@ -420,19 +413,18 @@ class MockCatalogServiceImpl {
}
}
std::string dtToString(int8_t type) const { return tDataTypes[type].name; }
string dtToString(int8_t type) const { return tDataTypes[type].name; }
std::string ftToString(int16_t colid, int16_t numOfColumns) const {
string ftToString(int16_t colid, int16_t numOfColumns) const {
return (0 == colid ? "column" : (colid < numOfColumns ? "column" : "tag"));
}
STableMeta* getTableSchemaMeta(const std::string& db, const std::string& tbname) const {
STableMeta* getTableSchemaMeta(const string& db, const string& tbname) const {
std::shared_ptr<MockTableMeta> table = getTableMeta(db, tbname);
return table ? table->schema : nullptr;
}
int32_t copyTableSchemaMeta(const std::string& db, const std::string& tbname,
std::unique_ptr<STableMeta>* dst) const {
int32_t copyTableSchemaMeta(const string& db, const string& tbname, std::unique_ptr<STableMeta>* dst) const {
STableMeta* src = getTableSchemaMeta(db, tbname);
if (nullptr == src) {
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
@ -446,7 +438,7 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS;
}
int32_t copyTableVgroup(const std::string& db, const std::string& tbname, SVgroupInfo* vg) const {
int32_t copyTableVgroup(const string& db, const string& tbname, SVgroupInfo* vg) const {
std::shared_ptr<MockTableMeta> table = getTableMeta(db, tbname);
if (table->vgs.empty()) {
return TSDB_CODE_SUCCESS;
@ -455,7 +447,7 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS;
}
int32_t copyTableVgroup(const std::string& db, const std::string& tbname, SArray** vgList) const {
int32_t copyTableVgroup(const string& db, const string& tbname, SArray** vgList) const {
std::shared_ptr<MockTableMeta> table = getTableMeta(db, tbname);
if (table->vgs.empty()) {
return TSDB_CODE_SUCCESS;
@ -467,7 +459,7 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS;
}
std::shared_ptr<MockTableMeta> getTableMeta(const std::string& db, const std::string& tbname) const {
std::shared_ptr<MockTableMeta> getTableMeta(const string& db, const string& tbname) const {
DbMetaCache::const_iterator it = meta_.find(db);
if (meta_.end() == it) {
return std::shared_ptr<MockTableMeta>();
@ -527,6 +519,40 @@ class MockCatalogServiceImpl {
return code;
}
int32_t catalogGetDBVgListImpl(const string& dbName, SArray** pVgList) const {
DbMetaCache::const_iterator it = meta_.find(dbName);
if (meta_.end() == it) {
return TSDB_CODE_FAILED;
}
std::set<int32_t> vgSet;
*pVgList = taosArrayInit(it->second.size(), sizeof(SVgroupInfo));
for (const auto& vgs : it->second) {
for (const auto& vg : vgs.second->vgs) {
if (0 == vgSet.count(vg.vgId)) {
taosArrayPush(*pVgList, &vg);
vgSet.insert(vg.vgId);
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetAllDBVgList(SArray** pVgList) const {
std::set<int32_t> vgSet;
*pVgList = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVgroupInfo));
for (const auto& db : meta_) {
for (const auto& vgs : db.second) {
for (const auto& vg : vgs.second->vgs) {
if (0 == vgSet.count(vg.vgId)) {
taosArrayPush(*pVgList, &vg);
vgSet.insert(vg.vgId);
}
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t getAllDbCfg(SArray* pDbCfgReq, SArray** pDbCfgData) const {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pDbCfgReq) {
@ -634,30 +660,29 @@ MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {
MockCatalogService::~MockCatalogService() {}
ITableBuilder& MockCatalogService::createTableBuilder(const std::string& db, const std::string& tbname,
int8_t tableType, int32_t numOfColumns, int32_t numOfTags) {
ITableBuilder& MockCatalogService::createTableBuilder(const string& db, const string& tbname, int8_t tableType,
int32_t numOfColumns, int32_t numOfTags) {
return impl_->createTableBuilder(db, tbname, tableType, numOfColumns, numOfTags);
}
void MockCatalogService::createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname,
int16_t vgid) {
void MockCatalogService::createSubTable(const string& db, const string& stbname, const string& tbname, int16_t vgid) {
impl_->createSubTable(db, stbname, tbname, vgid);
}
void MockCatalogService::showTables() const { impl_->showTables(); }
void MockCatalogService::createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen,
void MockCatalogService::createFunction(const string& func, int8_t funcType, int8_t outputType, int32_t outputLen,
int32_t bufSize) {
impl_->createFunction(func, funcType, outputType, outputLen, bufSize);
}
void MockCatalogService::createSmaIndex(const SMCreateSmaReq* pReq) { impl_->createSmaIndex(pReq); }
void MockCatalogService::createDnode(int32_t dnodeId, const std::string& host, int16_t port) {
void MockCatalogService::createDnode(int32_t dnodeId, const string& host, int16_t port) {
impl_->createDnode(dnodeId, host, port);
}
void MockCatalogService::createDatabase(const std::string& db, bool rollup, int8_t cacheLast) {
void MockCatalogService::createDatabase(const string& db, bool rollup, int8_t cacheLast) {
impl_->createDatabase(db, rollup, cacheLast);
}
@ -683,7 +708,7 @@ int32_t MockCatalogService::catalogGetDBCfg(const char* pDbFName, SDbCfgInfo* pD
return impl_->catalogGetDBCfg(pDbFName, pDbCfg);
}
int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
int32_t MockCatalogService::catalogGetUdfInfo(const string& funcName, SFuncInfo* pInfo) const {
return impl_->catalogGetUdfInfo(funcName, pInfo);
}

View File

@ -250,11 +250,12 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
}
if (NULL == pScanCols) {
return NULL == pScanPseudoCols
? SCAN_TYPE_TABLE
: ((FUNCTION_TYPE_BLOCK_DIST_INFO == ((SFunctionNode*)nodesListGetNode(pScanPseudoCols, 0))->funcType)
? SCAN_TYPE_BLOCK_INFO
: SCAN_TYPE_TABLE);
if (NULL == pScanPseudoCols) {
return SCAN_TYPE_TABLE;
}
return FUNCTION_TYPE_BLOCK_DIST_INFO == ((SFunctionNode*)nodesListGetNode(pScanPseudoCols, 0))->funcType
? SCAN_TYPE_BLOCK_INFO
: SCAN_TYPE_TABLE;
}
return SCAN_TYPE_TABLE;
@ -333,6 +334,8 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
return TSDB_CODE_SUCCESS;
}
static bool needScanDefaultCol(EScanType scanType) { return SCAN_TYPE_TABLE_COUNT != scanType; }
static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable,
SLogicNode** pLogicNode) {
SScanLogicNode* pScan = NULL;
@ -367,7 +370,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan->hasNormalCols = true;
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code && needScanDefaultCol(pScan->scanType)) {
code = addDefaultScanCol(pRealTable->pMeta, &pScan->pScanCols);
}

View File

@ -16,6 +16,7 @@
#include "filter.h"
#include "functionMgt.h"
#include "planInt.h"
#include "systable.h"
#include "tglobal.h"
#include "ttime.h"
@ -64,6 +65,7 @@ typedef enum ECondAction {
} ECondAction;
typedef bool (*FMayBeOptimized)(SLogicNode* pNode);
typedef bool (*FShouldBeOptimized)(SLogicNode* pNode, void* pInfo);
static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func) {
if (func(pNode)) {
@ -79,6 +81,19 @@ static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func)
return NULL;
}
static bool optFindEligibleNode(SLogicNode* pNode, FShouldBeOptimized func, void* pInfo) {
if (func(pNode, pInfo)) {
return true;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
if (optFindEligibleNode((SLogicNode*)pChild, func, pInfo)) {
return true;
}
}
return false;
}
static void optResetParent(SLogicNode* pNode) {
SNode* pChild = NULL;
FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; }
@ -2454,6 +2469,243 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
return TSDB_CODE_SUCCESS;
}
typedef struct STbCntScanOptInfo {
SAggLogicNode* pAgg;
SScanLogicNode* pScan;
SName table;
} STbCntScanOptInfo;
static bool tbCntScanOptIsEligibleGroupKeys(SNodeList* pGroupKeys) {
if (NULL == pGroupKeys) {
return true;
}
SNode* pGroupKey = NULL;
FOREACH(pGroupKey, pGroupKeys) {
SNode* pKey = nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pKey)) {
return false;
}
SColumnNode* pCol = (SColumnNode*)pKey;
if (0 != strcmp(pCol->colName, "db_name") && 0 != strcmp(pCol->colName, "stable_name")) {
return false;
}
}
return true;
}
static bool tbCntScanOptNotNullableExpr(SNode* pNode) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false;
}
const char* pColName = ((SColumnNode*)pNode)->colName;
return 0 == strcmp(pColName, "*") || 0 == strcmp(pColName, "db_name") || 0 == strcmp(pColName, "stable_name") ||
0 == strcmp(pColName, "table_name");
}
static bool tbCntScanOptIsEligibleAggFuncs(SNodeList* pAggFuncs) {
SNode* pNode = NULL;
FOREACH(pNode, pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)nodesListGetNode(pAggFuncs, 0);
if (FUNCTION_TYPE_COUNT != pFunc->funcType ||
!tbCntScanOptNotNullableExpr(nodesListGetNode(pFunc->pParameterList, 0))) {
return false;
}
}
return true;
}
static bool tbCntScanOptIsEligibleAgg(SAggLogicNode* pAgg) {
return tbCntScanOptIsEligibleGroupKeys(pAgg->pGroupKeys) && tbCntScanOptIsEligibleAggFuncs(pAgg->pAggFuncs);
}
static bool tbCntScanOptGetColValFromCond(SOperatorNode* pOper, SColumnNode** pCol, SValueNode** pVal) {
if (OP_TYPE_EQUAL != pOper->opType) {
return false;
}
*pCol = NULL;
*pVal = NULL;
if (QUERY_NODE_COLUMN == nodeType(pOper->pLeft)) {
*pCol = (SColumnNode*)pOper->pLeft;
} else if (QUERY_NODE_VALUE == nodeType(pOper->pLeft)) {
*pVal = (SValueNode*)pOper->pLeft;
}
if (QUERY_NODE_COLUMN == nodeType(pOper->pRight)) {
*pCol = (SColumnNode*)pOper->pRight;
} else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) {
*pVal = (SValueNode*)pOper->pRight;
}
return NULL != *pCol && NULL != *pVal;
}
static bool tbCntScanOptIsEligibleLogicCond(STbCntScanOptInfo* pInfo, SLogicConditionNode* pCond) {
if (LOGIC_COND_TYPE_AND != pCond->condType) {
return false;
}
bool hasDbCond = false;
bool hasStbCond = false;
SColumnNode* pCol = NULL;
SValueNode* pVal = NULL;
SNode* pNode = NULL;
FOREACH(pNode, pCond->pParameterList) {
if (QUERY_NODE_OPERATOR != nodeType(pNode) || !tbCntScanOptGetColValFromCond((SOperatorNode*)pNode, &pCol, &pVal)) {
return false;
}
if (!hasDbCond && 0 == strcmp(pCol->colName, "db_name")) {
hasDbCond = true;
strcpy(pInfo->table.dbname, pVal->literal);
} else if (!hasStbCond && 0 == strcmp(pCol->colName, "stable_name")) {
hasStbCond = true;
strcpy(pInfo->table.tname, pVal->literal);
} else {
return false;
}
}
return hasDbCond;
}
static bool tbCntScanOptIsEligibleOpCond(SOperatorNode* pCond) {
SColumnNode* pCol = NULL;
SValueNode* pVal = NULL;
if (!tbCntScanOptGetColValFromCond(pCond, &pCol, &pVal)) {
return false;
}
return 0 == strcmp(pCol->colName, "db_name");
}
static bool tbCntScanOptIsEligibleConds(STbCntScanOptInfo* pInfo, SNode* pConditions) {
if (NULL == pConditions) {
return true;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pConditions)) {
return tbCntScanOptIsEligibleLogicCond(pInfo, (SLogicConditionNode*)pConditions);
}
if (QUERY_NODE_OPERATOR == nodeType(pConditions)) {
return tbCntScanOptIsEligibleOpCond((SOperatorNode*)pConditions);
}
return false;
}
static bool tbCntScanOptIsEligibleScan(STbCntScanOptInfo* pInfo) {
if (0 != strcmp(pInfo->pScan->tableName.dbname, TSDB_INFORMATION_SCHEMA_DB) ||
0 != strcmp(pInfo->pScan->tableName.tname, TSDB_INS_TABLE_TABLES) || NULL != pInfo->pScan->pGroupTags) {
return false;
}
if (1 == pInfo->pScan->pVgroupList->numOfVgroups && MNODE_HANDLE == pInfo->pScan->pVgroupList->vgroups[0].vgId) {
return false;
}
return tbCntScanOptIsEligibleConds(pInfo, pInfo->pScan->node.pConditions);
}
static bool tbCntScanOptShouldBeOptimized(SLogicNode* pNode, STbCntScanOptInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
return false;
}
pInfo->pAgg = (SAggLogicNode*)pNode;
pInfo->pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
return tbCntScanOptIsEligibleAgg(pInfo->pAgg) && tbCntScanOptIsEligibleScan(pInfo);
}
static SNode* tbCntScanOptCreateTableCountFunc() {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) {
return NULL;
}
strcpy(pFunc->functionName, "_table_count");
strcpy(pFunc->node.aliasName, "_table_count");
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc, NULL, 0)) {
nodesDestroyNode((SNode*)pFunc);
return NULL;
}
return (SNode*)pFunc;
}
static int32_t tbCntScanOptRewriteScan(STbCntScanOptInfo* pInfo) {
pInfo->pScan->scanType = SCAN_TYPE_TABLE_COUNT;
strcpy(pInfo->pScan->tableName.dbname, pInfo->table.dbname);
strcpy(pInfo->pScan->tableName.tname, pInfo->table.tname);
NODES_DESTORY_LIST(pInfo->pScan->node.pTargets);
NODES_DESTORY_LIST(pInfo->pScan->pScanCols);
NODES_DESTORY_NODE(pInfo->pScan->node.pConditions);
NODES_DESTORY_LIST(pInfo->pScan->pScanPseudoCols);
int32_t code = nodesListMakeStrictAppend(&pInfo->pScan->pScanPseudoCols, tbCntScanOptCreateTableCountFunc());
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExpr(nodesListGetNode(pInfo->pScan->pScanPseudoCols, 0), &pInfo->pScan->node.pTargets);
}
SNode* pGroupKey = NULL;
FOREACH(pGroupKey, pInfo->pAgg->pGroupKeys) {
SNode* pGroupCol = nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0);
code = nodesListMakeStrictAppend(&pInfo->pScan->pGroupTags, nodesCloneNode(pGroupCol));
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pScan->pScanCols, nodesCloneNode(pGroupCol));
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pScan->node.pTargets, nodesCloneNode(pGroupCol));
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static int32_t tbCntScanOptCreateSumFunc(SFunctionNode* pCntFunc, SNode* pParam, SNode** pOutput) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pFunc->functionName, "sum");
strcpy(pFunc->node.aliasName, pCntFunc->node.aliasName);
int32_t code = createColumnByRewriteExpr(pParam, &pFunc->pParameterList);
if (TSDB_CODE_SUCCESS == code) {
code = fmGetFuncInfo(pFunc, NULL, 0);
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = (SNode*)pFunc;
} else {
nodesDestroyNode((SNode*)pFunc);
}
return code;
}
static int32_t tbCntScanOptRewriteAgg(SAggLogicNode* pAgg) {
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0);
SNode* pSum = NULL;
int32_t code = tbCntScanOptCreateSumFunc((SFunctionNode*)nodesListGetNode(pAgg->pAggFuncs, 0),
nodesListGetNode(pScan->pScanPseudoCols, 0), &pSum);
if (TSDB_CODE_SUCCESS == code) {
NODES_DESTORY_LIST(pAgg->pAggFuncs);
code = nodesListMakeStrictAppend(&pAgg->pAggFuncs, pSum);
}
if (TSDB_CODE_SUCCESS == code) {
code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, 0, pAgg);
}
NODES_DESTORY_LIST(pAgg->pGroupKeys);
return code;
}
static int32_t tableCountScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
STbCntScanOptInfo info = {0};
if (!optFindEligibleNode(pLogicSubplan->pNode, (FShouldBeOptimized)tbCntScanOptShouldBeOptimized, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = tbCntScanOptRewriteScan(&info);
if (TSDB_CODE_SUCCESS == code) {
code = tbCntScanOptRewriteAgg(info.pAgg);
}
return code;
}
// clang-format off
static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
@ -2468,7 +2720,8 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
{.pName = "TagScan", .optimizeFunc = tagScanOptimize},
{.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize}
{.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize},
{.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize},
};
// clang-format on

View File

@ -489,6 +489,8 @@ static ENodeType getScanOperatorType(EScanType scanType) {
return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
case SCAN_TYPE_BLOCK_INFO:
return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
case SCAN_TYPE_TABLE_COUNT:
return QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN;
default:
break;
}
@ -524,6 +526,24 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu
pScan->ignoreNull = pScanLogicNode->igLastNull;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
}
static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
STableCountScanPhysiNode* pScan = (STableCountScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
nodesDestroyNode((SNode*)pScan);
return TSDB_CODE_OUT_OF_MEMORY;
}
pScan->groupSort = pScanLogicNode->groupSort;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
@ -589,7 +609,6 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
pScan->accountId = pCxt->pPlanCxt->acctId;
pScan->sysInfo = pCxt->pPlanCxt->sysInfo;
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) ||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLE_DISTRIBUTED) ||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS)) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
} else {
@ -624,6 +643,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
case SCAN_TYPE_TAG:
case SCAN_TYPE_BLOCK_INFO:
return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
case SCAN_TYPE_TABLE_COUNT:
return createTableCountScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
case SCAN_TYPE_LAST_ROW:
return createLastRowScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
case SCAN_TYPE_TABLE:

View File

@ -292,6 +292,20 @@ static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) {
return true;
}
static bool stbSplIsTableCountQuery(SLogicNode* pNode) {
if (1 != LIST_LENGTH(pNode->pChildren)) {
return false;
}
SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
return false;
}
pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
}
return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType;
}
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pGroupTags;
@ -340,7 +354,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
case QUERY_NODE_LOGIC_PLAN_AGG:
return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
stbSplIsPartTableAgg((SAggLogicNode*)pNode)) &&
stbSplHasMultiTbScan(streamQuery, pNode);
stbSplHasMultiTbScan(streamQuery, pNode) && !stbSplIsTableCountQuery(pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return stbSplNeedSplitWindow(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_SORT:

View File

@ -20,13 +20,6 @@ using namespace std;
class PlanSysTableTest : public PlannerTestBase {};
TEST_F(PlanSysTableTest, show) {
useDb("root", "test");
run("show tables");
run("show stables");
}
TEST_F(PlanSysTableTest, informationSchema) {
useDb("root", "information_schema");
@ -38,3 +31,17 @@ TEST_F(PlanSysTableTest, withAgg) {
run("SELECT COUNT(1) FROM ins_users");
}
TEST_F(PlanSysTableTest, tableCount) {
useDb("root", "information_schema");
run("SELECT COUNT(*) FROM ins_tables");
run("SELECT COUNT(*) FROM ins_tables WHERE db_name = 'test'");
run("SELECT COUNT(*) FROM ins_tables WHERE db_name = 'test' AND stable_name = 'st1'");
run("SELECT db_name, COUNT(*) FROM ins_tables GROUP BY db_name");
run("SELECT db_name, stable_name, COUNT(*) FROM ins_tables GROUP BY db_name, stable_name");
}

View File

@ -363,7 +363,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
int32_t qwKillTaskHandle(SQWTaskCtx *ctx);
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode);
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status);
int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);

View File

@ -336,7 +336,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
}
if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
taosMemoryFree(msg);
rpcFreeCont(msg);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}

View File

@ -279,14 +279,14 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
}
}
int32_t qwKillTaskHandle(SQWTaskCtx *ctx) {
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
int32_t code = 0;
// Note: free/kill may in RC
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
qDebug("start to kill task");
code = qAsyncKillTask(taskHandle);
code = qAsyncKillTask(taskHandle, rspCode);
atomic_store_ptr(&ctx->taskHandle, taskHandle);
}

View File

@ -411,7 +411,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
// qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
@ -420,7 +420,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
case QW_PHASE_PRE_FETCH: {
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
@ -442,7 +442,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
case QW_PHASE_PRE_CQUERY: {
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
if (ctx->rspCode) {
@ -456,7 +456,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
// qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
break;
@ -502,7 +502,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
@ -515,7 +515,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
if (ctx->rspCode) {
@ -861,7 +861,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(ctx));
QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else {
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
@ -869,6 +869,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (!dropped) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
@ -1195,8 +1196,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
}
if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx);
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
} else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}

View File

@ -302,7 +302,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
return 0;
}
int32_t qwtKillTask(qTaskInfo_t qinfo) { return 0; }
int32_t qwtKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return 0; }
void qwtDestroyTask(qTaskInfo_t qHandle) {}

View File

@ -101,6 +101,11 @@ typedef int32_t (*filter_desc_compare_func)(const void *, const void *);
typedef bool (*filter_exec_func)(void *, int32_t, SColumnInfoData *, SColumnDataAgg *, int16_t, int32_t *);
typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char *, void **);
typedef struct SFilterDataInfo {
int32_t idx;
void* addr;
} SFilterDataInfo;
typedef struct SFilterRangeCompare {
int64_t s;
int64_t e;
@ -294,9 +299,9 @@ struct SFilterInfo {
#define CHK_OR_OPTR(ctx) ((ctx)->isnull == true && (ctx)->notnull == true)
#define CHK_AND_OPTR(ctx) ((ctx)->isnull == true && (((ctx)->notnull == true) || ((ctx)->isrange == true)))
#define FILTER_GET_FLAG(st, f) (st & f)
#define FILTER_SET_FLAG(st, f) st |= (f)
#define FILTER_CLR_FLAG(st, f) st &= (~f)
#define FILTER_GET_FLAG(st, f) ((st) & (f))
#define FILTER_SET_FLAG(st, f) (st) |= (f)
#define FILTER_CLR_FLAG(st, f) (st) &= (~f)
#define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src)
#define FLT_PACKAGE_UNIT_HASH_KEY(v, op1, op2, lidx, ridx, ridx2) \

View File

@ -963,16 +963,17 @@ int32_t filterGetFiledByDesc(SFilterFields *fields, int32_t type, void *v) {
return -1;
}
int32_t filterGetFiledByData(SFilterInfo *info, int32_t type, void *v, int32_t dataLen) {
int32_t filterGetFiledByData(SFilterInfo *info, int32_t type, void *v, int32_t dataLen, bool *sameBuf) {
if (type == FLD_TYPE_VALUE) {
if (info->pctx.valHash == false) {
qError("value hash is empty");
return -1;
}
void *hv = taosHashGet(info->pctx.valHash, v, dataLen);
if (hv) {
return *(int32_t *)hv;
SFilterDataInfo *dInfo = taosHashGet(info->pctx.valHash, v, dataLen);
if (dInfo) {
*sameBuf = (dInfo->addr == v);
return dInfo->idx;
}
}
@ -982,9 +983,10 @@ int32_t filterGetFiledByData(SFilterInfo *info, int32_t type, void *v, int32_t d
// In the params, we should use void *data instead of void **data, there is no need to use taosMemoryFreeClear(*data) to
// set *data = 0 Besides, fields data value is a pointer, so dataLen should be POINTER_BYTES for better.
int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type, SFilterFieldId *fid, int32_t dataLen,
bool freeIfExists) {
bool freeIfExists, int16_t *srcFlag) {
int32_t idx = -1;
uint32_t *num;
bool sameBuf = false;
num = &info->fields[type].num;
@ -992,7 +994,7 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type,
if (type == FLD_TYPE_COLUMN) {
idx = filterGetFiledByDesc(&info->fields[type], type, desc);
} else if (data && (*data) && dataLen > 0 && FILTER_GET_FLAG(info->options, FLT_OPTION_NEED_UNIQE)) {
idx = filterGetFiledByData(info, type, *data, dataLen);
idx = filterGetFiledByData(info, type, *data, dataLen, &sameBuf);
}
}
@ -1020,11 +1022,17 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
}
taosHashPut(info->pctx.valHash, *data, dataLen, &idx, sizeof(idx));
SFilterDataInfo dInfo = {idx, *data};
taosHashPut(info->pctx.valHash, *data, dataLen, &dInfo, sizeof(dInfo));
if (srcFlag) {
FILTER_SET_FLAG(*srcFlag, FLD_DATA_NO_FREE);
}
}
} else {
if (data && freeIfExists) {
} else if (type != FLD_TYPE_COLUMN && data) {
if (freeIfExists) {
taosMemoryFreeClear(*data);
} else if (sameBuf) {
FILTER_SET_FLAG(*srcFlag, FLD_DATA_NO_FREE);
}
}
@ -1035,7 +1043,7 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type,
}
static FORCE_INLINE int32_t filterAddColFieldFromField(SFilterInfo *info, SFilterField *field, SFilterFieldId *fid) {
filterAddField(info, field->desc, &field->data, FILTER_GET_TYPE(field->flag), fid, 0, false);
filterAddField(info, field->desc, &field->data, FILTER_GET_TYPE(field->flag), fid, 0, false, NULL);
FILTER_SET_FLAG(field->flag, FLD_DATA_NO_FREE);
@ -1064,7 +1072,7 @@ int32_t filterAddFieldFromNode(SFilterInfo *info, SNode *node, SFilterFieldId *f
v = node;
}
filterAddField(info, v, NULL, type, fid, 0, true);
filterAddField(info, v, NULL, type, fid, 0, true, NULL);
return TSDB_CODE_SUCCESS;
}
@ -1195,7 +1203,7 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) {
len = tDataTypes[type].bytes;
filterAddField(info, NULL, (void **)&out.columnData->pData, FLD_TYPE_VALUE, &right, len, true);
filterAddField(info, NULL, (void **)&out.columnData->pData, FLD_TYPE_VALUE, &right, len, true, NULL);
out.columnData->pData = NULL;
} else {
void *data = taosMemoryCalloc(1, tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes); // reserved space for simple_copy
@ -1203,7 +1211,7 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) {
FLT_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
memcpy(data, nodesGetValueFromNode(valueNode), tDataTypes[type].bytes);
filterAddField(info, NULL, (void **)&data, FLD_TYPE_VALUE, &right, len, true);
filterAddField(info, NULL, (void **)&data, FLD_TYPE_VALUE, &right, len, true, NULL);
}
filterAddUnit(info, OP_TYPE_EQUAL, &left, &right, &uidx);
@ -1234,28 +1242,26 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit *u
uint8_t type = FILTER_UNIT_DATA_TYPE(u);
uint16_t flag = 0;
filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left, 0, false);
filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left, 0, false, NULL);
SFilterField *t = FILTER_UNIT_LEFT_FIELD(src, u);
if (u->right.type == FLD_TYPE_VALUE) {
void *data = FILTER_UNIT_VAL_DATA(src, u);
SFilterField *rField = FILTER_UNIT_RIGHT_FIELD(src, u);
if (IS_VAR_DATA_TYPE(type)) {
if (FILTER_UNIT_OPTR(u) == OP_TYPE_IN) {
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, POINTER_BYTES,
false); // POINTER_BYTES should be sizeof(SHashObj), but POINTER_BYTES is also right.
false, &rField->flag); // POINTER_BYTES should be sizeof(SHashObj), but POINTER_BYTES is also right.
t = FILTER_GET_FIELD(dst, right);
FILTER_SET_FLAG(t->flag, FLD_DATA_IS_HASH);
} else {
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, varDataTLen(data), false);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, varDataTLen(data), false, &rField->flag);
}
} else {
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, false);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, false, &rField->flag);
}
flag = FLD_DATA_NO_FREE;
t = FILTER_UNIT_RIGHT_FIELD(src, u);
FILTER_SET_FLAG(t->flag, flag);
} else {
pright = NULL;
}
@ -1313,17 +1319,17 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan
if (func(&ra->s, &ra->e) == 0) {
void *data = taosMemoryMalloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &ra->s);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL);
filterAddUnit(dst, OP_TYPE_EQUAL, &left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
return TSDB_CODE_SUCCESS;
} else {
void *data = taosMemoryMalloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &ra->s);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL);
void *data2 = taosMemoryMalloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data2, &ra->e);
filterAddField(dst, NULL, &data2, FLD_TYPE_VALUE, &right2, tDataTypes[type].bytes, true);
filterAddField(dst, NULL, &data2, FLD_TYPE_VALUE, &right2, tDataTypes[type].bytes, true, NULL);
filterAddUnitImpl(
dst, FILTER_GET_FLAG(ra->sflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_GREATER_THAN : OP_TYPE_GREATER_EQUAL, &left,
@ -1337,7 +1343,7 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan
if (!FILTER_GET_FLAG(ra->sflag, RANGE_FLG_NULL)) {
void *data = taosMemoryMalloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &ra->s);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL);
filterAddUnit(dst, FILTER_GET_FLAG(ra->sflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_GREATER_THAN : OP_TYPE_GREATER_EQUAL,
&left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
@ -1346,7 +1352,7 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan
if (!FILTER_GET_FLAG(ra->eflag, RANGE_FLG_NULL)) {
void *data = taosMemoryMalloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &ra->e);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL);
filterAddUnit(dst, FILTER_GET_FLAG(ra->eflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_LOWER_THAN : OP_TYPE_LOWER_EQUAL,
&left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
@ -1393,16 +1399,16 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan
if (func(&r->ra.s, &r->ra.e) == 0) {
void *data = taosMemoryMalloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &r->ra.s);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL);
filterAddUnit(dst, OP_TYPE_EQUAL, &left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
} else {
void *data = taosMemoryMalloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &r->ra.s);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL);
void *data2 = taosMemoryMalloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data2, &r->ra.e);
filterAddField(dst, NULL, &data2, FLD_TYPE_VALUE, &right2, tDataTypes[type].bytes, true);
filterAddField(dst, NULL, &data2, FLD_TYPE_VALUE, &right2, tDataTypes[type].bytes, true, NULL);
filterAddUnitImpl(
dst, FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_GREATER_THAN : OP_TYPE_GREATER_EQUAL, &left,
@ -1421,7 +1427,7 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan
if (!FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_NULL)) {
void *data = taosMemoryMalloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &r->ra.s);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL);
filterAddUnit(dst, FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_GREATER_THAN : OP_TYPE_GREATER_EQUAL,
&left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
@ -1430,7 +1436,7 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan
if (!FILTER_GET_FLAG(r->ra.eflag, RANGE_FLG_NULL)) {
void *data = taosMemoryMalloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &r->ra.e);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true, NULL);
filterAddUnit(dst, FILTER_GET_FLAG(r->ra.eflag, RANGE_FLG_EXCLUDE) ? OP_TYPE_LOWER_THAN : OP_TYPE_LOWER_EQUAL,
&left, &right, &uidx);
filterAddUnitToGroup(g, uidx);

View File

@ -391,6 +391,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet)
int64_t leftTime = tsMaxRetryWaitTime - lastTime;
pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
pCtx->roundTimes = 0;
goto _return;
}
@ -407,7 +409,7 @@ _return:
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0;
SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
if (NULL == pData) {
pTask->retryTimes = 0;
@ -430,15 +432,15 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pData && pData->pEpSet) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
} else if (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(rspCode)) {
} else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
} else {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SCH_SWITCH_EPSET(addr);
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
} else {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse,
addr->epSet.numOfEps, pEp->fqdn, pEp->port);
}
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {

View File

@ -1519,7 +1519,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
transFreeMsg(pResp->pCont);
transUnrefCliHandle(pConn);
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT) {
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_VND_STOPPED) {
tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true);
transFreeMsg(pResp->pCont);

View File

@ -330,6 +330,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped")
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")