Merge branch 'develop' into feature/query

This commit is contained in:
Haojun Liao 2020-12-18 22:18:37 +08:00
commit 696ee1d9df
44 changed files with 792 additions and 500 deletions

View File

@ -3,6 +3,7 @@ os: Visual Studio 2015
environment: environment:
matrix: matrix:
- ARCH: amd64 - ARCH: amd64
- ARCH: x86
clone_folder: c:\dev\TDengine clone_folder: c:\dev\TDengine
clone_depth: 1 clone_depth: 1
@ -23,6 +24,7 @@ notifications:
- provider: Email - provider: Email
to: to:
- sangshuduo@gmail.com - sangshuduo@gmail.com
on_build_success: true on_build_success: true
on_build_failure: true on_build_failure: true
on_build_status_changed: true on_build_status_changed: true

Binary file not shown.

Before

Width:  |  Height:  |  Size: 76 KiB

After

Width:  |  Height:  |  Size: 85 KiB

View File

@ -2,37 +2,7 @@
## 快捷安装 ## 快捷安装
TDengine软件分为服务器、客户端和报警模块三部分目前2.0版服务器仅能在Linux系统上安装和运行后续会支持Windows、mac OS等系统。 TDengine软件分为服务器、客户端和报警模块三部分目前2.0版服务器仅能在Linux系统上安装和运行后续会支持Windows、mac OS等系统。客户端可以在Windows或Linux上安装和运行。任何OS的应用也可以选择RESTful接口连接服务器taosd。CPU支持X64/ARM64/MIPS64/Alpha64后续会支持ARM32、RISC-V等CPU架构。用户可根据需求选择通过[源码](https://www.taosdata.com/cn/getting-started/#通过源码安装)或者[安装包](https://www.taosdata.com/cn/getting-started/#通过安装包安装)来安装。
**应用驱动**
如果应用在Windows和Linux上运行可使用C/C++/C#/JAVA/Python/Go/Node.js接口连接服务器。如果应用在Mac上运行目前可以使用RESTful接口连接服务器。
**CPU**
CPU支持X64/ARM64/MIPS64/Alpha64后续会支持ARM32、RISC-V等CPU架构。用户可根据需求选择通过[源码](https://www.taosdata.com/cn/getting-started/#通过源码安装)或者[安装包](https://www.taosdata.com/cn/getting-started/#通过安装包安装)来安装。
**服务器**
目前TDengine服务器可以运行在以下平台上
| | **CentOS** **6/7/8** | **Ubuntu** **16/18/20** | **Other Linux** | **统信****UOS** | **银河****/****中标麒麟** | **凝思** **V60/V80** |
| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- |
| X64 | ● | ● | | ○ | ● | ● |
| 树莓派ARM32 | | ● | ● | | | |
| 龙芯MIPS64 | | | ● | | | |
| 鲲鹏 ARM64 | | ○ | ○ | | ● | |
| 申威 Alpha64 | | | ○ | ● | | |
| 飞腾ARM64 | | ○优麒麟 | | | | |
| 海光X64 | ● | ● | ● | ○ | ● | ● |
| 瑞芯微ARM64/32 | | | ○ | | | |
| 全志ARM64/32 | | | ○ | | | |
| 炬力ARM64/32 | | | ○ | | | |
| TI ARM32 | | | ○ | | | |
其中 ● 表示经过官方测试验证, ○ 表示非官方测试验证。
### 通过源码安装 ### 通过源码安装
@ -44,28 +14,11 @@ CPU支持X64/ARM64/MIPS64/Alpha64后续会支持ARM32、RISC-V等CPU架构。
### 通过安装包安装 ### 通过安装包安装
服务器部分,我们提供三种安装包,您可以根据需要选择。TDengine的安装非常简单从下载到安装成功仅仅只要几秒钟。 TDengine的安装非常简单从下载到安装成功仅仅只要几秒钟。服务端安装包包含客户端和连接器,我们提供三种安装包,您可以根据需要选择:
- TDengine-server-2.0.9.0-Linux-x64.rpm (4.2M) - TDengine-server-2.0.10.0-Linux-x64.rpm (4.2M)
- TDengine-server-2.0.9.0-Linux-x64.deb (2.7M) - TDengine-server-2.0.10.0-Linux-x64.deb (2.7M)
- TDengine-server-2.0.9.0-Linux-x64.tar.gz (4.5M) - TDengine-server-2.0.10.0-Linux-x64.tar.gz (4.5M)
客户端部分Linux安装包如下
- TDengine-client-2.0.9.0-Linux-x64.tar.gz(3.0M)
- TDengine-client-2.0.9.0-Windows-x64.exe(2.8M)
- TDengine-client-2.0.9.0-Windows-x86.exe(2.8M)
报警模块的Linux安装包如下请参考[报警模块的使用方法](https://github.com/taosdata/TDengine/blob/master/alert/README_cn.md)
- TDengine-alert-2.0.9.0-Linux-x64.tar.gz (8.1M)
目前TDengine 支持在使用[`systemd`](https://en.wikipedia.org/wiki/Systemd)做进程服务管理的linux系统上安装用`which systemctl`命令来检测系统中是否存在`systemd`包:
```cmd
which systemctl
```
具体的安装过程,请参见<a href="https://www.taosdata.com/blog/2019/08/09/566.html">TDengine多种安装包的安装和卸载</a> 具体的安装过程,请参见<a href="https://www.taosdata.com/blog/2019/08/09/566.html">TDengine多种安装包的安装和卸载</a>
@ -73,13 +26,13 @@ which systemctl
安装成功后,用户可使用`systemctl`命令来启动TDengine的服务进程。 安装成功后,用户可使用`systemctl`命令来启动TDengine的服务进程。
```cmd ```bash
systemctl start taosd $ systemctl start taosd
``` ```
检查服务是否正常工作。 检查服务是否正常工作。
```cmd ```bash
systemctl status taosd $ systemctl status taosd
``` ```
如果TDengine服务正常工作那么您可以通过TDengine的命令行程序`taos`来访问并体验TDengine。 如果TDengine服务正常工作那么您可以通过TDengine的命令行程序`taos`来访问并体验TDengine。
@ -88,15 +41,24 @@ systemctl status taosd
- systemctl命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo - systemctl命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo
- 为更好的获得产品反馈改善产品TDengine会采集基本的使用信息但您可以修改系统配置文件taos.cfg里的配置参数telemetryReporting, 将其设为0就可将其关闭。 - 为更好的获得产品反馈改善产品TDengine会采集基本的使用信息但您可以修改系统配置文件taos.cfg里的配置参数telemetryReporting, 将其设为0就可将其关闭。
- TDengine采用FQDN(一般就是hostname)作为节点的ID为保证正常运行需要给运行taosd的服务器配置好hostname在客户端应用运行的机器配置好DNS服务或hosts文件保证FQDN能够解析。
如果系统中不支持`systemd`,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。 * TDengine 支持在使用[`systemd`](https://en.wikipedia.org/wiki/Systemd)做进程服务管理的linux系统上安装用`which systemctl`命令来检测系统中是否存在`systemd`包:
```bash
$ which systemctl
```
如果系统中不支持systemd也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。
## TDengine命令行程序 ## TDengine命令行程序
执行TDengine命令行程序您只要在Linux终端执行`taos`即可。 执行TDengine命令行程序您只要在Linux终端执行`taos`即可。
```cmd ```bash
taos $ taos
``` ```
如果TDengine终端连接服务成功将会打印出欢迎消息和版本信息。如果失败则会打印错误消息出来请参考[FAQ](https://www.taosdata.com/cn/faq/)来解决终端连接服务端失败的问题。TDengine终端的提示符号如下 如果TDengine终端连接服务成功将会打印出欢迎消息和版本信息。如果失败则会打印错误消息出来请参考[FAQ](https://www.taosdata.com/cn/faq/)来解决终端连接服务端失败的问题。TDengine终端的提示符号如下
@ -136,8 +98,8 @@ Query OK, 2 row(s) in set (0.001700s)
示例: 示例:
```cmd ```bash
taos -h 192.168.0.1 -s "use db; show tables;" $ taos -h 192.168.0.1 -s "use db; show tables;"
``` ```
### 运行SQL命令脚本 ### 运行SQL命令脚本
@ -159,8 +121,8 @@ taos> source <filename>;
启动TDengine的服务在Linux终端执行taosdemo 启动TDengine的服务在Linux终端执行taosdemo
``` ```bash
> taosdemo $ taosdemo
``` ```
该命令将在数据库test下面自动创建一张超级表meters该超级表下有1万张表表名为"t0" 到"t9999"每张表有10万条记录每条记录有 f1, f2 f3三个字段时间戳从"2017-07-14 10:40:00 000" 到"2017-07-14 10:41:39 999"每张表带有标签areaid和loc, areaid被设置为1到10, loc被设置为"beijing"或者“shanghai"。 该命令将在数据库test下面自动创建一张超级表meters该超级表下有1万张表表名为"t0" 到"t9999"每张表有10万条记录每条记录有 f1, f2 f3三个字段时间戳从"2017-07-14 10:40:00 000" 到"2017-07-14 10:41:39 999"每张表带有标签areaid和loc, areaid被设置为1到10, loc被设置为"beijing"或者“shanghai"。
@ -171,33 +133,92 @@ taos> source <filename>;
- 查询超级表下记录总条数: - 查询超级表下记录总条数:
``` ```mysql
taos>select count(*) from test.meters; taos> select count(*) from test.meters;
``` ```
- 查询10亿条记录的平均值、最大值、最小值等 - 查询10亿条记录的平均值、最大值、最小值等
``` ```mysql
taos>select avg(f1), max(f2), min(f3) from test.meters; taos> select avg(f1), max(f2), min(f3) from test.meters;
``` ```
- 查询loc="beijing"的记录总条数: - 查询loc="beijing"的记录总条数:
``` ```mysql
taos>select count(*) from test.meters where loc="beijing"; taos> select count(*) from test.meters where loc="beijing";
``` ```
- 查询areaid=10的所有记录的平均值、最大值、最小值等 - 查询areaid=10的所有记录的平均值、最大值、最小值等
``` ```mysql
taos>select avg(f1), max(f2), min(f3) from test.meters where areaid=10; taos> select avg(f1), max(f2), min(f3) from test.meters where areaid=10;
``` ```
- 对表t10按10s进行平均值、最大值和最小值聚合统计 - 对表t10按10s进行平均值、最大值和最小值聚合统计
``` ```mysql
taos>select avg(f1), max(f2), min(f3) from test.t10 interval(10s); taos> select avg(f1), max(f2), min(f3) from test.t10 interval(10s);
``` ```
**Note:** taosdemo命令本身带有很多选项配置表的数目、记录条数等等请执行 `taosdemo --help`详细列出。您可以设置不同参数进行体验。 **Note:** taosdemo命令本身带有很多选项配置表的数目、记录条数等等请执行 `taosdemo --help`详细列出。您可以设置不同参数进行体验。
## 客户端和报警模块
如果客户端和服务端运行在不同的电脑上可以单独安装客户端。Linux和Windows安装包如下
- TDengine-client-2.0.10.0-Linux-x64.tar.gz(3.0M)
- TDengine-client-2.0.10.0-Windows-x64.exe(2.8M)
- TDengine-client-2.0.10.0-Windows-x86.exe(2.8M)
报警模块的Linux安装包如下请参考[报警模块的使用方法](https://github.com/taosdata/TDengine/blob/master/alert/README_cn.md)
- TDengine-alert-2.0.10.0-Linux-x64.tar.gz (8.1M)
## **支持平台列表**
### TDengine服务器支持的平台列表
| | **CentOS** **6/7/8** | **Ubuntu** **16/18/20** | **Other Linux** | **统信****UOS** | **银河****/****中标麒麟** | **凝思** **V60/V80** |
| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- |
| X64 | ● | ● | | ○ | ● | ● |
| 树莓派ARM32 | | ● | ● | | | |
| 龙芯MIPS64 | | | ● | | | |
| 鲲鹏 ARM64 | | ○ | ○ | | ● | |
| 申威 Alpha64 | | | ○ | ● | | |
| 飞腾ARM64 | | ○优麒麟 | | | | |
| 海光X64 | ● | ● | ● | ○ | ● | ● |
| 瑞芯微ARM64/32 | | | ○ | | | |
| 全志ARM64/32 | | | ○ | | | |
| 炬力ARM64/32 | | | ○ | | | |
| TI ARM32 | | | ○ | | | |
注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。
### TDengine客户端和连接器支持的平台列表
目前TDengine的连接器可支持的平台广泛目前包括X64/X86/ARM64/ARM32/MIPS/Alpha等硬件平台以及Linux/Win64/Win32等开发环境。
对照矩阵如下:
| **CPU** | **X64 64bit** | | | **X86 32bit** | **ARM64** | **ARM32** | **MIPS ** **龙芯** | **Alpha ** **申威** | **X64 ** **海光** |
| ----------- | --------------- | --------- | --------- | --------------- | --------- | --------- | ------------------- | -------------------- | ------------------ |
| **OS** | **Linux** | **Win64** | **Win32** | **Win32** | **Linux** | **Linux** | **Linux** | **Linux** | **Linux** |
| **C/C++** | ● | ● | ● | ○ | ● | ● | ● | ● | ● |
| **JDBC** | ● | ● | ● | ○ | ● | ● | ● | ● | ● |
| **Python** | ● | ● | ● | ○ | ● | ● | ● | -- | ● |
| **Go** | ● | ● | ● | ○ | ● | ● | ○ | -- | -- |
| **NodeJs** | ● | ● | ○ | ○ | ● | ● | ○ | -- | -- |
| **C#** | ○ | ● | ● | ○ | ○ | ○ | ○ | -- | -- |
| **RESTful** | ● | ● | ● | ● | ● | ● | ● | ● | ● |
注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。
请跳转到 [连接器 ](https://www.taosdata.com/cn/documentation/connector)查看更详细的信息。

View File

@ -79,7 +79,7 @@ TDengine缺省的时间戳是毫秒精度但通过修改配置参数enableMic
- **使用数据库** - **使用数据库**
```mysql ```mysql
USE db_name; USE db_name;
``` ```
@ -1039,3 +1039,21 @@ SELECT AVG(current),MAX(current),LEASTSQUARES(current, start_val, step_val), PER
- 标签最多允许128个可以0个标签总长度不超过16k个字符 - 标签最多允许128个可以0个标签总长度不超过16k个字符
- SQL语句最大长度65480个字符但可通过系统配置参数maxSQLLength修改最长可配置为1M - SQL语句最大长度65480个字符但可通过系统配置参数maxSQLLength修改最长可配置为1M
- 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制 - 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制
## TAOS SQL其他约定
**group by的限制**
TAOS SQL支持对标签、tbname进行group by操作也支持普通列进行group by前提是仅限一列且该列的唯一值小于10万个。
**join操作的限制**
TAOS SQL支持表之间按主键时间戳来join两张表的列暂不支持两个表之间聚合后的四则运算。
**is not null与不为空的表达式适用范围**
is not null支持所有类型的列。不为空的表达式为 <>"",仅对非数值类型的列适用。

View File

@ -80,6 +80,12 @@ TDengine集群的节点数必须大于等于副本数否则创建表时将报
TDengine系统后台服务由taosd提供可以在配置文件taos.cfg里修改配置参数以满足不同场景的需求。配置文件的缺省位置在/etc/taos目录可以通过taosd命令行执行参数-c指定配置文件目录。比如taosd -c /home/user来指定配置文件位于/home/user这个目录。 TDengine系统后台服务由taosd提供可以在配置文件taos.cfg里修改配置参数以满足不同场景的需求。配置文件的缺省位置在/etc/taos目录可以通过taosd命令行执行参数-c指定配置文件目录。比如taosd -c /home/user来指定配置文件位于/home/user这个目录。
另外可以使用 “-C” 显示当前服务器配置参数:
```
taosd -C
```
下面仅仅列出一些重要的配置参数,更多的参数请看配置文件里的说明。各个参数的详细介绍及作用请看前述章节,而且这些参数的缺省配置都是工作的,一般无需设置。**注意:配置修改后,需要重启*taosd*服务才能生效。** 下面仅仅列出一些重要的配置参数,更多的参数请看配置文件里的说明。各个参数的详细介绍及作用请看前述章节,而且这些参数的缺省配置都是工作的,一般无需设置。**注意:配置修改后,需要重启*taosd*服务才能生效。**
- firstEp: taosd启动时主动连接的集群中首个dnode的end point, 默认值为localhost:6030。 - firstEp: taosd启动时主动连接的集群中首个dnode的end point, 默认值为localhost:6030。
@ -97,6 +103,7 @@ TDengine系统后台服务由taosd提供可以在配置文件taos.cfg里修
- telemetryReporting: 是否允许 TDengine 采集和上报基本使用信息0表示不允许1表示允许。 默认值1。 - telemetryReporting: 是否允许 TDengine 采集和上报基本使用信息0表示不允许1表示允许。 默认值1。
- stream: 是否启用连续查询流计算功能0表示不允许1表示允许。 默认值1。 - stream: 是否启用连续查询流计算功能0表示不允许1表示允许。 默认值1。
- queryBufferSize: 为所有并发查询占用保留的内存大小。计算规则可以根据实际应用可能的最大并发数和表的数字相乘,再乘 170 。单位为字节。 - queryBufferSize: 为所有并发查询占用保留的内存大小。计算规则可以根据实际应用可能的最大并发数和表的数字相乘,再乘 170 。单位为字节。
- ratioOfQueryCores: 设置查询线程的最大数量。最小值0 表示只有1个查询线程最大值2表示最大建立2倍CPU核数的查询线程。默认为1表示最大和CPU核数相等的查询线程。该值可以为小数即0.5表示最大建立CPU核数一半的查询线程。
**注意:**对于端口TDengine会使用从serverPort起13个连续的TCP和UDP端口号请务必在防火墙打开。因此如果是缺省配置需要打开从6030都6042共13个端口而且必须TCP和UDP都打开。 **注意:**对于端口TDengine会使用从serverPort起13个连续的TCP和UDP端口号请务必在防火墙打开。因此如果是缺省配置需要打开从6030都6042共13个端口而且必须TCP和UDP都打开。
@ -152,7 +159,13 @@ ALTER DNODE <dnode_id> <config>
## 客户端配置 ## 客户端配置
TDengine系统的前台交互客户端应用程序为taos它与taosd共享同一个配置文件taos.cfg。运行taos时使用参数-c指定配置文件目录如taos -c /home/cfg表示使用/home/cfg/目录下的taos.cfg配置文件中的参数缺省目录是/etc/taos。本节主要说明 taos 客户端应用在配置文件 taos.cfg 文件中使用到的参数。 TDengine系统的前台交互客户端应用程序为taos以及应用驱动它与taosd共享同一个配置文件taos.cfg。运行taos时使用参数-c指定配置文件目录如taos -c /home/cfg表示使用/home/cfg/目录下的taos.cfg配置文件中的参数缺省目录是/etc/taos。更多taos的使用方法请见[Shell命令行程序](https://www.taosdata.com/cn/documentation/administrator/#_TDengine_Shell命令行程序)。本节主要说明 taos 客户端应用在配置文件 taos.cfg 文件中使用到的参数。
**2.0.10.0 之后版本支持命令行以下参数显示当前客户端参数的配置**
```bash
taos -C 或 taos --dump-config
```
客户端配置参数 客户端配置参数
@ -215,15 +228,15 @@ TDengine系统的前台交互客户端应用程序为taos它与taosd共享同
均是合法的设置东八区时区的格式。 均是合法的设置东八区时区的格式。
时区的设置对于查询和写入SQL语句中非Unix时间戳的内容时间戳字符串、关键词now的解析产生影响。例如 时区的设置对于查询和写入SQL语句中非Unix时间戳的内容时间戳字符串、关键词now的解析产生影响。例如
``` ```sql
SELECT count(*) FROM table_name WHERE TS<'2019-04-11 12:01:08'; SELECT count(*) FROM table_name WHERE TS<'2019-04-11 12:01:08';
``` ```
在东八区SQL语句等效于 在东八区SQL语句等效于
``` ```sql
SELECT count(*) FROM table_name WHERE TS<1554955268000; SELECT count(*) FROM table_name WHERE TS<1554955268000;
``` ```
在UTC时区SQL语句等效于 在UTC时区SQL语句等效于
``` ```sql
SELECT count(*) FROM table_name WHERE TS<1554984068000; SELECT count(*) FROM table_name WHERE TS<1554984068000;
``` ```
为了避免使用字符串时间格式带来的不确定性也可以直接使用Unix时间戳。此外还可以在SQL语句中使用带有时区的时间戳字符串例如RFC3339格式的时间戳字符串2013-04-12T15:52:01.123+08:00或者ISO-8601格式时间戳字符串2013-04-12T15:52:01.123+0800。上述两个字符串转化为Unix时间戳不受系统所在时区的影响。 为了避免使用字符串时间格式带来的不确定性也可以直接使用Unix时间戳。此外还可以在SQL语句中使用带有时区的时间戳字符串例如RFC3339格式的时间戳字符串2013-04-12T15:52:01.123+08:00或者ISO-8601格式时间戳字符串2013-04-12T15:52:01.123+0800。上述两个字符串转化为Unix时间戳不受系统所在时区的影响。
@ -239,31 +252,31 @@ TDengine系统的前台交互客户端应用程序为taos它与taosd共享同
系统管理员可以在CLI界面里添加、删除用户也可以修改密码。CLI里SQL语法如下 系统管理员可以在CLI界面里添加、删除用户也可以修改密码。CLI里SQL语法如下
``` ```sql
CREATE USER <user_name> PASS <'password'>; CREATE USER <user_name> PASS <'password'>;
``` ```
创建用户,并指定用户名和密码,密码需要用单引号引起来,单引号为英文半角 创建用户,并指定用户名和密码,密码需要用单引号引起来,单引号为英文半角
``` ```sql
DROP USER <user_name>; DROP USER <user_name>;
``` ```
删除用户限root用户使用 删除用户限root用户使用
``` ```sql
ALTER USER <user_name> PASS <'password'>; ALTER USER <user_name> PASS <'password'>;
``` ```
修改用户密码, 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角 修改用户密码, 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
``` ```sql
ALTER USER <user_name> PRIVILEGE <super|write|read>; ALTER USER <user_name> PRIVILEGE <super|write|read>;
``` ```
修改用户权限为super/write/read不需要添加单引号 修改用户权限为super/write/read不需要添加单引号
``` ```mysql
SHOW USERS; SHOW USERS;
``` ```
@ -316,12 +329,11 @@ taos> DESCRIBE d1001
``` ```
那么可以用如下命令导入数据 那么可以用如下命令导入数据
``` ```mysql
taos> insert into d1001 file '~/data.csv'; taos> insert into d1001 file '~/data.csv';
Query OK, 9 row(s) affected (0.004763s) Query OK, 9 row(s) affected (0.004763s)
``` ```
**taosdump工具导入** **taosdump工具导入**
TDengine提供了方便的数据库导入导出工具taosdump。用户可以将taosdump从一个系统导出的数据导入到其他系统中。具体使用方法请参见博客<a href='https://www.taosdata.com/blog/2020/03/09/1334.html'>TDengine DUMP工具使用指南</a> TDengine提供了方便的数据库导入导出工具taosdump。用户可以将taosdump从一个系统导出的数据导入到其他系统中。具体使用方法请参见博客<a href='https://www.taosdata.com/blog/2020/03/09/1334.html'>TDengine DUMP工具使用指南</a>
@ -334,7 +346,7 @@ TDengine提供了方便的数据库导入导出工具taosdump。用户可以将t
如果用户需要导出一个表或一个STable中的数据可在shell中运行 如果用户需要导出一个表或一个STable中的数据可在shell中运行
``` ```mysql
select * from <tb_name> >> data.csv; select * from <tb_name> >> data.csv;
``` ```
@ -348,37 +360,37 @@ TDengine提供了方便的数据库导出工具taosdump。用户可以根据需
系统管理员可以从CLI查询系统的连接、正在进行的查询、流式计算并且可以关闭连接、停止正在进行的查询和流式计算。CLI里SQL语法如下 系统管理员可以从CLI查询系统的连接、正在进行的查询、流式计算并且可以关闭连接、停止正在进行的查询和流式计算。CLI里SQL语法如下
``` ```mysql
SHOW CONNECTIONS; SHOW CONNECTIONS;
``` ```
显示数据库的连接其中一列显示ip:port, 为连接的IP地址和端口号。 显示数据库的连接其中一列显示ip:port, 为连接的IP地址和端口号。
``` ```mysql
KILL CONNECTION <connection-id>; KILL CONNECTION <connection-id>;
``` ```
强制关闭数据库连接其中的connection-id是SHOW CONNECTIONS中显示的第一列的数字。 强制关闭数据库连接其中的connection-id是SHOW CONNECTIONS中显示的第一列的数字。
``` ```mysql
SHOW QUERIES; SHOW QUERIES;
``` ```
显示数据查询其中第一列显示的以冒号隔开的两个数字为query-id为发起该query应用连接的connection-id和查询次数。 显示数据查询其中第一列显示的以冒号隔开的两个数字为query-id为发起该query应用连接的connection-id和查询次数。
``` ```mysql
KILL QUERY <query-id>; KILL QUERY <query-id>;
``` ```
强制关闭数据查询其中query-id是SHOW QUERIES中显示的 connection-id:query-no字串如“105:2”拷贝粘贴即可。 强制关闭数据查询其中query-id是SHOW QUERIES中显示的 connection-id:query-no字串如“105:2”拷贝粘贴即可。
``` ```mysql
SHOW STREAMS; SHOW STREAMS;
``` ```
显示流式计算其中第一列显示的以冒号隔开的两个数字为stream-id, 为启动该stream应用连接的connection-id和发起stream的次数。 显示流式计算其中第一列显示的以冒号隔开的两个数字为stream-id, 为启动该stream应用连接的connection-id和发起stream的次数。
``` ```mysql
KILL STREAM <stream-id>; KILL STREAM <stream-id>;
``` ```

View File

@ -137,6 +137,16 @@ DROP DNODE "fqdn:port";
其中fqdn是被删除的节点的FQDNport是其对外服务器的端口号 其中fqdn是被删除的节点的FQDNport是其对外服务器的端口号
<font color=green>**【注意】**</font>
- 一个数据节点一旦被drop之后不能重新加入集群。需要将此节点重新部署清空数据文件夹。集群在完成drop dnode操作之前会将该dnode的数据迁移走。
- 请注意 drop dnode 和 停止taosd进程是两个不同的概念不要混淆因为删除dnode之前要执行迁移数据的操作因此被删除的dnode必须保持在线状态。待删除操作结束之后才能停止taosd进程。
- 一个数据节点被drop之后其他节点都会感知到这个dnodeID的删除操作任何集群中的节点都不会再接收此dnodeID的请求。
- dnodeID的是集群自动分配的不得人工指定。它在生成时递增的不会重复。
### 查看数据节点 ### 查看数据节点
执行CLI程序taos,使用root账号登录进TDengine系统执行 执行CLI程序taos,使用root账号登录进TDengine系统执行

View File

@ -19,7 +19,10 @@ TDengine提供了丰富的应用程序开发接口其中包括C/C++、C# 、J
其中 ● 表示经过官方测试验证, ○ 表示非官方测试验证。 其中 ● 表示经过官方测试验证, ○ 表示非官方测试验证。
注意:所有执行 SQL 语句的 API例如 C/C++ Connector 中的 `tao_query`、`taos_query_a`、`taos_subscribe` 等以及其它语言中与它们对应的API每次都只能执行一条 SQL 语句,如果实际参数中包含了多条语句,它们的行为是未定义的。 注意:
* 所有执行 SQL 语句的 API例如 C/C++ Connector 中的 `tao_query`、`taos_query_a`、`taos_subscribe` 等以及其它语言中与它们对应的API每次都只能执行一条 SQL 语句,如果实际参数中包含了多条语句,它们的行为是未定义的。
* 升级到TDengine到2.0.8.0版本的用户必须更新JDBC连接TDengine必须升级taos-jdbcdriver到2.0.12及以上。
## C/C++ Connector ## C/C++ Connector

View File

@ -381,6 +381,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
taosScheduleTask(tscQhandle, &schedMsg); taosScheduleTask(tscQhandle, &schedMsg);
} }
void tscQueueAsyncRes(SSqlObj *pSql) { void tscQueueAsyncRes(SSqlObj *pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pSql); tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
@ -390,7 +391,10 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
assert(pSql->fp != NULL && pSql->fetchFp != NULL);
if (pSql->fp == NULL || pSql->fetchFp == NULL){
return;
}
pSql->fp = pSql->fetchFp; pSql->fp = pSql->fetchFp;
(*pSql->fp)(pSql->param, pSql, pRes->code); (*pSql->fp)(pSql->param, pSql, pRes->code);

View File

@ -243,7 +243,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj; STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(sizeof(SMsgVersion) + pCmd->payloadLen); char *pMsg = rpcMallocCont(pCmd->payloadLen);
if (NULL == pMsg) { if (NULL == pMsg) {
tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]); tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
@ -254,13 +254,12 @@ int tscSendMsgToServer(SSqlObj *pSql) {
tscDumpMgmtEpSet(pSql); tscDumpMgmtEpSet(pSql);
} }
tstrncpy(pMsg, version, sizeof(SMsgVersion)); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
memcpy(pMsg + sizeof(SMsgVersion), pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType, .msgType = pSql->cmd.msgType,
.pCont = pMsg, .pCont = pMsg,
.contLen = pSql->cmd.payloadLen + sizeof(SMsgVersion), .contLen = pSql->cmd.payloadLen,
.ahandle = (void*)pSql->self, .ahandle = (void*)pSql->self,
.handle = NULL, .handle = NULL,
.code = 0 .code = 0
@ -2146,6 +2145,10 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
if (pConnect->epSet.numOfEps > 0) { if (pConnect->epSet.numOfEps > 0) {
tscEpSetHtons(&pConnect->epSet); tscEpSetHtons(&pConnect->epSet);
tscUpdateMgmtEpSet(pSql, &pConnect->epSet); tscUpdateMgmtEpSet(pSql, &pConnect->epSet);
for (int i = 0; i < pConnect->epSet.numOfEps; ++i) {
tscDebug("%p epSet.fqdn[%d]: %s, pObj:%p", pSql, i, pConnect->epSet.fqdn[i], pObj);
}
} }
strcpy(pObj->sversion, pConnect->serverVersion); strcpy(pObj->sversion, pConnect->serverVersion);

View File

@ -124,6 +124,8 @@ void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) {
SMnodeMsg *pRead = mnodeCreateMsg(pMsg); SMnodeMsg *pRead = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
} }
rpcFreeCont(pMsg->pCont);
} }
static void dnodeFreeMReadMsg(SMnodeMsg *pRead) { static void dnodeFreeMReadMsg(SMnodeMsg *pRead) {

View File

@ -125,6 +125,8 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
} }
rpcFreeCont(pMsg->pCont);
} }
static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) { static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) {

View File

@ -127,20 +127,7 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} else {} } else {}
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
SMsgVersion *pMsgVersion = pMsg->pCont;
if (taosCheckVersion(pMsgVersion->clientVersion, version, 3) != TSDB_CODE_SUCCESS) {
rpcMsg.code = TSDB_CODE_TSC_INVALID_VERSION;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
return; // todo change the error code
}
pMsg->pCont += sizeof(*pMsgVersion);
pMsg->contLen -= sizeof(*pMsgVersion);
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
//pMsg->contLen += sizeof(*pMsgVersion);
rpcFreeCont(pMsg->pCont - sizeof(*pMsgVersion));
} else { } else {
dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
@ -244,4 +231,4 @@ SStatisInfo dnodeGetStatisInfo() {
} }
return info; return info;
} }

