[td-255] merge the develop branch and resolve the conflict.

This commit is contained in:
Haojun Liao 2021-08-13 10:48:25 +08:00
commit 4fadbc2d19
55 changed files with 20948 additions and 665 deletions

View File

@ -15,7 +15,7 @@ steps:
- mkdir debug - mkdir debug
- cd debug - cd debug
- cmake .. - cmake ..
- make - make -j4
trigger: trigger:
event: event:
- pull_request - pull_request
@ -39,7 +39,7 @@ steps:
- mkdir debug - mkdir debug
- cd debug - cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null - cmake .. -DCPUTYPE=aarch64 > /dev/null
- make - make -j4
trigger: trigger:
event: event:
- pull_request - pull_request
@ -66,7 +66,7 @@ steps:
- mkdir debug - mkdir debug
- cd debug - cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null - cmake .. -DCPUTYPE=aarch64 > /dev/null
- make - make -j4
trigger: trigger:
event: event:
- pull_request - pull_request
@ -91,7 +91,7 @@ steps:
- mkdir debug - mkdir debug
- cd debug - cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null - cmake .. -DCPUTYPE=aarch64 > /dev/null
- make - make -j4
trigger: trigger:
event: event:
- pull_request - pull_request
@ -116,7 +116,7 @@ steps:
- mkdir debug - mkdir debug
- cd debug - cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null - cmake .. -DCPUTYPE=aarch64 > /dev/null
- make - make -j4
trigger: trigger:
event: event:
- pull_request - pull_request
@ -142,7 +142,7 @@ steps:
- mkdir debug - mkdir debug
- cd debug - cd debug
- cmake .. -DCPUTYPE=aarch32 > /dev/null - cmake .. -DCPUTYPE=aarch32 > /dev/null
- make - make -j4
trigger: trigger:
event: event:
- pull_request - pull_request
@ -168,7 +168,7 @@ steps:
- mkdir debug - mkdir debug
- cd debug - cd debug
- cmake .. - cmake ..
- make - make -j4
trigger: trigger:
event: event:
- pull_request - pull_request
@ -193,7 +193,7 @@ steps:
- mkdir debug - mkdir debug
- cd debug - cd debug
- cmake .. - cmake ..
- make - make -j4
trigger: trigger:
event: event:
- pull_request - pull_request
@ -218,7 +218,7 @@ steps:
- mkdir debug - mkdir debug
- cd debug - cd debug
- cmake .. - cmake ..
- make - make -j4
trigger: trigger:
event: event:
- pull_request - pull_request
@ -241,7 +241,7 @@ steps:
- mkdir debug - mkdir debug
- cd debug - cd debug
- cmake .. - cmake ..
- make - make -j4
trigger: trigger:
event: event:
- pull_request - pull_request

4
Jenkinsfile vendored
View File

