From aff1068952ebac3537474f932268f01feff7a8a9 Mon Sep 17 00:00:00 2001 From: liu0x54 Date: Thu, 6 Feb 2020 01:58:43 +0800 Subject: [PATCH 01/11] add readme --- minidevops/README.MD | 218 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 minidevops/README.MD diff --git a/minidevops/README.MD b/minidevops/README.MD new file mode 100644 index 0000000000..bd2333763a --- /dev/null +++ b/minidevops/README.MD @@ -0,0 +1,218 @@ +# 一分钟快速搭建一个DevOps监控系统 +为了让更多的Devops领域的开发者快速体验TDengine的优秀特性,本文介绍了一种快速搭建Devops领域性能监控的demo,方便大家更方便的了解TDengine,并基于此文拓展Devops领域的应用。 +为了快速上手,本文用到的软件全部采用Docker容器方式部署,大家只需要安装Docker软件,就可以直接通过脚本运行所有软件,无需安装。这个Demo用到了以下Docker容器,都可以从Dockerhub上拉取相关镜像 +- tdengine/tdengine:1.6.4.5 TDengine开源版1.6.4.5.的镜像 +- tdengine/blm_telegraf:latest 用于telegraf写入TDengine的API,可以schemaless的将telegraf的数据写入TDengine +- tdengine/blm_prometheus:latest 用于Prometheus写入TDengine的API,可以schemaless的将Prometheus的数据写入TDengine +- grafana/grafana Grafana的镜像,一个广泛应用的开源可视化监控软件 +- telegraf:latest 一个广泛应用的开源数据采集程序 +- prom/prometheus:latest 一个广泛应用的k8s领域的开源数据采集程序 + +## 前提条件 +1. 一台linux服务器或运行linux操作系统的虚拟机或者运行MacOS的计算机 +2. 安装了Docker软件。Docker软件的安装方法请参考linux下安装Docker +3. sudo权限 +4. 下载本文用到的配置文件和脚本压缩包:[下载地址](http://www.taosdata.com/download/minidevops.tar.gz) + +压缩包下载下来后解压生成一个minidevops的文件夹,其结构如下 +```sh +minidevops$ tree +. +├── demodashboard.json +├── grafana +│   └── tdengine +│   ├── README.md +│   ├── css +│   │   └── query-editor.css +│   ├── datasource.js +│   ├── img +│   │   └── taosdata_logo.png +│   ├── module.js +│   ├── partials +│   │   ├── config.html +│   │   └── query.editor.html +│   ├── plugin.json +│   └── query_ctrl.js +├── prometheus +│   └── prometheus.yml +├── run.sh +└── telegraf + └── telegraf.conf +``` +`grafana`子文件夹里是TDengine的插件,用于在grafana中导入TDengine的数据源。 +`prometheus`子文件夹里是prometheus需要的配置文件。 +`run.sh`是运行脚本。 +`telegraf`子文件夹里是telegraf的配置文件。 +## 启动Docker镜像 +启动前,请确保系统里没有运行TDengine和Grafana,以及Telegraf和Prometheus,因为这些程序会占用docker所需的端口,造成脚本运行失败,建议先关闭这些程序。 +然后,只用在minidevops路径下执行 +```sh +sudo run.sh +``` +我们来看看`run.sh`里干了些什么: +```sh +#!/bin/bash + +LP=`pwd` + +#为了让脚本能够顺利执行,避免重复执行时出现错误, 首先将系统里所有docker容器停止了。请注意,如果该linux上已经运行了其他docker容器,也会被停止掉。 +docker rm -f `docker ps -a -q` + +#专门创建一个叫minidevops的虚拟网络,并指定了172.15.1.1~255这个地址段。 +docker network create --ip-range 172.15.1.255/24 --subnet 172.15.1.1/16 minidevops + +#启动grafana程序,并将tdengine插件文件所在路径绑定到容器中 +docker run -d --net minidevops --ip 172.15.1.11 -v $LP/grafana:/var/lib/grafana/plugins -p 3000:3000 grafana/grafana + +#启动tdengine的docker容器,并指定IP地址为172.15.1.6,绑定需要的端口 +docker run -d --net minidevops --ip 172.15.1.6 -p 6030:6030 -p 6020:6020 -p 6031:6031 -p 6032:6032 -p 6033:6033 -p 6034:6034 -p 6035:6035 -p 6036:6036 -p 6037:6037 -p 6038:6038 -p 6039:6039 tdengine/tdengine:1.6.4.5 + +#启动prometheus的写入代理程序,这个程序可以将prometheus发来的数据直接写入TDengine中,无需提前建立相关超级表和表,实现schemaless写入功能 +docker run -d --net minidevops --ip 172.15.1.7 -p 10203:10203 tdengine/blm_prometheus 172.15.1.6 + +#启动telegraf的写入代理程序,这个程序可以将telegraf发来的数据直接写入TDengine中,无需提前建立相关超级表和表,实现schemaless写入功能 +docker run -d --net minidevops --ip 172.15.1.8 -p 10202:10202 tdengine/blm_telegraf 172.15.1.6 + +#启动prometheus程序,并将配置文件所在路径绑定到容器中 +docker run -d --net minidevops --ip 172.15.1.9 -v $LP/prometheus:/etc/prometheus -p 9090:9090 prom/prometheus + +#启动telegraf程序,并将配置文件所在路径绑定到容器中 +docker run -d --net minidevops --ip 172.15.1.10 -v $LP/telegraf:/etc/telegraf -p 8092:8092 -p 8094:8094 -p 8125:8125 telegraf + +#通过Grafana的API,将TDengine配置成Grafana的datasources +curl -X POST http://localhost:3000/api/datasources --header "Content-Type:application/json" -u admin:admin -d '{"Name": "TDengine","Type": "tdengine","TypeLogoUrl": "public/plugins/tdengine/img/taosdata_logo.png","Access": "proxy","Url": "http://172.15.1.6:6020","BasicAuth": false,"isDefault": true,"jsonData": {},"readOnly": false}' + +#通过Grafana的API,配置一个示范的监控面板 +curl -X POST http://localhost:3000/api/dashboards/db --header "Content-Type:application/json" -u admin:admin -d '{"dashboard":{"annotations":{"list":[{"builtIn":1,"datasource":"-- Grafana --","enable":true,"hide":true,"iconColor":"rgba(0, 211, 255, 1)","name":"Annotations & Alerts","type":"dashboard"}]},"editable":true,"gnetId":null,"graphTooltip":0,"id":1,"links":[],"panels":[{"datasource":null,"gridPos":{"h":8,"w":6,"x":0,"y":0},"id":6,"options":{"fieldOptions":{"calcs":["mean"],"defaults":{"color":{"mode":"thresholds"},"links":[{"title":"","url":""}],"mappings":[],"max":100,"min":0,"thresholds":{"mode":"absolute","steps":[{"color":"green","value":null},{"color":"red","value":80}]},"unit":"percent"},"overrides":[],"values":false},"orientation":"auto","showThresholdLabels":false,"showThresholdMarkers":true},"pluginVersion":"6.6.0","targets":[{"refId":"A","sql":"select last_row(value) from telegraf.mem where field=\"used_percent\""}],"timeFrom":null,"timeShift":null,"title":"Memory used percent","type":"gauge"},{"aliasColors":{},"bars":false,"dashLength":10,"dashes":false,"datasource":null,"fill":1,"fillGradient":0,"gridPos":{"h":8,"w":12,"x":6,"y":0},"hiddenSeries":false,"id":8,"legend":{"avg":false,"current":false,"max":false,"min":false,"show":true,"total":false,"values":false},"lines":true,"linewidth":1,"nullPointMode":"null","options":{"dataLinks":[]},"percentage":false,"pointradius":2,"points":false,"renderer":"flot","seriesOverrides":[],"spaceLength":10,"stack":false,"steppedLine":false,"targets":[{"alias":"MEMUSED-PERCENT","refId":"A","sql":"select avg(value) from telegraf.mem where field=\"used_percent\" interval(1m)"}],"thresholds":[],"timeFrom":null,"timeRegions":[],"timeShift":null,"title":"Panel Title","tooltip":{"shared":true,"sort":0,"value_type":"individual"},"type":"graph","xaxis":{"buckets":null,"mode":"time","name":null,"show":true,"values":[]},"yaxes":[{"format":"short","label":null,"logBase":1,"max":null,"min":null,"show":true},{"format":"short","label":null,"logBase":1,"max":null,"min":null,"show":true}],"yaxis":{"align":false,"alignLevel":null}},{"datasource":null,"gridPos":{"h":9,"w":6,"x":0,"y":8},"id":10,"options":{"fieldOptions":{"calcs":["mean"],"defaults":{"mappings":[],"thresholds":{"mode":"absolute","steps":[{"color":"green","value":null}]},"unit":"percent"},"overrides":[],"values":false},"orientation":"auto","showThresholdLabels":false,"showThresholdMarkers":true},"pluginVersion":"6.6.0","targets":[{"alias":"CPU-SYS","refId":"A","sql":"select last_row(value) from telegraf.cpu where field=\"usage_system\""},{"alias":"CPU-IDLE","refId":"B","sql":"select last_row(value) from telegraf.cpu where field=\"usage_idle\""},{"alias":"CPU-USER","refId":"C","sql":"select last_row(value) from telegraf.cpu where field=\"usage_user\""}],"timeFrom":null,"timeShift":null,"title":"Panel Title","type":"gauge"},{"aliasColors":{},"bars":false,"dashLength":10,"dashes":false,"datasource":"TDengine","description":"General CPU monitor","fill":1,"fillGradient":0,"gridPos":{"h":9,"w":12,"x":6,"y":8},"hiddenSeries":false,"id":2,"legend":{"avg":false,"current":false,"max":false,"min":false,"show":true,"total":false,"values":false},"lines":true,"linewidth":1,"nullPointMode":"null","options":{"dataLinks":[]},"percentage":false,"pointradius":2,"points":false,"renderer":"flot","seriesOverrides":[],"spaceLength":10,"stack":false,"steppedLine":false,"targets":[{"alias":"CPU-USER","refId":"A","sql":"select avg(value) from telegraf.cpu where field=\"usage_user\" and cpu=\"cpu-total\" interval(1m)"},{"alias":"CPU-SYS","refId":"B","sql":"select avg(value) from telegraf.cpu where field=\"usage_system\" and cpu=\"cpu-total\" interval(1m)"},{"alias":"CPU-IDLE","refId":"C","sql":"select avg(value) from telegraf.cpu where field=\"usage_idle\" and cpu=\"cpu-total\" interval(1m)"}],"thresholds":[],"timeFrom":null,"timeRegions":[],"timeShift":null,"title":"CPU","tooltip":{"shared":true,"sort":0,"value_type":"individual"},"type":"graph","xaxis":{"buckets":null,"mode":"time","name":null,"show":true,"values":[]},"yaxes":[{"format":"short","label":null,"logBase":1,"max":null,"min":null,"show":true},{"format":"short","label":null,"logBase":1,"max":null,"min":null,"show":true}],"yaxis":{"align":false,"alignLevel":null}}],"refresh":"10s","schemaVersion":22,"style":"dark","tags":["demo"],"templating":{"list":[]},"time":{"from":"now-3h","to":"now"},"timepicker":{"refresh_intervals":["5s","10s","30s","1m","5m","15m","30m","1h","2h","1d"]},"timezone":"","title":"TDengineDashboardDemo","id":null,"uid":null,"version":0}}' +``` +执行以上脚本后,可以通过docker container ls命令来确认容器运行的状态: +```sh +$docker container ls +CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES +f875bd7d90d1 telegraf "/entrypoint.sh tele…" 6 hours ago Up 6 hours 0.0.0.0:8092->8092/tcp, 8092/udp, 0.0.0.0:8094->8094/tcp, 8125/udp, 0.0.0.0:8125->8125/tcp wonderful_antonelli +38ee2d5c3cb3 prom/prometheus "/bin/prometheus --c…" 6 hours ago Up 6 hours 0.0.0.0:9090->9090/tcp infallible_mestorf +1a1939386c07 tdengine/blm_telegraf "/root/blm_telegraf …" 6 hours ago Up 6 hours 0.0.0.0:10202->10202/tcp stupefied_hypatia +7063eb05caa4 tdengine/blm_prometheus "/root/blm_prometheu…" 6 hours ago Up 6 hours 0.0.0.0:10203->10203/tcp jovial_feynman +4a7b27931d21 tdengine/tdengine:1.6.4.5 "taosd" 6 hours ago Up 6 hours 0.0.0.0:6020->6020/tcp, 0.0.0.0:6030-6039->6030-6039/tcp, 6040-6050/tcp eager_kowalevski +ad2895760bc0 grafana/grafana "/run.sh" 6 hours ago Up 6 hours 0.0.0.0:3000->3000/tcp romantic_mccarthy +``` +当以上几个容器都已正常运行后,则我们的demo小系统已经开始工作了。 +## Grafana中进行配置 +打开浏览器,在地址栏输入服务器所在的IP地址 +`http://localhost:3000` +就可以访问到grafana的页面,如果不在本机打开浏览器,则将localhost改成server的ip地址即可。 +进入登录页面,用户名和密码都是缺省的admin,输入后,即可进入grafana的控制台输入用户名/密码后,会进入修改密码页面,选择skip,跳过这一步。进入Grafana后,可以在页面的左下角看到TDengineDashboardDemo已经创建好了,![](https://www.taosdata.com/blog/wp-content/uploads/2020/02/image2020-2-1_22-50-58-1024x465.png)对于有些浏览器打开时,可能会在home页面中没有TDengineDashboardDemo的选项,可以通过在Dashboard->Manage中选择![](https://www.taosdata.com/blog/wp-content/uploads/2020/02/2-1024x553.png)TDengineDashboardDemo。点击TDengineDashboardDemo进入示例监控面板。刚点进去页面时,监控曲线是空白的,因为监控数据还不够多,需要等待一段时间,让数据采集程序采集更多的数据。![](https://www.taosdata.com/blog/wp-content/uploads/2020/02/image-5-1024x853.png) + +如上两个监控面板分别监控了CPU和内存占用率。点击面板上的标题可以选择Edit进入编辑界面,新增监控数据。关于Grafana的监控面板设置,可以详细参考Grafana官网文档[Getting Started](https://grafana.com/docs/grafana/latest/guides/getting_started/)。 + +## 原理介绍 +按上面的操作,我们已经将监控系统搭建起来了,目前可以监控系统的CPU占有率了。下面介绍下这个Demo系统的工作原理。 +如下图所示,这个系统由数据采集功能(prometheus,telegraf),时序数据库功能(TDengine和适配程序),可视化功能(Grafana)组成。下面虚线框里的TDengine,blm_prometheus, blm_telegraf三个容器组成了一个schemaless写入的时序数据库,对于采用telegraf和prometheus作为采集程序的监控对象,可以直接将数据写入TDengine,并通过grafana进行可视化呈现。 +![architecture](https://www.taosdata.com/blog/wp-content/uploads/2020/02/image2020-1-29_21-22-6.png) +### 数据采集 +数据采集由Telegraf和Prometheus完成。Telegraf根据配置,从操作系统层面采集系统的相关统计值,并按配置上报给指定的URL,上报的数据json格式为 +```json +{ + "fields":{ + "usage_guest":0, + "usage_guest_nice":0, + "usage_idle":87.73726273726274, + "usage_iowait":0, + "usage_irq":0, + "usage_nice":0, + "usage_softirq":0, + "usage_steal":0, + "usage_system":2.6973026973026974, + "usage_user":9.565434565434565 + }, + "name":"cpu", + "tags":{ + "cpu":"cpu-total", + "host":"liutaodeMacBook-Pro.local" + }, + "timestamp":1571665100 +} +``` +其中name将被作为超级表的表名,tags作为普通表的tags,fields的名称也会作为一个tag用来描述普通表的标签。举个例子,一个普通表的结构如下,这是一个存储usage_softirq数据的普通表。 +![表结构](https://www.taosdata.com/blog/wp-content/uploads/2020/02/image2020-1-29_21-38-24.png) + +### Telegraf的配置 +对于使用telegraf作为数据采集程序的监控对象,可以在telegraf的配置文件telegraf.conf中将outputs.http部分的配置按以下配置修改,就可以直接将数据写入TDengine中了 +```toml +[[outputs.http]] +# ## URL is the address to send metrics to +url = "http://172.15.1.8:10202/telegraf" +# +# ## HTTP Basic Auth credentials +# # username = "username" +# # password = "pa$$word" +# + +data_format = "json" +json_timestamp_units = "1ms" +``` +可以打开HTTP basic Auth验证机制,本Demo为了简化没有打开验证功能。 +对于多个被监控对象,只需要在telegraf.conf文件中都写上以上的配置内容,就可以将数据写入TDengine中了。 + +### Telegraf数据在TDengine中的存储结构 +Telegraf的数据在TDengine中的存储,是以数据name为超级表名,以tags值加上监控对象的ip地址,以及field的属性名作为tag值,存入TDengine中的。 +以name为cpu的数据为例,telegraf产生的数据为: +```json +{ + "fields":{ + "usage_guest":0, + "usage_guest_nice":0, + "usage_idle":87.73726273726274, + "usage_iowait":0, + "usage_irq":0, + "usage_nice":0, + "usage_softirq":0, + "usage_steal":0, + "usage_system":2.6973026973026974, + "usage_user":9.565434565434565 + }, + "name":"cpu", + "tags":{ + "cpu":"cpu-total", + "host":"liutaodeMacBook-Pro.local" + }, + "timestamp":1571665100 +} +``` +则写入TDengine时会自动存入一个名为cpu的超级表中,这个表的结构如下 +![telegraf表结构](https://www.taosdata.com/blog/wp-content/uploads/2020/02/image2020-2-2_0-37-49.png) +这个超级表的tag字段有cpu,host,srcip,field;其中cpu,host是原始数据携带的tag,而srcip是监控对象的IP地址,field是监控对象cpu类型数据中的fields属性,取值空间为[usage_guest,usage_guest_nice,usage_idle,usage_iowait,usage_irq,usage_nice,usage_softirq,usage_steal,usage_system,usage_user],每个field值对应着一个具体含义的数据。 + +因此,在查询的时候,可以用这些tag来过滤数据,也可以用超级表来聚合数据。 +### Prometheus的配置 +对于使用Prometheus作为数据采集程序的监控对象,可以在Prometheus的配置文件prometheus.yaml文件中,将remote write部分的配置按以下配置修改,就可以直接将数据写入TDengine中了。 +```yaml +remote_write: + - url: "http://172.15.1.7:10203/receive" +``` +对于多个被监控对象,只需要在每个被监控对象的prometheus配置中增加以上配置内容,就可以将数据写入TDengine中了。 +### Prometheus数据在TDengine中的存储结构 +Prometheus的数据在TDengine中的存储,与telegraf类似,也是以数据的name字段为超级表名,以数据的label作为tag值,存入TDengine中 +以prometheus_engine_queries这个数据为例[prom表结构](https://www.taosdata.com/blog/wp-content/uploads/2020/02/image2020-2-2_0-51-4.png) +在TDengine中会自动创建一个prometheus_engine_queries的超级表,tag字段为t_instance,t_job,t_monitor。 +查询时,可以用这些tag来过滤数据,也可以用超级表来聚合数据。 + +## 数据查询 +我们可以登陆到TDengine的客户端命令,通过命令行看看TDengine里面都存储了些什么数据,顺便也能体验一下TDengine的高性能查询。如何才能登陆到TDengine的客户端,我们可以通过以下几步来完成。 +首先通过下面的命令查询一下tdengine的Docker ID +```sh +docker container ls +``` +然后再执行 +```sh +docker exec -it tdengine的containerID bash +``` +就可以进入TDengine容器的命令行,执行taos,就进入以下界面![](https://www.taosdata.com/blog/wp-content/uploads/2020/02/image2020-1-29_21-55-53.png) +Telegraf的数据写入时,自动创建了一个名为telegraf的database,可以通过 +``` +use telegraf; +``` +使用telegraf这个数据库。然后执行show tables,describe table等命令详细查询下telegraf这个库里保存了些什么数据。 +具体TDengine的查询语句可以参考[TDengine官方文档](https://www.taosdata.com/cn/documentation/taos-sql/) +## 接入多个监控对象 +就像前面原理介绍的,这个miniDevops的小系统,已经提供了一个时序数据库和可视化系统,对于多台机器的监控,只需要将每台机器的telegraf或prometheus配置按上面所述修改,就可以完成监控数据采集和可视化呈现了。 \ No newline at end of file From dafdb50fcf338daea483eab8830a84a4edffc3d6 Mon Sep 17 00:00:00 2001 From: tanxuefeng <1172915550@qq.com> Date: Mon, 10 Feb 2020 20:48:48 +0800 Subject: [PATCH 02/11] atomic type error --- src/rpc/src/trpc.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index 9e0b5dab0f..87506861b1 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -239,8 +239,8 @@ char *taosBuildReqHeader(void *param, char type, char *msg) { pHeader->spi = 0; pHeader->tcp = 0; pHeader->encrypt = 0; - pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1); - if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1); + pHeader->tranId = atomic_add_fetch_16(&pConn->tranId, 1); + if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_16(&pConn->tranId, 1); pHeader->sourceId = pConn->ownId; pHeader->destId = pConn->peerId; From 047acd91af70c358d44a1751cb2f761e294de47c Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 11 Feb 2020 17:06:59 +0800 Subject: [PATCH 03/11] fix tpercentile link issue on Mac (and Windows) --- src/util/CMakeLists.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 76664ef9ec..d8f74f46f4 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -55,6 +55,7 @@ ELSEIF (TD_WINDOWS_64) LIST(APPEND SRC ./src/tmempool.c) LIST(APPEND SRC ./src/tmodule.c) LIST(APPEND SRC ./src/tnote.c) + LIST(APPEND SRC ./src/tpercentile.c) LIST(APPEND SRC ./src/tsched.c) LIST(APPEND SRC ./src/tskiplist.c) LIST(APPEND SRC ./src/tsocket.c) @@ -90,6 +91,7 @@ ELSEIF(TD_DARWIN_64) LIST(APPEND SRC ./src/tmempool.c) LIST(APPEND SRC ./src/tmodule.c) LIST(APPEND SRC ./src/tnote.c) + LIST(APPEND SRC ./src/tpercentile.c) LIST(APPEND SRC ./src/tsched.c) LIST(APPEND SRC ./src/tskiplist.c) LIST(APPEND SRC ./src/tsocket.c) From f455f9c280a2f1add012fc6c88bfaec4a54b3217 Mon Sep 17 00:00:00 2001 From: lihui Date: Wed, 12 Feb 2020 11:35:14 +0800 Subject: [PATCH 04/11] [#1212] --- documentation/webdocs/markdowndocs/Super Table-ch.md | 2 +- documentation/webdocs/markdowndocs/Super Table.md | 2 +- documentation/webdocs/markdowndocs/advanced features-ch.md | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/documentation/webdocs/markdowndocs/Super Table-ch.md b/documentation/webdocs/markdowndocs/Super Table-ch.md index e75a8d46c3..524fb51b17 100644 --- a/documentation/webdocs/markdowndocs/Super Table-ch.md +++ b/documentation/webdocs/markdowndocs/Super Table-ch.md @@ -72,7 +72,7 @@ STable从属于库,一个STable只属于一个库,但一个库可以有一 DROP TABLE ``` - Note: 删除STable不会级联删除通过STable创建的表;相反删除STable时要求通过该STable创建的表都已经被删除。 + Note: 删除STable时,所有通过该STable创建的表都将被删除。 - 查看属于某STable并满足查询条件的表 diff --git a/documentation/webdocs/markdowndocs/Super Table.md b/documentation/webdocs/markdowndocs/Super Table.md index 609dd11bd2..efc95c5f79 100644 --- a/documentation/webdocs/markdowndocs/Super Table.md +++ b/documentation/webdocs/markdowndocs/Super Table.md @@ -142,7 +142,7 @@ It lists the STable's schema and tags DROP TABLE ``` -To delete a STable, all the tables created via this STable shall be deleted first, otherwise, it will fail. +To delete a STable, all the tables created via this STable will be deleted first. ### List the Associated Tables of a STable diff --git a/documentation/webdocs/markdowndocs/advanced features-ch.md b/documentation/webdocs/markdowndocs/advanced features-ch.md index 4d01eaf364..cd7950b0ed 100644 --- a/documentation/webdocs/markdowndocs/advanced features-ch.md +++ b/documentation/webdocs/markdowndocs/advanced features-ch.md @@ -76,7 +76,7 @@ TDengine通过查询函数向用户提供毫秒级的数据获取能力。直接 TDengine分配固定大小的内存空间作为缓存空间,缓存空间可根据应用的需求和硬件资源配置。通过适当的设置缓存空间,TDengine可以提供极高性能的写入和查询的支持。TDengine中每个虚拟节点(virtual node)创建时分配独立的缓存池。每个虚拟节点管理自己的缓存池,不同虚拟节点间不共享缓存池。每个虚拟节点内部所属的全部表共享该虚拟节点的缓存池。 -TDengine将内存池按块划分进行管理,数据在内存块里按照列式存储。一个vnode的内存池是在vnode创建时按块分配好的,而且每个内存块按照先进先出的原则进行管理。一张表所需要的内存块是从vnode的内存池中进行分配的,块的大小由系统配置参数cache决定。每张表最大内存块的数目由配置参数tblocks决定,每张表平均的内存块的个数由配置参数ablocks决定。因此对于一个vnode, 总的内存大小为: cache*ablocks*tables。内存块参数cache不宜过小,一个cache block需要能存储至少几十条以上记录,才会有效率。参数ablocks最小为2,保证每张表平均至少能分配两个内存块。 +TDengine将内存池按块划分进行管理,数据在内存块里按照列式存储。一个vnode的内存池是在vnode创建时按块分配好的,而且每个内存块按照先进先出的原则进行管理。一张表所需要的内存块是从vnode的内存池中进行分配的,块的大小由系统配置参数cache决定。每张表最大内存块的数目由配置参数tblocks决定,每张表平均的内存块的个数由配置参数ablocks决定。因此对于一个vnode, 总的内存大小为: cache`*ablocks`*tables。内存块参数cache不宜过小,一个cache block需要能存储至少几十条以上记录,才会有效率。参数ablocks最小为2,保证每张表平均至少能分配两个内存块。 你可以通过函数last_row快速获取一张表或一张超级表的最后一条记录,这样很便于在大屏显示各设备的实时状态或采集值。例如: From 2bb78227e2d43cb6c2b207ae542e3f8ac9dad03d Mon Sep 17 00:00:00 2001 From: lihui Date: Wed, 12 Feb 2020 11:40:11 +0800 Subject: [PATCH 05/11] [#1212] --- documentation/webdocs/markdowndocs/advanced features-ch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/webdocs/markdowndocs/advanced features-ch.md b/documentation/webdocs/markdowndocs/advanced features-ch.md index cd7950b0ed..7b8cf974d1 100644 --- a/documentation/webdocs/markdowndocs/advanced features-ch.md +++ b/documentation/webdocs/markdowndocs/advanced features-ch.md @@ -76,7 +76,7 @@ TDengine通过查询函数向用户提供毫秒级的数据获取能力。直接 TDengine分配固定大小的内存空间作为缓存空间,缓存空间可根据应用的需求和硬件资源配置。通过适当的设置缓存空间,TDengine可以提供极高性能的写入和查询的支持。TDengine中每个虚拟节点(virtual node)创建时分配独立的缓存池。每个虚拟节点管理自己的缓存池,不同虚拟节点间不共享缓存池。每个虚拟节点内部所属的全部表共享该虚拟节点的缓存池。 -TDengine将内存池按块划分进行管理,数据在内存块里按照列式存储。一个vnode的内存池是在vnode创建时按块分配好的,而且每个内存块按照先进先出的原则进行管理。一张表所需要的内存块是从vnode的内存池中进行分配的,块的大小由系统配置参数cache决定。每张表最大内存块的数目由配置参数tblocks决定,每张表平均的内存块的个数由配置参数ablocks决定。因此对于一个vnode, 总的内存大小为: cache`*ablocks`*tables。内存块参数cache不宜过小,一个cache block需要能存储至少几十条以上记录,才会有效率。参数ablocks最小为2,保证每张表平均至少能分配两个内存块。 +TDengine将内存池按块划分进行管理,数据在内存块里按照列式存储。一个vnode的内存池是在vnode创建时按块分配好的,而且每个内存块按照先进先出的原则进行管理。一张表所需要的内存块是从vnode的内存池中进行分配的,块的大小由系统配置参数cache决定。每张表最大内存块的数目由配置参数tblocks决定,每张表平均的内存块的个数由配置参数ablocks决定。因此对于一个vnode, 总的内存大小为: `cache * ablocks * tables`<200b>。内存块参数cache不宜过小,一个cache block需要能存储至少几十条以上记录,才会有效率。参数ablocks最小为2,保证每张表平均至少能分配两个内存块。 你可以通过函数last_row快速获取一张表或一张超级表的最后一条记录,这样很便于在大屏显示各设备的实时状态或采集值。例如: From 7e05c1bd8b51cd422014c7c4672da916beec0119 Mon Sep 17 00:00:00 2001 From: lihui Date: Wed, 12 Feb 2020 11:44:36 +0800 Subject: [PATCH 06/11] [#1212] --- documentation/webdocs/markdowndocs/advanced features-ch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/webdocs/markdowndocs/advanced features-ch.md b/documentation/webdocs/markdowndocs/advanced features-ch.md index 7b8cf974d1..fc229e71e6 100644 --- a/documentation/webdocs/markdowndocs/advanced features-ch.md +++ b/documentation/webdocs/markdowndocs/advanced features-ch.md @@ -76,7 +76,7 @@ TDengine通过查询函数向用户提供毫秒级的数据获取能力。直接 TDengine分配固定大小的内存空间作为缓存空间,缓存空间可根据应用的需求和硬件资源配置。通过适当的设置缓存空间,TDengine可以提供极高性能的写入和查询的支持。TDengine中每个虚拟节点(virtual node)创建时分配独立的缓存池。每个虚拟节点管理自己的缓存池,不同虚拟节点间不共享缓存池。每个虚拟节点内部所属的全部表共享该虚拟节点的缓存池。 -TDengine将内存池按块划分进行管理,数据在内存块里按照列式存储。一个vnode的内存池是在vnode创建时按块分配好的,而且每个内存块按照先进先出的原则进行管理。一张表所需要的内存块是从vnode的内存池中进行分配的,块的大小由系统配置参数cache决定。每张表最大内存块的数目由配置参数tblocks决定,每张表平均的内存块的个数由配置参数ablocks决定。因此对于一个vnode, 总的内存大小为: `cache * ablocks * tables`<200b>。内存块参数cache不宜过小,一个cache block需要能存储至少几十条以上记录,才会有效率。参数ablocks最小为2,保证每张表平均至少能分配两个内存块。 +TDengine将内存池按块划分进行管理,数据在内存块里按照列式存储。一个vnode的内存池是在vnode创建时按块分配好的,而且每个内存块按照先进先出的原则进行管理。一张表所需要的内存块是从vnode的内存池中进行分配的,块的大小由系统配置参数cache决定。每张表最大内存块的数目由配置参数tblocks决定,每张表平均的内存块的个数由配置参数ablocks决定。因此对于一个vnode, 总的内存大小为: `cache * ablocks * tables`。内存块参数cache不宜过小,一个cache block需要能存储至少几十条以上记录,才会有效率。参数ablocks最小为2,保证每张表平均至少能分配两个内存块。 你可以通过函数last_row快速获取一张表或一张超级表的最后一条记录,这样很便于在大屏显示各设备的实时状态或采集值。例如: From 70462035571bd9513233aeb68d0b6c686799e19c Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 12 Feb 2020 12:46:01 +0800 Subject: [PATCH 07/11] split meter to table and stable --- src/mnode/inc/mgmtStable.h | 35 +++++++++++++++++++++++++++++++++++ src/mnode/src/mgmtStable.c | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 src/mnode/inc/mgmtStable.h create mode 100644 src/mnode/src/mgmtStable.c diff --git a/src/mnode/inc/mgmtStable.h b/src/mnode/inc/mgmtStable.h new file mode 100644 index 0000000000..62f71606f7 --- /dev/null +++ b/src/mnode/inc/mgmtStable.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TBASE_MNODE_STABLE_H +#define TBASE_MNODE_STABLE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "mnode.h" + + + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/mnode/src/mgmtStable.c b/src/mnode/src/mgmtStable.c new file mode 100644 index 0000000000..28ac58e23c --- /dev/null +++ b/src/mnode/src/mgmtStable.c @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" + +#include "mnode.h" +#include "mgmtAcct.h" +#include "mgmtGrant.h" +#include "mgmtUtil.h" +#include "mgmtDb.h" +#include "mgmtDnodeInt.h" +#include "mgmtVgroup.h" +#include "mgmtSupertableQuery.h" +#include "mgmtTable.h" +#include "taosmsg.h" +#include "tast.h" +#include "textbuffer.h" +#include "tschemautil.h" +#include "tscompression.h" +#include "tskiplist.h" +#include "tsqlfunction.h" +#include "ttime.h" +#include "tstatus.h" From f4dac146dcfae8276cf98aaf1e76b58336b67bde Mon Sep 17 00:00:00 2001 From: liu0x54 Date: Wed, 12 Feb 2020 14:23:15 +0800 Subject: [PATCH 08/11] modify the dockerfile and docker build script to adapt the change of software release script --- packaging/docker/Dockerfile | 2 +- packaging/docker/dockerbuild.sh | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/packaging/docker/Dockerfile b/packaging/docker/Dockerfile index b01f375db0..22c1bc1902 100644 --- a/packaging/docker/Dockerfile +++ b/packaging/docker/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /root COPY tdengine.tar.gz /root/ RUN tar -zxf tdengine.tar.gz -WORKDIR /root/tdengine/ +WORKDIR /root/TDengine-server/ RUN sh install.sh diff --git a/packaging/docker/dockerbuild.sh b/packaging/docker/dockerbuild.sh index 280c27d7aa..aeea6a2a95 100755 --- a/packaging/docker/dockerbuild.sh +++ b/packaging/docker/dockerbuild.sh @@ -1,12 +1,6 @@ #!/bin/bash set -x $1 -tar -zxf $1 -DIR=`echo $1|awk -F . '{print($1"."$2"."$3"."$4)}'` -mv $DIR tdengine -tar -czf tdengine.tar.gz tdengine -TMP=`echo $1|awk -F . '{print($2"."$3"."$4)}'` -TAG="1."$TMP -docker build --rm -f "Dockerfile" -t tdengine/tdengine:$TAG "." +docker build --rm -f "Dockerfile" -t tdengine/tdengine:$1 "." docker login -u tdengine -p ******** #replace the docker registry username and password -docker push tdengine/tdengine:$TAG \ No newline at end of file +docker push tdengine/tdengine:$1 \ No newline at end of file From bad1e7e7b8e2fc71595874fbc056928466de0f04 Mon Sep 17 00:00:00 2001 From: liu0x54 Date: Wed, 12 Feb 2020 14:36:34 +0800 Subject: [PATCH 09/11] modify minidevops readme --- minidevops/README.MD | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/minidevops/README.MD b/minidevops/README.MD index 3645723ed5..9937ad04ad 100644 --- a/minidevops/README.MD +++ b/minidevops/README.MD @@ -7,12 +7,10 @@ - grafana/grafana Grafana的镜像,一个广泛应用的开源可视化监控软件 - telegraf:latest 一个广泛应用的开源数据采集程序 - prom/prometheus:latest 一个广泛应用的k8s领域的开源数据采集程序 -<<<<<<< HEAD -======= ## 说明 本文中的图片链接在Github上显示不出来,建议将MD文件下载后用vscode或其他md文件浏览工具进行查看 ->>>>>>> 740f82af58c4ecc2deecfa36fb1de4ef5ee55efc + ## 前提条件 1. 一台linux服务器或运行linux操作系统的虚拟机或者运行MacOS的计算机 2. 安装了Docker软件。Docker软件的安装方法请参考linux下安装Docker From 87efc517bf51616e5bc243097c7bfba4e16ed667 Mon Sep 17 00:00:00 2001 From: lihui Date: Wed, 12 Feb 2020 15:12:22 +0800 Subject: [PATCH 10/11] [TBASE-1462] --- src/client/src/tscFunctionImpl.c | 595 +++++++++++++++++++++- src/client/src/tscSQLParser.c | 35 +- src/inc/tsqlfunction.h | 11 +- src/system/detail/inc/vnodeQueryImpl.h | 1 + src/system/detail/src/vnodeQueryImpl.c | 23 +- src/system/detail/src/vnodeQueryProcess.c | 4 +- src/util/src/ttokenizer.c | 1 + 7 files changed, 642 insertions(+), 28 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index ed7d4dfc60..837a0ce005 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -138,6 +138,19 @@ typedef struct STSCompInfo { STSBuf *pTSBuf; } STSCompInfo; +typedef struct SRateInfo { + int64_t CorrectionValue; + int64_t firstValue; + TSKEY firstKey; + int64_t lastValue; + TSKEY lastKey; + int8_t hasResult; // flag to denote has value + bool isIRate; // true for IRate functions, false for Rate functions + int64_t num; // for sum/avg + double sum; // for sum/avg +} SRateInfo; + + int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) { if (!isValidDataType(dataType, dataBytes)) { @@ -192,7 +205,12 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(SAvgInfo); *intermediateResBytes = *bytes; + return TSDB_CODE_SUCCESS; + } else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) { + *type = TSDB_DATA_TYPE_DOUBLE; + *bytes = sizeof(SRateInfo); + *intermediateResBytes = sizeof(SRateInfo); return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { *type = TSDB_DATA_TYPE_BINARY; @@ -253,6 +271,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); *intermediateResBytes = sizeof(SAvgInfo); + } else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) { + *type = TSDB_DATA_TYPE_DOUBLE; + *bytes = sizeof(double); + *intermediateResBytes = sizeof(SRateInfo); } else if (functionId == TSDB_FUNC_STDDEV) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); @@ -4348,6 +4370,462 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } +////////////////////////////////////////////////////////////////////////////////////////////// +// RATE functions + +static double do_calc_rate(const SRateInfo* pRateInfo) { + if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) || (pRateInfo->firstKey >= pRateInfo->lastKey)) { + return 0; + } + + int64_t diff = 0; + + if (pRateInfo->isIRate) { + diff = pRateInfo->lastValue; + if (diff >= pRateInfo->firstValue) { + diff -= pRateInfo->firstValue; + } + } else { + diff = pRateInfo->CorrectionValue + pRateInfo->lastValue - pRateInfo->firstValue; + if (diff <= 0) { + return 0; + } + } + + int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey; + duration = (duration + 500) / 1000; + + double resultVal = ((double)diff) / duration; + + pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " resultVal:%f", + pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal); + + return resultVal; +} + + +static bool rate_function_setup(SQLFunctionCtx *pCtx) { + if (!function_setup(pCtx)) { + return false; + } + + SResultInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes; + SRateInfo * pInfo = pResInfo->interResultBuf; + + pInfo->CorrectionValue = 0; + pInfo->firstKey = INT64_MIN; + pInfo->lastKey = INT64_MIN; + pInfo->firstValue = INT64_MIN; + pInfo->lastValue = INT64_MIN; + pInfo->num = 0; + pInfo->sum = 0; + + pInfo->hasResult = 0; + pInfo->isIRate = ((pCtx->functionId == TSDB_FUNC_IRATE) || (pCtx->functionId == TSDB_FUNC_SUM_IRATE) || (pCtx->functionId == TSDB_FUNC_AVG_IRATE)); + return true; +} + + +static void rate_function(SQLFunctionCtx *pCtx) { + + assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus)); + + int32_t notNullElems = 0; + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + pTrace("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); + + for (int32_t i = 0; i < pCtx->size; ++i) { + char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + pTrace("%p rate_function() index of null data:%d", pCtx, i); + continue; + } + + notNullElems++; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[i]; + + pTrace("firstValue:%" PRId64 " firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey); + } + + if (INT64_MIN == pRateInfo->lastValue) { + pRateInfo->lastValue = v; + } else if (v < pRateInfo->lastValue) { + pRateInfo->CorrectionValue += pRateInfo->lastValue; + pTrace("CorrectionValue:%" PRId64, pRateInfo->CorrectionValue); + } + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[i]; + pTrace("lastValue:%" PRId64 " lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey); + } + + if (!pCtx->hasNull) { + assert(pCtx->size == notNullElems); + } + + SET_VAL(pCtx, notNullElems, 1); + + if (notNullElems > 0) { + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + } + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) { + void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + return; + } + + // NOTE: keep the intermediate result into the interResultBuf + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[index]; + } + + if (INT64_MIN == pRateInfo->lastValue) { + pRateInfo->lastValue = v; + } else if (v < pRateInfo->lastValue) { + pRateInfo->CorrectionValue += pRateInfo->lastValue; + } + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[index]; + + pTrace("====%p rate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " CorrectionValue:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->CorrectionValue); + + SET_VAL(pCtx, 1, 1); + + // set has result flag + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + + + +static void rate_func_merge(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + assert(pResInfo->superTableQ); + + pTrace("rate_func_merge() size:%d", pCtx->size); + + //SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + SRateInfo *pBuf = (SRateInfo *)pCtx->aOutputBuf; + char *indicator = pCtx->aInputElemBuf; + + assert(1 == pCtx->size); + + int32_t numOfNotNull = 0; + for (int32_t i = 0; i < pCtx->size; ++i, indicator += sizeof(SRateInfo)) { + SRateInfo *pInput = (SRateInfo *)indicator; + if (DATA_SET_FLAG != pInput->hasResult) { + continue; + } + + numOfNotNull++; + memcpy(pBuf, pInput, sizeof(SRateInfo)); + pTrace("%p rate_func_merge() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64, + pCtx, pInput->isIRate, pInput->firstKey, pInput->lastKey, pInput->firstValue, pInput->lastValue, pInput->CorrectionValue); + } + + SET_VAL(pCtx, numOfNotNull, 1); + + if (numOfNotNull > 0) { + pBuf->hasResult = DATA_SET_FLAG; + } + + return; +} + + + +static void rate_func_copy(SQLFunctionCtx *pCtx) { + assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); + + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + memcpy(pResInfo->interResultBuf, pCtx->aInputElemBuf, (size_t)pCtx->inputBytes); + pResInfo->hasResult = ((SRateInfo*)pCtx->aInputElemBuf)->hasResult; + + SRateInfo* pRateInfo = (SRateInfo*)pCtx->aInputElemBuf; + pTrace("%p rate_func_second_merge() firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d", + pCtx, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult); +} + + + +static void rate_finalizer(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + + pTrace("%p isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d", + pCtx, pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult); + + if (pRateInfo->hasResult != DATA_SET_FLAG) { + setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); + return; + } + + *(double*)pCtx->aOutputBuf = do_calc_rate(pRateInfo); + + pTrace("rate_finalizer() output result:%f", *(double *)pCtx->aOutputBuf); + + // cannot set the numOfIteratedElems again since it is set during previous iteration + pResInfo->numOfRes = 1; + pResInfo->hasResult = DATA_SET_FLAG; + + doFinalizer(pCtx); +} + + +static void irate_function(SQLFunctionCtx *pCtx) { + + assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus)); + + int32_t notNullElems = 0; + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + pTrace("%p irate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); + + if (pCtx->size < 1) { + return; + } + + for (int32_t i = pCtx->size - 1; i >= 0; --i) { + char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + pTrace("%p irate_function() index of null data:%d", pCtx, i); + continue; + } + + notNullElems++; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + // TODO: calc once if only call this function once ???? + if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->lastValue)) { + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[i]; + + pTrace("%p irate_function() lastValue:%" PRId64 " lastKey:%" PRId64, pCtx, pRateInfo->lastValue, pRateInfo->lastKey); + continue; + } + + if ((INT64_MIN == pRateInfo->firstKey) || (INT64_MIN == pRateInfo->firstValue)){ + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[i]; + + pTrace("%p irate_function() firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, pRateInfo->firstValue, pRateInfo->firstKey); + break; + } + } + + SET_VAL(pCtx, notNullElems, 1); + + if (notNullElems > 0) { + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + } + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) { + void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + return; + } + + // NOTE: keep the intermediate result into the interResultBuf + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + pRateInfo->firstKey = pRateInfo->lastKey; + pRateInfo->firstValue = pRateInfo->lastValue; + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[index]; + + pTrace("====%p irate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->firstValue , pRateInfo->firstKey); + + SET_VAL(pCtx, 1, 1); + + // set has result flag + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void do_sumrate_merge(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + assert(pResInfo->superTableQ); + + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + char * input = GET_INPUT_CHAR(pCtx); + + for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { + SRateInfo *pInput = (SRateInfo *)input; + + pTrace("%p do_sumrate_merge() hasResult:%d input num:%" PRId64 " input sum:%f total num:%" PRId64 " total sum:%f", pCtx, pInput->hasResult, pInput->num, pInput->sum, pRateInfo->num, pRateInfo->sum); + + if (pInput->hasResult != DATA_SET_FLAG) { + continue; + } else if (pInput->num == 0) { + pRateInfo->sum += do_calc_rate(pInput); + pRateInfo->num++; + } else { + pRateInfo->sum += pInput->sum; + pRateInfo->num += pInput->num; + } + pRateInfo->hasResult = DATA_SET_FLAG; + } + + // if the data set hasResult is not set, the result is null + if (DATA_SET_FLAG == pRateInfo->hasResult) { + pResInfo->hasResult = DATA_SET_FLAG; + SET_VAL(pCtx, pRateInfo->num, 1); + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void sumrate_func_merge(SQLFunctionCtx *pCtx) { + pTrace("%p sumrate_func_merge() process ...", pCtx); + do_sumrate_merge(pCtx); +} + +static void sumrate_func_second_merge(SQLFunctionCtx *pCtx) { + pTrace("%p sumrate_func_second_merge() process ...", pCtx); + do_sumrate_merge(pCtx); +} + +static void sumrate_finalizer(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + + pTrace("%p sumrate_finalizer() superTableQ:%d num:%" PRId64 " sum:%f hasResult:%d", pCtx, pResInfo->superTableQ, pRateInfo->num, pRateInfo->sum, pRateInfo->hasResult); + + if (pRateInfo->hasResult != DATA_SET_FLAG) { + setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); + return; + } + + if (pRateInfo->num == 0) { + // from meter + *(double*)pCtx->aOutputBuf = do_calc_rate(pRateInfo); + } else if (pCtx->functionId == TSDB_FUNC_SUM_RATE || pCtx->functionId == TSDB_FUNC_SUM_IRATE) { + *(double*)pCtx->aOutputBuf = pRateInfo->sum; + } else { + *(double*)pCtx->aOutputBuf = pRateInfo->sum / pRateInfo->num; + } + + pResInfo->numOfRes = 1; + pResInfo->hasResult = DATA_SET_FLAG; + doFinalizer(pCtx); +} + + +///////////////////////////////////////////////////////////////////////////////////////////// + + /* * function compatible list. * tag and ts are not involved in the compatibility check @@ -4359,23 +4837,18 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { * e.g., count/sum/avg/min/max/stddev/percentile/apercentile/first/last... * */ -int32_t funcCompatDefList[28] = { - /* - * count, sum, avg, min, max, stddev, percentile, apercentile, first, last - */ - 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, +int32_t funcCompatDefList[] = { + // count, sum, avg, min, max, stddev, percentile, apercentile, first, last + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + // last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z + 4, -1, -1, 1, 1, 1, 1, 1, 1, -1, + // tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, interp rate irate + 1, 1, 1, 1, -1, 1, 1, 5, 1, 1, + // sum_rate, sum_irate, avg_rate, avg_irate + 1, 1, 1, 1, +}; - /* - * last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z, tag - */ - 4, -1, -1, 1, 1, 1, 1, 1, 1, -1, 1, - - /* - * colprj, tagprj, arithmetic, diff, first_dist, last_dist, interp - */ - 1, 1, 1, -1, 1, 1, 5}; - -SQLAggFuncElem aAggs[28] = {{ +SQLAggFuncElem aAggs[] = {{ // 0, count function does not invoke the finalize function "count", TSDB_FUNC_COUNT, @@ -4798,4 +5271,94 @@ SQLAggFuncElem aAggs[28] = {{ noop1, copy_function, no_data_info, + }, + { + // 28 + "rate", + TSDB_FUNC_RATE, + TSDB_FUNC_RATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + rate_function, + rate_function_f, + no_next_step, + rate_finalizer, + rate_func_merge, + rate_func_copy, + data_req_load_info, + }, + { + // 29 + "irate", + TSDB_FUNC_IRATE, + TSDB_FUNC_IRATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + irate_function, + irate_function_f, + no_next_step, + rate_finalizer, + rate_func_merge, + rate_func_copy, + data_req_load_info, + }, + { + // 30 + "sum_rate", + TSDB_FUNC_SUM_RATE, + TSDB_FUNC_SUM_RATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + rate_function, + rate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, + }, + { + // 31 + "sum_irate", + TSDB_FUNC_SUM_IRATE, + TSDB_FUNC_SUM_IRATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + irate_function, + irate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, + }, + { + // 32 + "avg_rate", + TSDB_FUNC_AVG_RATE, + TSDB_FUNC_AVG_RATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + rate_function, + rate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, + }, + { + // 33 + "avg_irate", + TSDB_FUNC_AVG_IRATE, + TSDB_FUNC_AVG_IRATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + irate_function, + irate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, }}; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index da7e22fe1e..5652dbf58a 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1122,7 +1122,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel if (addProjectionExprAndResultField(pQueryInfo, pItem) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } - } else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_LAST_ROW) { + } else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_AVG_IRATE) { // sql function in selection clause, append sql function info in pSqlCmd structure sequentially if (addExprAndResultField(pQueryInfo, outputIndex, pItem) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; @@ -1504,6 +1504,12 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt } case TK_SUM: case TK_AVG: + case TK_RATE: + case TK_IRATE: + case TK_SUM_RATE: + case TK_SUM_IRATE: + case TK_AVG_RATE: + case TK_AVG_IRATE: case TK_TWA: case TK_MIN: case TK_MAX: @@ -1956,6 +1962,24 @@ int32_t changeFunctionID(int32_t optr, int16_t* functionId) { case TK_AVG: *functionId = TSDB_FUNC_AVG; break; + case TK_RATE: + *functionId = TSDB_FUNC_RATE; + break; + case TK_IRATE: + *functionId = TSDB_FUNC_IRATE; + break; + case TK_SUM_RATE: + *functionId = TSDB_FUNC_SUM_RATE; + break; + case TK_SUM_IRATE: + *functionId = TSDB_FUNC_SUM_IRATE; + break; + case TK_AVG_RATE: + *functionId = TSDB_FUNC_AVG_RATE; + break; + case TK_AVG_IRATE: + *functionId = TSDB_FUNC_AVG_IRATE; + break; case TK_MIN: *functionId = TSDB_FUNC_MIN; break; @@ -2149,7 +2173,8 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { int16_t functionId = aAggs[pExpr->functionId].stableFuncId; if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) || - (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST)) { + (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) || + (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { if (getResultDataInfo(pField->type, pField->bytes, functionId, pExpr->param[0].i64Key, &type, &bytes, &intermediateBytes, 0, true) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; @@ -2912,7 +2937,7 @@ static int32_t validateSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnL pList->ids[pList->num++] = index; } else if (pExpr->nSQLOptr == TK_FLOAT && (isnan(pExpr->val.dKey) || isinf(pExpr->val.dKey))) { return TSDB_CODE_INVALID_SQL; - } else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_LAST_ROW) { + } else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_AVG_IRATE) { return TSDB_CODE_INVALID_SQL; } @@ -2966,8 +2991,8 @@ static bool isValidExpr(tSQLExpr* pLeft, tSQLExpr* pRight, int32_t optr) { * * However, columnA < 4+12 is valid */ - if ((pLeft->nSQLOptr >= TK_COUNT && pLeft->nSQLOptr <= TK_LAST_ROW) || - (pRight->nSQLOptr >= TK_COUNT && pRight->nSQLOptr <= TK_LAST_ROW) || + if ((pLeft->nSQLOptr >= TK_COUNT && pLeft->nSQLOptr <= TK_AVG_IRATE) || + (pRight->nSQLOptr >= TK_COUNT && pRight->nSQLOptr <= TK_AVG_IRATE) || (pLeft->nSQLOptr >= TK_BOOL && pLeft->nSQLOptr <= TK_BINARY && pRight->nSQLOptr >= TK_BOOL && pRight->nSQLOptr <= TK_BINARY)) { return false; diff --git a/src/inc/tsqlfunction.h b/src/inc/tsqlfunction.h index 93f50cf4f3..2caecb6309 100644 --- a/src/inc/tsqlfunction.h +++ b/src/inc/tsqlfunction.h @@ -60,6 +60,13 @@ extern "C" { #define TSDB_FUNC_LAST_DST 26 #define TSDB_FUNC_INTERP 27 +#define TSDB_FUNC_RATE 28 +#define TSDB_FUNC_IRATE 29 +#define TSDB_FUNC_SUM_RATE 30 +#define TSDB_FUNC_SUM_IRATE 31 +#define TSDB_FUNC_AVG_RATE 32 +#define TSDB_FUNC_AVG_IRATE 33 + #define TSDB_FUNCSTATE_SO 0x1U // single output #define TSDB_FUNCSTATE_MO 0x2U // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM #define TSDB_FUNCSTATE_STREAM 0x4U // function avail for stream @@ -287,10 +294,10 @@ typedef struct STwaInfo { } STwaInfo; /* global sql function array */ -extern struct SQLAggFuncElem aAggs[28]; +extern struct SQLAggFuncElem aAggs[]; /* compatible check array list */ -extern int32_t funcCompatDefList[28]; +extern int32_t funcCompatDefList[]; void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, int32_t type, int64_t *min, int64_t *max, int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull); diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 40b65aa163..cce66786fd 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -126,6 +126,7 @@ static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) { bool isQueryKilled(SQuery* pQuery); bool isFixedOutputQuery(SQuery* pQuery); bool isPointInterpoQuery(SQuery* pQuery); +bool isSumAvgRateQuery(SQuery *pQuery); bool isTopBottomQuery(SQuery* pQuery); bool isFirstLastRowQuery(SQuery* pQuery); bool isTSCompQuery(SQuery* pQuery); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 33fb3fe760..dd86c6a35c 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -2438,8 +2438,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes // store the first&last timestamp into the intermediate buffer [1], the true // value may be null but timestamp will never be null pCtx->ptsList = (int64_t *)(primaryColumnData + startOffset * TSDB_KEYSIZE); - } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA || - functionId == TSDB_FUNC_DIFF) { + } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || + functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_DIFF || + (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { /* * leastsquares function needs two columns of input, currently, the x value of linear equation is set to * timestamp column, and the y-value is the column specified in pQuery->pSelectExpr[i].colIdxInBuffer @@ -2723,6 +2724,22 @@ bool isPointInterpoQuery(SQuery *pQuery) { } // TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION +bool isSumAvgRateQuery(SQuery *pQuery) { + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; + if (functionId == TSDB_FUNC_TS) { + continue; + } + + if (functionId == TSDB_FUNC_SUM_RATE || functionId == TSDB_FUNC_SUM_IRATE || + functionId == TSDB_FUNC_AVG_RATE || functionId == TSDB_FUNC_AVG_IRATE) { + return true; + } + } + + return false; +} + bool isTopBottomQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; @@ -4584,7 +4601,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 4096, type, pQuery->rowSize, pSupporter->pResult); } - if (pQuery->nAggTimeInterval != 0) { + if (pQuery->nAggTimeInterval != 0 || isSumAvgRateQuery(pQuery)) { // one page for each table at least ret = createResultBuf(&pRuntimeEnv->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize); if (ret != TSDB_CODE_SUCCESS) { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 1b04806f7c..c243a78e83 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -398,7 +398,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo restoreIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - if (pQuery->nAggTimeInterval == 0) { // normal query + if (pQuery->nAggTimeInterval == 0 && !isSumAvgRateQuery(pQuery)) { // normal query if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { qTrace( @@ -964,7 +964,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { return; } - if (pQuery->nAggTimeInterval > 0) { + if (pQuery->nAggTimeInterval > 0 || isSumAvgRateQuery(pQuery)) { assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) { diff --git a/src/util/src/ttokenizer.c b/src/util/src/ttokenizer.c index d4f3bd6879..7cbb4552b4 100644 --- a/src/util/src/ttokenizer.c +++ b/src/util/src/ttokenizer.c @@ -231,6 +231,7 @@ static SKeyword keywordTable[] = { {"RATE", TK_RATE}, {"IRATE", TK_IRATE}, {"SUM_RATE", TK_SUM_RATE}, + {"SUM_IRATE", TK_SUM_IRATE}, {"AVG_RATE", TK_AVG_RATE}, {"AVG_IRATE", TK_AVG_IRATE}, }; From f9f0e72d1d03d3272ef2ad80a098e322ef864451 Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 12 Feb 2020 15:32:29 +0800 Subject: [PATCH 11/11] vnodePeer.h --- src/inc/taosmsg.h | 9 +- src/mnode/inc/mgmtStable.h | 2 +- src/mnode/src/mgmtStable.c | 1 + src/mnode/src/mgmtTable.c | 2 +- src/vnode/common/inc/vnodePeer.h | 56 +++++++++ .../src/vnodePeer.c} | 0 src/vnode/detail/inc/vnodePeer.h | 108 ------------------ 7 files changed, 66 insertions(+), 112 deletions(-) create mode 100644 src/vnode/common/inc/vnodePeer.h rename src/vnode/{detail/src/vnodePeer.spec.c => common/src/vnodePeer.c} (100%) delete mode 100644 src/vnode/detail/inc/vnodePeer.h diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index af60cc893a..5af7befd87 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -684,10 +684,15 @@ typedef struct { } SSecIe; typedef struct { - uint32_t ip; - uint32_t vnode; + int32_t dnode; //the ID of dnode + int32_t vnode; //the index of vnode } SVPeerDesc; +typedef struct { + int32_t numOfVPeers; + SVPeerDesc vpeerDesc[]; +} SVpeerDescArray; + typedef struct { int32_t vnode; SVnodeCfg cfg; diff --git a/src/mnode/inc/mgmtStable.h b/src/mnode/inc/mgmtStable.h index 62f71606f7..161fd816fe 100644 --- a/src/mnode/inc/mgmtStable.h +++ b/src/mnode/inc/mgmtStable.h @@ -25,7 +25,7 @@ extern "C" { #include #include "mnode.h" - +int32_t mgmtInitSTable(); #ifdef __cplusplus diff --git a/src/mnode/src/mgmtStable.c b/src/mnode/src/mgmtStable.c index 28ac58e23c..ba278f5457 100644 --- a/src/mnode/src/mgmtStable.c +++ b/src/mnode/src/mgmtStable.c @@ -34,3 +34,4 @@ #include "tsqlfunction.h" #include "ttime.h" #include "tstatus.h" + diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index e7a4aebb9c..27a5ffd4f9 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -97,7 +97,7 @@ int32_t mgmtMeterDropColumnByName(STabObj *pTable, const char *name); static int dropMeterImp(SDbObj *pDb, STabObj * pTable, SAcctObj *pAcct); static void dropAllMetersOfMetric(SDbObj *pDb, STabObj * pMetric, SAcctObj *pAcct); -void mgmtMeterActionInit() { +static void mgmtMeterActionInit() { mgmtMeterActionFp[SDB_TYPE_INSERT] = mgmtMeterActionInsert; mgmtMeterActionFp[SDB_TYPE_DELETE] = mgmtMeterActionDelete; mgmtMeterActionFp[SDB_TYPE_UPDATE] = mgmtMeterActionUpdate; diff --git a/src/vnode/common/inc/vnodePeer.h b/src/vnode/common/inc/vnodePeer.h new file mode 100644 index 0000000000..becf9bae39 --- /dev/null +++ b/src/vnode/common/inc/vnodePeer.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_PEER_H +#define TDENGINE_VNODEPEER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include "tsdb.h" + +/* + * Initialize the resources + */ +int32_t vnodeInitPeer(int numOfThreads); + +/* + * Free the resources + */ +void vnodeCleanUpPeers(); + +/* + * Start a vnode synchronization process + */ +int32_t vnodeOpenPeer(int32_t vnode); + +/* + * Update the peerinfo of vnode + */ +int32_t vnodeConfigPeer(SVpeerDescArray msg); + +/* + * Close a vnode synchronization process + */ +void vnodeCleanUpPeer(int32_t vnode); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_VNODEPEER_H diff --git a/src/vnode/detail/src/vnodePeer.spec.c b/src/vnode/common/src/vnodePeer.c similarity index 100% rename from src/vnode/detail/src/vnodePeer.spec.c rename to src/vnode/common/src/vnodePeer.c diff --git a/src/vnode/detail/inc/vnodePeer.h b/src/vnode/detail/inc/vnodePeer.h deleted file mode 100644 index 4f17e66a70..0000000000 --- a/src/vnode/detail/inc/vnodePeer.h +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_VNODEPEER_H -#define TDENGINE_VNODEPEER_H - -#include "os.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define TSDB_VMSG_SYNC_DATA 1 -#define TSDB_VMSG_FORWARD 2 -#define TSDB_VMSG_SYNC_REQ 3 -#define TSDB_VMSG_SYNC_RSP 4 -#define TSDB_VMSG_SYNC_MUST 5 -#define TSDB_VMSG_STATUS 6 - -#pragma pack(push, 1) - -typedef struct { - char type; - char version; - short sourceVid; - short destVid; -} SFirstPkt; - -typedef struct { - uint64_t lastCreate; - uint64_t lastRemove; - uint32_t fileId; - uint64_t fmagic[]; -} SSyncMsg; - -typedef struct { - char status; - uint64_t version; -} SPeerState; - -typedef struct { - char status : 6; - char ack : 2; - char commitInProcess; - int32_t fileId; // ID for corrupted file, 0 means no corrupted file - uint64_t version; - SPeerState peerStates[]; -} SPeerStatus; - -#pragma pack(pop) - -typedef struct _thread_obj { - pthread_t thread; - int threadId; - int pollFd; - int numOfFds; -} SThreadObj; - -typedef struct { - int numOfThreads; - SThreadObj **pThread; - pthread_t thread; - int threadId; -} SThreadPool; - -typedef struct _vnodePeer { - void * signature; - int ownId; // own vnode ID - uint32_t ip; - char ipstr[20]; // ip string - int vid; - int status; - int syncStatus; - int32_t fileId; // 0 means no corrupted file - uint64_t version; - int commitInProcess; - int syncFd; - int peerFd; // forward FD - void * hbTimer; - void * syncTimer; - SThreadObj *pThread; -} SVnodePeer; - -typedef struct { - SVnodePeer *pVPeer; - uint64_t lastCreate; - uint64_t lastRemove; - uint32_t fileId; - uint64_t fmagic[]; -} SSyncCmd; - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_VNODEPEER_H