View File

@ -77,6 +77,8 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
rpcFreeCont(pMsg->pCont);
} }
void *dnodeAllocVQueryQueue(void *pVnode) { void *dnodeAllocVQueryQueue(void *pVnode) {

View File

@ -102,6 +102,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
} }
vnodeRelease(pVnode); vnodeRelease(pVnode);
rpcFreeCont(pRpcMsg->pCont);
} }
void *dnodeAllocVWriteQueue(void *pVnode) { void *dnodeAllocVWriteQueue(void *pVnode) {

View File

@ -267,6 +267,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_PROTOCOL, 0, 0x0905, "Mismatched protocol")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_CLUSTERID, 0, 0x0906, "Mismatched clusterId")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, 0, 0x0907, "Mismatched signature")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, 0, 0x0908, "Invalid msg checksum")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, 0, 0x0909, "Invalid msg length")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, 0, 0x090A, "Invalid msg type")
// wal // wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")

View File

@ -198,11 +198,6 @@ typedef struct {
int32_t numOfVnodes; int32_t numOfVnodes;
} SMsgDesc; } SMsgDesc;
typedef struct SMsgVersion {
char clientVersion[TSDB_VERSION_LEN];
uint32_t crc;
} SMsgVersion;
typedef struct SMsgHead { typedef struct SMsgHead {
int32_t contLen; int32_t contLen;
int32_t vgId; int32_t vgId;

View File

@ -119,10 +119,6 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[]; extern char *syncRole[];
//global configurable parameters //global configurable parameters
extern int32_t tsMaxSyncNum;
extern int32_t tsSyncTcpThreads;
extern int32_t tsSyncTimer;
extern int32_t tsMaxFwdInfo;
extern int32_t sDebugFlag; extern int32_t sDebugFlag;
extern char tsArbitrator[]; extern char tsArbitrator[];
extern uint16_t tsSyncPort; extern uint16_t tsSyncPort;

View File

@ -48,7 +48,7 @@ typedef struct {
void * pVnode; void * pVnode;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
SRspRet rspRet; SRspRet rspRet;
char reserveForSync[16]; char reserveForSync[24];
SWalHead pHead[]; SWalHead pHead[];
} SVWriteMsg; } SVWriteMsg;

View File

@ -86,6 +86,6 @@ extern void set_terminal_mode();
extern int get_old_terminal_mode(struct termios* tio); extern int get_old_terminal_mode(struct termios* tio);
extern void reset_terminal_mode(); extern void reset_terminal_mode();
extern SShellArguments args; extern SShellArguments args;
extern TAOS_RES* result; extern int64_t result;
#endif #endif

View File

@ -46,7 +46,7 @@ char CONTINUE_PROMPT[] = " -> ";
int prompt_size = 6; int prompt_size = 6;
#endif #endif
TAOS_RES *result = NULL; int64_t result = 0;
SShellHistory history; SShellHistory history;
#define DEFAULT_MAX_BINARY_DISPLAY_WIDTH 30 #define DEFAULT_MAX_BINARY_DISPLAY_WIDTH 30
@ -260,6 +260,14 @@ int32_t shellRunCommand(TAOS* con, char* command) {
} }
void freeResultWithRid(int64_t rid) {
SSqlObj* pSql = taosAcquireRef(tscObjRef, rid);
if(pSql){
taos_free_result(pSql);
taosReleaseRef(tscObjRef, rid);
}
}
void shellRunCommandOnServer(TAOS *con, char command[]) { void shellRunCommandOnServer(TAOS *con, char command[]) {
int64_t st, et; int64_t st, et;
wordexp_t full_path; wordexp_t full_path;
@ -294,18 +302,22 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
TAOS_RES* pSql = taos_query_h(con, command, &result); TAOS_RES* tmpSql = NULL;
TAOS_RES* pSql = taos_query_h(con, command, &tmpSql);
if (taos_errno(pSql)) { if (taos_errno(pSql)) {
taos_error(pSql, st); taos_error(pSql, st);
return; return;
} }
atomic_store_64(&result, ((SSqlObj*)tmpSql)->self);
int64_t oresult = atomic_load_64(&result);
if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
fprintf(stdout, "Database changed.\n\n"); fprintf(stdout, "Database changed.\n\n");
fflush(stdout); fflush(stdout);
atomic_store_ptr(&result, 0); atomic_store_64(&result, 0);
taos_free_result(pSql); freeResultWithRid(oresult);
return; return;
} }
@ -313,8 +325,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
int error_no = 0; int error_no = 0;
int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode); int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
if (numOfRows < 0) { if (numOfRows < 0) {
atomic_store_ptr(&result, 0); atomic_store_64(&result, 0);
taos_free_result(pSql); freeResultWithRid(oresult);
return; return;
} }
@ -336,8 +348,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
wordfree(&full_path); wordfree(&full_path);
} }
atomic_store_ptr(&result, 0); atomic_store_64(&result, 0);
taos_free_result(pSql); freeResultWithRid(oresult);
} }
/* Function to do regular expression check */ /* Function to do regular expression check */
@ -501,7 +513,7 @@ static int dumpResultToFile(const char* fname, TAOS_RES* tres) {
row = taos_fetch_row(tres); row = taos_fetch_row(tres);
} while( row != NULL); } while( row != NULL);
result = NULL; result = 0;
fclose(fp); fclose(fp);
return numOfRows; return numOfRows;