@ -4,6 +4,9 @@ properties([pipelineTriggers([githubPush()])])
node { node {
git url: 'https://github.com/taosdata/TDengine.git' git url: 'https://github.com/taosdata/TDengine.git'
} }
def skipbuild=0
def abortPreviousBuilds() { def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME def currentJobName = env.JOB_NAME
def currentBuildNumber = env.BUILD_NUMBER.toInteger() def currentBuildNumber = env.BUILD_NUMBER.toInteger()
@ -152,6 +155,7 @@ pipeline {
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
''' '''
script{ script{
skipbuild='2' skipbuild='2'
skipbuild=sh(script: "git log -2 --pretty=%B | fgrep -ie '[skip ci]' -e '[ci skip]' && echo 1 || echo 2", returnStdout:true) skipbuild=sh(script: "git log -2 --pretty=%B | fgrep -ie '[skip ci]' -e '[ci skip]' && echo 1 || echo 2", returnStdout:true)

View File

@ -1,6 +1,6 @@
# TDengine 集群安装、管理 # TDengine 集群安装、管理
多个TDengine服务器也就是多个taosd的运行实例可以组成一个集群以保证TDengine的高可靠运行并提供水平扩展能力。要了解TDengine 2.0的集群管理需要对集群的基本概念有所了解请看TDengine 2.0整体架构一章。而且在安装集群之前,先按照[《立即开始》](https://www.taosdata.com/cn/documentation/getting-started/)一章安装并体验单节点功能。 多个TDengine服务器也就是多个taosd的运行实例可以组成一个集群以保证TDengine的高可靠运行并提供水平扩展能力。要了解TDengine 2.0的集群管理,需要对集群的基本概念有所了解,请看TDengine整体架构一章。而且在安装集群之前,建议先按照[《立即开始》](https://www.taosdata.com/cn/documentation/getting-started/)一章安装并体验单节点功能。
集群的每个数据节点是由End Point来唯一标识的End Point是由FQDN(Fully Qualified Domain Name)外加Port组成比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname可通过Linux命令`hostname -f`获取如何配置FQDN请参考[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)。端口是这个数据节点对外服务的端口号缺省是6030但可以通过taos.cfg里配置参数serverPort进行修改。一个物理节点可能配置了多个hostname, TDengine会自动获取第一个但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问可以将参数fqdn设置为本节点的IP地址。 集群的每个数据节点是由End Point来唯一标识的End Point是由FQDN(Fully Qualified Domain Name)外加Port组成比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname可通过Linux命令`hostname -f`获取如何配置FQDN请参考[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)。端口是这个数据节点对外服务的端口号缺省是6030但可以通过taos.cfg里配置参数serverPort进行修改。一个物理节点可能配置了多个hostname, TDengine会自动获取第一个但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问可以将参数fqdn设置为本节点的IP地址。
@ -12,7 +12,7 @@ TDengine的集群管理极其简单除添加和删除节点需要人工干预
**第零步**规划集群所有物理节点的FQDN将规划好的FQDN分别添加到每个物理节点的/etc/hostname修改每个物理节点的/etc/hosts将所有集群物理节点的IP与FQDN的对应添加好。【如部署了DNS请联系网络管理员在DNS上做好相关配置】 **第零步**规划集群所有物理节点的FQDN将规划好的FQDN分别添加到每个物理节点的/etc/hostname修改每个物理节点的/etc/hosts将所有集群物理节点的IP与FQDN的对应添加好。【如部署了DNS请联系网络管理员在DNS上做好相关配置】
**第一步**如果搭建集群的物理节点中存有之前的测试数据、装过1.X的版本或者装过其他版本的TDengine请先将其删除并清空所有数据具体步骤请参考博客[《TDengine多种安装包的安装和卸载》](https://www.taosdata.com/blog/2019/08/09/566.html ) **第一步**如果搭建集群的物理节点中存有之前的测试数据、装过1.X的版本或者装过其他版本的TDengine请先将其删除并清空所有数据(如果需要保留原有数据,请联系涛思交付团队进行旧版本升级、数据迁移),具体步骤请参考博客[《TDengine多种安装包的安装和卸载》](https://www.taosdata.com/blog/2019/08/09/566.html )
**注意1**因为FQDN的信息会写进文件如果之前没有配置或者更改FQDN且启动了TDengine。请一定在确保数据无用或者备份的前提下清理一下之前的数据`rm -rf /var/lib/taos/*` **注意1**因为FQDN的信息会写进文件如果之前没有配置或者更改FQDN且启动了TDengine。请一定在确保数据无用或者备份的前提下清理一下之前的数据`rm -rf /var/lib/taos/*`
**注意2**客户端也需要配置确保它可以正确解析每个节点的FQDN配置不管是通过DNS服务还是 Host 文件。 **注意2**客户端也需要配置确保它可以正确解析每个节点的FQDN配置不管是通过DNS服务还是 Host 文件。
@ -23,23 +23,23 @@ TDengine的集群管理极其简单除添加和删除节点需要人工干预
**第四步**:检查所有数据节点,以及应用程序所在物理节点的网络设置: **第四步**:检查所有数据节点,以及应用程序所在物理节点的网络设置:
1. 每个物理节点上执行命令`hostname -f`查看和确认所有节点的hostname是不相同的(应用驱动所在节点无需做此项检查) 1. 每个物理节点上执行命令`hostname -f`查看和确认所有节点的hostname是不相同的(应用驱动所在节点无需做此项检查)
2. 每个物理节点上执行`ping host`, 其中host是其他物理节点的hostname, 看能否ping通其它物理节点; 如果不能ping通需要检查网络设置, 或/etc/hosts文件(Windows系统默认路径为C:\Windows\system32\drivers\etc\hosts)或DNS的配置。如果无法ping通是无法组成集群的 2. 每个物理节点上执行`ping host`其中host是其他物理节点的hostname看能否ping通其它物理节点如果不能ping通需要检查网络设置或/etc/hosts文件(Windows系统默认路径为C:\Windows\system32\drivers\etc\hosts)或DNS的配置。如果无法ping通是无法组成集群的
3. 从应用运行的物理节点ping taosd运行的数据节点如果无法ping通应用是无法连接taosd的请检查应用所在物理节点的DNS设置或hosts文件 3. 从应用运行的物理节点ping taosd运行的数据节点如果无法ping通应用是无法连接taosd的请检查应用所在物理节点的DNS设置或hosts文件
4. 每个数据节点的End Point就是输出的hostname外加端口号比如h1.taosdata.com:6030 4. 每个数据节点的End Point就是输出的hostname外加端口号比如h1.taosdata.com:6030
**第五步**修改TDengine的配置文件所有节点的文件/etc/taos/taos.cfg都需要修改。假设准备启动的第一个数据节点End Point为 h1.taosdata.com:6030, 其与集群配置相关参数如下: **第五步**修改TDengine的配置文件所有节点的文件/etc/taos/taos.cfg都需要修改。假设准备启动的第一个数据节点End Point为 h1.taosdata.com:6030其与集群配置相关参数如下:
``` ```
// firstEp 是每个数据节点首次启动后连接的第一个数据节点 // firstEp 是每个数据节点首次启动后连接的第一个数据节点
firstEp h1.taosdata.com:6030 firstEp h1.taosdata.com:6030
// 必须配置为本数据节点的FQDN如果本机只有一个hostname, 可注释掉本配置 // 必须配置为本数据节点的FQDN如果本机只有一个hostname, 可注释掉本
fqdn h1.taosdata.com fqdn h1.taosdata.com
// 配置本数据节点的端口号缺省是6030 // 配置本数据节点的端口号缺省是6030
serverPort 6030 serverPort 6030
// 使用场景请参考《Arbitrator的使用》的部分 // 副本数为偶数的时候,需要配置请参考《Arbitrator的使用》的部分
arbitrator ha.taosdata.com:6042 arbitrator ha.taosdata.com:6042
``` ```
@ -53,7 +53,7 @@ arbitrator ha.taosdata.com:6042
| 2 | mnodeEqualVnodeNum | 一个mnode等同于vnode消耗的个数 | | 2 | mnodeEqualVnodeNum | 一个mnode等同于vnode消耗的个数 |
| 3 | offlineThreshold | dnode离线阈值超过该时间将导致Dnode离线 | | 3 | offlineThreshold | dnode离线阈值超过该时间将导致Dnode离线 |
| 4 | statusInterval | dnode向mnode报告状态时长 | | 4 | statusInterval | dnode向mnode报告状态时长 |
| 5 | arbitrator | 系统中裁决器的end point | | 5 | arbitrator | 系统中裁决器的End Point |
| 6 | timezone | 时区 | | 6 | timezone | 时区 |
| 7 | balance | 是否启动负载均衡 | | 7 | balance | 是否启动负载均衡 |
| 8 | maxTablesPerVnode | 每个vnode中能够创建的最大表个数 | | 8 | maxTablesPerVnode | 每个vnode中能够创建的最大表个数 |
@ -87,7 +87,7 @@ taos>
1. 按照[《立即开始》](https://www.taosdata.com/cn/documentation/getting-started/)一章的方法在每个物理节点启动taosd注意每个物理节点都需要在 taos.cfg 文件中将 firstEP 参数配置为新集群首个节点的 End Point——在本例中是 h1.taos.com:6030 1. 按照[《立即开始》](https://www.taosdata.com/cn/documentation/getting-started/)一章的方法在每个物理节点启动taosd注意每个物理节点都需要在 taos.cfg 文件中将 firstEP 参数配置为新集群首个节点的 End Point——在本例中是 h1.taos.com:6030
2. 在第一个数据节点使用CLI程序taos, 登录进TDengine系统, 执行命令: 2. 在第一个数据节点使用CLI程序taos登录进TDengine系统执行命令
``` ```
CREATE DNODE "h2.taos.com:6030"; CREATE DNODE "h2.taos.com:6030";
@ -101,7 +101,7 @@ taos>
SHOW DNODES; SHOW DNODES;
``` ```
查看新节点是否被成功加入。如果该被加入的数据节点处于离线状态,请做两个检查 查看新节点是否被成功加入。如果该被加入的数据节点处于离线状态,请做两个检查
- 查看该数据节点的taosd是否正常工作如果没有正常运行需要先检查为什么 - 查看该数据节点的taosd是否正常工作如果没有正常运行需要先检查为什么
- 查看该数据节点taosd日志文件taosdlog.0里前面几行日志(一般在/var/log/taos目录)看日志里输出的该数据节点fqdn以及端口号是否为刚添加的End Point。如果不一致需要将正确的End Point添加进去。 - 查看该数据节点taosd日志文件taosdlog.0里前面几行日志(一般在/var/log/taos目录)看日志里输出的该数据节点fqdn以及端口号是否为刚添加的End Point。如果不一致需要将正确的End Point添加进去。
@ -121,7 +121,7 @@ taos>
### 添加数据节点 ### 添加数据节点
执行CLI程序taos, 使用root账号登录进系统, 执行: 执行CLI程序taos使用root账号登录进系统执行
``` ```
CREATE DNODE "fqdn:port"; CREATE DNODE "fqdn:port";
@ -131,13 +131,13 @@ CREATE DNODE "fqdn:port";
### 删除数据节点 ### 删除数据节点
执行CLI程序taos, 使用root账号登录进TDengine系统执行 执行CLI程序taos使用root账号登录进TDengine系统执行
``` ```mysql
DROP DNODE "fqdn:port"; DROP DNODE "fqdn:port | dnodeID";
``` ```
其中fqdn是被删除的节点的FQDNport是其对外服务器的端口号 通过"fqdn:port"或"dnodeID"来指定一个具体的节点都是可以的。其中fqdn是被删除的节点的FQDNport是其对外服务器的端口号dnodeID可以通过SHOW DNODES获得。
<font color=green>**【注意】**</font> <font color=green>**【注意】**</font>
@ -147,25 +147,41 @@ DROP DNODE "fqdn:port";
- 一个数据节点被drop之后其他节点都会感知到这个dnodeID的删除操作任何集群中的节点都不会再接收此dnodeID的请求。 - 一个数据节点被drop之后其他节点都会感知到这个dnodeID的删除操作任何集群中的节点都不会再接收此dnodeID的请求。
- dnodeID的是集群自动分配的不得人工指定。它在生成时递增的不会重复。 - dnodeID是集群自动分配的不得人工指定。它在生成时是递增的不会重复。
### 手动迁移数据节点
手动将某个vnode迁移到指定的dnode。
执行CLI程序taos使用root账号登录进TDengine系统执行
```mysql
ALTER DNODE <source-dnodeId> BALANCE "VNODE:<vgId>-DNODE:<dest-dnodeId>";
```
其中source-dnodeId是源dnodeId也就是待迁移的vnode所在的dnodeIDvgId可以通过SHOW VGROUPS获得列表的第一列dest-dnodeId是目标dnodeId。
<font color=green>**【注意】**</font>
- 只有在集群的自动负载均衡选项关闭时(balance设置为0),才允许手动迁移。
- 只有处于正常工作状态的vnode才能被迁移master/slave当处于offline/unsynced/syncing状态时是不能迁移的。
- 迁移前务必核实目标dnode的资源足够CPU、内存、硬盘。
### 查看数据节点 ### 查看数据节点
执行CLI程序taos,使用root账号登录进TDengine系统执行 执行CLI程序taos使用root账号登录进TDengine系统执行
```mysql
```
SHOW DNODES; SHOW DNODES;
``` ```
它将列出集群中所有的dnode,每个dnode的fqdn:port, 状态(ready, offline等vnode数目还未使用的vnode数目等信息。在添加或删除一个数据节点后可以使用该命令查看。 它将列出集群中所有的dnode每个dnode的IDend_point(fqdn:port)状态(ready, offline等vnode数目还未使用的vnode数目等信息。在添加或删除一个数据节点后可以使用该命令查看。
### 查看虚拟节点组 ### 查看虚拟节点组
为充分利用多核技术并提供scalability数据需要分片处理。因此TDengine会将一个DB的数据切分成多份存放在多个vnode里。这些vnode可能分布在多个数据节点dnode里这样就实现了水平扩展。一个vnode仅仅属于一个DB但一个DB可以有多个vnode。vnode的是mnode根据当前系统资源的情况自动进行分配的无需任何人工干预。 为充分利用多核技术并提供scalability数据需要分片处理。因此TDengine会将一个DB的数据切分成多份存放在多个vnode里。这些vnode可能分布在多个数据节点dnode里这样就实现了水平扩展。一个vnode仅仅属于一个DB但一个DB可以有多个vnode。vnode的是mnode根据当前系统资源的情况自动进行分配的无需任何人工干预。
执行CLI程序taos,使用root账号登录进TDengine系统执行 执行CLI程序taos使用root账号登录进TDengine系统执行
```mysql
```
SHOW VGROUPS; SHOW VGROUPS;
``` ```
@ -173,9 +189,9 @@ SHOW VGROUPS;
TDengine通过多副本的机制来提供系统的高可用性包括vnode和mnode的高可用性。 TDengine通过多副本的机制来提供系统的高可用性包括vnode和mnode的高可用性。
vnode的副本数是与DB关联的一个集群里可以有多个DB根据运营的需求每个DB可以配置不同的副本数。创建数据库时通过参数replica 指定副本数缺省为1。如果副本数为1系统的可靠性无法保证只要数据所在的节点宕机就将无法提供服务。集群的节点数必须大于等于副本数否则创建表时将返回错误more dnodes are needed"。比如下面的命令将创建副本数为3的数据库demo vnode的副本数是与DB关联的一个集群里可以有多个DB根据运营的需求每个DB可以配置不同的副本数。创建数据库时通过参数replica 指定副本数缺省为1。如果副本数为1系统的可靠性无法保证只要数据所在的节点宕机就将无法提供服务。集群的节点数必须大于等于副本数否则创建表时将返回错误"more dnodes are needed"。比如下面的命令将创建副本数为3的数据库demo
``` ```mysql
CREATE DATABASE demo replica 3; CREATE DATABASE demo replica 3;
``` ```
@ -183,20 +199,19 @@ CREATE DATABASE demo replica 3;
一个数据节点dnode里可能有多个DB的数据因此一个dnode离线时可能会影响到多个DB。如果一个vnode group里的一半或一半以上的vnode不工作那么该vnode group就无法对外服务无法插入或读取数据这样会影响到它所属的DB的一部分表的读写操作。 一个数据节点dnode里可能有多个DB的数据因此一个dnode离线时可能会影响到多个DB。如果一个vnode group里的一半或一半以上的vnode不工作那么该vnode group就无法对外服务无法插入或读取数据这样会影响到它所属的DB的一部分表的读写操作。
因为vnode的引入无法简单给出结论“集群中过半数据节点dnode工作集群就应该工作”。但是对于简单的情形很好下结论。比如副本数为3只有三个dnode那如果仅有一个节点不工作整个集群还是可以正常工作的但如果有两个数据节点不工作那整个集群就无法正常工作了。 因为vnode的引入无法简单给出结论“集群中过半数据节点dnode工作集群就应该工作”。但是对于简单的情形很好下结论。比如副本数为3只有三个dnode那如果仅有一个节点不工作整个集群还是可以正常工作的但如果有两个数据节点不工作那整个集群就无法正常工作了。
## <a class="anchor" id="mnode"></a>Mnode的高可用性 ## <a class="anchor" id="mnode"></a>Mnode的高可用性
TDengine集群是由mnode (taosd的一个模块管理节点) 负责管理的为保证mnode的高可用可以配置多个mnode副本副本数由系统配置参数numOfMnodes决定有效范围为1-3。为保证元数据的强一致性mnode副本之间是通过同步的方式进行数据复制的。 TDengine集群是由mnode (taosd的一个模块管理节点) 负责管理的为保证mnode的高可用可以配置多个mnode副本副本数由系统配置参数numOfMnodes决定有效范围为1-3。为保证元数据的强一致性mnode副本之间是通过同步的方式进行数据复制的。
一个集群有多个数据节点dnode, 但一个dnode至多运行一个mnode实例。多个dnode情况下哪个dnode可以作为mnode呢这是完全由系统根据整个系统资源情况自动指定的。用户可通过CLI程序taos在TDengine的console里执行如下命令 一个集群有多个数据节点dnode但一个dnode至多运行一个mnode实例。多个dnode情况下哪个dnode可以作为mnode呢这是完全由系统根据整个系统资源情况自动指定的。用户可通过CLI程序taos在TDengine的console里执行如下命令
``` ```mysql
SHOW MNODES; SHOW MNODES;
``` ```
来查看mnode列表该列表将列出mnode所处的dnode的End Point和角色(master, slave, unsynced 或offline)。 来查看mnode列表该列表将列出mnode所处的dnode的End Point和角色(master, slave, unsynced 或offline)。当集群中第一个数据节点启动时该数据节点一定会运行一个mnode实例否则该数据节点dnode无法正常工作因为一个系统是必须有至少一个mnode的。如果numOfMnodes配置为2启动第二个dnode时该dnode也将运行一个mnode实例。
当集群中第一个数据节点启动时该数据节点一定会运行一个mnode实例否则该数据节点dnode无法正常工作因为一个系统是必须有至少一个mnode的。如果numOfMnodes配置为2启动第二个dnode时该dnode也将运行一个mnode实例。
为保证mnode服务的高可用性numOfMnodes必须设置为2或更大。因为mnode保存的元数据必须是强一致的如果numOfMnodes大于2复制参数quorum自动设为2也就是说至少要保证有两个副本写入数据成功才通知客户端应用写入成功。 为保证mnode服务的高可用性numOfMnodes必须设置为2或更大。因为mnode保存的元数据必须是强一致的如果numOfMnodes大于2复制参数quorum自动设为2也就是说至少要保证有两个副本写入数据成功才通知客户端应用写入成功。
@ -210,7 +225,7 @@ SHOW MNODES;
- 当一个数据节点从集群中移除时,系统将自动把该数据节点上的数据转移到其他数据节点,无需任何人工干预。 - 当一个数据节点从集群中移除时,系统将自动把该数据节点上的数据转移到其他数据节点,无需任何人工干预。
- 如果一个数据节点过热数据量过大系统将自动进行负载均衡将该数据节点的一些vnode自动挪到其他节点。 - 如果一个数据节点过热数据量过大系统将自动进行负载均衡将该数据节点的一些vnode自动挪到其他节点。
当上述三种情况发生时,系统将启动各个数据节点的负载计算,从而决定如何挪动。 当上述三种情况发生时,系统将启动各个数据节点的负载计算,从而决定如何挪动。
**【提示】负载均衡由参数balance控制它决定是否启动自动负载均衡。** **【提示】负载均衡由参数balance控制它决定是否启动自动负载均衡。**
@ -225,7 +240,7 @@ SHOW MNODES;
## <a class="anchor" id="arbitrator"></a>Arbitrator的使用 ## <a class="anchor" id="arbitrator"></a>Arbitrator的使用
如果副本数为偶数,当一个 vnode group 里一半 vnode 不工作时,是无法从中选出 master 的。同理,一半 mnode 不工作时,是无法选出 mnode 的 master 的因为存在“split brain”问题。为解决这个问题TDengine 引入了 Arbitrator 的概念。Arbitrator 模拟一个 vnode 或 mnode 在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含 Arbitrator 在内,超过半数的 vnode 或 mnode 工作,那么该 vnode group 或 mnode 组就可以正常的提供数据插入或查询服务。比如对于副本数为 2 的情形,如果一个节点 A 离线,但另外一个节点 B 正常,而且能连接到 Arbitrator那么节点 B 就能正常工作。 如果副本数为偶数,当一个 vnode group 里一半或超过一半的 vnode 不工作时,是无法从中选出 master 的。同理,一半或超过一半的 mnode 不工作时,是无法选出 mnode 的 master 的因为存在“split brain”问题。为解决这个问题TDengine 引入了 Arbitrator 的概念。Arbitrator 模拟一个 vnode 或 mnode 在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含 Arbitrator 在内,超过半数的 vnode 或 mnode 工作,那么该 vnode group 或 mnode 组就可以正常的提供数据插入或查询服务。比如对于副本数为 2 的情形,如果一个节点 A 离线,但另外一个节点 B 正常,而且能连接到 Arbitrator那么节点 B 就能正常工作。
总之在目前版本下TDengine 建议在双副本环境要配置 Arbitrator以提升系统的可用性。 总之在目前版本下TDengine 建议在双副本环境要配置 Arbitrator以提升系统的可用性。
@ -235,3 +250,9 @@ Arbitrator 的执行程序名为 tarbitrator。该程序对系统资源几乎没
3. 修改每个 taosd 实例的配置文件,在 taos.cfg 里将参数 arbitrator 设置为 tarbitrator 程序所对应的 End Point。如果该参数配置了当副本数为偶数时系统将自动连接配置的 Arbitrator。如果副本数为奇数即使配置了 Arbitrator系统也不会去建立连接。 3. 修改每个 taosd 实例的配置文件,在 taos.cfg 里将参数 arbitrator 设置为 tarbitrator 程序所对应的 End Point。如果该参数配置了当副本数为偶数时系统将自动连接配置的 Arbitrator。如果副本数为奇数即使配置了 Arbitrator系统也不会去建立连接。
4. 在配置文件中配置了的 Arbitrator会出现在 `SHOW DNODES;` 指令的返回结果中,对应的 role 列的值会是“arb”。 4. 在配置文件中配置了的 Arbitrator会出现在 `SHOW DNODES;` 指令的返回结果中,对应的 role 列的值会是“arb”。
查看集群 Arbitrator 的状态【2.0.14.0 以后支持】
```mysql
SHOW DNODES;
```

View File

@ -953,6 +953,8 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
### 选择函数 ### 选择函数
在使用所有的选择函数的时候,可以同时指定输出 ts 列或标签列(包括 tbname这样就可以方便地知道被选出的值是源于哪个数据行的。
- **MIN** - **MIN**
```mysql ```mysql
SELECT MIN(field_name) FROM {tb_name | stb_name} [WHERE clause]; SELECT MIN(field_name) FROM {tb_name | stb_name} [WHERE clause];

View File

@ -657,9 +657,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
index = 0; index = 0;
sToken = tStrGetToken(*str, &index, false); sToken = tStrGetToken(*str, &index, false);
if (sToken.n == 0 || sToken.type != TK_RP) { if (sToken.n == 0 || sToken.type != TK_RP) {
tscSQLSyntaxErrMsg(pInsertParam->msg, ") expected", *str); return tscSQLSyntaxErrMsg(pInsertParam->msg, ") expected", *str);
code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return code;
} }
*str += index; *str += index;

View File

@ -5763,10 +5763,15 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
} }
int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) { int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) {
const char* msg0 = "only support order by primary timestamp"; const char* msg0 = "only one column allowed in orderby";
const char* msg1 = "invalid column name"; const char* msg1 = "invalid column name in orderby clause";
const char* msg2 = "order by primary timestamp, first tag or groupby column in groupby clause allowed"; const char* msg2 = "too many order by columns";
const char* msg3 = "invalid column in order by clause, only primary timestamp or first tag in groupby clause allowed"; const char* msg3 = "only primary timestamp/tbname/first tag in groupby clause allowed";
const char* msg4 = "only tag in groupby clause allowed in order clause";
const char* msg5 = "only primary timestamp/column in top/bottom function allowed as order column";
const char* msg6 = "only primary timestamp allowed as the second order column";
const char* msg7 = "only primary timestamp/column in groupby clause allowed as order column";
const char* msg8 = "only column in groupby clause allowed as order column";
setDefaultOrderInfo(pQueryInfo); setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
@ -5791,7 +5796,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
} }
} else { } else {
if (size > 2) { if (size > 2) {
return invalidOperationMsg(pMsgBuf, msg3); return invalidOperationMsg(pMsgBuf, msg2);
} }
} }
@ -5820,7 +5825,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
// it is a tag column // it is a tag column
if (pQueryInfo->groupbyExpr.columnInfo == NULL) { if (pQueryInfo->groupbyExpr.columnInfo == NULL) {
return invalidOperationMsg(pMsgBuf, msg2); return invalidOperationMsg(pMsgBuf, msg4);
} }
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0); SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0);
if (relTagIndex == pColIndex->colIndex) { if (relTagIndex == pColIndex->colIndex) {
@ -5867,7 +5872,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pExpr = tscExprGet(pQueryInfo, 1); pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(pMsgBuf, msg2); return invalidOperationMsg(pMsgBuf, msg5);
} }
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
@ -5906,7 +5911,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
} }
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(pMsgBuf, msg2); return invalidOperationMsg(pMsgBuf, msg6);
} else { } else {
tVariantListItem* p1 = taosArrayGet(pSortOrder, 1); tVariantListItem* p1 = taosArrayGet(pSortOrder, 1);
pQueryInfo->order.order = p1->sortOrder; pQueryInfo->order.order = p1->sortOrder;
@ -5928,7 +5933,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
} }
if (!validOrder) { if (!validOrder) {
return invalidOperationMsg(pMsgBuf, msg2); return invalidOperationMsg(pMsgBuf, msg7);
} }
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
@ -5937,11 +5942,13 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
} }
if (isTopBottomQuery(pQueryInfo)) { if (isTopBottomQuery(pQueryInfo)) {
bool validOrder = false;
SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo; SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo;
if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) { if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) {
SColIndex* pColIndex = taosArrayGet(columnInfo, 0); SColIndex* pColIndex = taosArrayGet(columnInfo, 0);
validOrder = (pColIndex->colIndex == index.columnIndex);
if (pColIndex->colIndex == index.columnIndex) {
return invalidOperationMsg(pMsgBuf, msg8);
}
} else { } else {
/* order of top/bottom query in interval is not valid */ /* order of top/bottom query in interval is not valid */
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0); SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
@ -5949,14 +5956,8 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pExpr = tscExprGet(pQueryInfo, 1); pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(pMsgBuf, msg2); return invalidOperationMsg(pMsgBuf, msg5);
} }
validOrder = true;
}
if (!validOrder) {
return invalidOperationMsg(pMsgBuf, msg2);
} }
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);

