diff --git a/.appveyor.yml b/.appveyor.yml index 559431e2f9..fe4816688b 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -3,6 +3,7 @@ os: Visual Studio 2015 environment: matrix: - ARCH: amd64 + - ARCH: x86 clone_folder: c:\dev\TDengine clone_depth: 1 @@ -23,6 +24,7 @@ notifications: - provider: Email to: - sangshuduo@gmail.com + on_build_success: true on_build_failure: true on_build_status_changed: true diff --git a/documentation20/webdocs/assets/connector.png b/documentation20/webdocs/assets/connector.png index 1f8708ff19..6030bd73f5 100644 Binary files a/documentation20/webdocs/assets/connector.png and b/documentation20/webdocs/assets/connector.png differ diff --git a/documentation20/webdocs/markdowndocs/Getting Started-ch.md b/documentation20/webdocs/markdowndocs/Getting Started-ch.md index 9df501ea78..9f9d3a2ec2 100644 --- a/documentation20/webdocs/markdowndocs/Getting Started-ch.md +++ b/documentation20/webdocs/markdowndocs/Getting Started-ch.md @@ -2,37 +2,7 @@ ## 快捷安装 -TDengine软件分为服务器、客户端和报警模块三部分,目前2.0版服务器仅能在Linux系统上安装和运行,后续会支持Windows、mac OS等系统。 - -**应用驱动** - -如果应用在Windows和Linux上运行,可使用C/C++/C#/JAVA/Python/Go/Node.js接口连接服务器。如果应用在Mac上运行,目前可以使用RESTful接口连接服务器。 - -**CPU** - -CPU支持X64/ARM64/MIPS64/Alpha64,后续会支持ARM32、RISC-V等CPU架构。用户可根据需求选择通过[源码](https://www.taosdata.com/cn/getting-started/#通过源码安装)或者[安装包](https://www.taosdata.com/cn/getting-started/#通过安装包安装)来安装。 - -**服务器** - -目前TDengine服务器可以运行在以下平台上: - -| | **CentOS** **6/7/8** | **Ubuntu** **16/18/20** | **Other Linux** | **统信****UOS** | **银河****/****中标麒麟** | **凝思** **V60/V80** | -| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- | -| X64 | ● | ● | | ○ | ● | ● | -| 树莓派ARM32 | | ● | ● | | | | -| 龙芯MIPS64 | | | ● | | | | -| 鲲鹏 ARM64 | | ○ | ○ | | ● | | -| 申威 Alpha64 | | | ○ | ● | | | -| 飞腾ARM64 | | ○优麒麟 | | | | | -| 海光X64 | ● | ● | ● | ○ | ● | ● | -| 瑞芯微ARM64/32 | | | ○ | | | | -| 全志ARM64/32 | | | ○ | | | | -| 炬力ARM64/32 | | | ○ | | | | -| TI ARM32 | | | ○ | | | | - - 其中 ● 表示经过官方测试验证, ○ 表示非官方测试验证。 - - +TDengine软件分为服务器、客户端和报警模块三部分,目前2.0版服务器仅能在Linux系统上安装和运行,后续会支持Windows、mac OS等系统。客户端可以在Windows或Linux上安装和运行。任何OS的应用也可以选择RESTful接口连接服务器taosd。CPU支持X64/ARM64/MIPS64/Alpha64,后续会支持ARM32、RISC-V等CPU架构。用户可根据需求选择通过[源码](https://www.taosdata.com/cn/getting-started/#通过源码安装)或者[安装包](https://www.taosdata.com/cn/getting-started/#通过安装包安装)来安装。 ### 通过源码安装 @@ -44,28 +14,11 @@ CPU支持X64/ARM64/MIPS64/Alpha64,后续会支持ARM32、RISC-V等CPU架构。 ### 通过安装包安装 -服务器部分,我们提供三种安装包,您可以根据需要选择。TDengine的安装非常简单,从下载到安装成功仅仅只要几秒钟。 +TDengine的安装非常简单,从下载到安装成功仅仅只要几秒钟。服务端安装包包含客户端和连接器,我们提供三种安装包,您可以根据需要选择: -- TDengine-server-2.0.9.0-Linux-x64.rpm (4.2M) -- TDengine-server-2.0.9.0-Linux-x64.deb (2.7M) -- TDengine-server-2.0.9.0-Linux-x64.tar.gz (4.5M) - - -客户端部分,Linux安装包如下: - -- TDengine-client-2.0.9.0-Linux-x64.tar.gz(3.0M) -- TDengine-client-2.0.9.0-Windows-x64.exe(2.8M) -- TDengine-client-2.0.9.0-Windows-x86.exe(2.8M) - -报警模块的Linux安装包如下(请参考[报警模块的使用方法](https://github.com/taosdata/TDengine/blob/master/alert/README_cn.md)): - -- TDengine-alert-2.0.9.0-Linux-x64.tar.gz (8.1M) - -目前,TDengine 支持在使用[`systemd`](https://en.wikipedia.org/wiki/Systemd)做进程服务管理的linux系统上安装,用`which systemctl`命令来检测系统中是否存在`systemd`包: - -```cmd -which systemctl -``` +- TDengine-server-2.0.10.0-Linux-x64.rpm (4.2M) +- TDengine-server-2.0.10.0-Linux-x64.deb (2.7M) +- TDengine-server-2.0.10.0-Linux-x64.tar.gz (4.5M) 具体的安装过程,请参见TDengine多种安装包的安装和卸载。 @@ -73,13 +26,13 @@ which systemctl 安装成功后,用户可使用`systemctl`命令来启动TDengine的服务进程。 -```cmd -systemctl start taosd +```bash +$ systemctl start taosd ``` 检查服务是否正常工作。 -```cmd -systemctl status taosd +```bash +$ systemctl status taosd ``` 如果TDengine服务正常工作,那么您可以通过TDengine的命令行程序`taos`来访问并体验TDengine。 @@ -88,15 +41,24 @@ systemctl status taosd - systemctl命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo - 为更好的获得产品反馈,改善产品,TDengine会采集基本的使用信息,但您可以修改系统配置文件taos.cfg里的配置参数telemetryReporting, 将其设为0,就可将其关闭。 +- TDengine采用FQDN(一般就是hostname)作为节点的ID,为保证正常运行,需要给运行taosd的服务器配置好hostname,在客户端应用运行的机器配置好DNS服务或hosts文件,保证FQDN能够解析。 -如果系统中不支持`systemd`,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。 +* TDengine 支持在使用[`systemd`](https://en.wikipedia.org/wiki/Systemd)做进程服务管理的linux系统上安装,用`which systemctl`命令来检测系统中是否存在`systemd`包: + + ```bash + $ which systemctl + ``` + + 如果系统中不支持systemd,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。 + + ## TDengine命令行程序 执行TDengine命令行程序,您只要在Linux终端执行`taos`即可。 -```cmd -taos +```bash +$ taos ``` 如果TDengine终端连接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息出来(请参考[FAQ](https://www.taosdata.com/cn/faq/)来解决终端连接服务端失败的问题)。TDengine终端的提示符号如下: @@ -136,8 +98,8 @@ Query OK, 2 row(s) in set (0.001700s) 示例: -```cmd -taos -h 192.168.0.1 -s "use db; show tables;" +```bash +$ taos -h 192.168.0.1 -s "use db; show tables;" ``` ### 运行SQL命令脚本 @@ -159,8 +121,8 @@ taos> source ; 启动TDengine的服务,在Linux终端执行taosdemo -``` -> taosdemo +```bash +$ taosdemo ``` 该命令将在数据库test下面自动创建一张超级表meters,该超级表下有1万张表,表名为"t0" 到"t9999",每张表有10万条记录,每条记录有 (f1, f2, f3)三个字段,时间戳从"2017-07-14 10:40:00 000" 到"2017-07-14 10:41:39 999",每张表带有标签areaid和loc, areaid被设置为1到10, loc被设置为"beijing"或者“shanghai"。 @@ -171,33 +133,92 @@ taos> source ; - 查询超级表下记录总条数: -``` -taos>select count(*) from test.meters; +```mysql +taos> select count(*) from test.meters; ``` - 查询10亿条记录的平均值、最大值、最小值等: -``` -taos>select avg(f1), max(f2), min(f3) from test.meters; +```mysql +taos> select avg(f1), max(f2), min(f3) from test.meters; ``` - 查询loc="beijing"的记录总条数: -``` -taos>select count(*) from test.meters where loc="beijing"; +```mysql +taos> select count(*) from test.meters where loc="beijing"; ``` - 查询areaid=10的所有记录的平均值、最大值、最小值等: -``` -taos>select avg(f1), max(f2), min(f3) from test.meters where areaid=10; +```mysql +taos> select avg(f1), max(f2), min(f3) from test.meters where areaid=10; ``` - 对表t10按10s进行平均值、最大值和最小值聚合统计: -``` -taos>select avg(f1), max(f2), min(f3) from test.t10 interval(10s); +```mysql +taos> select avg(f1), max(f2), min(f3) from test.t10 interval(10s); ``` **Note:** taosdemo命令本身带有很多选项,配置表的数目、记录条数等等,请执行 `taosdemo --help`详细列出。您可以设置不同参数进行体验。 + + +## 客户端和报警模块 + +如果客户端和服务端运行在不同的电脑上,可以单独安装客户端。Linux和Windows安装包如下: + +- TDengine-client-2.0.10.0-Linux-x64.tar.gz(3.0M) +- TDengine-client-2.0.10.0-Windows-x64.exe(2.8M) +- TDengine-client-2.0.10.0-Windows-x86.exe(2.8M) + +报警模块的Linux安装包如下(请参考[报警模块的使用方法](https://github.com/taosdata/TDengine/blob/master/alert/README_cn.md)): + +- TDengine-alert-2.0.10.0-Linux-x64.tar.gz (8.1M) + + + +## **支持平台列表** + +### TDengine服务器支持的平台列表 + +| | **CentOS** **6/7/8** | **Ubuntu** **16/18/20** | **Other Linux** | **统信****UOS** | **银河****/****中标麒麟** | **凝思** **V60/V80** | +| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- | +| X64 | ● | ● | | ○ | ● | ● | +| 树莓派ARM32 | | ● | ● | | | | +| 龙芯MIPS64 | | | ● | | | | +| 鲲鹏 ARM64 | | ○ | ○ | | ● | | +| 申威 Alpha64 | | | ○ | ● | | | +| 飞腾ARM64 | | ○优麒麟 | | | | | +| 海光X64 | ● | ● | ● | ○ | ● | ● | +| 瑞芯微ARM64/32 | | | ○ | | | | +| 全志ARM64/32 | | | ○ | | | | +| 炬力ARM64/32 | | | ○ | | | | +| TI ARM32 | | | ○ | | | | + +注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。 + + + +### TDengine客户端和连接器支持的平台列表 + +目前TDengine的连接器可支持的平台广泛,目前包括:X64/X86/ARM64/ARM32/MIPS/Alpha等硬件平台,以及Linux/Win64/Win32等开发环境。 + +对照矩阵如下: + +| **CPU** | **X64 64bit** | | | **X86 32bit** | **ARM64** | **ARM32** | **MIPS ** **龙芯** | **Alpha ** **申威** | **X64 ** **海光** | +| ----------- | --------------- | --------- | --------- | --------------- | --------- | --------- | ------------------- | -------------------- | ------------------ | +| **OS** | **Linux** | **Win64** | **Win32** | **Win32** | **Linux** | **Linux** | **Linux** | **Linux** | **Linux** | +| **C/C++** | ● | ● | ● | ○ | ● | ● | ● | ● | ● | +| **JDBC** | ● | ● | ● | ○ | ● | ● | ● | ● | ● | +| **Python** | ● | ● | ● | ○ | ● | ● | ● | -- | ● | +| **Go** | ● | ● | ● | ○ | ● | ● | ○ | -- | -- | +| **NodeJs** | ● | ● | ○ | ○ | ● | ● | ○ | -- | -- | +| **C#** | ○ | ● | ● | ○ | ○ | ○ | ○ | -- | -- | +| **RESTful** | ● | ● | ● | ● | ● | ● | ● | ● | ● | + +注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。 + +请跳转到 [连接器 ](https://www.taosdata.com/cn/documentation/connector)查看更详细的信息。 + diff --git a/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md b/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md index adba39ec1f..8ed497fe21 100644 --- a/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md +++ b/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md @@ -79,7 +79,7 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic - **使用数据库** - + ```mysql USE db_name; ``` @@ -1039,3 +1039,21 @@ SELECT AVG(current),MAX(current),LEASTSQUARES(current, start_val, step_val), PER - 标签最多允许128个,可以0个,标签总长度不超过16k个字符 - SQL语句最大长度65480个字符,但可通过系统配置参数maxSQLLength修改,最长可配置为1M - 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制 + + + +## TAOS SQL其他约定 + +**group by的限制** + +TAOS SQL支持对标签、tbname进行group by操作,也支持普通列进行group by,前提是:仅限一列且该列的唯一值小于10万个。 + +**join操作的限制** + +TAOS SQL支持表之间按主键时间戳来join两张表的列,暂不支持两个表之间聚合后的四则运算。 + +**is not null与不为空的表达式适用范围** + +is not null支持所有类型的列。不为空的表达式为 <>"",仅对非数值类型的列适用。 + + diff --git a/documentation20/webdocs/markdowndocs/administrator-ch.md b/documentation20/webdocs/markdowndocs/administrator-ch.md index 7f40009e3c..2c470a270d 100644 --- a/documentation20/webdocs/markdowndocs/administrator-ch.md +++ b/documentation20/webdocs/markdowndocs/administrator-ch.md @@ -80,6 +80,12 @@ TDengine集群的节点数必须大于等于副本数,否则创建表时将报 TDengine系统后台服务由taosd提供,可以在配置文件taos.cfg里修改配置参数,以满足不同场景的需求。配置文件的缺省位置在/etc/taos目录,可以通过taosd命令行执行参数-c指定配置文件目录。比如taosd -c /home/user来指定配置文件位于/home/user这个目录。 +另外可以使用 “-C” 显示当前服务器配置参数: + +``` +taosd -C +``` + 下面仅仅列出一些重要的配置参数,更多的参数请看配置文件里的说明。各个参数的详细介绍及作用请看前述章节,而且这些参数的缺省配置都是工作的,一般无需设置。**注意:配置修改后,需要重启*taosd*服务才能生效。** - firstEp: taosd启动时,主动连接的集群中首个dnode的end point, 默认值为localhost:6030。 @@ -97,6 +103,7 @@ TDengine系统后台服务由taosd提供,可以在配置文件taos.cfg里修 - telemetryReporting: 是否允许 TDengine 采集和上报基本使用信息,0表示不允许,1表示允许。 默认值:1。 - stream: 是否启用连续查询(流计算功能),0表示不允许,1表示允许。 默认值:1。 - queryBufferSize: 为所有并发查询占用保留的内存大小。计算规则可以根据实际应用可能的最大并发数和表的数字相乘,再乘 170 。单位为字节。 +- ratioOfQueryCores: 设置查询线程的最大数量。最小值0 表示只有1个查询线程;最大值2表示最大建立2倍CPU核数的查询线程。默认为1,表示最大和CPU核数相等的查询线程。该值可以为小数,即0.5表示最大建立CPU核数一半的查询线程。 **注意:**对于端口,TDengine会使用从serverPort起13个连续的TCP和UDP端口号,请务必在防火墙打开。因此如果是缺省配置,需要打开从6030都6042共13个端口,而且必须TCP和UDP都打开。 @@ -152,7 +159,13 @@ ALTER DNODE ## 客户端配置 -TDengine系统的前台交互客户端应用程序为taos,它与taosd共享同一个配置文件taos.cfg。运行taos时,使用参数-c指定配置文件目录,如taos -c /home/cfg,表示使用/home/cfg/目录下的taos.cfg配置文件中的参数,缺省目录是/etc/taos。本节主要说明 taos 客户端应用在配置文件 taos.cfg 文件中使用到的参数。 +TDengine系统的前台交互客户端应用程序为taos,以及应用驱动,它与taosd共享同一个配置文件taos.cfg。运行taos时,使用参数-c指定配置文件目录,如taos -c /home/cfg,表示使用/home/cfg/目录下的taos.cfg配置文件中的参数,缺省目录是/etc/taos。更多taos的使用方法请见[Shell命令行程序](https://www.taosdata.com/cn/documentation/administrator/#_TDengine_Shell命令行程序)。本节主要说明 taos 客户端应用在配置文件 taos.cfg 文件中使用到的参数。 + +**2.0.10.0 之后版本支持命令行以下参数显示当前客户端参数的配置** + +```bash +taos -C 或 taos --dump-config +``` 客户端配置参数 @@ -215,15 +228,15 @@ TDengine系统的前台交互客户端应用程序为taos,它与taosd共享同 均是合法的设置东八区时区的格式。 时区的设置对于查询和写入SQL语句中非Unix时间戳的内容(时间戳字符串、关键词now的解析)产生影响。例如: - ``` + ```sql SELECT count(*) FROM table_name WHERE TS<'2019-04-11 12:01:08'; ``` 在东八区,SQL语句等效于 - ``` + ```sql SELECT count(*) FROM table_name WHERE TS<1554955268000; ``` 在UTC时区,SQL语句等效于 - ``` + ```sql SELECT count(*) FROM table_name WHERE TS<1554984068000; ``` 为了避免使用字符串时间格式带来的不确定性,也可以直接使用Unix时间戳。此外,还可以在SQL语句中使用带有时区的时间戳字符串,例如:RFC3339格式的时间戳字符串,2013-04-12T15:52:01.123+08:00或者ISO-8601格式时间戳字符串2013-04-12T15:52:01.123+0800。上述两个字符串转化为Unix时间戳不受系统所在时区的影响。 @@ -239,31 +252,31 @@ TDengine系统的前台交互客户端应用程序为taos,它与taosd共享同 系统管理员可以在CLI界面里添加、删除用户,也可以修改密码。CLI里SQL语法如下: -``` +```sql CREATE USER PASS <'password'>; ``` 创建用户,并指定用户名和密码,密码需要用单引号引起来,单引号为英文半角 -``` +```sql DROP USER ; ``` 删除用户,限root用户使用 -``` +```sql ALTER USER PASS <'password'>; ``` 修改用户密码, 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角 -``` +```sql ALTER USER PRIVILEGE ; ``` 修改用户权限为:super/write/read,不需要添加单引号 -``` +```mysql SHOW USERS; ``` @@ -316,12 +329,11 @@ taos> DESCRIBE d1001 ``` 那么可以用如下命令导入数据 -``` +```mysql taos> insert into d1001 file '~/data.csv'; Query OK, 9 row(s) affected (0.004763s) ``` - **taosdump工具导入** TDengine提供了方便的数据库导入导出工具taosdump。用户可以将taosdump从一个系统导出的数据,导入到其他系统中。具体使用方法,请参见博客:TDengine DUMP工具使用指南 @@ -334,7 +346,7 @@ TDengine提供了方便的数据库导入导出工具taosdump。用户可以将t 如果用户需要导出一个表或一个STable中的数据,可在shell中运行 -``` +```mysql select * from >> data.csv; ``` @@ -348,37 +360,37 @@ TDengine提供了方便的数据库导出工具taosdump。用户可以根据需 系统管理员可以从CLI查询系统的连接、正在进行的查询、流式计算,并且可以关闭连接、停止正在进行的查询和流式计算。CLI里SQL语法如下: -``` +```mysql SHOW CONNECTIONS; ``` 显示数据库的连接,其中一列显示ip:port, 为连接的IP地址和端口号。 -``` +```mysql KILL CONNECTION ; ``` 强制关闭数据库连接,其中的connection-id是SHOW CONNECTIONS中显示的第一列的数字。 -``` +```mysql SHOW QUERIES; ``` 显示数据查询,其中第一列显示的以冒号隔开的两个数字为query-id,为发起该query应用连接的connection-id和查询次数。 -``` +```mysql KILL QUERY ; ``` 强制关闭数据查询,其中query-id是SHOW QUERIES中显示的 connection-id:query-no字串,如“105:2”,拷贝粘贴即可。 -``` +```mysql SHOW STREAMS; ``` 显示流式计算,其中第一列显示的以冒号隔开的两个数字为stream-id, 为启动该stream应用连接的connection-id和发起stream的次数。 -``` +```mysql KILL STREAM ; ``` diff --git a/documentation20/webdocs/markdowndocs/cluster-ch.md b/documentation20/webdocs/markdowndocs/cluster-ch.md index db479417c5..60ac6e4c2e 100644 --- a/documentation20/webdocs/markdowndocs/cluster-ch.md +++ b/documentation20/webdocs/markdowndocs/cluster-ch.md @@ -137,6 +137,16 @@ DROP DNODE "fqdn:port"; 其中fqdn是被删除的节点的FQDN,port是其对外服务器的端口号 +**【注意】** + + - 一个数据节点一旦被drop之后,不能重新加入集群。需要将此节点重新部署(清空数据文件夹)。集群在完成drop dnode操作之前,会将该dnode的数据迁移走。 + + - 请注意 drop dnode 和 停止taosd进程是两个不同的概念,不要混淆:因为删除dnode之前要执行迁移数据的操作,因此被删除的dnode必须保持在线状态。待删除操作结束之后,才能停止taosd进程。 + + - 一个数据节点被drop之后,其他节点都会感知到这个dnodeID的删除操作,任何集群中的节点都不会再接收此dnodeID的请求。 + + - dnodeID的是集群自动分配的,不得人工指定。它在生成时递增的,不会重复。 + ### 查看数据节点 执行CLI程序taos,使用root账号登录进TDengine系统,执行: diff --git a/documentation20/webdocs/markdowndocs/connector-ch.md b/documentation20/webdocs/markdowndocs/connector-ch.md index 9017dfd663..c04dc9891e 100644 --- a/documentation20/webdocs/markdowndocs/connector-ch.md +++ b/documentation20/webdocs/markdowndocs/connector-ch.md @@ -19,7 +19,10 @@ TDengine提供了丰富的应用程序开发接口,其中包括C/C++、C# 、J 其中 ● 表示经过官方测试验证, ○ 表示非官方测试验证。 -注意:所有执行 SQL 语句的 API,例如 C/C++ Connector 中的 `tao_query`、`taos_query_a`、`taos_subscribe` 等,以及其它语言中与它们对应的API,每次都只能执行一条 SQL 语句,如果实际参数中包含了多条语句,它们的行为是未定义的。 +注意: + +* 所有执行 SQL 语句的 API,例如 C/C++ Connector 中的 `tao_query`、`taos_query_a`、`taos_subscribe` 等,以及其它语言中与它们对应的API,每次都只能执行一条 SQL 语句,如果实际参数中包含了多条语句,它们的行为是未定义的。 +* 升级到TDengine到2.0.8.0版本的用户,必须更新JDBC连接TDengine必须升级taos-jdbcdriver到2.0.12及以上。 ## C/C++ Connector diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 910a7b4112..97b962e53a 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -381,6 +381,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) { taosScheduleTask(tscQhandle, &schedMsg); } + void tscQueueAsyncRes(SSqlObj *pSql) { if (pSql == NULL || pSql->signature != pSql) { tscDebug("%p SqlObj is freed, not add into queue async res", pSql); @@ -390,7 +391,10 @@ void tscQueueAsyncRes(SSqlObj *pSql) { tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); SSqlRes *pRes = &pSql->res; - assert(pSql->fp != NULL && pSql->fetchFp != NULL); + + if (pSql->fp == NULL || pSql->fetchFp == NULL){ + return; + } pSql->fp = pSql->fetchFp; (*pSql->fp)(pSql->param, pSql, pRes->code); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 92afbbfd6f..8045ff5532 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -243,7 +243,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; SSqlCmd* pCmd = &pSql->cmd; - char *pMsg = rpcMallocCont(sizeof(SMsgVersion) + pCmd->payloadLen); + char *pMsg = rpcMallocCont(pCmd->payloadLen); if (NULL == pMsg) { tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -254,13 +254,12 @@ int tscSendMsgToServer(SSqlObj *pSql) { tscDumpMgmtEpSet(pSql); } - tstrncpy(pMsg, version, sizeof(SMsgVersion)); - memcpy(pMsg + sizeof(SMsgVersion), pSql->cmd.payload, pSql->cmd.payloadLen); + memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { .msgType = pSql->cmd.msgType, .pCont = pMsg, - .contLen = pSql->cmd.payloadLen + sizeof(SMsgVersion), + .contLen = pSql->cmd.payloadLen, .ahandle = (void*)pSql->self, .handle = NULL, .code = 0 @@ -2146,6 +2145,10 @@ int tscProcessConnectRsp(SSqlObj *pSql) { if (pConnect->epSet.numOfEps > 0) { tscEpSetHtons(&pConnect->epSet); tscUpdateMgmtEpSet(pSql, &pConnect->epSet); + + for (int i = 0; i < pConnect->epSet.numOfEps; ++i) { + tscDebug("%p epSet.fqdn[%d]: %s, pObj:%p", pSql, i, pConnect->epSet.fqdn[i], pObj); + } } strcpy(pObj->sversion, pConnect->serverVersion); diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index b32326f4c2..0fc6400d99 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -124,6 +124,8 @@ void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) { SMnodeMsg *pRead = mnodeCreateMsg(pMsg); taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); } + + rpcFreeCont(pMsg->pCont); } static void dnodeFreeMReadMsg(SMnodeMsg *pRead) { diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 9007b54d47..414b66653d 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -125,6 +125,8 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) { taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } + + rpcFreeCont(pMsg->pCont); } static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) { diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 221e13d109..79cc70005b 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -127,20 +127,7 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } else {} if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { - SMsgVersion *pMsgVersion = pMsg->pCont; - if (taosCheckVersion(pMsgVersion->clientVersion, version, 3) != TSDB_CODE_SUCCESS) { - rpcMsg.code = TSDB_CODE_TSC_INVALID_VERSION; - rpcSendResponse(&rpcMsg); - rpcFreeCont(pMsg->pCont); - return; // todo change the error code - } - pMsg->pCont += sizeof(*pMsgVersion); - pMsg->contLen -= sizeof(*pMsgVersion); - (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); - - //pMsg->contLen += sizeof(*pMsgVersion); - rpcFreeCont(pMsg->pCont - sizeof(*pMsgVersion)); } else { dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; @@ -244,4 +231,4 @@ SStatisInfo dnodeGetStatisInfo() { } return info; -} +} \ No newline at end of file diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 2995116ef5..3f31e49370 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -77,6 +77,8 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; rpcSendResponse(&rpcRsp); } + + rpcFreeCont(pMsg->pCont); } void *dnodeAllocVQueryQueue(void *pVnode) { diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 959789a6d2..a5ae8ac830 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -102,6 +102,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) { } vnodeRelease(pVnode); + rpcFreeCont(pRpcMsg->pCont); } void *dnodeAllocVWriteQueue(void *pVnode) { diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index c913b2cf2a..7c7e7ec31a 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -267,6 +267,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_PROTOCOL, 0, 0x0905, "Mismatched protocol") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_CLUSTERID, 0, 0x0906, "Mismatched clusterId") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, 0, 0x0907, "Mismatched signature") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, 0, 0x0908, "Invalid msg checksum") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, 0, 0x0909, "Invalid msg length") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, 0, 0x090A, "Invalid msg type") // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b0c7c0895f..b7f0de54fe 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -198,11 +198,6 @@ typedef struct { int32_t numOfVnodes; } SMsgDesc; -typedef struct SMsgVersion { - char clientVersion[TSDB_VERSION_LEN]; - uint32_t crc; -} SMsgVersion; - typedef struct SMsgHead { int32_t contLen; int32_t vgId; diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 0ce2a1a495..4dae86bbed 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -119,10 +119,6 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *); extern char *syncRole[]; //global configurable parameters -extern int32_t tsMaxSyncNum; -extern int32_t tsSyncTcpThreads; -extern int32_t tsSyncTimer; -extern int32_t tsMaxFwdInfo; extern int32_t sDebugFlag; extern char tsArbitrator[]; extern uint16_t tsSyncPort; diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 95f1d27b59..cbe64484b1 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -48,7 +48,7 @@ typedef struct { void * pVnode; SRpcMsg rpcMsg; SRspRet rspRet; - char reserveForSync[16]; + char reserveForSync[24]; SWalHead pHead[]; } SVWriteMsg; diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index 7e5ebb0596..2415695617 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -86,6 +86,6 @@ extern void set_terminal_mode(); extern int get_old_terminal_mode(struct termios* tio); extern void reset_terminal_mode(); extern SShellArguments args; -extern TAOS_RES* result; +extern int64_t result; #endif diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 7a9e242668..627d06ac2e 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -46,7 +46,7 @@ char CONTINUE_PROMPT[] = " -> "; int prompt_size = 6; #endif -TAOS_RES *result = NULL; +int64_t result = 0; SShellHistory history; #define DEFAULT_MAX_BINARY_DISPLAY_WIDTH 30 @@ -260,6 +260,14 @@ int32_t shellRunCommand(TAOS* con, char* command) { } +void freeResultWithRid(int64_t rid) { + SSqlObj* pSql = taosAcquireRef(tscObjRef, rid); + if(pSql){ + taos_free_result(pSql); + taosReleaseRef(tscObjRef, rid); + } +} + void shellRunCommandOnServer(TAOS *con, char command[]) { int64_t st, et; wordexp_t full_path; @@ -294,18 +302,22 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { st = taosGetTimestampUs(); - TAOS_RES* pSql = taos_query_h(con, command, &result); + TAOS_RES* tmpSql = NULL; + TAOS_RES* pSql = taos_query_h(con, command, &tmpSql); if (taos_errno(pSql)) { taos_error(pSql, st); return; } + atomic_store_64(&result, ((SSqlObj*)tmpSql)->self); + int64_t oresult = atomic_load_64(&result); + if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { fprintf(stdout, "Database changed.\n\n"); fflush(stdout); - atomic_store_ptr(&result, 0); - taos_free_result(pSql); + atomic_store_64(&result, 0); + freeResultWithRid(oresult); return; } @@ -313,8 +325,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { int error_no = 0; int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode); if (numOfRows < 0) { - atomic_store_ptr(&result, 0); - taos_free_result(pSql); + atomic_store_64(&result, 0); + freeResultWithRid(oresult); return; } @@ -336,8 +348,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { wordfree(&full_path); } - atomic_store_ptr(&result, 0); - taos_free_result(pSql); + atomic_store_64(&result, 0); + freeResultWithRid(oresult); } /* Function to do regular expression check */ @@ -501,7 +513,7 @@ static int dumpResultToFile(const char* fname, TAOS_RES* tres) { row = taos_fetch_row(tres); } while( row != NULL); - result = NULL; + result = 0; fclose(fp); return numOfRows; diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 4f0c5e3f99..041ad71ccb 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -19,15 +19,31 @@ #include "tnettest.h" pthread_t pid; +static tsem_t cancelSem; void shellQueryInterruptHandler(int signum) { + tsem_post(&cancelSem); +} + +void *cancelHandler(void *arg) { + while(1) { + if (tsem_wait(&cancelSem) != 0) { + taosMsleep(10); + continue; + } + #ifdef LINUX - void* pResHandle = atomic_val_compare_exchange_64(&result, result, 0); - taos_stop_query(pResHandle); + int64_t rid = atomic_val_compare_exchange_64(&result, result, 0); + SSqlObj* pSql = taosAcquireRef(tscObjRef, rid); + taos_stop_query(pSql); + taosReleaseRef(tscObjRef, rid); #else - printf("\nReceive ctrl+c or other signal, quit shell.\n"); - exit(0); + printf("\nReceive ctrl+c or other signal, quit shell.\n"); + exit(0); #endif + } + + return NULL; } int checkVersion() { @@ -105,6 +121,14 @@ int main(int argc, char* argv[]) { exit(EXIT_FAILURE); } + if (tsem_init(&cancelSem, 0, 0) != 0) { + printf("failed to create cancel semphore\n"); + exit(EXIT_FAILURE); + } + + pthread_t spid; + pthread_create(&spid, NULL, cancelHandler, NULL); + /* Interrupt handler. */ struct sigaction act; memset(&act, 0, sizeof(struct sigaction)); diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index 31ea2da640..e4df562d81 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -59,7 +59,7 @@ typedef struct SSdbRow { SMnodeMsg *pMsg; int32_t (*fpReq)(SMnodeMsg *pMsg); int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code); - char reserveForSync[16]; + char reserveForSync[24]; SWalHead pHead[]; } SSdbRow; diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index b60b308cf8..8b3b2896ff 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -294,6 +294,11 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) { *epSet = tsMEpForShell; mnodeMnodeUnLock(); + if (mnodeGetDnodesNum() <= 1) { + epSet->numOfEps = 0; + return; + } + mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { if (redirect && strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index 199eea289b..0de6027006 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -346,8 +346,8 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). { A = tSetCreateSQLElems(NULL, NULL, S, TSQL_CREATE_STREAM); setSQLInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE); - U.n += Z.n; - setCreatedTableName(pInfo, &U, &V); + V.n += Z.n; + setCreatedTableName(pInfo, &V, &U); } %type column{TAOS_FIELD} diff --git a/src/query/src/sql.c b/src/query/src/sql.c index 566b6e4371..fe82db72a6 100644 --- a/src/query/src/sql.c +++ b/src/query/src/sql.c @@ -2428,8 +2428,8 @@ static void yy_reduce( yylhsminor.yy538 = tSetCreateSqlElems(NULL, NULL, yymsp[0].minor.yy84, TSQL_CREATE_STREAM); setSqlInfo(pInfo, yylhsminor.yy538, NULL, TSDB_SQL_CREATE_TABLE); - yymsp[-4].minor.yy0.n += yymsp[-2].minor.yy0.n; - setCreatedTableName(pInfo, &yymsp[-4].minor.yy0, &yymsp[-3].minor.yy0); + yymsp[-3].minor.yy0.n += yymsp[-2].minor.yy0.n; + setCreatedTableName(pInfo, &yymsp[-3].minor.yy0, &yymsp[-4].minor.yy0); } yymsp[-4].minor.yy538 = yylhsminor.yy538; break; diff --git a/src/sync/CMakeLists.txt b/src/sync/CMakeLists.txt index 60271c771c..aa38a56f38 100644 --- a/src/sync/CMakeLists.txt +++ b/src/sync/CMakeLists.txt @@ -5,12 +5,12 @@ INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) IF (TD_LINUX) - LIST(REMOVE_ITEM SRC src/tarbitrator.c) + LIST(REMOVE_ITEM SRC src/syncArbitrator.c) ADD_LIBRARY(sync ${SRC}) TARGET_LINK_LIBRARIES(sync tutil pthread common) - LIST(APPEND BIN_SRC src/tarbitrator.c) - LIST(APPEND BIN_SRC src/taosTcpPool.c) + LIST(APPEND BIN_SRC src/syncArbitrator.c) + LIST(APPEND BIN_SRC src/syncTcp.c) ADD_EXECUTABLE(tarbitrator ${BIN_SRC}) TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil) diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 2be25447c4..d855c651f9 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -13,12 +13,14 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_SYNCINT_H -#define TDENGINE_SYNCINT_H +#ifndef TDENGINE_SYNC_INT_H +#define TDENGINE_SYNC_INT_H #ifdef __cplusplus extern "C" { #endif +#include "syncMsg.h" +#include "twal.h" #define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }} #define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }} @@ -27,86 +29,22 @@ extern "C" { #define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} -typedef enum { - TAOS_SMSG_SYNC_DATA = 1, - TAOS_SMSG_FORWARD = 2, - TAOS_SMSG_FORWARD_RSP = 3, - TAOS_SMSG_SYNC_REQ = 4, - TAOS_SMSG_SYNC_RSP = 5, - TAOS_SMSG_SYNC_MUST = 6, - TAOS_SMSG_STATUS = 7, - TAOS_SMSG_SYNC_DATA_RSP = 8, -} ESyncMsgType; +#define SYNC_TCP_THREADS 2 +#define SYNC_MAX_NUM 2 #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024) -#define SYNC_FWD_TIMER 300 -#define SYNC_ROLE_TIMER 10000 -#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3 + +#define SYNC_MAX_FWDS 512 +#define SYNC_FWD_TIMER 300 +#define SYNC_ROLE_TIMER 15000 // ms +#define SYNC_CHECK_INTERVAL 1 // ms +#define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms #define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version #define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus -#pragma pack(push, 1) - -typedef struct { - char type; // msg type - char pversion; // protocol version - char reserved[6]; // not used - int32_t vgId; // vg ID - int32_t len; // content length, does not include head - // char cont[]; // message content starts from here -} SSyncHead; - -typedef struct { - SSyncHead syncHead; - uint16_t port; - uint16_t tranId; - char fqdn[TSDB_FQDN_LEN]; - int32_t sourceId; // only for arbitrator -} SFirstPkt; - -typedef struct { - int8_t sync; - int8_t reserved; - uint16_t tranId; -} SFirstPktRsp; - -typedef struct { - int8_t role; - uint64_t version; -} SPeerStatus; - -typedef struct { - int8_t role; - int8_t ack; - int8_t type; - int8_t reserved[3]; - uint16_t tranId; - uint64_t version; - SPeerStatus peersStatus[]; -} SPeersStatus; - -typedef struct { - char name[TSDB_FILENAME_LEN]; - uint32_t magic; - uint32_t index; - uint64_t fversion; - int64_t size; -} SFileInfo; - -typedef struct { - int8_t sync; -} SFileAck; - -typedef struct { - uint64_t version; - int32_t code; -} SFwdRsp; - -#pragma pack(pop) - typedef struct { char * buffer; int32_t bufferSize; @@ -190,7 +128,6 @@ void syncRestartConnection(SSyncPeer *pPeer); void syncBroadcastStatus(SSyncNode *pNode); void syncAddPeerRef(SSyncPeer *pPeer); int32_t syncDecPeerRef(SSyncPeer *pPeer); -uint16_t syncGenTranId(); #ifdef __cplusplus } diff --git a/src/sync/inc/syncMsg.h b/src/sync/inc/syncMsg.h new file mode 100644 index 0000000000..73f4223c88 --- /dev/null +++ b/src/sync/inc/syncMsg.h @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_SYNC_MSG_H +#define TDENGINE_SYNC_MSG_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "tsync.h" + +typedef enum { + TAOS_SMSG_START = 0, + TAOS_SMSG_SYNC_DATA = 1, + TAOS_SMSG_SYNC_DATA_RSP = 2, + TAOS_SMSG_SYNC_FWD = 3, + TAOS_SMSG_SYNC_FWD_RSP = 4, + TAOS_SMSG_SYNC_REQ = 5, + TAOS_SMSG_SYNC_REQ_RSP = 6, + TAOS_SMSG_SYNC_MUST = 7, + TAOS_SMSG_SYNC_MUST_RSP = 8, + TAOS_SMSG_STATUS = 9, + TAOS_SMSG_STATUS_RSP = 10, + TAOS_SMSG_SETUP = 11, + TAOS_SMSG_SETUP_RSP = 12, + TAOS_SMSG_SYNC_FILE = 13, + TAOS_SMSG_SYNC_FILE_RSP = 14, + TAOS_SMSG_END = 15, +} ESyncMsgType; + +typedef enum { + SYNC_STATUS_BROADCAST, + SYNC_STATUS_BROADCAST_RSP, + SYNC_STATUS_SETUP_CONN, + SYNC_STATUS_SETUP_CONN_RSP, + SYNC_STATUS_EXCHANGE_DATA, + SYNC_STATUS_EXCHANGE_DATA_RSP, + SYNC_STATUS_CHECK_ROLE, + SYNC_STATUS_CHECK_ROLE_RSP +} ESyncStatusType; + +#pragma pack(push, 1) + +typedef struct { + int8_t type; // msg type + int8_t protocol; // protocol version + uint16_t signature; // fixed value + int32_t code; // + int32_t cId; // cluster Id + int32_t vgId; // vg ID + int32_t len; // content length, does not include head + uint32_t cksum; +} SSyncHead; + +typedef struct { + SSyncHead head; + uint16_t port; + uint16_t tranId; + int32_t sourceId; // only for arbitrator + char fqdn[TSDB_FQDN_LEN]; +} SSyncMsg; + +typedef struct { + SSyncHead head; + int8_t sync; + int8_t reserved; + uint16_t tranId; + int8_t reserverd[4]; +} SSyncRsp; + +typedef struct { + int8_t role; + uint64_t version; +} SPeerStatus; + +typedef struct { + SSyncHead head; + int8_t role; + int8_t ack; + int8_t type; + int8_t reserved[3]; + uint16_t tranId; + uint64_t version; + SPeerStatus peersStatus[TAOS_SYNC_MAX_REPLICA]; +} SPeersStatus; + +typedef struct { + SSyncHead head; + char name[TSDB_FILENAME_LEN]; + uint32_t magic; + uint32_t index; + uint64_t fversion; + int64_t size; +} SFileInfo; + +typedef struct { + SSyncHead head; + int8_t sync; +} SFileAck; + +typedef struct { + SSyncHead head; + uint64_t version; + int32_t code; +} SFwdRsp; + +#pragma pack(pop) + +#define SYNC_PROTOCOL_VERSION 1 +#define SYNC_SIGNATURE ((uint16_t)(0xCDEF)) + +extern char *statusType[]; + +uint16_t syncGenTranId(); +int32_t syncCheckHead(SSyncHead *pHead); + +void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len); +void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code); +void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId); +void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId); +void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId); +void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId); + +void syncBuildFileAck(SFileAck *pMsg, int32_t vgId); +void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_VNODEPEER_H diff --git a/src/sync/inc/taosTcpPool.h b/src/sync/inc/syncTcp.h similarity index 77% rename from src/sync/inc/taosTcpPool.h rename to src/sync/inc/syncTcp.h index 41043b0cd4..7db51f2a71 100644 --- a/src/sync/inc/taosTcpPool.h +++ b/src/sync/inc/syncTcp.h @@ -13,16 +13,13 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_TCP_POOL_H -#define TDENGINE_TCP_POOL_H +#ifndef TDENGINE_SYNC_TCP_POOL_H +#define TDENGINE_SYNC_TCP_POOL_H #ifdef __cplusplus extern "C" { #endif -typedef void *ttpool_h; -typedef void *tthread_h; - typedef struct { int32_t numOfThreads; uint32_t serverIp; @@ -33,10 +30,10 @@ typedef struct { void (*processIncomingConn)(int32_t fd, uint32_t ip); } SPoolInfo; -ttpool_h taosOpenTcpThreadPool(SPoolInfo *pInfo); -void taosCloseTcpThreadPool(ttpool_h); -void * taosAllocateTcpConn(void *, void *ahandle, int32_t connFd); -void taosFreeTcpConn(void *); +void *syncOpenTcpThreadPool(SPoolInfo *pInfo); +void syncCloseTcpThreadPool(void *); +void *syncAllocateTcpConn(void *, void *ahandle, int32_t connFd); +void syncFreeTcpConn(void *); #ifdef __cplusplus } diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/syncArbitrator.c similarity index 82% rename from src/sync/src/tarbitrator.c rename to src/sync/src/syncArbitrator.c index 4016042de2..1cb2b8f302 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/syncArbitrator.c @@ -22,17 +22,17 @@ #include "tsocket.h" #include "tglobal.h" #include "taoserror.h" -#include "taosTcpPool.h" #include "twal.h" #include "tsync.h" #include "syncInt.h" +#include "syncTcp.h" -static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); -static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); -static void arbProcessBrokenLink(void *param); -static int32_t arbProcessPeerMsg(void *param, void *buffer); -static tsem_t tsArbSem; -static ttpool_h tsArbTcpPool; +static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); +static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); +static void arbProcessBrokenLink(void *param); +static int32_t arbProcessPeerMsg(void *param, void *buffer); +static tsem_t tsArbSem; +static void * tsArbTcpPool; typedef struct { char id[TSDB_EP_LEN + 24]; @@ -90,7 +90,7 @@ int32_t main(int32_t argc, char *argv[]) { info.processBrokenLink = arbProcessBrokenLink; info.processIncomingMsg = arbProcessPeerMsg; info.processIncomingConn = arbProcessIncommingConnection; - tsArbTcpPool = taosOpenTcpThreadPool(&info); + tsArbTcpPool = syncOpenTcpThreadPool(&info); if (tsArbTcpPool == NULL) { sDebug("failed to open TCP thread pool, exit..."); @@ -101,8 +101,8 @@ int32_t main(int32_t argc, char *argv[]) { tsem_wait(&tsArbSem); - taosCloseTcpThreadPool(tsArbTcpPool); - sInfo("TAOS arbitrator is shut down\n"); + syncCloseTcpThreadPool(tsArbTcpPool); + sInfo("TAOS arbitrator is shut down"); closelog(); return 0; @@ -113,9 +113,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); - SFirstPkt firstPkt; - if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { - sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno)); + SSyncMsg msg; + if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno)); taosCloseSocket(connFd); return; } @@ -127,9 +127,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { return; } - firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0; - snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); - if (firstPkt.syncHead.vgId) { + msg.fqdn[TSDB_FQDN_LEN - 1] = 0; + snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", msg.sourceId, msg.fqdn, msg.port); + if (msg.head.vgId) { sDebug("%s, vgId in head is not zero, close the connection", pNode->id); tfree(pNode); taosCloseSocket(connFd); @@ -138,7 +138,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { sDebug("%s, arbitrator request is accepted", pNode->id); pNode->nodeFd = connFd; - pNode->pConn = taosAllocateTcpConn(tsArbTcpPool, pNode, connFd); + pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, pNode, connFd); return; } @@ -156,8 +156,8 @@ static int32_t arbProcessPeerMsg(void *param, void *buffer) { int32_t bytes = 0; char * cont = (char *)buffer; - int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head)); - if (hlen != sizeof(head)) { + int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(SSyncHead)); + if (hlen != sizeof(SSyncHead)) { sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen); return -1; } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index aae5dab3cd..ac79b48606 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -23,26 +23,19 @@ #include "tsocket.h" #include "tglobal.h" #include "taoserror.h" -#include "taosTcpPool.h" #include "tqueue.h" #include "twal.h" #include "tsync.h" +#include "syncTcp.h" #include "syncInt.h" -// global configurable -int32_t tsMaxSyncNum = 2; -int32_t tsSyncTcpThreads = 2; -int32_t tsMaxFwdInfo = 512; -int32_t tsSyncTimer = 1; +int32_t tsSyncNum = 0; // number of sync in process in whole system +char tsNodeFqdn[TSDB_FQDN_LEN] = {0}; -// module global, not configurable -int32_t tsSyncNum; // number of sync in process in whole system -char tsNodeFqdn[TSDB_FQDN_LEN]; - -static ttpool_h tsTcpPool; -static void * tsSyncTmrCtrl = NULL; -static void * tsVgIdHash; -static int32_t tsSyncRefId = -1; +static void * tsTcpPool = NULL; +static void * tsSyncTmrCtrl = NULL; +static void * tsVgIdHash = NULL; +static int32_t tsSyncRefId = -1; // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); @@ -80,36 +73,10 @@ char *syncStatus[] = { "invalid" }; -typedef enum { - SYNC_STATUS_BROADCAST, - SYNC_STATUS_BROADCAST_RSP, - SYNC_STATUS_SETUP_CONN, - SYNC_STATUS_SETUP_CONN_RSP, - SYNC_STATUS_EXCHANGE_DATA, - SYNC_STATUS_EXCHANGE_DATA_RSP, - SYNC_STATUS_CHECK_ROLE, - SYNC_STATUS_CHECK_ROLE_RSP -} ESyncStatusType; - -char *statusType[] = { - "broadcast", - "broadcast-rsp", - "setup-conn", - "setup-conn-rsp", - "exchange-data", - "exchange-data-rsp", - "check-role", - "check-role-rsp" -}; - -uint16_t syncGenTranId() { - return taosRand() & 0XFFFF; -} - int32_t syncInit() { SPoolInfo info = {0}; - info.numOfThreads = tsSyncTcpThreads; + info.numOfThreads = SYNC_TCP_THREADS; info.serverIp = 0; info.port = tsSyncPort; info.bufferSize = SYNC_MAX_SIZE; @@ -117,7 +84,7 @@ int32_t syncInit() { info.processIncomingMsg = syncProcessPeerMsg; info.processIncomingConn = syncProcessIncommingConnection; - tsTcpPool = taosOpenTcpThreadPool(&info); + tsTcpPool = syncOpenTcpThreadPool(&info); if (tsTcpPool == NULL) { sError("failed to init tcpPool"); return -1; @@ -126,16 +93,16 @@ int32_t syncInit() { tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); if (tsSyncTmrCtrl == NULL) { sError("failed to init tmrCtrl"); - taosCloseTcpThreadPool(tsTcpPool); + syncCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; return -1; } tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (tsVgIdHash == NULL) { - sError("failed to init tsVgIdHash"); + sError("failed to init vgIdHash"); taosTmrCleanUp(tsSyncTmrCtrl); - taosCloseTcpThreadPool(tsTcpPool); + syncCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; tsSyncTmrCtrl = NULL; return -1; @@ -155,7 +122,7 @@ int32_t syncInit() { void syncCleanUp() { if (tsTcpPool) { - taosCloseTcpThreadPool(tsTcpPool); + syncCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; } @@ -236,7 +203,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]); - pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo * sizeof(SFwdInfo), 1); + pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + SYNC_MAX_FWDS * sizeof(SFwdInfo), 1); if (pNode->pSyncFwds == NULL) { sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); terrno = TAOS_SYSTEM_ERROR(errno); @@ -336,6 +303,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { newPeers[i] = pNode->peerInfo[j]; } + if (newPeers[i] == NULL) { + sError("vgId:%d, failed to reconfig", pNode->vgId); + return TSDB_CODE_SYN_INVALID_CONFIG; + } + if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) { pNode->selfIndex = i; } @@ -384,21 +356,13 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { SSyncPeer *pPeer = pNode->pMaster; if (pPeer && pNode->quorum > 1) { - char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; + SFwdRsp rsp; + syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code); - SSyncHead *pHead = (SSyncHead *)msg; - pHead->type = TAOS_SMSG_FORWARD_RSP; - pHead->len = sizeof(SFwdRsp); - - SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); - pFwdRsp->version = version; - pFwdRsp->code = code; - - int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); - if (taosWriteMsg(pPeer->peerFd, msg, msgLen) == msgLen) { - sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); + if (taosWriteMsg(pPeer->peerFd, &rsp, sizeof(SFwdRsp)) == sizeof(SFwdRsp)) { + sTrace("%s, forward-rsp is sent, code:0x%x hver:%" PRIu64, pPeer->id, code, version); } else { - sDebug("%s, failed to send forward ack, restart", pPeer->id); + sDebug("%s, failed to send forward-rsp, restart", pPeer->id); syncRestartConnection(pPeer); } } @@ -509,7 +473,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { taosClose(pPeer->syncFd); if (pPeer->peerFd >= 0) { pPeer->peerFd = -1; - taosFreeTcpConn(pPeer->pConn); + syncFreeTcpConn(pPeer->pConn); } } @@ -779,7 +743,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) { int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { sDebug("%s, check peer connection in 1000 ms", pPeer->id); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); } } @@ -857,7 +821,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); // Ensure the sync of mnode not interrupted - if (pNode->vgId != 1 && tsSyncNum >= tsMaxSyncNum) { + if (pNode->vgId != 1 && tsSyncNum >= SYNC_MAX_NUM) { sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum); taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer); return; @@ -865,56 +829,42 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { sDebug("%s, try to sync", pPeer->id); - SFirstPkt firstPkt; - memset(&firstPkt, 0, sizeof(firstPkt)); - firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ; - firstPkt.syncHead.vgId = pNode->vgId; - firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); - firstPkt.tranId = syncGenTranId(); - tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); - firstPkt.port = tsSyncPort; - taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); + SSyncMsg msg; + syncBuildSyncReqMsg(&msg, pNode->vgId); - if (taosWriteMsg(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { + taosTmrReset(syncNotStarted, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); + + if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { sError("%s, failed to send sync-req to peer", pPeer->id); } else { - sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, firstPkt.tranId, syncStatus[nodeSStatus]); + sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, msg.tranId, syncStatus[nodeSStatus]); } } -static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { +static void syncProcessFwdResponse(SFwdRsp *pFwdRsp, SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; - SFwdRsp * pFwdRsp = (SFwdRsp *)cont; SSyncFwds *pSyncFwds = pNode->pSyncFwds; SFwdInfo * pFwdInfo; sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version); SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; - bool found = false; if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { // find the forwardInfo from first for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { - pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo; + pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % SYNC_MAX_FWDS; if (pFwdRsp->version == pFwdInfo->version) { - found = true; syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code); syncRemoveConfirmedFwdInfo(pNode); - break; + return; } } } - - if (!found) { - sTrace("%s, forward-rsp not found first:%d fwds:%d, code:%x hver:%" PRIu64, pPeer->id, pSyncFwds->first, - pSyncFwds->fwds, pFwdRsp->code, pFwdRsp->version); - syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code); - } } static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; - SWalHead * pHead = (SWalHead *)cont; + SWalHead * pHead = (SWalHead *)(cont + sizeof(SSyncHead)); sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); @@ -931,9 +881,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { } } -static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { - SSyncNode * pNode = pPeer->pSyncNode; - SPeersStatus *pPeersStatus = (SPeersStatus *)cont; +static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) { + SSyncNode *pNode = pPeer->pSyncNode; sDebug("%s, status is received, self:%s:%s:%" PRIu64 ", peer:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role], @@ -947,23 +896,22 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { } } -static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { +static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) { if (pPeer->peerFd < 0) return -1; int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); if (hlen != sizeof(SSyncHead)) { - sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen); + sDebug("%s, failed to read msg since %s, hlen:%d", pPeer->id, tstrerror(errno), hlen); return -1; } - // head.len = htonl(head.len); - if (pHead->len < 0) { - sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len); + int32_t code = syncCheckHead(pHead); + if (code != 0) { + sError("%s, failed to check msg head since %s, type:%d", pPeer->id, tstrerror(code), pHead->type); return -1; } - assert(pHead->len <= TSDB_MAX_WAL_SIZE); - int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); + int32_t bytes = taosReadMsg(pPeer->peerFd, (char *)pHead + sizeof(SSyncHead), pHead->len); if (bytes != pHead->len) { sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len); return -1; @@ -974,23 +922,22 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { static int32_t syncProcessPeerMsg(void *param, void *buffer) { SSyncPeer *pPeer = param; - SSyncHead head; - char * cont = buffer; - + SSyncHead *pHead = buffer; SSyncNode *pNode = pPeer->pSyncNode; + pthread_mutex_lock(&pNode->mutex); - int32_t code = syncReadPeerMsg(pPeer, &head, cont); + int32_t code = syncReadPeerMsg(pPeer, pHead); if (code == 0) { - if (head.type == TAOS_SMSG_FORWARD) { - syncProcessForwardFromPeer(cont, pPeer); - } else if (head.type == TAOS_SMSG_FORWARD_RSP) { - syncProcessFwdResponse(cont, pPeer); - } else if (head.type == TAOS_SMSG_SYNC_REQ) { - syncProcessSyncRequest(cont, pPeer); - } else if (head.type == TAOS_SMSG_STATUS) { - syncProcessPeersStatusMsg(cont, pPeer); + if (pHead->type == TAOS_SMSG_SYNC_FWD) { + syncProcessForwardFromPeer(buffer, pPeer); + } else if (pHead->type == TAOS_SMSG_SYNC_FWD_RSP) { + syncProcessFwdResponse(buffer, pPeer); + } else if (pHead->type == TAOS_SMSG_SYNC_REQ) { + syncProcessSyncRequest(buffer, pPeer); + } else if (pHead->type == TAOS_SMSG_STATUS) { + syncProcessPeersStatusMsg(buffer, pPeer); } } @@ -999,36 +946,30 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) { return code; } -#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA - static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) { - SSyncNode *pNode = pPeer->pSyncNode; - char msg[statusMsgLen] = {0}; - if (pPeer->peerFd < 0 || pPeer->ip == 0) return; - SSyncHead * pHead = (SSyncHead *)msg; - SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead)); + SSyncNode * pNode = pPeer->pSyncNode; + SPeersStatus msg; - pHead->type = TAOS_SMSG_STATUS; - pHead->len = statusMsgLen - sizeof(SSyncHead); + memset(&msg, 0, sizeof(SPeersStatus)); + syncBuildPeersStatus(&msg, pNode->vgId); - pPeersStatus->version = nodeVersion; - pPeersStatus->role = nodeRole; - pPeersStatus->ack = ack; - pPeersStatus->type = type; - pPeersStatus->tranId = tranId; + msg.role = nodeRole; + msg.ack = ack; + msg.type = type; + msg.tranId = tranId; + msg.version = nodeVersion; for (int32_t i = 0; i < pNode->replica; ++i) { - pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role; - pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; + msg.peersStatus[i].role = pNode->peerInfo[i]->role; + msg.peersStatus[i].version = pNode->peerInfo[i]->version; } - if (taosWriteMsg(pPeer->peerFd, msg, statusMsgLen) == statusMsgLen) { + if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SPeersStatus)) == sizeof(SPeersStatus)) { sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role], - syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId, - statusType[pPeersStatus->type], pPeer->peerFd); + syncStatus[pPeer->sstatus], pPeer->version, ack, tranId, statusType[type], pPeer->peerFd); } else { sDebug("%s, failed to send status msg, restart", pPeer->id); syncRestartConnection(pPeer); @@ -1048,29 +989,23 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (connFd < 0) { sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); return; } - SFirstPkt firstPkt; - memset(&firstPkt, 0, sizeof(firstPkt)); - firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0; - firstPkt.syncHead.type = TAOS_SMSG_STATUS; - tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); - firstPkt.port = tsSyncPort; - firstPkt.tranId = syncGenTranId(); - firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId + SSyncMsg msg; + syncBuildSyncSetupMsg(&msg, pPeer->nodeId ? pNode->vgId : 0); - if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { - sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId); + if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) == sizeof(SSyncMsg)) { + sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId); pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; - pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); + pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); syncAddPeerRef(pPeer); } else { sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); taosClose(connFd); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); } } @@ -1116,14 +1051,21 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); - SFirstPkt firstPkt; - if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { - sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno)); + SSyncMsg msg; + if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno)); taosCloseSocket(connFd); return; } - int32_t vgId = firstPkt.syncHead.vgId; + int32_t code = syncCheckHead((SSyncHead *)(&msg)); + if (code != 0) { + sError("failed to check peer sync msg from ip:%s since %s", ipstr, strerror(code)); + taosCloseSocket(connFd); + return; + } + + int32_t vgId = msg.head.vgId; SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t)); if (ppNode == NULL || *ppNode == NULL) { sError("vgId:%d, vgId could not be found", vgId); @@ -1131,7 +1073,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { return; } - sDebug("vgId:%d, firstPkt is received, tranId:%u", vgId, firstPkt.tranId); + sDebug("vgId:%d, sync msg is received, tranId:%u", vgId, msg.tranId); SSyncNode *pNode = *ppNode; pthread_mutex_lock(&pNode->mutex); @@ -1139,27 +1081,27 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { SSyncPeer *pPeer; for (i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; - if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break; + if (pPeer && (strcmp(pPeer->fqdn, msg.fqdn) == 0) && (pPeer->port == msg.port)) break; } pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL; if (pPeer == NULL) { - sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, firstPkt.fqdn, firstPkt.port); + sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, msg.fqdn, msg.port); taosCloseSocket(connFd); // syncSendVpeerCfgMsg(sync); } else { // first packet tells what kind of link - if (firstPkt.syncHead.type == TAOS_SMSG_SYNC_DATA) { + if (msg.head.type == TAOS_SMSG_SYNC_DATA) { pPeer->syncFd = connFd; nodeSStatus = TAOS_SYNC_STATUS_START; - sInfo("%s, sync-data pkt from master is received, tranId:%u, set sstatus:%s", pPeer->id, firstPkt.tranId, + sInfo("%s, sync-data msg from master is received, tranId:%u, set sstatus:%s", pPeer->id, msg.tranId, syncStatus[nodeSStatus]); syncCreateRestoreDataThread(pPeer); } else { sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd); syncClosePeerConn(pPeer); pPeer->peerFd = connFd; - pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); + pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); syncAddPeerRef(pPeer); sDebug("%s, ready to exchange data", pPeer->id); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId()); @@ -1192,15 +1134,15 @@ static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle SSyncFwds *pSyncFwds = pNode->pSyncFwds; int64_t time = taosGetTimestampMs(); - if (pSyncFwds->fwds >= tsMaxFwdInfo) { - // pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; + if (pSyncFwds->fwds >= SYNC_MAX_FWDS) { + // pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS; // pSyncFwds->fwds--; sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds); return TSDB_CODE_SYN_TOO_MANY_FWDINFO; } if (pSyncFwds->fwds > 0) { - pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo; + pSyncFwds->last = (pSyncFwds->last + 1) % SYNC_MAX_FWDS; } SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; @@ -1223,7 +1165,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first; if (pFwdInfo->confirmed == 0) break; - pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; + pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS; pSyncFwds->fwds--; if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last; sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds); @@ -1248,7 +1190,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code } if (confirm && pFwdInfo->confirmed == 0) { - sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); + sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:0x%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); (*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code); pFwdInfo->confirmed = 1; } @@ -1289,7 +1231,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { if (pSyncFwds->fwds > 0) { pthread_mutex_lock(&pNode->mutex); for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { - SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS; if (ABS(time - pFwdInfo->time) < 2000) break; sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId, @@ -1334,14 +1276,12 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; - // only pkt from RPC or CQ can be forwarded + // only msg from RPC or CQ can be forwarded if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; // a hacker way to improve the performance pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); - pSyncHead->type = TAOS_SMSG_FORWARD; - pSyncHead->pversion = 0; - pSyncHead->len = sizeof(SWalHead) + pWalHead->len; + syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len); fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head pthread_mutex_lock(&pNode->mutex); @@ -1371,4 +1311,3 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle return code; } - diff --git a/src/sync/src/syncMsg.c b/src/sync/src/syncMsg.c new file mode 100644 index 0000000000..034f9a98a7 --- /dev/null +++ b/src/sync/src/syncMsg.c @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "tglobal.h" +#include "tchecksum.h" +#include "syncInt.h" + +char *statusType[] = { + "broadcast", + "broadcast-rsp", + "setup-conn", + "setup-conn-rsp", + "exchange-data", + "exchange-data-rsp", + "check-role", + "check-role-rsp" +}; + +uint16_t syncGenTranId() { + return taosRand() & 0XFFFF; +} + +static void syncBuildHead(SSyncHead *pHead) { + pHead->protocol = SYNC_PROTOCOL_VERSION; + pHead->signature = SYNC_SIGNATURE; + pHead->code = 0; + pHead->cId = 0; + taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead)); +} + +int32_t syncCheckHead(SSyncHead *pHead) { + if (pHead->protocol != SYNC_PROTOCOL_VERSION) return TSDB_CODE_SYN_MISMATCHED_PROTOCOL; + if (pHead->signature != SYNC_SIGNATURE) return TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + if (pHead->cId != 0) return TSDB_CODE_SYN_MISMATCHED_CLUSTERID; + if (pHead->len <= 0 || pHead->len > TSDB_MAX_WAL_SIZE) return TSDB_CODE_SYN_INVALID_MSGLEN; + if (pHead->type <= TAOS_SMSG_START || pHead->type >= TAOS_SMSG_END) return TSDB_CODE_SYN_INVALID_MSGTYPE; + if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SSyncHead))) return TSDB_CODE_SYN_INVALID_CHECKSUM; + + return TSDB_CODE_SUCCESS; +} + +void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len) { + pHead->type = TAOS_SMSG_SYNC_FWD; + pHead->vgId = vgId; + pHead->len = len; + syncBuildHead(pHead); +} + +void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code) { + pMsg->head.type = TAOS_SMSG_SYNC_FWD_RSP; + pMsg->head.vgId = vgId; + pMsg->head.len = sizeof(SFwdRsp) - sizeof(SSyncHead); + syncBuildHead(&pMsg->head); + + pMsg->version = version; + pMsg->code = code; +} + +static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) { + pMsg->head.type = type; + pMsg->head.vgId = vgId; + pMsg->head.len = sizeof(SSyncMsg) - sizeof(SSyncHead); + syncBuildHead(&pMsg->head); + + pMsg->port = tsSyncPort; + pMsg->tranId = syncGenTranId(); + pMsg->sourceId = vgId; + tstrncpy(pMsg->fqdn, tsNodeFqdn, TSDB_FQDN_LEN); +} + +void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); } +void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); } +void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); } + +void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) { + pMsg->head.type = TAOS_SMSG_STATUS; + pMsg->head.vgId = vgId; + pMsg->head.len = sizeof(SPeersStatus) - sizeof(SSyncHead); + syncBuildHead(&pMsg->head); +} + +void syncBuildFileAck(SFileAck *pMsg, int32_t vgId) { + pMsg->head.type = TAOS_SMSG_SYNC_FILE_RSP; + pMsg->head.vgId = vgId; + pMsg->head.len = sizeof(SFileAck) - sizeof(SSyncHead); + syncBuildHead(&pMsg->head); +} + +void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId) { + pMsg->head.type = TAOS_SMSG_SYNC_FILE; + pMsg->head.vgId = vgId; + pMsg->head.len = sizeof(SFileInfo) - sizeof(SSyncHead); + syncBuildHead(&pMsg->head); +} \ No newline at end of file diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 4247468bca..8651879eb6 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -56,7 +56,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { SSyncNode *pNode = pPeer->pSyncNode; SFileInfo minfo; memset(&minfo, 0, sizeof(SFileInfo)); /* = {0}; */ SFileInfo sinfo; memset(&sinfo, 0, sizeof(SFileInfo)); /* = {0}; */ - SFileAck fileAck = {0}; + SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck)); int32_t code = -1; char name[TSDB_FILENAME_LEN * 2] = {0}; uint32_t pindex = 0; // index in last restore @@ -69,7 +69,14 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { minfo.index = -1; int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo)); if (ret != sizeof(SFileInfo) || minfo.index == -1) { - sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno)); + sError("%s, failed to read fileinfo while restore file since %s", pPeer->id, strerror(errno)); + break; + } + + assert(ret == sizeof(SFileInfo)); + ret = syncCheckHead((SSyncHead *)(&minfo)); + if (ret != 0) { + sError("%s, failed to check fileinfo while restore file since %s", pPeer->id, strerror(ret)); break; } @@ -94,12 +101,13 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { &sinfo.fversion); // if file not there or magic is not the same, file shall be synced - memset(&fileAck, 0, sizeof(fileAck)); + memset(&fileAck, 0, sizeof(SFileAck)); + syncBuildFileAck(&fileAck, pNode->vgId); fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0; // send file ack - ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); - if (ret != sizeof(fileAck)) { + ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck)); + if (ret != sizeof(SFileAck)) { sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno)); break; } @@ -289,12 +297,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { uint64_t fversion = 0; sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - SFirstPktRsp firstPktRsp = {.sync = 1, .tranId = syncGenTranId()}; - if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { - sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno)); + SSyncRsp rsp = {.sync = 1, .tranId = syncGenTranId()}; + if (taosWriteMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) { + sError("%s, failed to send sync rsp since %s", pPeer->id, strerror(errno)); return -1; } - sDebug("%s, send firstPktRsp to peer, tranId:%u", pPeer->id, firstPktRsp.tranId); + sDebug("%s, send sync rsp to peer, tranId:%u", pPeer->id, rsp.tranId); sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); int32_t code = syncRestoreFile(pPeer, &fversion); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 82e3700c7a..02d990313e 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -88,7 +88,7 @@ static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) { static int32_t syncRetrieveFile(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo)); - SFileAck fileAck = {0}; + SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck)); int32_t code = -1; char name[TSDB_FILENAME_LEN * 2] = {0}; @@ -103,11 +103,12 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { fileInfo.size = 0; fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); + syncBuildFileInfo(&fileInfo, pNode->vgId); sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); // send the file info - int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); - if (ret != sizeof(fileInfo)) { + int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo)); + if (ret != sizeof(SFileInfo)) { code = -1; sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; @@ -128,6 +129,13 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { break; } + ret = syncCheckHead((SSyncHead*)(&fileAck)); + if (ret != 0) { + code = -1; + sError("%s, failed to check file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(ret)); + break; + } + // set the peer sync version pPeer->sversion = fileInfo.fversion; @@ -405,27 +413,22 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; - SFirstPkt firstPkt; - memset(&firstPkt, 0, sizeof(firstPkt)); - firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA; - firstPkt.syncHead.vgId = pNode->vgId; - firstPkt.tranId = syncGenTranId(); - tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); - firstPkt.port = tsSyncPort; + SSyncMsg msg; + syncBuildSyncDataMsg(&msg, pNode->vgId); - if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { - sError("%s, failed to send sync firstPkt since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); + if (taosWriteMsg(pPeer->syncFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId); return -1; } - sDebug("%s, send sync-data pkt to peer, tranId:%u", pPeer->id, firstPkt.tranId); + sDebug("%s, send sync-data msg to peer, tranId:%u", pPeer->id, msg.tranId); - SFirstPktRsp firstPktRsp; - if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { - sError("%s, failed to read sync firstPkt rsp since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); + SSyncRsp rsp; + if (taosReadMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) { + sError("%s, failed to read sync-data rsp since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId); return -1; } - sDebug("%s, recv firstPktRsp from peer, tranId:%u", pPeer->id, firstPkt.tranId); + sDebug("%s, recv sync-data rsp from peer, tranId:%u rsp-tranId:%u", pPeer->id, msg.tranId, rsp.tranId); return 0; } diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/syncTcp.c similarity index 89% rename from src/sync/src/taosTcpPool.c rename to src/sync/src/syncTcp.c index eb05cf7c6f..7bfdc4e440 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/syncTcp.c @@ -19,10 +19,10 @@ #include "tutil.h" #include "tsocket.h" #include "taoserror.h" -#include "taosTcpPool.h" #include "twal.h" #include "tsync.h" #include "syncInt.h" +#include "syncTcp.h" typedef struct SThreadObj { pthread_t thread; @@ -47,12 +47,12 @@ typedef struct { int32_t closedByApp; } SConnObj; -static void *taosAcceptPeerTcpConnection(void *argv); -static void *taosProcessTcpData(void *param); -static void taosStopPoolThread(SThreadObj *pThread); -static SThreadObj *taosGetTcpThread(SPoolObj *pPool); +static void *syncAcceptPeerTcpConnection(void *argv); +static void *syncProcessTcpData(void *param); +static void syncStopPoolThread(SThreadObj *pThread); +static SThreadObj *syncGetTcpThread(SPoolObj *pPool); -void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { +void *syncOpenTcpThreadPool(SPoolInfo *pInfo) { pthread_attr_t thattr; SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); @@ -80,7 +80,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) { + if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) { sError("failed to create accept thread for TCP server since %s", strerror(errno)); close(pPool->acceptFd); tfree(pPool->pThread); @@ -94,7 +94,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { return pPool; } -void taosCloseTcpThreadPool(void *param) { +void syncCloseTcpThreadPool(void *param) { SPoolObj * pPool = param; SThreadObj *pThread; @@ -103,7 +103,7 @@ void taosCloseTcpThreadPool(void *param) { for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) { pThread = pPool->pThread[i]; - if (pThread) taosStopPoolThread(pThread); + if (pThread) syncStopPoolThread(pThread); } sDebug("%p TCP pool is closed", pPool); @@ -112,7 +112,7 @@ void taosCloseTcpThreadPool(void *param) { tfree(pPool); } -void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { +void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { struct epoll_event event; SPoolObj *pPool = param; @@ -122,7 +122,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { return NULL; } - SThreadObj *pThread = taosGetTcpThread(pPool); + SThreadObj *pThread = syncGetTcpThread(pPool); if (pThread == NULL) { tfree(pConn); return NULL; @@ -149,7 +149,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { return pConn; } -void taosFreeTcpConn(void *param) { +void syncFreeTcpConn(void *param) { SConnObj * pConn = param; SThreadObj *pThread = pConn->pThread; @@ -175,7 +175,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) { #define maxEvents 10 -static void *taosProcessTcpData(void *param) { +static void *syncProcessTcpData(void *param) { SThreadObj *pThread = (SThreadObj *)param; SPoolObj * pPool = pThread->pPool; SPoolInfo * pInfo = &pPool->info; @@ -222,7 +222,7 @@ static void *taosProcessTcpData(void *param) { if (pConn->closedByApp == 0) { if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) { - taosFreeTcpConn(pConn); + syncFreeTcpConn(pConn); continue; } } @@ -239,7 +239,7 @@ static void *taosProcessTcpData(void *param) { return NULL; } -static void *taosAcceptPeerTcpConnection(void *argv) { +static void *syncAcceptPeerTcpConnection(void *argv) { SPoolObj * pPool = (SPoolObj *)argv; SPoolInfo *pInfo = &pPool->info; @@ -268,7 +268,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) { return NULL; } -static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { +static SThreadObj *syncGetTcpThread(SPoolObj *pPool) { SThreadObj *pThread = pPool->pThread[pPool->nextId]; if (pThread) return pThread; @@ -286,7 +286,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread); + int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)syncProcessTcpData, pThread); pthread_attr_destroy(&thattr); if (ret != 0) { @@ -303,7 +303,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { return pThread; } -static void taosStopPoolThread(SThreadObj *pThread) { +static void syncStopPoolThread(SThreadObj *pThread) { pthread_t thread = pThread->thread; if (!taosCheckPthreadValid(thread)) { return; diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 5a64a0a36a..161105d86c 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -100,7 +100,7 @@ int processRpcMsg(void *item) { pHead->msgType = pMsg->msgType; pHead->len = pMsg->contLen; - uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version); + uDebug("ver:%" PRIu64 ", rsp from client processed", pHead->version); writeIntoWal(pHead); syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC); @@ -275,7 +275,7 @@ int getWalInfo(int32_t vgId, char *name, int64_t *index) { int writeToCache(int32_t vgId, void *data, int type) { SWalHead *pHead = data; - uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type); + uDebug("rsp from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type); int msgSize = pHead->len + sizeof(SWalHead); void *pMsg = taosAllocateQitem(msgSize); diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 2253ad5c33..e67127d6e4 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -297,16 +297,14 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch } } - if (pHead->len > size - sizeof(SWalHead)) { - size = sizeof(SWalHead) + pHead->len; - buffer = realloc(buffer, size); - if (buffer == NULL) { - wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); - code = TAOS_SYSTEM_ERROR(errno); + if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) { + wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, + pHead->version, pHead->len, offset); + code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); + if (code != TSDB_CODE_SUCCESS) { + walFtruncate(pWal, tfd, offset); break; } - - pHead = buffer; } ret = tfRead(tfd, pHead->cont, pHead->len); diff --git a/src/wal/test/waltest.c b/src/wal/test/waltest.c index 7a473ed18c..9a52a2ca83 100644 --- a/src/wal/test/waltest.c +++ b/src/wal/test/waltest.c @@ -76,7 +76,7 @@ int main(int argc, char *argv[]) { taosInitLog("wal.log", 100000, 10); - SWalCfg walCfg; + SWalCfg walCfg = {0}; walCfg.walLevel = level; walCfg.keep = keep; diff --git a/tests/pytest/query/queryError.py b/tests/pytest/query/queryError.py index f1fd9c0dec..539ce5141f 100644 --- a/tests/pytest/query/queryError.py +++ b/tests/pytest/query/queryError.py @@ -56,6 +56,15 @@ class TDTestCase: # query .. order by non-time field tdSql.error("select * from st order by name") + # TD-2133 + tdSql.error("select diff(tagtype),bottom(tagtype,1) from dev_001") + + # TD-2190 + tdSql.error("select min(tagtype),max(tagtype) from dev_002 interval(1n) fill(prev)") + + # TD-2208 + tdSql.error("select diff(tagtype),top(tagtype,1) from dev_001") + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/pytest/test.py b/tests/pytest/test.py index a2c35444b5..6be86fe3fd 100644 --- a/tests/pytest/test.py +++ b/tests/pytest/test.py @@ -111,13 +111,25 @@ if __name__ == "__main__": tdLog.info('stop All dnodes') sys.exit(0) - + tdDnodes.init(deployPath) tdDnodes.setTestCluster(testCluster) tdDnodes.setValgrind(valgrind) - tdDnodes.stopAll() - tdDnodes.deploy(1) + is_test_framework = 0 + key_word = 'tdCases.addLinux' + if key_word in open(fileName).read(): + is_test_framework = 1 + if is_test_framework: + moduleName = fileName.replace(".py", "").replace("/", ".") + uModule = importlib.import_module(moduleName) + try: + ucase = uModule.TDTestCase() + tdDnodes.deploy(1,ucase.updatecfgDict) + except : + tdDnodes.deploy(1,{}) + else: + tdDnodes.deploy(1,{}) tdDnodes.start(1) if masterIp == "": diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 38e9e01870..83cb4e8d99 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -108,6 +108,36 @@ class TDDnode: self.deployed = 0 self.testCluster = False self.valgrind = 0 + self.cfgDict = { + "numOfLogLines":"100000000", + "mnodeEqualVnodeNum":"0", + "walLevel":"2", + "fsync":"1000", + "statusInterval":"1", + "numOfMnodes":"3", + "numOfThreadsPerCore":"2.0", + "monitor":"0", + "maxVnodeConnections":"30000", + "maxMgmtConnections":"30000", + "maxMeterConnections":"30000", + "maxShellConns":"30000", + "locale":"en_US.UTF-8", + "charset":"UTF-8", + "asyncLog":"0", + "anyIp":"0", + "tsEnableTelemetryReporting":"0", + "dDebugFlag":"135", + "mDebugFlag":"135", + "sdbDebugFlag":"135", + "rpcDebugFlag":"135", + "tmrDebugFlag":"131", + "cDebugFlag":"135", + "httpDebugFlag":"135", + "monitorDebugFlag":"135", + "udebugFlag":"135", + "jnidebugFlag":"135", + "qdebugFlag":"135" + } def init(self, path): self.path = path @@ -131,7 +161,10 @@ class TDDnode: return totalSize - def deploy(self): + def addExtraCfg(self, option, value): + self.cfgDict.update({option: value}) + + def deploy(self, *updatecfgDict): self.logDir = "%s/sim/dnode%d/log" % (self.path, self.index) self.dataDir = "%s/sim/dnode%d/data" % (self.path, self.index) self.cfgDir = "%s/sim/dnode%d/cfg" % (self.path, self.index) @@ -175,36 +208,17 @@ class TDDnode: self.cfg("publicIp", "192.168.0.%d" % (self.index)) self.cfg("internalIp", "192.168.0.%d" % (self.index)) self.cfg("privateIp", "192.168.0.%d" % (self.index)) - self.cfg("dataDir", self.dataDir) - self.cfg("logDir", self.logDir) - self.cfg("numOfLogLines", "100000000") - self.cfg("mnodeEqualVnodeNum", "0") - self.cfg("walLevel", "2") - self.cfg("fsync", "1000") - self.cfg("statusInterval", "1") - self.cfg("numOfMnodes", "3") - self.cfg("numOfThreadsPerCore", "2.0") - self.cfg("monitor", "0") - self.cfg("maxVnodeConnections", "30000") - self.cfg("maxMgmtConnections", "30000") - self.cfg("maxMeterConnections", "30000") - self.cfg("maxShellConns", "30000") - self.cfg("locale", "en_US.UTF-8") - self.cfg("charset", "UTF-8") - self.cfg("asyncLog", "0") - self.cfg("anyIp", "0") - self.cfg("tsEnableTelemetryReporting", "0") - self.cfg("dDebugFlag", "135") - self.cfg("mDebugFlag", "135") - self.cfg("sdbDebugFlag", "135") - self.cfg("rpcDebugFlag", "135") - self.cfg("tmrDebugFlag", "131") - self.cfg("cDebugFlag", "135") - self.cfg("httpDebugFlag", "135") - self.cfg("monitorDebugFlag", "135") - self.cfg("udebugFlag", "135") - self.cfg("jnidebugFlag", "135") - self.cfg("qdebugFlag", "135") + + self.cfg("dataDir",self.dataDir) + self.cfg("logDir",self.logDir) + print(updatecfgDict) + if updatecfgDict[0] and updatecfgDict[0][0]: + print(updatecfgDict[0][0]) + for key,value in updatecfgDict[0][0].items(): + self.addExtraCfg(key,value) + for key, value in self.cfgDict.items(): + self.cfg(key, value) + self.deployed = 1 tdLog.debug( "dnode:%d is deployed and configured by %s" % @@ -260,6 +274,12 @@ class TDDnode: key = 'from offline to online' bkey = bytes(key,encoding="utf8") logFile = self.logDir + "/taosdlog.0" + i = 0 + while not os.path.exists(logFile): + sleep(0.1) + i += 1 + if i>50: + break popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) pid = popen.pid print('Popen.pid:' + str(pid)) @@ -273,6 +293,7 @@ class TDDnode: else: tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) time.sleep(5) + # time.sleep(5) @@ -454,7 +475,7 @@ class TDDnodes: def setValgrind(self, value): self.valgrind = value - def deploy(self, index): + def deploy(self, index, *updatecfgDict): self.sim.setTestCluster(self.testCluster) if (self.simDeployed == False): @@ -464,7 +485,7 @@ class TDDnodes: self.check(index) self.dnodes[index - 1].setTestCluster(self.testCluster) self.dnodes[index - 1].setValgrind(self.valgrind) - self.dnodes[index - 1].deploy() + self.dnodes[index - 1].deploy(updatecfgDict) def cfg(self, index, option, value): self.check(index) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 64a6c871fc..34b057e71b 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -154,6 +154,7 @@ cd ../../../debug; make ./test.sh -f general/parser/repeatAlter.sim ./test.sh -f general/parser/union.sim ./test.sh -f general/parser/topbot.sim +./test.sh -f general/parser/function.sim ./test.sh -f general/stable/disk.sim ./test.sh -f general/stable/dnode3.sim diff --git a/tests/script/unique/db/replica_add13.sim b/tests/script/unique/db/replica_add13.sim index 1df49ba658..defe306f2f 100644 --- a/tests/script/unique/db/replica_add13.sim +++ b/tests/script/unique/db/replica_add13.sim @@ -110,6 +110,7 @@ sql insert into d1.t1 values(1589529000012, 2) sql insert into d2.t2 values(1589529000022, 2) sql insert into d3.t3 values(1589529000032, 2) sql insert into d4.t4 values(1589529000042, 2) +sleep 1000 sql select * from d1.t1 if $rows != 2 then @@ -141,6 +142,7 @@ sql insert into d1.t1 values(1589529000013, 3) sql insert into d2.t2 values(1589529000023, 3) sql insert into d3.t3 values(1589529000033, 3) sql insert into d4.t4 values(1589529000043, 3) +sleep 1000 sql select * from d1.t1 if $rows != 3 then @@ -172,6 +174,7 @@ sql insert into d1.t1 values(1589529000014, 4) sql insert into d2.t2 values(1589529000024, 4) sql insert into d3.t3 values(1589529000034, 4) sql insert into d4.t4 values(1589529000044, 4) +sleep 1000 sql select * from d1.t1 print select * from d1.t1 $rows @@ -207,6 +210,7 @@ sql insert into d1.t1 values(1589529000015, 5) sql insert into d2.t2 values(1589529000025, 5) sql insert into d3.t3 values(1589529000035, 5) sql insert into d4.t4 values(1589529000045, 5) +sleep 1000 sql select * from d1.t1 if $rows != 5 then @@ -238,6 +242,7 @@ sql insert into d1.t1 values(1589529000016, 6) sql insert into d2.t2 values(1589529000026, 6) sql insert into d3.t3 values(1589529000036, 6) sql insert into d4.t4 values(1589529000046, 6) +sleep 1000 sql select * from d1.t1 if $rows != 6 then diff --git a/tests/script/unique/db/replica_add23.sim b/tests/script/unique/db/replica_add23.sim index 5da73cd117..d93894deb8 100644 --- a/tests/script/unique/db/replica_add23.sim +++ b/tests/script/unique/db/replica_add23.sim @@ -110,6 +110,7 @@ sql insert into d1.t1 values(1588262400002, 2) sql insert into d2.t2 values(1588262400002, 2) sql insert into d3.t3 values(1588262400002, 2) sql insert into d4.t4 values(1588262400002, 2) +sleep 1000 sql select * from d1.t1 if $rows != 2 then @@ -142,6 +143,7 @@ sql insert into d1.t1 values(1588262400003, 3) sql insert into d2.t2 values(1588262400003, 3) sql insert into d3.t3 values(1588262400003, 3) sql insert into d4.t4 values(1588262400003, 3) +sleep 1000 sql select * from d1.t1 if $rows != 3 then @@ -173,6 +175,7 @@ sql insert into d1.t1 values(1588262400004, 4) sql insert into d2.t2 values(1588262400004, 4) sql insert into d3.t3 values(1588262400004, 4) sql insert into d4.t4 values(1588262400004, 4) +sleep 1000 sql select * from d1.t1 if $rows != 4 then @@ -204,6 +207,7 @@ sql insert into d1.t1 values(1588262400005, 5) sql insert into d2.t2 values(1588262400005, 5) sql insert into d3.t3 values(1588262400005, 5) sql insert into d4.t4 values(1588262400005, 5) +sleep 1000 sql select * from d1.t1 if $rows != 5 then @@ -235,6 +239,7 @@ sql insert into d1.t1 values(1588262400006, 6) sql insert into d2.t2 values(1588262400006, 6) sql insert into d3.t3 values(1588262400006, 6) sql insert into d4.t4 values(1588262400006, 6) +sleep 1000 sql select * from d1.t1 if $rows != 6 then