View File

@ -19,15 +19,31 @@
#include "tnettest.h" #include "tnettest.h"
pthread_t pid; pthread_t pid;
static tsem_t cancelSem;
void shellQueryInterruptHandler(int signum) { void shellQueryInterruptHandler(int signum) {
tsem_post(&cancelSem);
}
void *cancelHandler(void *arg) {
while(1) {
if (tsem_wait(&cancelSem) != 0) {
taosMsleep(10);
continue;
}
#ifdef LINUX #ifdef LINUX
void* pResHandle = atomic_val_compare_exchange_64(&result, result, 0); int64_t rid = atomic_val_compare_exchange_64(&result, result, 0);
taos_stop_query(pResHandle); SSqlObj* pSql = taosAcquireRef(tscObjRef, rid);
taos_stop_query(pSql);
taosReleaseRef(tscObjRef, rid);
#else #else
printf("\nReceive ctrl+c or other signal, quit shell.\n"); printf("\nReceive ctrl+c or other signal, quit shell.\n");
exit(0); exit(0);
#endif #endif
}
return NULL;
} }
int checkVersion() { int checkVersion() {
@ -105,6 +121,14 @@ int main(int argc, char* argv[]) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (tsem_init(&cancelSem, 0, 0) != 0) {
printf("failed to create cancel semphore\n");
exit(EXIT_FAILURE);
}
pthread_t spid;
pthread_create(&spid, NULL, cancelHandler, NULL);
/* Interrupt handler. */ /* Interrupt handler. */
struct sigaction act; struct sigaction act;
memset(&act, 0, sizeof(struct sigaction)); memset(&act, 0, sizeof(struct sigaction));

View File

@ -59,7 +59,7 @@ typedef struct SSdbRow {
SMnodeMsg *pMsg; SMnodeMsg *pMsg;
int32_t (*fpReq)(SMnodeMsg *pMsg); int32_t (*fpReq)(SMnodeMsg *pMsg);
int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code); int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code);
char reserveForSync[16]; char reserveForSync[24];
SWalHead pHead[]; SWalHead pHead[];
} SSdbRow; } SSdbRow;