View File

@ -2404,8 +2404,8 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SColumn* x = taosArrayGetP(pNewQueryInfo->colList, index1); SColumn* x = taosArrayGetP(pNewQueryInfo->colList, index1);
tscColumnCopy(x, pCol); tscColumnCopy(x, pCol);
} else { } else {
SColumn *p = tscColumnClone(pCol); SSchema ss = {.type = (uint8_t)pCol->info.type, .bytes = pCol->info.bytes, .colId = (int16_t)pCol->columnIndex};
taosArrayPush(pNewQueryInfo->colList, &p); tscColumnListInsert(pNewQueryInfo->colList, pCol->columnIndex, pCol->tableUid, &ss);
} }
} }
} }

View File

@ -41,6 +41,7 @@ extern char tsArbitrator[];
extern int8_t tsArbOnline; extern int8_t tsArbOnline;
extern int64_t tsArbOnlineTimestamp; extern int64_t tsArbOnlineTimestamp;
extern int32_t tsDnodeId; extern int32_t tsDnodeId;
extern int64_t tsDnodeStartTime;
// common // common
extern int tsRpcTimer; extern int tsRpcTimer;

View File

@ -46,6 +46,7 @@ int8_t tsArbOnline = 0;
int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME; int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME;
char tsEmail[TSDB_FQDN_LEN] = {0}; char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsDnodeId = 0; int32_t tsDnodeId = 0;
int64_t tsDnodeStartTime = 0;
// common // common
int32_t tsRpcTimer = 300; int32_t tsRpcTimer = 300;

View File

@ -195,6 +195,7 @@ int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING); dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING);
moduleStart(); moduleStart();
tsDnodeStartTime = taosGetTimestampMs();
dnodeReportStep("TDengine", "initialized successfully", 1); dnodeReportStep("TDengine", "initialized successfully", 1);
dInfo("TDengine is initialized successfully"); dInfo("TDengine is initialized successfully");

View File

@ -42,6 +42,8 @@ int32_t main(int32_t argc, char *argv[]) {
} }
} else if (strcmp(argv[i], "-C") == 0) { } else if (strcmp(argv[i], "-C") == 0) {
dump_config = 1; dump_config = 1;
} else if (strcmp(argv[i], "--force-keep-file") == 0) {
tsdbForceKeepFile = true;
} else if (strcmp(argv[i], "--compact-mnode-wal") == 0) { } else if (strcmp(argv[i], "--compact-mnode-wal") == 0) {
tsCompactMnodeWal = 1; tsCompactMnodeWal = 1;
} else if (strcmp(argv[i], "-V") == 0) { } else if (strcmp(argv[i], "-V") == 0) {

View File

@ -471,6 +471,7 @@ typedef struct {
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag bool topBotQuery; // TODO used bitwise flag
bool interpQuery; // interp query or not
bool groupbyColumn; // denote if this is a groupby normal column query bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation bool timeWindowInterpo;// if the time window start/end required interpolation

View File

@ -41,9 +41,16 @@ typedef struct {
int64_t avail; int64_t avail;
} SFSMeta; } SFSMeta;
typedef struct {
int64_t size;
int64_t used;
int64_t free;
int16_t nAvailDisks; // # of Available disks
} STierMeta;
int tfsInit(SDiskCfg *pDiskCfg, int ndisk); int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
void tfsDestroy(); void tfsDestroy();
void tfsUpdateInfo(SFSMeta *pFSMeta); void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
void tfsGetMeta(SFSMeta *pMeta); void tfsGetMeta(SFSMeta *pMeta);
void tfsAllocDisk(int expLevel, int *level, int *id); void tfsAllocDisk(int expLevel, int *level, int *id);

File diff suppressed because it is too large Load Diff

View File

@ -253,6 +253,10 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
int32_t connId = htonl(pHBMsg->connId); int32_t connId = htonl(pHBMsg->connId);
SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort); SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (pConn == NULL) {
pHBMsg->pid = htonl(pHBMsg->pid);
pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName);
}
if (pConn == NULL) { if (pConn == NULL) {
// do not close existing links, otherwise // do not close existing links, otherwise

View File

@ -65,7 +65,14 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_MSG_NOT_PROCESSED; return TSDB_CODE_MND_MSG_NOT_PROCESSED;
} }
int32_t code = mnodeInitMsg(pMsg); int32_t code = grantCheck(TSDB_GRANT_TIME);
if (code != TSDB_CODE_SUCCESS) {
mError("msg:%p, app:%p type:%s not processed, reason:%s", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType],
tstrerror(code));
return code;
}
code = mnodeInitMsg(pMsg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("msg:%p, app:%p type:%s not processed, reason:%s", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], mError("msg:%p, app:%p type:%s not processed, reason:%s", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType],
tstrerror(code)); tstrerror(code));