View File

@ -294,6 +294,11 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) {
*epSet = tsMEpForShell; *epSet = tsMEpForShell;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
if (mnodeGetDnodesNum() <= 1) {
epSet->numOfEps = 0;
return;
}
mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse); mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (redirect && strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { if (redirect && strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {

View File

@ -346,8 +346,8 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {
A = tSetCreateSQLElems(NULL, NULL, S, TSQL_CREATE_STREAM); A = tSetCreateSQLElems(NULL, NULL, S, TSQL_CREATE_STREAM);
setSQLInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE); setSQLInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
U.n += Z.n; V.n += Z.n;
setCreatedTableName(pInfo, &U, &V); setCreatedTableName(pInfo, &V, &U);
} }
%type column{TAOS_FIELD} %type column{TAOS_FIELD}

View File

@ -2428,8 +2428,8 @@ static void yy_reduce(
yylhsminor.yy538 = tSetCreateSqlElems(NULL, NULL, yymsp[0].minor.yy84, TSQL_CREATE_STREAM); yylhsminor.yy538 = tSetCreateSqlElems(NULL, NULL, yymsp[0].minor.yy84, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, yylhsminor.yy538, NULL, TSDB_SQL_CREATE_TABLE); setSqlInfo(pInfo, yylhsminor.yy538, NULL, TSDB_SQL_CREATE_TABLE);
yymsp[-4].minor.yy0.n += yymsp[-2].minor.yy0.n; yymsp[-3].minor.yy0.n += yymsp[-2].minor.yy0.n;
setCreatedTableName(pInfo, &yymsp[-4].minor.yy0, &yymsp[-3].minor.yy0); setCreatedTableName(pInfo, &yymsp[-3].minor.yy0, &yymsp[-4].minor.yy0);
} }
yymsp[-4].minor.yy538 = yylhsminor.yy538; yymsp[-4].minor.yy538 = yylhsminor.yy538;
break; break;

View File

@ -5,12 +5,12 @@ INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX) IF (TD_LINUX)
LIST(REMOVE_ITEM SRC src/tarbitrator.c) LIST(REMOVE_ITEM SRC src/syncArbitrator.c)
ADD_LIBRARY(sync ${SRC}) ADD_LIBRARY(sync ${SRC})
TARGET_LINK_LIBRARIES(sync tutil pthread common) TARGET_LINK_LIBRARIES(sync tutil pthread common)
LIST(APPEND BIN_SRC src/tarbitrator.c) LIST(APPEND BIN_SRC src/syncArbitrator.c)
LIST(APPEND BIN_SRC src/taosTcpPool.c) LIST(APPEND BIN_SRC src/syncTcp.c)
ADD_EXECUTABLE(tarbitrator ${BIN_SRC}) ADD_EXECUTABLE(tarbitrator ${BIN_SRC})
TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil) TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil)

View File

@ -13,12 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_SYNCINT_H #ifndef TDENGINE_SYNC_INT_H
#define TDENGINE_SYNCINT_H #define TDENGINE_SYNC_INT_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "syncMsg.h"
#include "twal.h"
#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }} #define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }}
#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }} #define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }}
@ -27,86 +29,22 @@ extern "C" {
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
typedef enum { #define SYNC_TCP_THREADS 2
TAOS_SMSG_SYNC_DATA = 1, #define SYNC_MAX_NUM 2
TAOS_SMSG_FORWARD = 2,
TAOS_SMSG_FORWARD_RSP = 3,
TAOS_SMSG_SYNC_REQ = 4,
TAOS_SMSG_SYNC_RSP = 5,
TAOS_SMSG_SYNC_MUST = 6,
TAOS_SMSG_STATUS = 7,
TAOS_SMSG_SYNC_DATA_RSP = 8,
} ESyncMsgType;
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 10000 #define SYNC_MAX_FWDS 512
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3 #define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 15000 // ms
#define SYNC_CHECK_INTERVAL 1 // ms
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus #define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus
#pragma pack(push, 1)
typedef struct {
char type; // msg type
char pversion; // protocol version
char reserved[6]; // not used
int32_t vgId; // vg ID
int32_t len; // content length, does not include head
// char cont[]; // message content starts from here
} SSyncHead;
typedef struct {
SSyncHead syncHead;
uint16_t port;
uint16_t tranId;
char fqdn[TSDB_FQDN_LEN];
int32_t sourceId; // only for arbitrator
} SFirstPkt;
typedef struct {
int8_t sync;
int8_t reserved;
uint16_t tranId;
} SFirstPktRsp;
typedef struct {
int8_t role;
uint64_t version;
} SPeerStatus;
typedef struct {
int8_t role;
int8_t ack;
int8_t type;
int8_t reserved[3];
uint16_t tranId;
uint64_t version;
SPeerStatus peersStatus[];
} SPeersStatus;
typedef struct {
char name[TSDB_FILENAME_LEN];
uint32_t magic;
uint32_t index;
uint64_t fversion;
int64_t size;
} SFileInfo;
typedef struct {
int8_t sync;
} SFileAck;
typedef struct {
uint64_t version;
int32_t code;
} SFwdRsp;
#pragma pack(pop)
typedef struct { typedef struct {
char * buffer; char * buffer;
int32_t bufferSize; int32_t bufferSize;
@ -190,7 +128,6 @@ void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode); void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer); void syncAddPeerRef(SSyncPeer *pPeer);
int32_t syncDecPeerRef(SSyncPeer *pPeer); int32_t syncDecPeerRef(SSyncPeer *pPeer);
uint16_t syncGenTranId();
#ifdef __cplusplus #ifdef __cplusplus
} }

143
src/sync/inc/syncMsg.h Normal file
View File

@ -0,0 +1,143 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SYNC_MSG_H
#define TDENGINE_SYNC_MSG_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tsync.h"
typedef enum {
TAOS_SMSG_START = 0,
TAOS_SMSG_SYNC_DATA = 1,
TAOS_SMSG_SYNC_DATA_RSP = 2,
TAOS_SMSG_SYNC_FWD = 3,
TAOS_SMSG_SYNC_FWD_RSP = 4,
TAOS_SMSG_SYNC_REQ = 5,
TAOS_SMSG_SYNC_REQ_RSP = 6,
TAOS_SMSG_SYNC_MUST = 7,
TAOS_SMSG_SYNC_MUST_RSP = 8,
TAOS_SMSG_STATUS = 9,
TAOS_SMSG_STATUS_RSP = 10,
TAOS_SMSG_SETUP = 11,
TAOS_SMSG_SETUP_RSP = 12,
TAOS_SMSG_SYNC_FILE = 13,
TAOS_SMSG_SYNC_FILE_RSP = 14,
TAOS_SMSG_END = 15,
} ESyncMsgType;
typedef enum {
SYNC_STATUS_BROADCAST,
SYNC_STATUS_BROADCAST_RSP,
SYNC_STATUS_SETUP_CONN,
SYNC_STATUS_SETUP_CONN_RSP,
SYNC_STATUS_EXCHANGE_DATA,
SYNC_STATUS_EXCHANGE_DATA_RSP,
SYNC_STATUS_CHECK_ROLE,
SYNC_STATUS_CHECK_ROLE_RSP
} ESyncStatusType;
#pragma pack(push, 1)
typedef struct {
int8_t type; // msg type
int8_t protocol; // protocol version
uint16_t signature; // fixed value
int32_t code; //
int32_t cId; // cluster Id
int32_t vgId; // vg ID
int32_t len; // content length, does not include head
uint32_t cksum;
} SSyncHead;
typedef struct {
SSyncHead head;
uint16_t port;
uint16_t tranId;
int32_t sourceId; // only for arbitrator
char fqdn[TSDB_FQDN_LEN];
} SSyncMsg;
typedef struct {
SSyncHead head;
int8_t sync;
int8_t reserved;
uint16_t tranId;
int8_t reserverd[4];
} SSyncRsp;
typedef struct {
int8_t role;
uint64_t version;
} SPeerStatus;
typedef struct {
SSyncHead head;
int8_t role;
int8_t ack;
int8_t type;
int8_t reserved[3];
uint16_t tranId;
uint64_t version;
SPeerStatus peersStatus[TAOS_SYNC_MAX_REPLICA];
} SPeersStatus;
typedef struct {
SSyncHead head;
char name[TSDB_FILENAME_LEN];
uint32_t magic;
uint32_t index;
uint64_t fversion;
int64_t size;
} SFileInfo;
typedef struct {
SSyncHead head;
int8_t sync;
} SFileAck;
typedef struct {
SSyncHead head;
uint64_t version;
int32_t code;
} SFwdRsp;
#pragma pack(pop)
#define SYNC_PROTOCOL_VERSION 1
#define SYNC_SIGNATURE ((uint16_t)(0xCDEF))
extern char *statusType[];
uint16_t syncGenTranId();
int32_t syncCheckHead(SSyncHead *pHead);
void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len);
void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code);
void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId);
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEPEER_H

View File