View File

@ -28,8 +28,11 @@ typedef struct {
int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize); int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize);
int32_t taosGetCpuCores();
void taosGetSystemInfo(); void taosGetSystemInfo();
bool taosReadProcIO(int64_t* rchars, int64_t* wchars);
bool taosGetProcIO(float *readKB, float *writeKB); bool taosGetProcIO(float *readKB, float *writeKB);
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes);
bool taosGetBandSpeed(float *bandSpeedKb); bool taosGetBandSpeed(float *bandSpeedKb);
void taosGetDisk(); void taosGetDisk();
bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) ; bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) ;

View File

@ -164,6 +164,10 @@ void taosKillSystem() {
exit(0); exit(0);
} }
int32_t taosGetCpuCores() {
return sysconf(_SC_NPROCESSORS_ONLN);
}
void taosGetSystemInfo() { void taosGetSystemInfo() {
// taosGetProcInfos(); // taosGetProcInfos();
@ -185,12 +189,25 @@ void taosGetSystemInfo() {
taosGetSystemLocale(); taosGetSystemLocale();
} }
bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
if (rchars) *rchars = 0;
if (wchars) *wchars = 0;
return true;
}
bool taosGetProcIO(float *readKB, float *writeKB) { bool taosGetProcIO(float *readKB, float *writeKB) {
*readKB = 0; *readKB = 0;
*writeKB = 0; *writeKB = 0;
return true; return true;
} }
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
if (bytes) *bytes = 0;
if (rbytes) *rbytes = 0;
if (tbytes) *tbytes = 0;
return true;
}
bool taosGetBandSpeed(float *bandSpeedKb) { bool taosGetBandSpeed(float *bandSpeedKb) {
*bandSpeedKb = 0; *bandSpeedKb = 0;
return true; return true;

View File

@ -277,7 +277,7 @@ static void taosGetSystemLocale() { // get and set default locale
} }
} }
static int32_t taosGetCpuCores() { return (int32_t)sysconf(_SC_NPROCESSORS_ONLN); } int32_t taosGetCpuCores() { return (int32_t)sysconf(_SC_NPROCESSORS_ONLN); }
bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) { bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
static uint64_t lastSysUsed = 0; static uint64_t lastSysUsed = 0;
@ -332,7 +332,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
} }
} }
static bool taosGetCardInfo(int64_t *bytes) { bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
*bytes = 0; *bytes = 0;
FILE *fp = fopen(tsSysNetFile, "r"); FILE *fp = fopen(tsSysNetFile, "r");
if (fp == NULL) { if (fp == NULL) {
@ -347,9 +347,9 @@ static bool taosGetCardInfo(int64_t *bytes) {
while (!feof(fp)) { while (!feof(fp)) {
memset(line, 0, len); memset(line, 0, len);
int64_t rbytes = 0; int64_t o_rbytes = 0;
int64_t rpackts = 0; int64_t rpackts = 0;
int64_t tbytes = 0; int64_t o_tbytes = 0;
int64_t tpackets = 0; int64_t tpackets = 0;
int64_t nouse1 = 0; int64_t nouse1 = 0;
int64_t nouse2 = 0; int64_t nouse2 = 0;
@ -374,8 +374,10 @@ static bool taosGetCardInfo(int64_t *bytes) {
sscanf(line, sscanf(line,
"%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 "%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64
" %" PRId64, " %" PRId64,
nouse0, &rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &tbytes, &tpackets); nouse0, &o_rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &o_tbytes, &tpackets);
*bytes += (rbytes + tbytes); if (rbytes) *rbytes = o_rbytes;
if (tbytes) *tbytes = o_tbytes;
*bytes += (o_rbytes + o_tbytes);
} }
tfree(line); tfree(line);
@ -390,7 +392,7 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
int64_t curBytes = 0; int64_t curBytes = 0;
time_t curTime = time(NULL); time_t curTime = time(NULL);
if (!taosGetCardInfo(&curBytes)) { if (!taosGetCardInfo(&curBytes, NULL, NULL)) {
return false; return false;
} }
@ -420,7 +422,7 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
return true; return true;
} }
static bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) { bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
FILE *fp = fopen(tsProcIOFile, "r"); FILE *fp = fopen(tsProcIOFile, "r");
if (fp == NULL) { if (fp == NULL) {
uError("open file:%s failed", tsProcIOFile); uError("open file:%s failed", tsProcIOFile);
@ -441,10 +443,10 @@ static bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) {
break; break;
} }
if (strstr(line, "rchar:") != NULL) { if (strstr(line, "rchar:") != NULL) {
sscanf(line, "%s %" PRId64, tmp, readbyte); sscanf(line, "%s %" PRId64, tmp, rchars);
readIndex++; readIndex++;
} else if (strstr(line, "wchar:") != NULL) { } else if (strstr(line, "wchar:") != NULL) {
sscanf(line, "%s %" PRId64, tmp, writebyte); sscanf(line, "%s %" PRId64, tmp, wchars);
readIndex++; readIndex++;
} else { } else {
} }

View File

@ -115,7 +115,7 @@ static void taosGetSystemLocale() {
} }
} }
static int32_t taosGetCpuCores() { int32_t taosGetCpuCores() {
SYSTEM_INFO info; SYSTEM_INFO info;
GetSystemInfo(&info); GetSystemInfo(&info);
return (int32_t)info.dwNumberOfProcessors; return (int32_t)info.dwNumberOfProcessors;
@ -146,6 +146,13 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
} }
} }
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
if (bytes) *bytes = 0;
if (rbytes) *rbytes = 0;
if (tbytes) *tbytes = 0;
return true;
}
bool taosGetBandSpeed(float *bandSpeedKb) { bool taosGetBandSpeed(float *bandSpeedKb) {
*bandSpeedKb = 0; *bandSpeedKb = 0;
return true; return true;

View File

@ -0,0 +1,27 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTPMETRICSHANDLE_H
#define TDENGINE_HTTPMETRICSHANDLE_H
#include "http.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
void metricsInitHandle(HttpServer* httpServer);
bool metricsProcessRequest(struct HttpContext* httpContext);
#endif // TDENGINE_HTTPMETRICHANDLE_H

View File

@ -0,0 +1,184 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tfs.h"
#include "httpMetricsHandle.h"
#include "dnode.h"
#include "httpLog.h"
static HttpDecodeMethod metricsDecodeMethod = {"metrics", metricsProcessRequest};
void metricsInitHandle(HttpServer* pServer) {
httpAddMethod(pServer, &metricsDecodeMethod);
}
bool metricsProcessRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, user:%s, process admin grant msg", pContext, pContext->fd, pContext->user);
JsonBuf* jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) {
httpError("failed to allocate memory for metrics");
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY);
return false;
}
httpInitJsonBuf(jsonBuf, pContext);
httpWriteJsonBufHead(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
{
char* keyDisks = "tags";
httpJsonPairHead(jsonBuf, keyDisks, (int32_t)strlen(keyDisks));
httpJsonToken(jsonBuf, JsonArrStt);
{
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
char* keyTagName = "name";
char* keyTagValue = "value";
httpJsonPairOriginString(jsonBuf, keyTagName, (int32_t)strlen(keyTagName), "\"dnode_id\"",
(int32_t)strlen("\"dnode_id\""));
int32_t dnodeId = dnodeGetDnodeId();
httpJsonPairIntVal(jsonBuf, keyTagValue, (int32_t)strlen(keyTagValue), dnodeId);
httpJsonToken(jsonBuf, JsonObjEnd);
}
httpJsonToken(jsonBuf, JsonArrEnd);
}
{
if (tsDnodeStartTime != 0) {
int64_t now = taosGetTimestampMs();
int64_t upTime = now-tsDnodeStartTime;
char* keyUpTime = "up_time";
httpJsonPairInt64Val(jsonBuf, keyUpTime, (int32_t)strlen(keyUpTime), upTime);
}
}
{
int32_t cpuCores = taosGetCpuCores();
char* keyCpuCores = "cpu_cores";
httpJsonPairIntVal(jsonBuf, keyCpuCores, (int32_t)strlen(keyCpuCores), cpuCores);
float sysCpuUsage = 0;
float procCpuUsage = 0;
bool succeeded = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage);
if (!succeeded) {
httpError("failed to get cpu usage");
} else {
if (sysCpuUsage <= procCpuUsage) {
sysCpuUsage = procCpuUsage + 0.1f;
}
char* keyCpuSystem = "cpu_system";
char* keyCpuEngine = "cpu_engine";
httpJsonPairFloatVal(jsonBuf, keyCpuSystem, (int32_t)strlen(keyCpuSystem), sysCpuUsage);
httpJsonPairFloatVal(jsonBuf, keyCpuEngine, (int32_t)strlen(keyCpuEngine), procCpuUsage);
}
}
{
float sysMemoryUsedMB = 0;
bool succeeded = taosGetSysMemory(&sysMemoryUsedMB);
if (!succeeded) {
httpError("failed to get sys memory info");
} else {
char* keyMemSystem = "mem_system";
httpJsonPairFloatVal(jsonBuf, keyMemSystem, (int32_t)strlen(keyMemSystem), sysMemoryUsedMB);
}
float procMemoryUsedMB = 0;
succeeded = taosGetProcMemory(&procMemoryUsedMB);
if (!succeeded) {
httpError("failed to get proc memory info");
} else {
char* keyMemEngine = "mem_engine";
httpJsonPairFloatVal(jsonBuf, keyMemEngine, (int32_t)strlen(keyMemEngine), procMemoryUsedMB);
}
}
{
int64_t bytes = 0, rbytes = 0, tbytes = 0;
bool succeeded = taosGetCardInfo(&bytes, &rbytes, &tbytes);
if (!succeeded) {
httpError("failed to get network info");
} else {
char* keyNetIn = "net_in";
char* keyNetOut = "net_out";
httpJsonPairInt64Val(jsonBuf, keyNetIn, (int32_t)strlen(keyNetIn), rbytes);
httpJsonPairInt64Val(jsonBuf, keyNetOut, (int32_t)strlen(keyNetOut), tbytes);
}
}
{
int64_t rchars = 0;
int64_t wchars = 0;
bool succeeded = taosReadProcIO(&rchars, &wchars);
if (!succeeded) {
httpError("failed to get io info");
} else {
char* keyIORead = "io_read";
char* keyIOWrite = "io_write";
httpJsonPairInt64Val(jsonBuf, keyIORead, (int32_t)strlen(keyIORead), rchars);
httpJsonPairInt64Val(jsonBuf, keyIOWrite, (int32_t)strlen(keyIOWrite), wchars);
}
}
{
const int8_t numTiers = 3;
SFSMeta fsMeta;
STierMeta* tierMetas = calloc(numTiers, sizeof(STierMeta));
tfsUpdateInfo(&fsMeta, tierMetas, numTiers);
{
char* keyDiskUsed = "disk_used";
char* keyDiskTotal = "disk_total";
httpJsonPairInt64Val(jsonBuf, keyDiskTotal, (int32_t)strlen(keyDiskTotal), fsMeta.tsize);
httpJsonPairInt64Val(jsonBuf, keyDiskUsed, (int32_t)strlen(keyDiskUsed), fsMeta.used);
char* keyDisks = "disks";
httpJsonPairHead(jsonBuf, keyDisks, (int32_t)strlen(keyDisks));
httpJsonToken(jsonBuf, JsonArrStt);
for (int i = 0; i < numTiers; ++i) {
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
char* keyDataDirLevelUsed = "datadir_used";
char* keyDataDirLevelTotal = "datadir_total";
httpJsonPairInt64Val(jsonBuf, keyDataDirLevelUsed, (int32_t)strlen(keyDataDirLevelUsed), tierMetas[i].used);
httpJsonPairInt64Val(jsonBuf, keyDataDirLevelTotal, (int32_t)strlen(keyDataDirLevelTotal), tierMetas[i].size);
httpJsonToken(jsonBuf, JsonObjEnd);
}
httpJsonToken(jsonBuf, JsonArrEnd);
}
free(tierMetas);
}
{
SStatisInfo info = dnodeGetStatisInfo();
{
char* keyReqHttp = "req_http";
char* keyReqSelect = "req_select";
char* keyReqInsert = "req_insert";
httpJsonPairInt64Val(jsonBuf, keyReqHttp, (int32_t)strlen(keyReqHttp), info.httpReqNum);
httpJsonPairInt64Val(jsonBuf, keyReqSelect, (int32_t)strlen(keyReqSelect), info.queryReqNum);
httpJsonPairInt64Val(jsonBuf, keyReqInsert, (int32_t)strlen(keyReqInsert), info.submitReqNum);
}
}
httpJsonToken(jsonBuf, JsonObjEnd);
httpWriteJsonBufEnd(jsonBuf);
pContext->reqType = HTTP_REQTYPE_OTHERS;
httpFreeJsonBuf(pContext);
return false;
}