@ -13,16 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_TCP_POOL_H #ifndef TDENGINE_SYNC_TCP_POOL_H
#define TDENGINE_TCP_POOL_H #define TDENGINE_SYNC_TCP_POOL_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef void *ttpool_h;
typedef void *tthread_h;
typedef struct { typedef struct {
int32_t numOfThreads; int32_t numOfThreads;
uint32_t serverIp; uint32_t serverIp;
@ -33,10 +30,10 @@ typedef struct {
void (*processIncomingConn)(int32_t fd, uint32_t ip); void (*processIncomingConn)(int32_t fd, uint32_t ip);
} SPoolInfo; } SPoolInfo;
ttpool_h taosOpenTcpThreadPool(SPoolInfo *pInfo); void *syncOpenTcpThreadPool(SPoolInfo *pInfo);
void taosCloseTcpThreadPool(ttpool_h); void syncCloseTcpThreadPool(void *);
void * taosAllocateTcpConn(void *, void *ahandle, int32_t connFd); void *syncAllocateTcpConn(void *, void *ahandle, int32_t connFd);
void taosFreeTcpConn(void *); void syncFreeTcpConn(void *);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -22,17 +22,17 @@
#include "tsocket.h" #include "tsocket.h"
#include "tglobal.h" #include "tglobal.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosTcpPool.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncTcp.h"
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(void *param); static void arbProcessBrokenLink(void *param);
static int32_t arbProcessPeerMsg(void *param, void *buffer); static int32_t arbProcessPeerMsg(void *param, void *buffer);
static tsem_t tsArbSem; static tsem_t tsArbSem;
static ttpool_h tsArbTcpPool; static void * tsArbTcpPool;
typedef struct { typedef struct {
char id[TSDB_EP_LEN + 24]; char id[TSDB_EP_LEN + 24];
@ -90,7 +90,7 @@ int32_t main(int32_t argc, char *argv[]) {
info.processBrokenLink = arbProcessBrokenLink; info.processBrokenLink = arbProcessBrokenLink;
info.processIncomingMsg = arbProcessPeerMsg; info.processIncomingMsg = arbProcessPeerMsg;
info.processIncomingConn = arbProcessIncommingConnection; info.processIncomingConn = arbProcessIncommingConnection;
tsArbTcpPool = taosOpenTcpThreadPool(&info); tsArbTcpPool = syncOpenTcpThreadPool(&info);
if (tsArbTcpPool == NULL) { if (tsArbTcpPool == NULL) {
sDebug("failed to open TCP thread pool, exit..."); sDebug("failed to open TCP thread pool, exit...");
@ -101,8 +101,8 @@ int32_t main(int32_t argc, char *argv[]) {
tsem_wait(&tsArbSem); tsem_wait(&tsArbSem);
taosCloseTcpThreadPool(tsArbTcpPool); syncCloseTcpThreadPool(tsArbTcpPool);
sInfo("TAOS arbitrator is shut down\n"); sInfo("TAOS arbitrator is shut down");
closelog(); closelog();
return 0; return 0;
@ -113,9 +113,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
tinet_ntoa(ipstr, sourceIp); tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr); sDebug("peer TCP connection from ip:%s", ipstr);
SFirstPkt firstPkt; SSyncMsg msg;
if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno)); sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno));
taosCloseSocket(connFd); taosCloseSocket(connFd);
return; return;
} }
@ -127,9 +127,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return; return;
} }
firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0; msg.fqdn[TSDB_FQDN_LEN - 1] = 0;
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", msg.sourceId, msg.fqdn, msg.port);
if (firstPkt.syncHead.vgId) { if (msg.head.vgId) {
sDebug("%s, vgId in head is not zero, close the connection", pNode->id); sDebug("%s, vgId in head is not zero, close the connection", pNode->id);
tfree(pNode); tfree(pNode);
taosCloseSocket(connFd); taosCloseSocket(connFd);
@ -138,7 +138,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
sDebug("%s, arbitrator request is accepted", pNode->id); sDebug("%s, arbitrator request is accepted", pNode->id);
pNode->nodeFd = connFd; pNode->nodeFd = connFd;
pNode->pConn = taosAllocateTcpConn(tsArbTcpPool, pNode, connFd); pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, pNode, connFd);
return; return;
} }
@ -156,8 +156,8 @@ static int32_t arbProcessPeerMsg(void *param, void *buffer) {
int32_t bytes = 0; int32_t bytes = 0;
char * cont = (char *)buffer; char * cont = (char *)buffer;
int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head)); int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(SSyncHead));
if (hlen != sizeof(head)) { if (hlen != sizeof(SSyncHead)) {
sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen); sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen);
return -1; return -1;
} }

View File

@ -23,26 +23,19 @@
#include "tsocket.h" #include "tsocket.h"
#include "tglobal.h" #include "tglobal.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosTcpPool.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
#include "syncTcp.h"
#include "syncInt.h" #include "syncInt.h"
// global configurable int32_t tsSyncNum = 0; // number of sync in process in whole system
int32_t tsMaxSyncNum = 2; char tsNodeFqdn[TSDB_FQDN_LEN] = {0};
int32_t tsSyncTcpThreads = 2;
int32_t tsMaxFwdInfo = 512;
int32_t tsSyncTimer = 1;
// module global, not configurable static void * tsTcpPool = NULL;
int32_t tsSyncNum; // number of sync in process in whole system static void * tsSyncTmrCtrl = NULL;
char tsNodeFqdn[TSDB_FQDN_LEN]; static void * tsVgIdHash = NULL;
static int32_t tsSyncRefId = -1;
static ttpool_h tsTcpPool;
static void * tsSyncTmrCtrl = NULL;
static void * tsVgIdHash;
static int32_t tsSyncRefId = -1;
// local functions // local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
@ -80,36 +73,10 @@ char *syncStatus[] = {
"invalid" "invalid"
}; };
typedef enum {
SYNC_STATUS_BROADCAST,
SYNC_STATUS_BROADCAST_RSP,
SYNC_STATUS_SETUP_CONN,
SYNC_STATUS_SETUP_CONN_RSP,
SYNC_STATUS_EXCHANGE_DATA,
SYNC_STATUS_EXCHANGE_DATA_RSP,
SYNC_STATUS_CHECK_ROLE,
SYNC_STATUS_CHECK_ROLE_RSP
} ESyncStatusType;
char *statusType[] = {
"broadcast",
"broadcast-rsp",
"setup-conn",
"setup-conn-rsp",
"exchange-data",
"exchange-data-rsp",
"check-role",
"check-role-rsp"
};
uint16_t syncGenTranId() {
return taosRand() & 0XFFFF;
}
int32_t syncInit() { int32_t syncInit() {
SPoolInfo info = {0}; SPoolInfo info = {0};
info.numOfThreads = tsSyncTcpThreads; info.numOfThreads = SYNC_TCP_THREADS;
info.serverIp = 0; info.serverIp = 0;
info.port = tsSyncPort; info.port = tsSyncPort;
info.bufferSize = SYNC_MAX_SIZE; info.bufferSize = SYNC_MAX_SIZE;
@ -117,7 +84,7 @@ int32_t syncInit() {
info.processIncomingMsg = syncProcessPeerMsg; info.processIncomingMsg = syncProcessPeerMsg;
info.processIncomingConn = syncProcessIncommingConnection; info.processIncomingConn = syncProcessIncommingConnection;
tsTcpPool = taosOpenTcpThreadPool(&info); tsTcpPool = syncOpenTcpThreadPool(&info);
if (tsTcpPool == NULL) { if (tsTcpPool == NULL) {
sError("failed to init tcpPool"); sError("failed to init tcpPool");
return -1; return -1;
@ -126,16 +93,16 @@ int32_t syncInit() {
tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
if (tsSyncTmrCtrl == NULL) { if (tsSyncTmrCtrl == NULL) {
sError("failed to init tmrCtrl"); sError("failed to init tmrCtrl");
taosCloseTcpThreadPool(tsTcpPool); syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
return -1; return -1;
} }
tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVgIdHash == NULL) { if (tsVgIdHash == NULL) {
sError("failed to init tsVgIdHash"); sError("failed to init vgIdHash");
taosTmrCleanUp(tsSyncTmrCtrl); taosTmrCleanUp(tsSyncTmrCtrl);
taosCloseTcpThreadPool(tsTcpPool); syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
tsSyncTmrCtrl = NULL; tsSyncTmrCtrl = NULL;
return -1; return -1;
@ -155,7 +122,7 @@ int32_t syncInit() {
void syncCleanUp() { void syncCleanUp() {
if (tsTcpPool) { if (tsTcpPool) {
taosCloseTcpThreadPool(tsTcpPool); syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
} }
@ -236,7 +203,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum,
syncRole[nodeRole]); syncRole[nodeRole]);
pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo * sizeof(SFwdInfo), 1); pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + SYNC_MAX_FWDS * sizeof(SFwdInfo), 1);
if (pNode->pSyncFwds == NULL) { if (pNode->pSyncFwds == NULL) {
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -336,6 +303,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
newPeers[i] = pNode->peerInfo[j]; newPeers[i] = pNode->peerInfo[j];
} }
if (newPeers[i] == NULL) {
sError("vgId:%d, failed to reconfig", pNode->vgId);
return TSDB_CODE_SYN_INVALID_CONFIG;
}
if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) { if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) {
pNode->selfIndex = i; pNode->selfIndex = i;
} }
@ -384,21 +356,13 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncPeer *pPeer = pNode->pMaster; SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) { if (pPeer && pNode->quorum > 1) {
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; SFwdRsp rsp;
syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code);
SSyncHead *pHead = (SSyncHead *)msg; if (taosWriteMsg(pPeer->peerFd, &rsp, sizeof(SFwdRsp)) == sizeof(SFwdRsp)) {
pHead->type = TAOS_SMSG_FORWARD_RSP; sTrace("%s, forward-rsp is sent, code:0x%x hver:%" PRIu64, pPeer->id, code, version);
pHead->len = sizeof(SFwdRsp);
SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead));
pFwdRsp->version = version;
pFwdRsp->code = code;
int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
if (taosWriteMsg(pPeer->peerFd, msg, msgLen) == msgLen) {
sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
} else { } else {
sDebug("%s, failed to send forward ack, restart", pPeer->id); sDebug("%s, failed to send forward-rsp, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
} }
} }
@ -509,7 +473,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) {
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1; pPeer->peerFd = -1;
taosFreeTcpConn(pPeer->pConn); syncFreeTcpConn(pPeer->pConn);
} }
} }
@ -779,7 +743,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
sDebug("%s, check peer connection in 1000 ms", pPeer->id); sDebug("%s, check peer connection in 1000 ms", pPeer->id);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
@ -857,7 +821,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
// Ensure the sync of mnode not interrupted // Ensure the sync of mnode not interrupted
if (pNode->vgId != 1 && tsSyncNum >= tsMaxSyncNum) { if (pNode->vgId != 1 && tsSyncNum >= SYNC_MAX_NUM) {
sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum); sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum);
taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return; return;
@ -865,56 +829,42 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
sDebug("%s, try to sync", pPeer->id); sDebug("%s, try to sync", pPeer->id);
SFirstPkt firstPkt; SSyncMsg msg;
memset(&firstPkt, 0, sizeof(firstPkt)); syncBuildSyncReqMsg(&msg, pNode->vgId);
firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ;
firstPkt.syncHead.vgId = pNode->vgId;
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
firstPkt.tranId = syncGenTranId();
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
if (taosWriteMsg(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { taosTmrReset(syncNotStarted, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("%s, failed to send sync-req to peer", pPeer->id); sError("%s, failed to send sync-req to peer", pPeer->id);
} else { } else {
sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, firstPkt.tranId, syncStatus[nodeSStatus]); sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, msg.tranId, syncStatus[nodeSStatus]);
} }
} }
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { static void syncProcessFwdResponse(SFwdRsp *pFwdRsp, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFwdRsp * pFwdRsp = (SFwdRsp *)cont;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo * pFwdInfo; SFwdInfo * pFwdInfo;
sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version); sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
bool found = false;
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
// find the forwardInfo from first // find the forwardInfo from first
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo; pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % SYNC_MAX_FWDS;
if (pFwdRsp->version == pFwdInfo->version) { if (pFwdRsp->version == pFwdInfo->version) {
found = true;
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code); syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
syncRemoveConfirmedFwdInfo(pNode); syncRemoveConfirmedFwdInfo(pNode);
break; return;
} }
} }
} }
if (!found) {
sTrace("%s, forward-rsp not found first:%d fwds:%d, code:%x hver:%" PRIu64, pPeer->id, pSyncFwds->first,
pSyncFwds->fwds, pFwdRsp->code, pFwdRsp->version);
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
}
} }
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)cont; SWalHead * pHead = (SWalHead *)(cont + sizeof(SSyncHead));
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
@ -931,9 +881,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
} }
} }
static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status is received, self:%s:%s:%" PRIu64 ", peer:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", sDebug("%s, status is received, self:%s:%s:%" PRIu64 ", peer:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role], pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role],
@ -947,23 +896,22 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
} }
} }
static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) {
if (pPeer->peerFd < 0) return -1; if (pPeer->peerFd < 0) return -1;
int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
if (hlen != sizeof(SSyncHead)) { if (hlen != sizeof(SSyncHead)) {
sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen); sDebug("%s, failed to read msg since %s, hlen:%d", pPeer->id, tstrerror(errno), hlen);
return -1; return -1;
} }
// head.len = htonl(head.len); int32_t code = syncCheckHead(pHead);
if (pHead->len < 0) { if (code != 0) {
sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len); sError("%s, failed to check msg head since %s, type:%d", pPeer->id, tstrerror(code), pHead->type);
return -1; return -1;
} }
assert(pHead->len <= TSDB_MAX_WAL_SIZE); int32_t bytes = taosReadMsg(pPeer->peerFd, (char *)pHead + sizeof(SSyncHead), pHead->len);
int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
if (bytes != pHead->len) { if (bytes != pHead->len) {
sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len); sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
return -1; return -1;
@ -974,23 +922,22 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
static int32_t syncProcessPeerMsg(void *param, void *buffer) { static int32_t syncProcessPeerMsg(void *param, void *buffer) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncHead head; SSyncHead *pHead = buffer;
char * cont = buffer;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
int32_t code = syncReadPeerMsg(pPeer, &head, cont); int32_t code = syncReadPeerMsg(pPeer, pHead);
if (code == 0) { if (code == 0) {
if (head.type == TAOS_SMSG_FORWARD) { if (pHead->type == TAOS_SMSG_SYNC_FWD) {
syncProcessForwardFromPeer(cont, pPeer); syncProcessForwardFromPeer(buffer, pPeer);
} else if (head.type == TAOS_SMSG_FORWARD_RSP) { } else if (pHead->type == TAOS_SMSG_SYNC_FWD_RSP) {
syncProcessFwdResponse(cont, pPeer); syncProcessFwdResponse(buffer, pPeer);
} else if (head.type == TAOS_SMSG_SYNC_REQ) { } else if (pHead->type == TAOS_SMSG_SYNC_REQ) {
syncProcessSyncRequest(cont, pPeer); syncProcessSyncRequest(buffer, pPeer);
} else if (head.type == TAOS_SMSG_STATUS) { } else if (pHead->type == TAOS_SMSG_STATUS) {
syncProcessPeersStatusMsg(cont, pPeer); syncProcessPeersStatusMsg(buffer, pPeer);
} }
} }
@ -999,36 +946,30 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
return code; return code;
} }
#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) { static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) {
SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0};
if (pPeer->peerFd < 0 || pPeer->ip == 0) return; if (pPeer->peerFd < 0 || pPeer->ip == 0) return;
SSyncHead * pHead = (SSyncHead *)msg; SSyncNode * pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead)); SPeersStatus msg;
pHead->type = TAOS_SMSG_STATUS; memset(&msg, 0, sizeof(SPeersStatus));
pHead->len = statusMsgLen - sizeof(SSyncHead); syncBuildPeersStatus(&msg, pNode->vgId);
pPeersStatus->version = nodeVersion; msg.role = nodeRole;
pPeersStatus->role = nodeRole; msg.ack = ack;
pPeersStatus->ack = ack; msg.type = type;
pPeersStatus->type = type; msg.tranId = tranId;
pPeersStatus->tranId = tranId; msg.version = nodeVersion;
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role; msg.peersStatus[i].role = pNode->peerInfo[i]->role;
pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; msg.peersStatus[i].version = pNode->peerInfo[i]->version;
} }
if (taosWriteMsg(pPeer->peerFd, msg, statusMsgLen) == statusMsgLen) { if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SPeersStatus)) == sizeof(SPeersStatus)) {
sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role], pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role],
syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId, syncStatus[pPeer->sstatus], pPeer->version, ack, tranId, statusType[type], pPeer->peerFd);
statusType[pPeersStatus->type], pPeer->peerFd);
} else { } else {
sDebug("%s, failed to send status msg, restart", pPeer->id); sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
@ -1048,29 +989,23 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) { if (connFd < 0) {
sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return; return;
} }
SFirstPkt firstPkt; SSyncMsg msg;
memset(&firstPkt, 0, sizeof(firstPkt)); syncBuildSyncSetupMsg(&msg, pPeer->nodeId ? pNode->vgId : 0);
firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0;
firstPkt.syncHead.type = TAOS_SMSG_STATUS;
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
firstPkt.tranId = syncGenTranId();
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) == sizeof(SSyncMsg)) {
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId); sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer); syncAddPeerRef(pPeer);
} else { } else {
sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
taosClose(connFd); taosClose(connFd);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
@ -1116,14 +1051,21 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
tinet_ntoa(ipstr, sourceIp); tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr); sDebug("peer TCP connection from ip:%s", ipstr);
SFirstPkt firstPkt; SSyncMsg msg;
if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno)); sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno));
taosCloseSocket(connFd); taosCloseSocket(connFd);
return; return;
} }
int32_t vgId = firstPkt.syncHead.vgId; int32_t code = syncCheckHead((SSyncHead *)(&msg));
if (code != 0) {
sError("failed to check peer sync msg from ip:%s since %s", ipstr, strerror(code));
taosCloseSocket(connFd);
return;
}
int32_t vgId = msg.head.vgId;
SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t)); SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t));
if (ppNode == NULL || *ppNode == NULL) { if (ppNode == NULL || *ppNode == NULL) {
sError("vgId:%d, vgId could not be found", vgId); sError("vgId:%d, vgId could not be found", vgId);
@ -1131,7 +1073,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return; return;
} }
sDebug("vgId:%d, firstPkt is received, tranId:%u", vgId, firstPkt.tranId); sDebug("vgId:%d, sync msg is received, tranId:%u", vgId, msg.tranId);
SSyncNode *pNode = *ppNode; SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
@ -1139,27 +1081,27 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
SSyncPeer *pPeer; SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) { for (i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break; if (pPeer && (strcmp(pPeer->fqdn, msg.fqdn) == 0) && (pPeer->port == msg.port)) break;
} }
pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL; pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL;
if (pPeer == NULL) { if (pPeer == NULL) {
sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, firstPkt.fqdn, firstPkt.port); sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, msg.fqdn, msg.port);
taosCloseSocket(connFd); taosCloseSocket(connFd);
// syncSendVpeerCfgMsg(sync); // syncSendVpeerCfgMsg(sync);
} else { } else {
// first packet tells what kind of link // first packet tells what kind of link
if (firstPkt.syncHead.type == TAOS_SMSG_SYNC_DATA) { if (msg.head.type == TAOS_SMSG_SYNC_DATA) {
pPeer->syncFd = connFd; pPeer->syncFd = connFd;
nodeSStatus = TAOS_SYNC_STATUS_START; nodeSStatus = TAOS_SYNC_STATUS_START;
sInfo("%s, sync-data pkt from master is received, tranId:%u, set sstatus:%s", pPeer->id, firstPkt.tranId, sInfo("%s, sync-data msg from master is received, tranId:%u, set sstatus:%s", pPeer->id, msg.tranId,
syncStatus[nodeSStatus]); syncStatus[nodeSStatus]);
syncCreateRestoreDataThread(pPeer); syncCreateRestoreDataThread(pPeer);
} else { } else {
sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd); sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd);
syncClosePeerConn(pPeer); syncClosePeerConn(pPeer);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer); syncAddPeerRef(pPeer);
sDebug("%s, ready to exchange data", pPeer->id); sDebug("%s, ready to exchange data", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId()); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId());
@ -1192,15 +1134,15 @@ static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int64_t time = taosGetTimestampMs(); int64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds >= tsMaxFwdInfo) { if (pSyncFwds->fwds >= SYNC_MAX_FWDS) {
// pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; // pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS;
// pSyncFwds->fwds--; // pSyncFwds->fwds--;
sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds); sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds);
return TSDB_CODE_SYN_TOO_MANY_FWDINFO; return TSDB_CODE_SYN_TOO_MANY_FWDINFO;
} }
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo; pSyncFwds->last = (pSyncFwds->last + 1) % SYNC_MAX_FWDS;
} }
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
@ -1223,7 +1165,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFwdInfo->confirmed == 0) break; if (pFwdInfo->confirmed == 0) break;
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS;
pSyncFwds->fwds--; pSyncFwds->fwds--;
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last; if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds); sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
@ -1248,7 +1190,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
} }
if (confirm && pFwdInfo->confirmed == 0) { if (confirm && pFwdInfo->confirmed == 0) {
sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:0x%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
(*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code); (*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code);
pFwdInfo->confirmed = 1; pFwdInfo->confirmed = 1;
} }
@ -1289,7 +1231,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS;
if (ABS(time - pFwdInfo->time) < 2000) break; if (ABS(time - pFwdInfo->time) < 2000) break;
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId, sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
@ -1334,14 +1276,12 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded // only msg from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance // a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD; syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len);
pSyncHead->pversion = 0;
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
@ -1371,4 +1311,3 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
return code; return code;
} }

109
src/sync/src/syncMsg.c Normal file
View File

@ -0,0 +1,109 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "tchecksum.h"
#include "syncInt.h"
char *statusType[] = {
"broadcast",
"broadcast-rsp",
"setup-conn",
"setup-conn-rsp",
"exchange-data",
"exchange-data-rsp",
"check-role",
"check-role-rsp"
};
uint16_t syncGenTranId() {
return taosRand() & 0XFFFF;
}
static void syncBuildHead(SSyncHead *pHead) {
pHead->protocol = SYNC_PROTOCOL_VERSION;
pHead->signature = SYNC_SIGNATURE;
pHead->code = 0;
pHead->cId = 0;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));
}
int32_t syncCheckHead(SSyncHead *pHead) {
if (pHead->protocol != SYNC_PROTOCOL_VERSION) return TSDB_CODE_SYN_MISMATCHED_PROTOCOL;
if (pHead->signature != SYNC_SIGNATURE) return TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
if (pHead->cId != 0) return TSDB_CODE_SYN_MISMATCHED_CLUSTERID;
if (pHead->len <= 0 || pHead->len > TSDB_MAX_WAL_SIZE) return TSDB_CODE_SYN_INVALID_MSGLEN;
if (pHead->type <= TAOS_SMSG_START || pHead->type >= TAOS_SMSG_END) return TSDB_CODE_SYN_INVALID_MSGTYPE;
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SSyncHead))) return TSDB_CODE_SYN_INVALID_CHECKSUM;
return TSDB_CODE_SUCCESS;
}
void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len) {
pHead->type = TAOS_SMSG_SYNC_FWD;
pHead->vgId = vgId;
pHead->len = len;
syncBuildHead(pHead);
}
void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code) {
pMsg->head.type = TAOS_SMSG_SYNC_FWD_RSP;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SFwdRsp) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
pMsg->version = version;
pMsg->code = code;
}
static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) {
pMsg->head.type = type;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
pMsg->port = tsSyncPort;
pMsg->tranId = syncGenTranId();
pMsg->sourceId = vgId;
tstrncpy(pMsg->fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
}
void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); }
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); }
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); }
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) {
pMsg->head.type = TAOS_SMSG_STATUS;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SPeersStatus) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
}
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId) {
pMsg->head.type = TAOS_SMSG_SYNC_FILE_RSP;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SFileAck) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
}
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId) {
pMsg->head.type = TAOS_SMSG_SYNC_FILE;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SFileInfo) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
}

View File