View File

@ -30,6 +30,7 @@
#include "httpGcHandle.h" #include "httpGcHandle.h"
#include "httpRestHandle.h" #include "httpRestHandle.h"
#include "httpTgHandle.h" #include "httpTgHandle.h"
#include "httpMetricsHandle.h"
#ifndef _ADMIN #ifndef _ADMIN
void adminInitHandle(HttpServer* pServer) {} void adminInitHandle(HttpServer* pServer) {}
@ -52,7 +53,7 @@ int32_t httpInitSystem() {
gcInitHandle(&tsHttpServer); gcInitHandle(&tsHttpServer);
tgInitHandle(&tsHttpServer); tgInitHandle(&tsHttpServer);
opInitHandle(&tsHttpServer); opInitHandle(&tsHttpServer);
metricsInitHandle(&tsHttpServer);
return 0; return 0;
} }

View File

@ -333,7 +333,9 @@ enum OPERATOR_TYPE_E {
OP_Distinct = 20, OP_Distinct = 20,
OP_Join = 21, OP_Join = 21,
OP_StateWindow = 22, OP_StateWindow = 22,
OP_Order = 23, OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
OP_Order = 25,
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
@ -561,11 +563,13 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);

View File

@ -39,7 +39,6 @@
#define GET_QID(_r) (((SQInfo*)((_r)->qinfo))->qId) #define GET_QID(_r) (((SQInfo*)((_r)->qinfo))->qId)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex) #define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.param[0].i64:1)
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr); int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr);
@ -60,6 +59,7 @@ SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t
void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr); void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr);
void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols); void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols);
int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
@ -70,7 +70,7 @@ static FORCE_INLINE char* getPosInResultPage(SQueryAttr* pQueryAttr, tFilePage*
int32_t offset) { int32_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL); assert(rowOffset >= 0 && pQueryAttr != NULL);
int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows; return ((char *)page->data) + rowOffset + offset * numOfRows;
} }

View File