@ -56,7 +56,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo minfo; memset(&minfo, 0, sizeof(SFileInfo)); /* = {0}; */ SFileInfo minfo; memset(&minfo, 0, sizeof(SFileInfo)); /* = {0}; */
SFileInfo sinfo; memset(&sinfo, 0, sizeof(SFileInfo)); /* = {0}; */ SFileInfo sinfo; memset(&sinfo, 0, sizeof(SFileInfo)); /* = {0}; */
SFileAck fileAck = {0}; SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck));
int32_t code = -1; int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0}; char name[TSDB_FILENAME_LEN * 2] = {0};
uint32_t pindex = 0; // index in last restore uint32_t pindex = 0; // index in last restore
@ -69,7 +69,14 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
minfo.index = -1; minfo.index = -1;
int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo)); int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo));
if (ret != sizeof(SFileInfo) || minfo.index == -1) { if (ret != sizeof(SFileInfo) || minfo.index == -1) {
sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno)); sError("%s, failed to read fileinfo while restore file since %s", pPeer->id, strerror(errno));
break;
}
assert(ret == sizeof(SFileInfo));
ret = syncCheckHead((SSyncHead *)(&minfo));
if (ret != 0) {
sError("%s, failed to check fileinfo while restore file since %s", pPeer->id, strerror(ret));
break; break;
} }
@ -94,12 +101,13 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
&sinfo.fversion); &sinfo.fversion);
// if file not there or magic is not the same, file shall be synced // if file not there or magic is not the same, file shall be synced
memset(&fileAck, 0, sizeof(fileAck)); memset(&fileAck, 0, sizeof(SFileAck));
syncBuildFileAck(&fileAck, pNode->vgId);
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0; fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0;
// send file ack // send file ack
ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
if (ret != sizeof(fileAck)) { if (ret != sizeof(SFileAck)) {
sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno)); sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno));
break; break;
} }
@ -289,12 +297,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
uint64_t fversion = 0; uint64_t fversion = 0;
sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
SFirstPktRsp firstPktRsp = {.sync = 1, .tranId = syncGenTranId()}; SSyncRsp rsp = {.sync = 1, .tranId = syncGenTranId()};
if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { if (taosWriteMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) {
sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno)); sError("%s, failed to send sync rsp since %s", pPeer->id, strerror(errno));
return -1; return -1;
} }
sDebug("%s, send firstPktRsp to peer, tranId:%u", pPeer->id, firstPktRsp.tranId); sDebug("%s, send sync rsp to peer, tranId:%u", pPeer->id, rsp.tranId);
sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
int32_t code = syncRestoreFile(pPeer, &fversion); int32_t code = syncRestoreFile(pPeer, &fversion);

View File

@ -88,7 +88,7 @@ static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
static int32_t syncRetrieveFile(SSyncPeer *pPeer) { static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo)); SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo));
SFileAck fileAck = {0}; SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck));
int32_t code = -1; int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0}; char name[TSDB_FILENAME_LEN * 2] = {0};
@ -103,11 +103,12 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.size = 0; fileInfo.size = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion); &fileInfo.size, &fileInfo.fversion);
syncBuildFileInfo(&fileInfo, pNode->vgId);
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
// send the file info // send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo));
if (ret != sizeof(fileInfo)) { if (ret != sizeof(SFileInfo)) {
code = -1; code = -1;
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break; break;
@ -128,6 +129,13 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
break; break;
} }
ret = syncCheckHead((SSyncHead*)(&fileAck));
if (ret != 0) {
code = -1;
sError("%s, failed to check file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(ret));
break;
}
// set the peer sync version // set the peer sync version
pPeer->sversion = fileInfo.fversion; pPeer->sversion = fileInfo.fversion;
@ -405,27 +413,22 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) { static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFirstPkt firstPkt; SSyncMsg msg;
memset(&firstPkt, 0, sizeof(firstPkt)); syncBuildSyncDataMsg(&msg, pNode->vgId);
firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA;
firstPkt.syncHead.vgId = pNode->vgId;
firstPkt.tranId = syncGenTranId();
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { if (taosWriteMsg(pPeer->syncFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("%s, failed to send sync firstPkt since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
return -1; return -1;
} }
sDebug("%s, send sync-data pkt to peer, tranId:%u", pPeer->id, firstPkt.tranId); sDebug("%s, send sync-data msg to peer, tranId:%u", pPeer->id, msg.tranId);
SFirstPktRsp firstPktRsp; SSyncRsp rsp;
if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { if (taosReadMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) {
sError("%s, failed to read sync firstPkt rsp since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); sError("%s, failed to read sync-data rsp since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
return -1; return -1;
} }
sDebug("%s, recv firstPktRsp from peer, tranId:%u", pPeer->id, firstPkt.tranId); sDebug("%s, recv sync-data rsp from peer, tranId:%u rsp-tranId:%u", pPeer->id, msg.tranId, rsp.tranId);
return 0; return 0;
} }

View File

@ -19,10 +19,10 @@
#include "tutil.h" #include "tutil.h"
#include "tsocket.h" #include "tsocket.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosTcpPool.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncTcp.h"
typedef struct SThreadObj { typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
@ -47,12 +47,12 @@ typedef struct {
int32_t closedByApp; int32_t closedByApp;
} SConnObj; } SConnObj;
static void *taosAcceptPeerTcpConnection(void *argv); static void *syncAcceptPeerTcpConnection(void *argv);
static void *taosProcessTcpData(void *param); static void *syncProcessTcpData(void *param);
static void taosStopPoolThread(SThreadObj *pThread); static void syncStopPoolThread(SThreadObj *pThread);
static SThreadObj *taosGetTcpThread(SPoolObj *pPool); static SThreadObj *syncGetTcpThread(SPoolObj *pPool);
void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { void *syncOpenTcpThreadPool(SPoolInfo *pInfo) {
pthread_attr_t thattr; pthread_attr_t thattr;
SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
@ -80,7 +80,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) { if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) {
sError("failed to create accept thread for TCP server since %s", strerror(errno)); sError("failed to create accept thread for TCP server since %s", strerror(errno));
close(pPool->acceptFd); close(pPool->acceptFd);
tfree(pPool->pThread); tfree(pPool->pThread);
@ -94,7 +94,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
return pPool; return pPool;
} }
void taosCloseTcpThreadPool(void *param) { void syncCloseTcpThreadPool(void *param) {
SPoolObj * pPool = param; SPoolObj * pPool = param;
SThreadObj *pThread; SThreadObj *pThread;
@ -103,7 +103,7 @@ void taosCloseTcpThreadPool(void *param) {
for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) { for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) {
pThread = pPool->pThread[i]; pThread = pPool->pThread[i];
if (pThread) taosStopPoolThread(pThread); if (pThread) syncStopPoolThread(pThread);
} }
sDebug("%p TCP pool is closed", pPool); sDebug("%p TCP pool is closed", pPool);
@ -112,7 +112,7 @@ void taosCloseTcpThreadPool(void *param) {
tfree(pPool); tfree(pPool);
} }
void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = param; SPoolObj *pPool = param;
@ -122,7 +122,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
return NULL; return NULL;
} }
SThreadObj *pThread = taosGetTcpThread(pPool); SThreadObj *pThread = syncGetTcpThread(pPool);
if (pThread == NULL) { if (pThread == NULL) {
tfree(pConn); tfree(pConn);
return NULL; return NULL;
@ -149,7 +149,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
return pConn; return pConn;
} }
void taosFreeTcpConn(void *param) { void syncFreeTcpConn(void *param) {
SConnObj * pConn = param; SConnObj * pConn = param;
SThreadObj *pThread = pConn->pThread; SThreadObj *pThread = pConn->pThread;
@ -175,7 +175,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
#define maxEvents 10 #define maxEvents 10
static void *taosProcessTcpData(void *param) { static void *syncProcessTcpData(void *param) {
SThreadObj *pThread = (SThreadObj *)param; SThreadObj *pThread = (SThreadObj *)param;
SPoolObj * pPool = pThread->pPool; SPoolObj * pPool = pThread->pPool;
SPoolInfo * pInfo = &pPool->info; SPoolInfo * pInfo = &pPool->info;
@ -222,7 +222,7 @@ static void *taosProcessTcpData(void *param) {
if (pConn->closedByApp == 0) { if (pConn->closedByApp == 0) {
if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) { if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) {
taosFreeTcpConn(pConn); syncFreeTcpConn(pConn);
continue; continue;
} }
} }
@ -239,7 +239,7 @@ static void *taosProcessTcpData(void *param) {
return NULL; return NULL;
} }
static void *taosAcceptPeerTcpConnection(void *argv) { static void *syncAcceptPeerTcpConnection(void *argv) {
SPoolObj * pPool = (SPoolObj *)argv; SPoolObj * pPool = (SPoolObj *)argv;
SPoolInfo *pInfo = &pPool->info; SPoolInfo *pInfo = &pPool->info;
@ -268,7 +268,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) {
return NULL; return NULL;
} }
static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
SThreadObj *pThread = pPool->pThread[pPool->nextId]; SThreadObj *pThread = pPool->pThread[pPool->nextId];
if (pThread) return pThread; if (pThread) return pThread;
@ -286,7 +286,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread); int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)syncProcessTcpData, pThread);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret != 0) { if (ret != 0) {
@ -303,7 +303,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
return pThread; return pThread;
} }
static void taosStopPoolThread(SThreadObj *pThread) { static void syncStopPoolThread(SThreadObj *pThread) {
pthread_t thread = pThread->thread; pthread_t thread = pThread->thread;
if (!taosCheckPthreadValid(thread)) { if (!taosCheckPthreadValid(thread)) {
return; return;

View File

@ -100,7 +100,7 @@ int processRpcMsg(void *item) {
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->len = pMsg->contLen; pHead->len = pMsg->contLen;
uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version); uDebug("ver:%" PRIu64 ", rsp from client processed", pHead->version);
writeIntoWal(pHead); writeIntoWal(pHead);
syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC); syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC);
@ -275,7 +275,7 @@ int getWalInfo(int32_t vgId, char *name, int64_t *index) {
int writeToCache(int32_t vgId, void *data, int type) { int writeToCache(int32_t vgId, void *data, int type) {
SWalHead *pHead = data; SWalHead *pHead = data;
uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type); uDebug("rsp from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type);
int msgSize = pHead->len + sizeof(SWalHead); int msgSize = pHead->len + sizeof(SWalHead);
void *pMsg = taosAllocateQitem(msgSize); void *pMsg = taosAllocateQitem(msgSize);

View File

@ -297,16 +297,14 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
} }
} }
if (pHead->len > size - sizeof(SWalHead)) { if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
size = sizeof(SWalHead) + pHead->len; wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
buffer = realloc(buffer, size); pHead->version, pHead->len, offset);
if (buffer == NULL) { code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); if (code != TSDB_CODE_SUCCESS) {
code = TAOS_SYSTEM_ERROR(errno); walFtruncate(pWal, tfd, offset);
break; break;
} }
pHead = buffer;
} }
ret = tfRead(tfd, pHead->cont, pHead->len); ret = tfRead(tfd, pHead->cont, pHead->len);

View File

@ -76,7 +76,7 @@ int main(int argc, char *argv[]) {
taosInitLog("wal.log", 100000, 10); taosInitLog("wal.log", 100000, 10);
SWalCfg walCfg; SWalCfg walCfg = {0};
walCfg.walLevel = level; walCfg.walLevel = level;
walCfg.keep = keep; walCfg.keep = keep;

View File

@ -56,6 +56,15 @@ class TDTestCase:
# query .. order by non-time field # query .. order by non-time field
tdSql.error("select * from st order by name") tdSql.error("select * from st order by name")
# TD-2133
tdSql.error("select diff(tagtype),bottom(tagtype,1) from dev_001")
# TD-2190
tdSql.error("select min(tagtype),max(tagtype) from dev_002 interval(1n) fill(prev)")
# TD-2208
tdSql.error("select diff(tagtype),top(tagtype,1) from dev_001")
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)

View File

@ -111,13 +111,25 @@ if __name__ == "__main__":
tdLog.info('stop All dnodes') tdLog.info('stop All dnodes')
sys.exit(0) sys.exit(0)
tdDnodes.init(deployPath) tdDnodes.init(deployPath)
tdDnodes.setTestCluster(testCluster) tdDnodes.setTestCluster(testCluster)
tdDnodes.setValgrind(valgrind) tdDnodes.setValgrind(valgrind)
tdDnodes.stopAll() tdDnodes.stopAll()
tdDnodes.deploy(1) is_test_framework = 0
key_word = 'tdCases.addLinux'
if key_word in open(fileName).read():
is_test_framework = 1
if is_test_framework:
moduleName = fileName.replace(".py", "").replace("/", ".")
uModule = importlib.import_module(moduleName)
try:
ucase = uModule.TDTestCase()
tdDnodes.deploy(1,ucase.updatecfgDict)
except :
tdDnodes.deploy(1,{})
else:
tdDnodes.deploy(1,{})
tdDnodes.start(1) tdDnodes.start(1)
if masterIp == "": if masterIp == "":

View File

@ -108,6 +108,36 @@ class TDDnode:
self.deployed = 0 self.deployed = 0
self.testCluster = False self.testCluster = False
self.valgrind = 0 self.valgrind = 0
self.cfgDict = {
"numOfLogLines":"100000000",
"mnodeEqualVnodeNum":"0",
"walLevel":"2",
"fsync":"1000",
"statusInterval":"1",
"numOfMnodes":"3",
"numOfThreadsPerCore":"2.0",
"monitor":"0",
"maxVnodeConnections":"30000",
"maxMgmtConnections":"30000",
"maxMeterConnections":"30000",
"maxShellConns":"30000",
"locale":"en_US.UTF-8",
"charset":"UTF-8",
"asyncLog":"0",
"anyIp":"0",
"tsEnableTelemetryReporting":"0",
"dDebugFlag":"135",
"mDebugFlag":"135",
"sdbDebugFlag":"135",
"rpcDebugFlag":"135",
"tmrDebugFlag":"131",
"cDebugFlag":"135",
"httpDebugFlag":"135",
"monitorDebugFlag":"135",
"udebugFlag":"135",
"jnidebugFlag":"135",
"qdebugFlag":"135"
}
def init(self, path): def init(self, path):
self.path = path self.path = path
@ -131,7 +161,10 @@ class TDDnode:
return totalSize return totalSize
def deploy(self): def addExtraCfg(self, option, value):
self.cfgDict.update({option: value})
def deploy(self, *updatecfgDict):
self.logDir = "%s/sim/dnode%d/log" % (self.path, self.index) self.logDir = "%s/sim/dnode%d/log" % (self.path, self.index)
self.dataDir = "%s/sim/dnode%d/data" % (self.path, self.index) self.dataDir = "%s/sim/dnode%d/data" % (self.path, self.index)
self.cfgDir = "%s/sim/dnode%d/cfg" % (self.path, self.index) self.cfgDir = "%s/sim/dnode%d/cfg" % (self.path, self.index)
@ -175,36 +208,17 @@ class TDDnode:
self.cfg("publicIp", "192.168.0.%d" % (self.index)) self.cfg("publicIp", "192.168.0.%d" % (self.index))
self.cfg("internalIp", "192.168.0.%d" % (self.index)) self.cfg("internalIp", "192.168.0.%d" % (self.index))
self.cfg("privateIp", "192.168.0.%d" % (self.index)) self.cfg("privateIp", "192.168.0.%d" % (self.index))
self.cfg("dataDir", self.dataDir)
self.cfg("logDir", self.logDir) self.cfg("dataDir",self.dataDir)
self.cfg("numOfLogLines", "100000000") self.cfg("logDir",self.logDir)
self.cfg("mnodeEqualVnodeNum", "0") print(updatecfgDict)
self.cfg("walLevel", "2") if updatecfgDict[0] and updatecfgDict[0][0]:
self.cfg("fsync", "1000") print(updatecfgDict[0][0])
self.cfg("statusInterval", "1") for key,value in updatecfgDict[0][0].items():
self.cfg("numOfMnodes", "3") self.addExtraCfg(key,value)
self.cfg("numOfThreadsPerCore", "2.0") for key, value in self.cfgDict.items():
self.cfg("monitor", "0") self.cfg(key, value)
self.cfg("maxVnodeConnections", "30000")
self.cfg("maxMgmtConnections", "30000")
self.cfg("maxMeterConnections", "30000")
self.cfg("maxShellConns", "30000")
self.cfg("locale", "en_US.UTF-8")
self.cfg("charset", "UTF-8")
self.cfg("asyncLog", "0")
self.cfg("anyIp", "0")
self.cfg("tsEnableTelemetryReporting", "0")
self.cfg("dDebugFlag", "135")
self.cfg("mDebugFlag", "135")
self.cfg("sdbDebugFlag", "135")
self.cfg("rpcDebugFlag", "135")
self.cfg("tmrDebugFlag", "131")
self.cfg("cDebugFlag", "135")
self.cfg("httpDebugFlag", "135")
self.cfg("monitorDebugFlag", "135")
self.cfg("udebugFlag", "135")
self.cfg("jnidebugFlag", "135")
self.cfg("qdebugFlag", "135")
self.deployed = 1 self.deployed = 1
tdLog.debug( tdLog.debug(
"dnode:%d is deployed and configured by %s" % "dnode:%d is deployed and configured by %s" %
@ -260,6 +274,12 @@ class TDDnode:
key = 'from offline to online' key = 'from offline to online'
bkey = bytes(key,encoding="utf8") bkey = bytes(key,encoding="utf8")
logFile = self.logDir + "/taosdlog.0" logFile = self.logDir + "/taosdlog.0"
i = 0
while not os.path.exists(logFile):
sleep(0.1)
i += 1
if i>50:
break
popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
pid = popen.pid pid = popen.pid
print('Popen.pid:' + str(pid)) print('Popen.pid:' + str(pid))
@ -273,6 +293,7 @@ class TDDnode:
else: else:
tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index))
time.sleep(5) time.sleep(5)
# time.sleep(5) # time.sleep(5)
@ -454,7 +475,7 @@ class TDDnodes:
def setValgrind(self, value): def setValgrind(self, value):
self.valgrind = value self.valgrind = value
def deploy(self, index): def deploy(self, index, *updatecfgDict):
self.sim.setTestCluster(self.testCluster) self.sim.setTestCluster(self.testCluster)
if (self.simDeployed == False): if (self.simDeployed == False):
@ -464,7 +485,7 @@ class TDDnodes:
self.check(index) self.check(index)
self.dnodes[index - 1].setTestCluster(self.testCluster) self.dnodes[index - 1].setTestCluster(self.testCluster)
self.dnodes[index - 1].setValgrind(self.valgrind) self.dnodes[index - 1].setValgrind(self.valgrind)
self.dnodes[index - 1].deploy() self.dnodes[index - 1].deploy(updatecfgDict)
def cfg(self, index, option, value): def cfg(self, index, option, value):
self.check(index) self.check(index)

View File

@ -154,6 +154,7 @@ cd ../../../debug; make
./test.sh -f general/parser/repeatAlter.sim ./test.sh -f general/parser/repeatAlter.sim
./test.sh -f general/parser/union.sim ./test.sh -f general/parser/union.sim
./test.sh -f general/parser/topbot.sim ./test.sh -f general/parser/topbot.sim
./test.sh -f general/parser/function.sim
./test.sh -f general/stable/disk.sim ./test.sh -f general/stable/disk.sim
./test.sh -f general/stable/dnode3.sim ./test.sh -f general/stable/dnode3.sim

View File

@ -110,6 +110,7 @@ sql insert into d1.t1 values(1589529000012, 2)
sql insert into d2.t2 values(1589529000022, 2) sql insert into d2.t2 values(1589529000022, 2)
sql insert into d3.t3 values(1589529000032, 2) sql insert into d3.t3 values(1589529000032, 2)
sql insert into d4.t4 values(1589529000042, 2) sql insert into d4.t4 values(1589529000042, 2)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 2 then if $rows != 2 then
@ -141,6 +142,7 @@ sql insert into d1.t1 values(1589529000013, 3)
sql insert into d2.t2 values(1589529000023, 3) sql insert into d2.t2 values(1589529000023, 3)
sql insert into d3.t3 values(1589529000033, 3) sql insert into d3.t3 values(1589529000033, 3)
sql insert into d4.t4 values(1589529000043, 3) sql insert into d4.t4 values(1589529000043, 3)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 3 then if $rows != 3 then
@ -172,6 +174,7 @@ sql insert into d1.t1 values(1589529000014, 4)
sql insert into d2.t2 values(1589529000024, 4) sql insert into d2.t2 values(1589529000024, 4)
sql insert into d3.t3 values(1589529000034, 4) sql insert into d3.t3 values(1589529000034, 4)
sql insert into d4.t4 values(1589529000044, 4) sql insert into d4.t4 values(1589529000044, 4)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
print select * from d1.t1 $rows print select * from d1.t1 $rows
@ -207,6 +210,7 @@ sql insert into d1.t1 values(1589529000015, 5)
sql insert into d2.t2 values(1589529000025, 5) sql insert into d2.t2 values(1589529000025, 5)
sql insert into d3.t3 values(1589529000035, 5) sql insert into d3.t3 values(1589529000035, 5)
sql insert into d4.t4 values(1589529000045, 5) sql insert into d4.t4 values(1589529000045, 5)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 5 then if $rows != 5 then
@ -238,6 +242,7 @@ sql insert into d1.t1 values(1589529000016, 6)
sql insert into d2.t2 values(1589529000026, 6) sql insert into d2.t2 values(1589529000026, 6)
sql insert into d3.t3 values(1589529000036, 6) sql insert into d3.t3 values(1589529000036, 6)
sql insert into d4.t4 values(1589529000046, 6) sql insert into d4.t4 values(1589529000046, 6)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 6 then if $rows != 6 then

View File

@ -110,6 +110,7 @@ sql insert into d1.t1 values(1588262400002, 2)
sql insert into d2.t2 values(1588262400002, 2) sql insert into d2.t2 values(1588262400002, 2)
sql insert into d3.t3 values(1588262400002, 2) sql insert into d3.t3 values(1588262400002, 2)
sql insert into d4.t4 values(1588262400002, 2) sql insert into d4.t4 values(1588262400002, 2)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 2 then if $rows != 2 then
@ -142,6 +143,7 @@ sql insert into d1.t1 values(1588262400003, 3)
sql insert into d2.t2 values(1588262400003, 3) sql insert into d2.t2 values(1588262400003, 3)
sql insert into d3.t3 values(1588262400003, 3) sql insert into d3.t3 values(1588262400003, 3)
sql insert into d4.t4 values(1588262400003, 3) sql insert into d4.t4 values(1588262400003, 3)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 3 then if $rows != 3 then
@ -173,6 +175,7 @@ sql insert into d1.t1 values(1588262400004, 4)
sql insert into d2.t2 values(1588262400004, 4) sql insert into d2.t2 values(1588262400004, 4)
sql insert into d3.t3 values(1588262400004, 4) sql insert into d3.t3 values(1588262400004, 4)
sql insert into d4.t4 values(1588262400004, 4) sql insert into d4.t4 values(1588262400004, 4)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 4 then if $rows != 4 then
@ -204,6 +207,7 @@ sql insert into d1.t1 values(1588262400005, 5)
sql insert into d2.t2 values(1588262400005, 5) sql insert into d2.t2 values(1588262400005, 5)
sql insert into d3.t3 values(1588262400005, 5) sql insert into d3.t3 values(1588262400005, 5)
sql insert into d4.t4 values(1588262400005, 5) sql insert into d4.t4 values(1588262400005, 5)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 5 then if $rows != 5 then
@ -235,6 +239,7 @@ sql insert into d1.t1 values(1588262400006, 6)
sql insert into d2.t2 values(1588262400006, 6) sql insert into d2.t2 values(1588262400006, 6)
sql insert into d3.t3 values(1588262400006, 6) sql insert into d3.t3 values(1588262400006, 6)
sql insert into d4.t4 values(1588262400006, 6) sql insert into d4.t4 values(1588262400006, 6)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 6 then if $rows != 6 then