@ -3708,27 +3708,59 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
} }
} else { } else {
// no data generated yet // no data generated yet
if (pCtx->size == 1) { if (pCtx->size < 1) {
return; return;
} }
// check the timestamp in input buffer // check the timestamp in input buffer
TSKEY skey = GET_TS_DATA(pCtx, 0); TSKEY skey = GET_TS_DATA(pCtx, 0);
TSKEY ekey = GET_TS_DATA(pCtx, 1);
// no data generated yet
if (!(skey < pCtx->startTs && ekey > pCtx->startTs)) {
return;
}
assert(pCtx->start.key == INT64_MIN && skey < pCtx->startTs && ekey > pCtx->startTs);
if (type == TSDB_FILL_PREV) { if (type == TSDB_FILL_PREV) {
if (skey > pCtx->startTs) {
return;
}
if (pCtx->size > 1) {
TSKEY ekey = GET_TS_DATA(pCtx, 1);
if (ekey > skey && ekey <= pCtx->startTs) {
skey = ekey;
}
}
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType); assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType);
} else if (type == TSDB_FILL_NEXT) { } else if (type == TSDB_FILL_NEXT) {
char* val = ((char*)pCtx->pInput) + pCtx->inputBytes; TSKEY ekey = skey;
char* val = NULL;
if (ekey < pCtx->startTs) {
if (pCtx->size > 1) {
ekey = GET_TS_DATA(pCtx, 1);
if (ekey < pCtx->startTs) {
return;
}
val = ((char*)pCtx->pInput) + pCtx->inputBytes;
} else {
return;
}
} else {
val = (char*)pCtx->pInput;
}
assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType); assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType);
} else if (type == TSDB_FILL_LINEAR) { } else if (type == TSDB_FILL_LINEAR) {
if (pCtx->size <= 1) {
return;
}
TSKEY ekey = GET_TS_DATA(pCtx, 1);
// no data generated yet
if (!(skey < pCtx->startTs && ekey > pCtx->startTs)) {
return;
}
assert(pCtx->start.key == INT64_MIN && skey < pCtx->startTs && ekey > pCtx->startTs);
char *start = GET_INPUT_DATA(pCtx, 0); char *start = GET_INPUT_DATA(pCtx, 0);
char *end = GET_INPUT_DATA(pCtx, 1); char *end = GET_INPUT_DATA(pCtx, 1);
@ -4030,12 +4062,15 @@ static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockD
pDist->maxRows = pSrc->maxRows; pDist->maxRows = pSrc->maxRows;
pDist->minRows = pSrc->minRows; pDist->minRows = pSrc->minRows;
int32_t numSteps = tsMaxRowsInFileBlock/TSDB_BLOCK_DIST_STEP_ROWS; int32_t maxSteps = TSDB_MAX_MAX_ROW_FBLOCK/TSDB_BLOCK_DIST_STEP_ROWS;
pDist->dataBlockInfos = taosArrayInit(numSteps, sizeof(SFileBlockInfo)); if (TSDB_MAX_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
taosArraySetSize(pDist->dataBlockInfos, numSteps); ++maxSteps;
}
pDist->dataBlockInfos = taosArrayInit(maxSteps, sizeof(SFileBlockInfo));
taosArraySetSize(pDist->dataBlockInfos, maxSteps);
} }
size_t steps = taosArrayGetSize(pDist->dataBlockInfos); size_t steps = taosArrayGetSize(pSrc->dataBlockInfos);
for (int32_t i = 0; i < steps; ++i) { for (int32_t i = 0; i < steps; ++i) {
int32_t srcNumBlocks = ((SFileBlockInfo*)taosArrayGet(pSrc->dataBlockInfos, i))->numBlocksOfStep; int32_t srcNumBlocks = ((SFileBlockInfo*)taosArrayGet(pSrc->dataBlockInfos, i))->numBlocksOfStep;
SFileBlockInfo* blockInfo = (SFileBlockInfo*)taosArrayGet(pDist->dataBlockInfos, i); SFileBlockInfo* blockInfo = (SFileBlockInfo*)taosArrayGet(pDist->dataBlockInfos, i);

View File

@ -447,6 +447,44 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
pResultRowInfo->capacity = (int32_t)newCapacity; pResultRowInfo->capacity = (int32_t)newCapacity;
} }
static bool chkResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) {
bool existed = false;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
SResultRow **p1 =
(SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
// in case of repeat scan/reverse scan, no new time window added.
if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists.
return p1 != NULL;
}
if (p1 != NULL) {
if (pResultRowInfo->size == 0) {
existed = false;
assert(pResultRowInfo->curPos == -1);
} else if (pResultRowInfo->size == 1) {
existed = (pResultRowInfo->pResult[0] == (*p1));
} else { // check if current pResultRowInfo contains the existed pResultRow
SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
int64_t* index = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
if (index != NULL) {
existed = true;
} else {
existed = false;
}
}
}
return existed;
}
return p1 != NULL;
}
static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid, static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) { char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) {
bool existed = false; bool existed = false;
@ -591,6 +629,35 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
return w; return w;
} }
// get the correct time window according to the handled timestamp
static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) {
STimeWindow w = {0};
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
getInitialStartTimeWindow(pQueryAttr, ts, &w);
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') {
w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1;
} else {
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
}
} else {
w = getResultRow(pResultRowInfo, pResultRowInfo->curPos)->win;
}
/*
* query border check, skey should not be bounded by the query time range, since the value skey will
* be used as the time window index value. So we only change ekey of time window accordingly.
*/
if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) {
w.ekey = pQueryAttr->window.ekey;
}
return w;
}
// a new buffer page for each table. Needs to opt this design // a new buffer page for each table. Needs to opt this design
static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, uint32_t size) { static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, uint32_t size) {
if (pWindowRes->pageId != -1) { if (pWindowRes->pageId != -1) {
@ -636,6 +703,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return 0; return 0;
} }
static bool chkWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
assert(win->skey <= win->ekey);
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
}
static int32_t setResultOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win, static int32_t setResultOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx, bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) { int32_t numOfOutput, int32_t* rowCellInfoOffset) {
@ -706,7 +781,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
} }
} }
assert(forwardStep > 0); assert(forwardStep >= 0);
return forwardStep; return forwardStep;
} }
@ -763,6 +838,8 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
pResultRowInfo->curPos = i + 1; // current not closed result object pResultRowInfo->curPos = i + 1; // current not closed result object
} }
} }
//pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey;
} }
static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQueryAttr* pQueryAttr, TSKEY lastKey) { static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQueryAttr* pQueryAttr, TSKEY lastKey) {
@ -812,7 +889,7 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
} }
} }
assert(num > 0); assert(num >= 0);
return num; return num;
} }
@ -970,6 +1047,11 @@ static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow *pNext
} }
} }
/* interp query with fill should not skip time window */
if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
return startPos;
}
/* /*
* This time window does not cover any data, try next time window, * This time window does not cover any data, try next time window,
* this case may happen when the time window is too small * this case may happen when the time window is too small
@ -1482,6 +1564,82 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
} }
static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = (int64_t*) pColDataInfo->pData;
assert(tsCols[0] == pSDataBlock->info.window.skey &&
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
}
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL;
int32_t forwardStep = 0;
int32_t ret = 0;
while (1) {
// null data, failed to allocate more memory buffer
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult,
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
TSKEY ekey = reviseWindowEkey(pQueryAttr, &win);
forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos);
if (startPos < 0) {
if (win.skey <= pQueryAttr->window.ekey) {
int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
startPos = pSDataBlock->info.rows - 1;
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
}
break;
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
if (pQueryAttr->timeWindowInterpo) {
int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0;
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
}
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current; STableQueryInfo* item = pRuntimeEnv->current;
@ -1978,6 +2136,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break; break;
} }
case OP_AllMultiTableTimeInterval: {
pRuntimeEnv->proot =
createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break;
}
case OP_TimeWindow: { case OP_TimeWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
@ -1987,6 +2151,15 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
break; break;
} }
case OP_AllTimeWindow: {
pRuntimeEnv->proot =
createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break;
}
case OP_Groupby: { case OP_Groupby: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
@ -2533,7 +2706,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t MIN_ROWS_PER_PAGE = 4; int32_t MIN_ROWS_PER_PAGE = 4;
*rowsize = (int32_t)(pQueryAttr->resultRowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); *rowsize = (int32_t)(pQueryAttr->resultRowSize * getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
int32_t overhead = sizeof(tFilePage); int32_t overhead = sizeof(tFilePage);
// one page contains at least two rows // one page contains at least two rows
@ -2910,6 +3083,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// check if this data block is required to load // check if this data block is required to load
if ((*status) != BLK_DATA_ALL_NEEDED) { if ((*status) != BLK_DATA_ALL_NEEDED) {
bool needFilter = true;
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
@ -2919,10 +3094,16 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId, if (pQueryAttr->pointInterpQuery) {
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); pTableScanInfo->rowCellInfoOffset);
} else {
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
} }
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
@ -2930,7 +3111,11 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pRuntimeEnv->current->groupIndex); pRuntimeEnv->current->groupIndex);
} }
(*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock); if (needFilter) {
(*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
} else {
(*status) = BLK_DATA_ALL_NEEDED;
}
} }
SDataBlockInfo* pBlockInfo = &pBlock->info; SDataBlockInfo* pBlockInfo = &pBlock->info;
@ -3439,7 +3624,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
// re-estabilish output buffer pointer. // re-estabilish output buffer pointer.
int32_t functionId = pBInfo->pCtx[i].functionId; int32_t functionId = pBInfo->pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[0].pOutput; pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
} }
} }
} }
@ -4540,6 +4725,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
pQueryAttr->tsdb = tsdb; pQueryAttr->tsdb = tsdb;
if (tsdb != NULL) { if (tsdb != NULL) {
int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery); int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -4832,6 +5018,9 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
tableBlockDist.numOfTables = (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables; tableBlockDist.numOfTables = (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
int32_t numRowSteps = tsMaxRowsInFileBlock / TSDB_BLOCK_DIST_STEP_ROWS; int32_t numRowSteps = tsMaxRowsInFileBlock / TSDB_BLOCK_DIST_STEP_ROWS;
if (tsMaxRowsInFileBlock % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
++numRowSteps;
}
tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo)); tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo));
taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps); taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps);
tableBlockDist.maxRows = INT_MIN; tableBlockDist.maxRows = INT_MIN;
@ -4948,7 +5137,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pCtx = pAggInfo->binfo.pCtx; pTableScanInfo->pCtx = pAggInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo; pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_TimeWindow) { } else if (pDownstream->operatorType == OP_TimeWindow || pDownstream->operatorType == OP_AllTimeWindow) {
STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info; STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
pTableScanInfo->pCtx = pIntervalInfo->pCtx; pTableScanInfo->pCtx = pIntervalInfo->pCtx;
@ -4962,7 +5151,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo; pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_MultiTableTimeInterval) { } else if (pDownstream->operatorType == OP_MultiTableTimeInterval || pDownstream->operatorType == OP_AllMultiTableTimeInterval) {
STableIntervalOperatorInfo *pInfo = pDownstream->info; STableIntervalOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->pCtx; pTableScanInfo->pCtx = pInfo->pCtx;
@ -5106,7 +5295,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
pInfo->resultRowFactor = pInfo->resultRowFactor =
(int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false)); (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false));
pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx
@ -5689,6 +5878,66 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
} }
static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pIntervalInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
hashAllIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pBlock, 0);
}
// restore the value
pQueryAttr->order.order = order;
pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pIntervalInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
}
static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
@ -5740,6 +5989,61 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
doCloseAllTimeWindow(pRuntimeEnv); doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pIntervalInfo->pRes;
}
static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pIntervalInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
// the pDataBlock are always the same one, no need to call this again
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
}
pOperator->status = OP_RES_TO_RETURN;
pQueryAttr->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
@ -6134,7 +6438,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfRows = (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
@ -6378,6 +6682,32 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "AllTimeIntervalAggOperator";
pOperator->operatorType = OP_AllTimeWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doAllIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator;
}
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
pInfo->colIndex = -1; pInfo->colIndex = -1;
@ -6400,7 +6730,6 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
@ -6452,6 +6781,32 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
return pOperator; return pOperator;
} }
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "AllMultiTableTimeIntervalOperator";
pOperator->operatorType = OP_AllMultiTableTimeInterval;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doAllSTableIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator;
}
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index pInfo->colIndex = -1; // group by column index
@ -6462,7 +6817,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
pQueryAttr->resultRowSize = (pQueryAttr->resultRowSize * pQueryAttr->resultRowSize = (pQueryAttr->resultRowSize *
(int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery))); (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);

View File

@ -206,6 +206,12 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
} else { } else {
assert(pFillInfo->currentKey == ts); assert(pFillInfo->currentKey == ts);
initBeforeAfterDataBuf(pFillInfo, prev); initBeforeAfterDataBuf(pFillInfo, prev);
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
initBeforeAfterDataBuf(pFillInfo, next);
++pFillInfo->index;
copyCurrentRowIntoBuf(pFillInfo, srcData, *next);
--pFillInfo->index;
}
// assign rows to dst buffer // assign rows to dst buffer
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
@ -227,6 +233,12 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
} else if (pFillInfo->type == TSDB_FILL_LINEAR) { } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
assignVal(output, src, pCol->col.bytes, pCol->col.type); assignVal(output, src, pCol->col.bytes, pCol->col.type);
memcpy(*prev + pCol->col.offset, src, pCol->col.bytes); memcpy(*prev + pCol->col.offset, src, pCol->col.bytes);
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
if (*next) {
assignVal(output, *next + pCol->col.offset, pCol->col.bytes, pCol->col.type);
} else {
setNull(output, pCol->col.type, pCol->col.bytes);
}
} else { } else {
assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
} }

View File

@ -566,10 +566,18 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
} }
} else if (pQueryAttr->interval.interval > 0) { } else if (pQueryAttr->interval.interval > 0) {
if (pQueryAttr->stableQuery) { if (pQueryAttr->stableQuery) {
op = OP_MultiTableTimeInterval; if (pQueryAttr->pointInterpQuery) {
op = OP_AllMultiTableTimeInterval;
} else {
op = OP_MultiTableTimeInterval;
}
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} else { } else {
op = OP_TimeWindow; if (pQueryAttr->pointInterpQuery) {
op = OP_AllTimeWindow;
} else {
op = OP_TimeWindow;
}
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) { if (pQueryAttr->pExpr2 != NULL) {
@ -577,7 +585,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) { if (pQueryAttr->fillType != TSDB_FILL_NONE) {
op = OP_Fill; op = OP_Fill;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }

View File

@ -30,6 +30,18 @@ typedef struct SCompSupporter {
int32_t order; int32_t order;
} SCompSupporter; } SCompSupporter;
int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable) {
if (pQueryAttr && (!stable)) {
for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM) {
return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64;
}
}
}
return 1;
}
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) { int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) {
int32_t size = 0; int32_t size = 0;

View File

@ -397,7 +397,11 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
SThreadObj *pThreadObj = pClientObj->pThreadObj[index]; SThreadObj *pThreadObj = pClientObj->pThreadObj[index];
SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
if (fd == (SOCKET)-1) return NULL;
#else
if (fd <= 0) return NULL; if (fd <= 0) return NULL;
#endif
struct sockaddr_in sin; struct sockaddr_in sin;
uint16_t localPort = 0; uint16_t localPort = 0;

View File

@ -65,12 +65,7 @@ SDisk *tfsFreeDisk(SDisk *pDisk);
int tfsUpdateDiskInfo(SDisk *pDisk); int tfsUpdateDiskInfo(SDisk *pDisk);
// ttier.c ====================================================== // ttier.c ======================================================
typedef struct {
int64_t size;
int64_t used;
int64_t free;
int16_t nAvailDisks; // # of Available disks
} STierMeta;
typedef struct STier { typedef struct STier {
pthread_spinlock_t lock; pthread_spinlock_t lock;
int level; int level;

View File

@ -101,7 +101,7 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
return -1; return -1;
} }
tfsUpdateInfo(NULL); tfsUpdateInfo(NULL, NULL, 0);
for (int level = 0; level < TFS_NLEVEL(); level++) { for (int level = 0; level < TFS_NLEVEL(); level++) {
tfsPosNextId(TFS_TIER_AT(level)); tfsPosNextId(TFS_TIER_AT(level));
} }
@ -119,7 +119,7 @@ void tfsDestroy() {
} }
} }
void tfsUpdateInfo(SFSMeta *pFSMeta) { void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) {
SFSMeta fsMeta; SFSMeta fsMeta;
STierMeta tierMeta; STierMeta tierMeta;
@ -130,11 +130,16 @@ void tfsUpdateInfo(SFSMeta *pFSMeta) {
memset(pFSMeta, 0, sizeof(*pFSMeta)); memset(pFSMeta, 0, sizeof(*pFSMeta));
for (int level = 0; level < TFS_NLEVEL(); level++) { for (int level = 0; level < TFS_NLEVEL(); level++) {
STierMeta *pTierMeta = &tierMeta;
if (tierMetas && level < numTiers) {
pTierMeta = tierMetas + level;
}
STier *pTier = TFS_TIER_AT(level); STier *pTier = TFS_TIER_AT(level);
tfsUpdateTierInfo(pTier, &tierMeta); tfsUpdateTierInfo(pTier, pTierMeta);
pFSMeta->tsize += tierMeta.size; pFSMeta->tsize += pTierMeta->size;
pFSMeta->avail += tierMeta.free; pFSMeta->avail += pTierMeta->free;
pFSMeta->used += tierMeta.used; pFSMeta->used += pTierMeta->used;
} }
tfsLock(); tfsLock();
@ -595,7 +600,7 @@ void taosGetDisk() {
SFSMeta fsMeta; SFSMeta fsMeta;
if (tscEmbedded) { if (tscEmbedded) {
tfsUpdateInfo(&fsMeta); tfsUpdateInfo(&fsMeta, NULL, 0);
tsTotalDataDirGB = (float)(fsMeta.tsize / unit); tsTotalDataDirGB = (float)(fsMeta.tsize / unit);
tsUsedDataDirGB = (float)(fsMeta.used / unit); tsUsedDataDirGB = (float)(fsMeta.used / unit);
tsAvailDataDirGB = (float)(fsMeta.avail / unit); tsAvailDataDirGB = (float)(fsMeta.avail / unit);

View File

@ -24,8 +24,7 @@ typedef struct STable {
tstr* name; // NOTE: there a flexible string here tstr* name; // NOTE: there a flexible string here
uint64_t suid; uint64_t suid;
struct STable* pSuper; // super table pointer struct STable* pSuper; // super table pointer
uint8_t numOfSchemas; SArray* schema;
STSchema* schema[TSDB_MAX_TABLE_SCHEMAS];
STSchema* tagSchema; STSchema* tagSchema;
SKVRow tagVal; SKVRow tagVal;
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
@ -107,10 +106,9 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
if (lock) TSDB_RLOCK_TABLE(pDTable); if (lock) TSDB_RLOCK_TABLE(pDTable);
if (_version < 0) { // get the latest version of schema if (_version < 0) { // get the latest version of schema
pTSchema = pDTable->schema[pDTable->numOfSchemas - 1]; pTSchema = *(STSchema **)taosArrayGetLast(pDTable->schema);
} else { // get the schema with version } else { // get the schema with version
void* ptr = taosbsearch(&_version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*), void* ptr = taosArraySearch(pDTable->schema, &_version, tsdbCompareSchemaVersion, TD_EQ);
tsdbCompareSchemaVersion, TD_EQ);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _exit; goto _exit;

View File

@ -37,6 +37,8 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired);
static int tsdbProcessExpiredFS(STsdbRepo *pRepo); static int tsdbProcessExpiredFS(STsdbRepo *pRepo);
static int tsdbCreateMeta(STsdbRepo *pRepo); static int tsdbCreateMeta(STsdbRepo *pRepo);
// For backward compatibility
bool tsdbForceKeepFile = false;
// ================== CURRENT file header info // ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
int tlen = 0; int tlen = 0;
@ -1048,6 +1050,26 @@ static int tsdbRestoreMeta(STsdbRepo *pRepo) {
return -1; return -1;
} }
if (tsdbForceKeepFile) {
struct stat tfstat;
// Get real file size
if (fstat(pfs->cstatus->pmf->fd, &tfstat) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseMFile(pfs->cstatus->pmf);
tfsClosedir(tdir);
regfree(&regex);
return -1;
}
if (pfs->cstatus->pmf->info.size != tfstat.st_size) {
int64_t tfsize = pfs->cstatus->pmf->info.size;
pfs->cstatus->pmf->info.size = tfstat.st_size;
tsdbInfo("vgId:%d file %s header size is changed from %" PRId64 " to %" PRId64, REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), tfsize, pfs->cstatus->pmf->info.size);
}
}
tsdbCloseMFile(pfs->cstatus->pmf); tsdbCloseMFile(pfs->cstatus->pmf);
} }
} else if (code == REG_NOMATCH) { } else if (code == REG_NOMATCH) {
@ -1212,6 +1234,24 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
return -1; return -1;
} }
if (tsdbForceKeepFile) {
struct stat tfstat;
// Get real file size
if (fstat(pDFile->fd, &tfstat) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosArrayDestroy(fArray);
return -1;
}
if (pDFile->info.size != tfstat.st_size) {
int64_t tfsize = pDFile->info.size;
pDFile->info.size = tfstat.st_size;
tsdbInfo("vgId:%d file %s header size is changed from %" PRId64 " to %" PRId64, REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pDFile), tfsize, pDFile->info.size);
}
}
tsdbCloseDFile(pDFile); tsdbCloseDFile(pDFile);
index++; index++;
} }

View File

@ -43,6 +43,8 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid); static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema); static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema);
static int tsdbAddSchema(STable *pTable, STSchema *pSchema);
static void tsdbFreeTableSchema(STable *pTable);
// ------------------ OUTER FUNCTIONS ------------------ // ------------------ OUTER FUNCTIONS ------------------
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
@ -722,17 +724,10 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1])); ASSERT(schemaVersion(pSchema) > schemaVersion(*(STSchema **)taosArrayGetLast(pCTable->schema)));
TSDB_WLOCK_TABLE(pCTable); TSDB_WLOCK_TABLE(pCTable);
if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) { tsdbAddSchema(pCTable, pSchema);
pCTable->schema[pCTable->numOfSchemas++] = pSchema;
} else {
ASSERT(pCTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS);
tdFreeSchema(pCTable->schema[0]);
memmove(pCTable->schema, pCTable->schema + 1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1));
pCTable->schema[pCTable->numOfSchemas - 1] = pSchema;
}
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema); if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema); if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
@ -828,9 +823,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
TABLE_TID(pTable) = -1; TABLE_TID(pTable) = -1;
TABLE_SUID(pTable) = -1; TABLE_SUID(pTable) = -1;
pTable->pSuper = NULL; pTable->pSuper = NULL;
pTable->numOfSchemas = 1; if (tsdbAddSchema(pTable, tdDupSchema(pCfg->schema)) < 0) {
pTable->schema[0] = tdDupSchema(pCfg->schema);
if (pTable->schema[0] == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
} }
@ -841,7 +834,8 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
} }
pTable->tagVal = NULL; pTable->tagVal = NULL;
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL, SL_ALLOW_DUP_KEY, getTagIndexKey); pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) { if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
@ -870,9 +864,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
} }
} else { } else {
TABLE_SUID(pTable) = -1; TABLE_SUID(pTable) = -1;
pTable->numOfSchemas = 1; if (tsdbAddSchema(pTable, tdDupSchema(pCfg->schema)) < 0) {
pTable->schema[0] = tdDupSchema(pCfg->schema);
if (pTable->schema[0] == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
} }
@ -906,9 +898,7 @@ static void tsdbFreeTable(STable *pTable) {
TABLE_UID(pTable)); TABLE_UID(pTable));
tfree(TABLE_NAME(pTable)); tfree(TABLE_NAME(pTable));
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) { tsdbFreeTableSchema(pTable);
tdFreeSchema(pTable->schema[i]);
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tdFreeSchema(pTable->tagSchema); tdFreeSchema(pTable->tagSchema);
@ -1260,9 +1250,10 @@ static int tsdbEncodeTable(void **buf, STable *pTable) {
tlen += taosEncodeFixedU64(buf, TABLE_SUID(pTable)); tlen += taosEncodeFixedU64(buf, TABLE_SUID(pTable));
tlen += tdEncodeKVRow(buf, pTable->tagVal); tlen += tdEncodeKVRow(buf, pTable->tagVal);
} else { } else {
tlen += taosEncodeFixedU8(buf, pTable->numOfSchemas); tlen += taosEncodeFixedU8(buf, (uint8_t)taosArrayGetSize(pTable->schema));
for (int i = 0; i < pTable->numOfSchemas; i++) { for (int i = 0; i < taosArrayGetSize(pTable->schema); i++) {
tlen += tdEncodeSchema(buf, pTable->schema[i]); STSchema *pSchema = taosArrayGetP(pTable->schema, i);
tlen += tdEncodeSchema(buf, pSchema);
} }
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
@ -1293,9 +1284,12 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
buf = taosDecodeFixedU64(buf, &TABLE_SUID(pTable)); buf = taosDecodeFixedU64(buf, &TABLE_SUID(pTable));
buf = tdDecodeKVRow(buf, &(pTable->tagVal)); buf = tdDecodeKVRow(buf, &(pTable->tagVal));
} else { } else {
buf = taosDecodeFixedU8(buf, &(pTable->numOfSchemas)); uint8_t nSchemas;
for (int i = 0; i < pTable->numOfSchemas; i++) { buf = taosDecodeFixedU8(buf, &nSchemas);
buf = tdDecodeSchema(buf, &(pTable->schema[i])); for (int i = 0; i < nSchemas; i++) {
STSchema *pSchema;
buf = tdDecodeSchema(buf, &pSchema);
tsdbAddSchema(pTable, pSchema);
} }
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
@ -1457,3 +1451,38 @@ static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema) {
return 0; return 0;
} }
static int tsdbAddSchema(STable *pTable, STSchema *pSchema) {
ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE);
if (pTable->schema == NULL) {
pTable->schema = taosArrayInit(TSDB_MAX_TABLE_SCHEMAS, sizeof(SSchema *));
if (pTable->schema == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
ASSERT(taosArrayGetSize(pTable->schema) == 0 ||
schemaVersion(pSchema) > schemaVersion(*(STSchema **)taosArrayGetLast(pTable->schema)));
if (taosArrayPush(pTable->schema, &pSchema) == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
static void tsdbFreeTableSchema(STable *pTable) {
ASSERT(pTable != NULL);
if (pTable->schema) {
for (size_t i = 0; i < taosArrayGetSize(pTable->schema); i++) {
STSchema *pSchema = taosArrayGetP(pTable->schema, i);
tdFreeSchema(pSchema);
}
taosArrayDestroy(pTable->schema);
}
}

View File

@ -2693,7 +2693,7 @@ static void destroyHelper(void* param) {
free(param); free(param);
} }
static bool loadBlockOfActiveTable(STsdbQueryHandle* pQueryHandle) { static bool loadBlockOfActiveTable(STsdbQueryHandle* pQueryHandle) {
if (pQueryHandle->checkFiles) { if (pQueryHandle->checkFiles) {
// check if the query range overlaps with the file data block // check if the query range overlaps with the file data block
bool exists = true; bool exists = true;

View File

@ -81,6 +81,7 @@ typedef struct {
extern SGlobalCfg tsGlobalConfig[]; extern SGlobalCfg tsGlobalConfig[];
extern int32_t tsGlobalConfigNum; extern int32_t tsGlobalConfigNum;
extern char * tsCfgStatusStr[]; extern char * tsCfgStatusStr[];
extern bool tsdbForceKeepFile;
void taosReadGlobalLogCfg(); void taosReadGlobalLogCfg();
bool taosReadGlobalCfg(); bool taosReadGlobalCfg();

View File

@ -0,0 +1,67 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import random
import string
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def genColList(self):
'''
generate column list
'''
col_list = list()
for i in range(1, 18):
col_list.append(f'c{i}')
return col_list
def genIncreaseValue(self, input_value):
'''
add ', 1' to end of value every loop
'''
value_list = list(input_value)
value_list.insert(-1, ", 1")
return ''.join(value_list)
def insertAlter(self):
'''
after each alter and insert, when execute 'select * from {tbname};' taosd will coredump
'''
tbname = ''.join(random.choice(string.ascii_letters.lower()) for i in range(7))
input_value = '(now, 1)'
tdSql.execute(f'create table {tbname} (ts timestamp, c0 int);')
tdSql.execute(f'insert into {tbname} values {input_value};')
for col in self.genColList():
input_value = self.genIncreaseValue(input_value)
tdSql.execute(f'alter table {tbname} add column {col} int;')
tdSql.execute(f'insert into {tbname} values {input_value};')
tdSql.query(f'select * from {tbname};')
tdSql.checkRows(18)
def run(self):
tdSql.prepare()
self.insertAlter()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -386,6 +386,7 @@ python3 ./test.py -f query/querySession.py
python3 test.py -f alter/alter_create_exception.py python3 test.py -f alter/alter_create_exception.py
python3 ./test.py -f insert/flushwhiledrop.py python3 ./test.py -f insert/flushwhiledrop.py
python3 ./test.py -f insert/schemalessInsert.py python3 ./test.py -f insert/schemalessInsert.py
python3 ./test.py -f alter/alterColMultiTimes.py
#======================p4-end=============== #======================p4-end===============

View File

@ -0,0 +1,3 @@
1
2
3
1 1
2 2
3 3

File diff suppressed because it is too large Load Diff

View File

@ -47,7 +47,6 @@ class TDTestCase:
else: else:
tdLog.info("taosd found in %s" % buildPath) tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath + "/build/bin/" binPath = buildPath + "/build/bin/"
# insert: create one or mutiple tables per sql and insert multiple rows per sql # insert: create one or mutiple tables per sql and insert multiple rows per sql
# insert data from a special timestamp # insert data from a special timestamp
# check stable stb0 # check stable stb0
@ -90,7 +89,6 @@ class TDTestCase:
os.system( os.system(
"%staosdemo -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestNanoDatabaseNow.json -y " % "%staosdemo -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestNanoDatabaseNow.json -y " %
binPath) binPath)
tdSql.execute("use nsdb2") tdSql.execute("use nsdb2")
tdSql.query("show stables") tdSql.query("show stables")
tdSql.checkData(0, 4, 100) tdSql.checkData(0, 4, 100)

View File

@ -0,0 +1,45 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "192.168.1.103",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 1,
"result_file": "1174.out",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 51,
"databases": [
{
"dbinfo": {
"name": "gdse",
"drop": "yes"
},
"super_tables": [{
"name": "model_1174",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "model_1174_",
"auto_create_table": "no",
"batch_create_tbl_num": 0,
"data_source": "sample",
"insert_mode": "stmt",
"insert_rate": 0,
"insert_rows": 2592000,
"interlace_rows": 1,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2021-05-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "tools/taosdemoAllTest/1174.csv",
"tags_file": "tools/taosdemoAllTest/1174-tag.csv",
"columns": [{"type": "FLOAT", "count": 109}, {"type": "INT", "count": 4}, {"type": "FLOAT", "count": 8}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 5}, {"type": "INT", "count": 47}, {"type": "BOOL", "count": 103}, {"type": "INT", "count": 2}, {"type": "TIMESTAMP", "count": 3}, {"type": "BOOL", "count": 28}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 6}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 7}, {"type": "BOOL", "count": 7}, {"type": "FLOAT", "count": 2}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 3}, {"type": "INT", "count": 3}, {"type": "BOOL", "count": 1}],
"tags": [{"type": "INT", "count": 1}]
}]
}]
}

View File

@ -0,0 +1,45 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "192.168.1.103",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 1,
"result_file": "1174.out",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 51,
"databases": [
{
"dbinfo": {
"name": "gdse",
"drop": "yes"
},
"super_tables": [{
"name": "model_1174",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "model_1174_",
"auto_create_table": "no",
"batch_create_tbl_num": 0,
"data_source": "sample",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_rows": 2592000,
"interlace_rows": 1,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2021-05-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "tools/taosdemoAllTest/1174.csv",
"tags_file": "tools/taosdemoAllTest/1174-tag.csv",
"columns": [{"type": "FLOAT", "count": 109}, {"type": "INT", "count": 4}, {"type": "FLOAT", "count": 8}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 5}, {"type": "INT", "count": 47}, {"type": "BOOL", "count": 103}, {"type": "INT", "count": 2}, {"type": "TIMESTAMP", "count": 3}, {"type": "BOOL", "count": 28}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 6}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 7}, {"type": "BOOL", "count": 7}, {"type": "FLOAT", "count": 2}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 3}, {"type": "INT", "count": 3}, {"type": "BOOL", "count": 1}],
"tags": [{"type": "INT", "count": 1}]
}]
}]
}

View File

@ -0,0 +1,45 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "192.168.1.103",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 1,
"result_file": "1174.out",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 51,
"databases": [
{
"dbinfo": {
"name": "gdse",
"drop": "yes"
},
"super_tables": [{
"name": "model_1174",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "model_1174_",
"auto_create_table": "no",
"batch_create_tbl_num": 0,
"data_source": "rand",
"insert_mode": "stmt",
"insert_rate": 0,
"insert_rows": 259200,
"interlace_rows": 1,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2021-05-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "tools/taosdemoAllTest/1174.csv",
"tags_file": "tools/taosdemoAllTest/1174-tag.csv",
"columns": [{"type": "FLOAT", "count": 109}, {"type": "INT", "count": 4}, {"type": "FLOAT", "count": 8}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 5}, {"type": "INT", "count": 47}, {"type": "BOOL", "count": 103}, {"type": "INT", "count": 2}, {"type": "TIMESTAMP", "count": 3}, {"type": "BOOL", "count": 28}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 6}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 7}, {"type": "BOOL", "count": 7}, {"type": "FLOAT", "count": 2}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 3}, {"type": "INT", "count": 3}, {"type": "BOOL", "count": 1}],
"tags": [{"type": "INT", "count": 1}]
}]
}]
}

View File

@ -0,0 +1,45 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "192.168.1.103",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 1,
"result_file": "1174.out",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 51,
"databases": [
{
"dbinfo": {
"name": "gdse",
"drop": "yes"
},
"super_tables": [{
"name": "model_1174",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "model_1174_",
"auto_create_table": "no",
"batch_create_tbl_num": 0,
"data_source": "sample",
"insert_mode": "stmt",
"insert_rate": 0,
"insert_rows": 259200,
"interlace_rows": 1,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2021-05-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "tools/taosdemoAllTest/1174.csv",
"tags_file": "tools/taosdemoAllTest/1174-tag.csv",
"columns": [{"type": "FLOAT", "count": 109}, {"type": "INT", "count": 4}, {"type": "FLOAT", "count": 8}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 5}, {"type": "INT", "count": 47}, {"type": "BOOL", "count": 103}, {"type": "INT", "count": 2}, {"type": "TIMESTAMP", "count": 3}, {"type": "BOOL", "count": 28}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 6}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 7}, {"type": "BOOL", "count": 7}, {"type": "FLOAT", "count": 2}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 3}, {"type": "INT", "count": 3}, {"type": "BOOL", "count": 1}],
"tags": [{"type": "INT", "count": 1}]
}]
}]
}

View File

@ -0,0 +1,45 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "192.168.1.103",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 1,
"result_file": "1174.out",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 51,
"databases": [
{
"dbinfo": {
"name": "gdse",
"drop": "yes"
},
"super_tables": [{
"name": "model_1174",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "model_1174_",
"auto_create_table": "no",
"batch_create_tbl_num": 0,
"data_source": "sample",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_rows": 259200,
"interlace_rows": 1,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2021-05-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "tools/taosdemoAllTest/1174.csv",
"tags_file": "tools/taosdemoAllTest/1174-tag.csv",
"columns": [{"type": "FLOAT", "count": 109}, {"type": "INT", "count": 4}, {"type": "FLOAT", "count": 8}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 5}, {"type": "INT", "count": 47}, {"type": "BOOL", "count": 103}, {"type": "INT", "count": 2}, {"type": "TIMESTAMP", "count": 3}, {"type": "BOOL", "count": 28}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 6}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 7}, {"type": "BOOL", "count": 7}, {"type": "FLOAT", "count": 2}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 3}, {"type": "INT", "count": 3}, {"type": "BOOL", "count": 1}],
"tags": [{"type": "INT", "count": 1}]
}]
}]
}

View File

@ -0,0 +1,92 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
# tdSql.init(conn.cursor(), logSql)
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
# insert: create one or mutiple tables per sql and insert multiple rows per sql
os.system("%staosdemo -f tools/taosdemoAllTest/stmt/1174-small-stmt-random.json -y " % binPath)
# sleep(60)
# os.system("%staosdemo -f tools/taosdemoAllTest/stmt/1174-small-taosc.json -y " % binPath)
# sleep(60)
# os.system("%staosdemo -f tools/taosdemoAllTest/stmt/1174-small-stmt.json -y " % binPath)
# sleep(60)
# os.system("%staosdemo -f tools/taosdemoAllTest/stmt/1174-large-taosc.json -y " % binPath)
# sleep(60)
# os.system("%staosdemo -f tools/taosdemoAllTest/stmt/1174-large-stmt.json -y " % binPath)
# tdSql.execute("use db")
# tdSql.query("select count (tbname) from stb0")
# tdSql.checkData(0, 0, 1000)
# tdSql.query("select count (tbname) from stb1")
# tdSql.checkData(0, 0, 1000)
# tdSql.query("select count(*) from stb00_0")
# tdSql.checkData(0, 0, 100)
# tdSql.query("select count(*) from stb0")
# tdSql.checkData(0, 0, 100000)
# tdSql.query("select count(*) from stb01_1")
# tdSql.checkData(0, 0, 200)
# tdSql.query("select count(*) from stb1")
# tdSql.checkData(0, 0, 200000)
testcaseFilename = os.path.split(__file__)[-1]
os.system("rm -rf ./insert_res.txt")
os.system("rm -rf tools/taosdemoAllTest/%s.sql" % testcaseFilename )
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -103,7 +103,6 @@ class TDTestCase:
os.system("cat subscribe_res0.txt* > all_subscribe_res0.txt") os.system("cat subscribe_res0.txt* > all_subscribe_res0.txt")
subTimes0 = self.subTimes("all_subscribe_res0.txt") subTimes0 = self.subTimes("all_subscribe_res0.txt")
print("pass")
self.assertCheck("all_subscribe_res0.txt",subTimes0 ,202) self.assertCheck("all_subscribe_res0.txt",subTimes0 ,202)

View File

@ -1050,6 +1050,27 @@ sql_error select min(c3) from m_fl_mt0 interval(10w) fill(value, 20)
sql_error select max(c3) from m_fl_mt0 interval(1n) fill(prev) sql_error select max(c3) from m_fl_mt0 interval(1n) fill(prev)
sql_error select min(c3) from m_fl_mt0 interval(1y) fill(value, 20) sql_error select min(c3) from m_fl_mt0 interval(1y) fill(value, 20)
sql create table nexttb1 (ts timestamp, f1 int);
sql insert into nexttb1 values ('2021-08-08 1:1:1', NULL);
sql insert into nexttb1 values ('2021-08-08 1:1:5', 3);
sql select last(*) from nexttb1 where ts >= '2021-08-08 1:1:1' and ts < '2021-08-08 1:1:10' interval(1s) fill(next);
if $rows != 9 then
return -1
endi
if $data00 != @21-08-08 01:01:01.000@ then
return -1
endi
if $data01 != @21-08-08 01:01:01.000@ then
return -1
endi
if $data02 != 3 then
return -1
endi
print =============== clear print =============== clear
#sql drop database $db #sql drop database $db
#sql show databases #sql show databases

View File

@ -1148,3 +1148,21 @@ endi
sql select derivative(test_column_alias_name, 1s, 0) from (select avg(k) test_column_alias_name from t1 interval(1s)); sql select derivative(test_column_alias_name, 1s, 0) from (select avg(k) test_column_alias_name from t1 interval(1s));
sql create table smeters (ts timestamp, current float, voltage int);
sql insert into smeters values ('2021-08-08 10:10:10', 10, 1);
sql insert into smeters values ('2021-08-08 10:10:12', 10, 2);
sql select stddev(voltage) from smeters where ts>='2021-08-08 10:10:10.000' and ts < '2021-08-08 10:10:20.000' and current=10 interval(1000a);
if $rows != 2 then
return -1
endi
if $data00 != @21-08-08 10:10:10.000@ then
return -1
endi
if $data10 != @21-08-08 10:10:12.000@ then
return -1
endi

View File

@ -55,6 +55,9 @@ while $i < $halfNum
endw endw
print ====== tables created print ====== tables created
sql create table ap1 (ts timestamp, pav float);
sql INSERT INTO ap1 VALUES ('2021-07-25 02:19:54.100',1) ('2021-07-25 02:19:54.200',2) ('2021-07-25 02:19:54.300',3) ('2021-07-25 02:19:56.500',4) ('2021-07-25 02:19:57.500',5) ('2021-07-25 02:19:57.600',6) ('2021-07-25 02:19:57.900',7) ('2021-07-25 02:19:58.100',8) ('2021-07-25 02:19:58.300',9) ('2021-07-25 02:19:59.100',10) ('2021-07-25 02:19:59.300',11) ('2021-07-25 02:19:59.500',12) ('2021-07-25 02:19:59.700',13) ('2021-07-25 02:19:59.900',14) ('2021-07-25 02:20:05.000', 20) ('2021-07-25 02:25:00.000', 10000);
run general/parser/interp_test.sim run general/parser/interp_test.sim
print ================== restart server to commit data into disk print ================== restart server to commit data into disk
@ -65,4 +68,4 @@ print ================== server restart completed
run general/parser/interp_test.sim run general/parser/interp_test.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT #system sh/exec.sh -n dnode1 -s stop -x SIGINT

File diff suppressed because it is too large Load Diff