Merge pull request #3743 from taosdata/develop

Merge from develop
This commit is contained in:
Shengliang Guan 2020-10-02 08:50:28 +08:00 committed by GitHub
commit 60688a0365
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 1125 additions and 881 deletions

70
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,70 @@
pipeline {
agent any
stages {
stage('build TDengine') {
steps {
sh '''cd ${WORKSPACE}
export TZ=Asia/Harbin
date
rm -rf ${WORKSPACE}/debug
mkdir debug
cd debug
cmake .. > /dev/null
make > /dev/null
cd ${WORKSPACE}/debug'''
}
}
stage('test_tsim') {
parallel {
stage('test') {
steps {
sh '''cd ${WORKSPACE}/tests
#./test-all.sh smoke
sudo ./test-all.sh full'''
}
}
stage('test_crash_gen') {
steps {
sh '''cd ${WORKSPACE}/tests/pytest
sudo ./crash_gen.sh -a -p -t 4 -s 2000'''
}
}
stage('test_valgrind') {
steps {
sh '''cd ${WORKSPACE}/tests/pytest
sudo ./valgrind-test.sh 2>&1 > mem-error-out.log
grep \'start to execute\\|ERROR SUMMARY\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-mem-error-out.log
for memError in `grep \'ERROR SUMMARY\' uniq-mem-error-out.log | awk \'{print $4}\'`
do
if [ -n "$memError" ]; then
if [ "$memError" -gt 12 ]; then
echo -e "${RED} ## Memory errors number valgrind reports is $memError.\\
More than our threshold! ## ${NC}"
travis_terminate $memError
fi
fi
done
grep \'start to execute\\|definitely lost:\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-definitely-lost-out.log
for defiMemError in `grep \'definitely lost:\' uniq-definitely-lost-out.log | awk \'{print $7}\'`
do
if [ -n "$defiMemError" ]; then
if [ "$defiMemError" -gt 13 ]; then
echo -e "${RED} ## Memory errors number valgrind reports \\
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
travis_terminate $defiMemError
fi
fi
done'''
}
}
}
}
}
}

View File

@ -72,18 +72,21 @@ ENDIF ()
IF (TD_ARM_32)
ADD_DEFINITIONS(-D_TD_ARM_32)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "arm32 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE -Wno-pointer-to-int-cast -Wno-int-to-pointer-cast -Wno-incompatible-pointer-types ")
ENDIF ()
IF (TD_MIPS_64)
ADD_DEFINITIONS(-D_TD_MIPS_64_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "mips64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
IF (TD_MIPS_32)
ADD_DEFINITIONS(-D_TD_MIPS_32_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "mips32 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()

View File

@ -68,7 +68,7 @@ systemctl status taosd
taos
```
如果TDengine终端接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息出来(请参考[FAQ](https://www.taosdata.com/cn/faq/)来解决终端接服务端失败的问题。TDengine终端的提示符号如下
如果TDengine终端接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息出来(请参考[FAQ](https://www.taosdata.com/cn/faq/)来解决终端接服务端失败的问题。TDengine终端的提示符号如下
```cmd
taos>
@ -99,8 +99,8 @@ Query OK, 2 row(s) in set (0.001700s)
- -c, --config-dir: 指定配置文件目录默认为_/etc/taos_
- -h, --host: 指定服务的IP地址默认为本地服务
- -s, --commands: 在不进入终端的情况下运行TDengine命令
- -u, -- user: 接TDengine服务器的用户名缺省为root
- -p, --password: 接TDengine服务器的密码缺省为taosdata
- -u, -- user: 接TDengine服务器的用户名缺省为root
- -p, --password: 接TDengine服务器的密码缺省为taosdata
- -?, --help: 打印出所有命令行参数
示例:

View File

@ -19,7 +19,7 @@ CREATE DATABASE power KEEP 365 DAYS 10 BLOCKS 4;
USE power;
```
就当前接里操作的库换为power否则对具体表操作前需要使用“库名.表名”来指定库的名字。
就当前接里操作的库换为power否则对具体表操作前需要使用“库名.表名”来指定库的名字。
**注意:**

View File

@ -196,7 +196,7 @@ TDengine是基于硬件、软件系统不可靠、一定会有故障的假设进
**对外服务地址**TDengine集群可以容纳单台、多台甚至几千台物理节点。应用只需要向集群中任何一个物理节点的publicIp发起连接即可。启动CLI应用taos时选项-h需要提供的就是publicIp。
**master/secondIp**每一个dnode都需要配置一个masterIp。dnode启动后将对配置的masterIp发起加入集群的连接请求。masterIp是已经创建的集群中的任何一个节点的privateIp对于集群中的第一个节点就是它自己的privateIp。为保证连接成功每个dnode还可配置secondIp, 该IP地址也是已创建的集群中的任何一个节点的privateIp。如果一个节点连接masterIp失败,它将试图接secondIp。
**master/secondIp**每一个dnode都需要配置一个masterIp。dnode启动后将对配置的masterIp发起加入集群的连接请求。masterIp是已经创建的集群中的任何一个节点的privateIp对于集群中的第一个节点就是它自己的privateIp。为保证连接成功每个dnode还可配置secondIp, 该IP地址也是已创建的集群中的任何一个节点的privateIp。如果一个节点连接masterIp失败,它将试图接secondIp。
dnode启动后会获知集群的mnode IP列表并且定时向mnode发送状态信息。
@ -245,4 +245,4 @@ vnode(虚拟数据节点)保存采集的时序数据,而且查询、计算都
**Note**目前集群功能仅仅限于企业版
**Note**目前集群功能仅仅限于企业版

View File

@ -130,9 +130,25 @@ TDengine集群中加入一个新的dnode时涉及集群相关的一些参数
- statusInterval: dnode向mnode报告状态时长。单位为秒默认值1。
- maxTablesPerVnode: 每个vnode中能够创建的最大表个数。默认值1000000。
- maxVgroupsPerDb: 每个数据库中能够使用的最大vnode个数。
- arbitrator: 系统中裁决器的end point缺省为空
- arbitrator: 系统中裁决器的end point缺省为空
- timezone、locale、charset 的配置见客户端配置。
为方便调试可通过SQL语句临时调整每个dnode的日志配置系统重启后会失效
```mysql
ALTER DNODE <dnode_id> <config>
```
- dnode_id: 可以通过SQL语句"SHOW DNODES"命令获取
- config: 要调整的日志参数,在如下列表中取值
> resetlog 截断旧日志文件,创建一个新日志文件
> debugFlag < 131 | 135 | 143 > 设置debugFlag为131、135或者143
例如:
```
alter dnode 1 debugFlag 135;
```
## 客户端配置
TDengine系统的前台交互客户端应用程序为taos它与taosd共享同一个配置文件taos.cfg。运行taos时使用参数-c指定配置文件目录如taos -c /home/cfg表示使用/home/cfg/目录下的taos.cfg配置文件中的参数缺省目录是/etc/taos。本节主要说明 taos 客户端应用在配置文件 taos.cfg 文件中使用到的参数。
@ -234,10 +250,10 @@ ALTER USER <user_name> PASS <'password'>;
修改用户密码, 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
```
ALTER USER <user_name> PRIVILEDGE <'super'|'write'|'read'>;
ALTER USER <user_name> PRIVILEDGE <super|write|read>;
```
修改用户权限为super/write/read。 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
修改用户权限为super/write/read,不需要添加单引号
```
SHOW USERS;
@ -392,5 +408,5 @@ TDengine的所有可执行文件默认存放在 _/usr/local/taos/bin_ 目录下
您可以通过修改系统配置文件taos.cfg来配置不同的数据目录和日志目录。
##

View File

@ -84,17 +84,17 @@ TDengine 分布式架构的逻辑结构图如下:
**FQDN配置**一个数据节点有一个或多个FQDN可以在系统配置文件taos.cfg通过参数“fqdn"进行指定如果没有指定系统将自动获取FQDN。如果节点没有配置FQDN可以直接将该节点的配置参数fqdn设置为它的IP地址。但不建议使用IP因为IP地址可变一旦变化将让集群无法正常工作。一个数据节点的EP(End Point)由FQDN + Port组成。采用FQDN需要保证DNS服务正常工作或者在节点以及应用所在的节点配置好hosts文件。
**端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口是serverPort+10. 为支持多线程高效的处理UDP数据每个对内和对外的UDP都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10总共11个TCP/UDP端口。使用时需要确保防火墙将这些端口打开。每个数据节点可以配置不同的serverPort。
**端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口是serverPort+10. 为支持多线程高效的处理UDP数据每个对内和对外的UDP都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10总共11个TCP/UDP端口。使用时需要确保防火墙将这些端口打开。每个数据节点可以配置不同的serverPort。
**集群对外接:** TDengine集群可以容纳单个、多个甚至几千个数据节点。应用只需要向集群中任何一个数据节点发起连接即可接需要提供的网络参数是一数据节点的End Point(FQDN加配置的端口号。通过命令行CLI启动应用taos时可以通过选项-h来指定数据节点的FQDN, -P来指定其配置的端口号如果端口不配置将采用TDengine的系统配置参数serverPort。
**集群对外接:** TDengine集群可以容纳单个、多个甚至几千个数据节点。应用只需要向集群中任何一个数据节点发起连接即可接需要提供的网络参数是一数据节点的End Point(FQDN加配置的端口号。通过命令行CLI启动应用taos时可以通过选项-h来指定数据节点的FQDN, -P来指定其配置的端口号如果端口不配置将采用TDengine的系统配置参数serverPort。
**集群内部通讯**: 各个数据节点之间通过TCP/UDP进行接。一个数据节点启动时将获取mnode所在的dnode的EP信息然后与系统中的mnode建立起交换信息。获取mnode的EP信息有三步1检查mnodeEpList文件是否存在如果不存在或不能正常打开获得mnode EP信息进入第二步2检查系统配置文件taos.cfg, 获取mnode EP配置参数first, second如果不存在或者taos.cfg里没有这两个配置参数或无效进入第三步3将自己的EP设为mnode EP, 并独立运行起来。获取mnode EP列表后数据节点发起链接,如果链接成功则成功加入进工作的集群如果不成功则尝试mnode EP列表中的下一个。如果都尝试了接都仍然失败,则休眠几秒后,再进行尝试。
**集群内部通讯**: 各个数据节点之间通过TCP/UDP进行接。一个数据节点启动时将获取mnode所在的dnode的EP信息然后与系统中的mnode建立起交换信息。获取mnode的EP信息有三步1检查mnodeEpList文件是否存在如果不存在或不能正常打开获得mnode EP信息进入第二步2检查系统配置文件taos.cfg, 获取mnode EP配置参数first, second如果不存在或者taos.cfg里没有这两个配置参数或无效进入第三步3将自己的EP设为mnode EP, 并独立运行起来。获取mnode EP列表后数据节点发起连接,如果连接成功则成功加入进工作的集群如果不成功则尝试mnode EP列表中的下一个。如果都尝试了接都仍然失败,则休眠几秒后,再进行尝试。
**MNODE的选择:** TDengine逻辑上有管理节点但没有单独的执行代码服务器侧只有一套执行代码taosd。那么哪个数据节点会是管理节点呢这是系统自动决定的无需任何人工干预。原则如下一个数据节点启动时会检查自己的End Point, 并与获取的mnode EP List进行比对如果在其中该数据节点认为自己应该启动mnode模块成为mnode。如果自己的EP不在mnode EP List里则不启动mnode模块。在系统的运行过程中由于负载均衡、宕机等原因mnode有可能迁移至新的dnode但一切都是透明的无需人工干预配置参数的修改是mnode自己根据资源做出的决定。
**新数据节点的加入**系统有了一个数据节点后就已经成为一个工作的系统。添加新的节点进集群时有两个步骤第一步使用TDengine CLI接到现有工作的数据节点然后用命令”create dnode"将新的数据节点的End Point添加进去; 第二步在新的数据节点的系统配置参数文件taos.cfg里将first, second参数设置为现有集群中任意两个数据节点的EP即可。具体添加的详细步骤请见详细的用户手册。这样就把集群一步一步的建立起来。
**新数据节点的加入**系统有了一个数据节点后就已经成为一个工作的系统。添加新的节点进集群时有两个步骤第一步使用TDengine CLI接到现有工作的数据节点然后用命令”create dnode"将新的数据节点的End Point添加进去; 第二步在新的数据节点的系统配置参数文件taos.cfg里将first, second参数设置为现有集群中任意两个数据节点的EP即可。具体添加的详细步骤请见详细的用户手册。这样就把集群一步一步的建立起来。
**重定向**无论是dnode还是taosc最先都是要发起与mnode的但mnode是系统自动创建并维护的因此对于用户来说并不知道哪个dnode在运行mnode。TDengine只要求向系统中任何一个工作的dnode发起接即可。因为任何一个正在运行的dnode都维护有目前运行的mnode EP List。当收到一个来自新启动的dnode或taosc的接请求如果自己不是mnode则将mnode EP List回复给对方taosc或新启动的dnode收到这个list, 就重新尝试建立接。当mnode EP List发生改变通过节点之间的消息交互各个数据节点就很快获取最新列表并通知taosc。
**重定向**无论是dnode还是taosc最先都是要发起与mnode的但mnode是系统自动创建并维护的因此对于用户来说并不知道哪个dnode在运行mnode。TDengine只要求向系统中任何一个工作的dnode发起接即可。因为任何一个正在运行的dnode都维护有目前运行的mnode EP List。当收到一个来自新启动的dnode或taosc的接请求如果自己不是mnode则将mnode EP List回复给对方taosc或新启动的dnode收到这个list, 就重新尝试建立接。当mnode EP List发生改变通过节点之间的消息交互各个数据节点就很快获取最新列表并通知taosc。
### 一个典型的消息流程
为解释vnode, mnode, taosc和应用之间的关系以及各自扮演的角色下面对写入数据这个典型操作的流程进行剖析。
@ -197,7 +197,7 @@ Master Vnode遵循下面的写入流程
### 主从选择
Vnode会保持一个数据版本号(Version),对内存数据进行持久化存储时,对该版本号也进行持久化存储。每个数据更新操作,无论是采集的时序数据还是元数据,这个版本号将增一。
一个vnode启动时角色(master、slave) 是不定的数据是处于未同步状态它需要与虚拟节点组内其他节点建立TCP并互相交换status其中包括version和自己的角色。通过status的交换系统进入选主流程规则如下
一个vnode启动时角色(master、slave) 是不定的数据是处于未同步状态它需要与虚拟节点组内其他节点建立TCP并互相交换status其中包括version和自己的角色。通过status的交换系统进入选主流程规则如下
1. 如果只有一个副本该副本永远就是master
2. 所有副本都在线时版本最高的被选为master

View File

@ -142,7 +142,7 @@ C/C++的API类似于MySQL的C API。应用程序使用时需要包含TDengine
获取最近一次API调用失败的原因返回值为错误代码。
**注意**:对于单个数据库连接,在同一时刻只能有一个线程使用该接调用API否则会有未定义的行为出现并可能导致客户端crash。客户端应用可以通过建立多个连接进行多线程的数据写入或查询处理。
**注意**:对于单个数据库连接,在同一时刻只能有一个线程使用该接调用API否则会有未定义的行为出现并可能导致客户端crash。客户端应用可以通过建立多个连接进行多线程的数据写入或查询处理。
### 异步查询API

View File

@ -21,7 +21,7 @@
## 5. 遇到错误"Unable to establish connection", 我怎么办?
客户端遇到接故障,请按照下面的步骤进行检查:
客户端遇到接故障,请按照下面的步骤进行检查:
1. 检查网络环境
* 云服务器检查云服务器的安全组是否打开TCP/UDP 端口6030-6042的访问权限
@ -45,7 +45,7 @@
9. 如果仍不能排除连接故障请使用命令行工具nc来分别判断指定端口的TCP和UDP连接是否通畅
检查UDP端口连接是否工作`nc -vuz {hostIP} {port} `
检查服务器侧TCP端口连接是否工作`nc -l {port}`
检查客户端侧TCP端口接是否工作:`nc {hostIP} {port}`
检查客户端侧TCP端口接是否工作:`nc {hostIP} {port}`
10. 也可以使用taos程序内嵌的网络连通检测功能来验证服务器和客户端之间指定的端口连接是否通畅包括TCP和UDP[TDengine 内嵌网络检测工具使用指南](https://www.taosdata.com/blog/2020/09/08/1816.html)。
@ -57,7 +57,7 @@
1. 请检查连接的服务器的FQDN是否正确,FQDN配置参考[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)。
2. 如果网络配置有DNS server, 请检查是否正常工作
3. 如果网络没有配置DNS server, 请检查客户端所在机器的hosts文件查看该FQDN是否配置并是否有正确的IP地址。
4. 如果网络配置OK从客户端所在机器你需要能Ping该连接的FQDN否则客户端是无法接服务器的
4. 如果网络配置OK从客户端所在机器你需要能Ping该连接的FQDN否则客户端是无法接服务器的
## 7. 虽然语法正确,为什么我还是得到 "Invalid SQL" 错误
@ -108,4 +108,8 @@ Connection = DriverManager.getConnection(url, properties);
附上必要的问题描述,以及发生该问题的执行操作,出现问题的表征及大概的时间,在<a href='https://github.com/taosdata/TDengine'> GitHub</a>提交Issue。
为了保证有足够的debug信息如果问题能够重复请修改/etc/taos/taos.cfg文件最后面添加一行“debugFlag 135"(不带引号本身然后重启taosd, 重复问题然后再递交。但系统正常运行时请一定将debugFlag设置为131否则会产生大量的日志信息降低系统效率。
为了保证有足够的debug信息如果问题能够重复请修改/etc/taos/taos.cfg文件最后面添加一行“debugFlag 135"(不带引号本身然后重启taosd, 重复问题然后再递交。也可以通过如下SQL语句临时设置taosd的日志级别。
```
alter dnode <dnode_id> debugFlag 135;
```
但系统正常运行时请一定将debugFlag设置为131否则会产生大量的日志信息降低系统效率。

View File

@ -66,21 +66,21 @@ TDengine采取的是Master-Slave模式进行同步与流行的RAFT一致性
数据实时复制有三个主要流程:选主、数据转发、数据恢复。后续做详细讨论。
## 虚拟节点之间的网络
## 虚拟节点之间的网络
虚拟节点之间通过TCP进行链接节点之间的状态交换、数据包的转发都是通过这个TCP链接(peerFd)进行。为避免竞争两个虚拟节点之间的TCP链接总是由IP地址(UINT32)小的节点作为TCP客户端发起。一旦TCP链接被中断虚拟节点能通过TCP socket自动检测到将对方标为offline。如果监测到任何错误比如数据恢复流程虚拟节点将主动重置该接。
虚拟节点之间通过TCP进行连接节点之间的状态交换、数据包的转发都是通过这个TCP连接(peerFd)进行。为避免竞争两个虚拟节点之间的TCP连接总是由IP地址(UINT32)小的节点作为TCP客户端发起。一旦TCP连接被中断虚拟节点能通过TCP socket自动检测到将对方标为offline。如果监测到任何错误比如数据恢复流程虚拟节点将主动重置该接。
一旦作为客户端的节点链接不成或中断,它将周期性的每隔一秒钟去试图去链接一次。因为TCP本身有心跳机制虚拟节点之间不再另行提供心跳。
一旦作为客户端的节点连接不成或中断,它将周期性的每隔一秒钟去试图去连接一次。因为TCP本身有心跳机制虚拟节点之间不再另行提供心跳。
如果一个unsynced节点要发起数据恢复流程它与Master将建立起专有的TCP链接(syncFd)。数据恢复完成后,该链接会被关闭。而且为限制资源的使用,系统只容许一定数量(配置参数tsMaxSyncNum)的数据恢复的socket存在。如果超过这个数字系统会将新的数据恢复请求延后处理。
如果一个unsynced节点要发起数据恢复流程它与Master将建立起专有的TCP连接(syncFd)。数据恢复完成后,该连接会被关闭。而且为限制资源的使用,系统只容许一定数量(配置参数tsMaxSyncNum)的数据恢复的socket存在。如果超过这个数字系统会将新的数据恢复请求延后处理。
任意一个节点无论有多少虚拟节点都会启动而且只会启动一个TCP server, 来接受来自其他虚拟节点的上述两类TCP的接请求。当TCP socket建立起来客户端侧发送的消息体里会带有vgId全局唯一的vgroup ID), TCP 服务器侧会检查该vgId是否已经在该节点启动运行。如果已经启动运行就接受其请求。如果不存在就直接将接请求关闭。在TDengine代码里mnode group的vgId设置为1。
任意一个节点无论有多少虚拟节点都会启动而且只会启动一个TCP server, 来接受来自其他虚拟节点的上述两类TCP的接请求。当TCP socket建立起来客户端侧发送的消息体里会带有vgId全局唯一的vgroup ID), TCP 服务器侧会检查该vgId是否已经在该节点启动运行。如果已经启动运行就接受其请求。如果不存在就直接将接请求关闭。在TDengine代码里mnode group的vgId设置为1。
## 选主流程
当同一组的两个虚拟节点之间(vnode A, vnode B)建立连接后他们互换status消息。status消息里包含本地存储的同一虚拟节点组内所有虚拟节点的role和version。
如果一个虚拟节点(vnode A)检测到与同一虚拟节点组内另外一虚拟节点vnode B接中断vnode A将立即把vnode B的role设置为offline。无论是接收到另外一虚拟节点发来的status消息还是检测与另外一虚拟节点的接中断,该虚拟节点都将进入状态处理流程。状态处理流程的规则如下:
如果一个虚拟节点(vnode A)检测到与同一虚拟节点组内另外一虚拟节点vnode B接中断vnode A将立即把vnode B的role设置为offline。无论是接收到另外一虚拟节点发来的status消息还是检测与另外一虚拟节点的接中断,该虚拟节点都将进入状态处理流程。状态处理流程的规则如下:
1. 如果检测到在线的节点数没有超过一半则将自己的状态设置为unsynced.
2. 如果在线的虚拟节点数超过一半会检查master节点是否存在如果存在则会决定是否将自己状态改为slave或启动数据恢复流程
@ -118,7 +118,7 @@ TDengine采取的是Master-Slave模式进行同步与流行的RAFT一致性
9. 如果quorum为1上述678步不会发生。
10. 如果要等待slave的确认master会启动2秒的定时器可配置如果超时则认为失败。
对于回复确认sync模块提供的是异步回调函数因此APP在调用syncForwardToPeer之后无需等待可以处理下一个操作。在Master与Slave的TCP接管道里可能有多个Forward消息这些消息是严格按照应用提供的顺序排好的。对于Forward Response也是一样TCP管道里存在多个但都是排序好的。这个顺序SYNC模块并没有做特别的事情是由APP单线程顺序写来保证的(TDengine里每个vnode的写数据都是单线程
对于回复确认sync模块提供的是异步回调函数因此APP在调用syncForwardToPeer之后无需等待可以处理下一个操作。在Master与Slave的TCP接管道里可能有多个Forward消息这些消息是严格按照应用提供的顺序排好的。对于Forward Response也是一样TCP管道里存在多个但都是排序好的。这个顺序SYNC模块并没有做特别的事情是由APP单线程顺序写来保证的(TDengine里每个vnode的写数据都是单线程
## 数据恢复流程
@ -142,9 +142,9 @@ TDengine采取的是Master-Slave模式进行同步与流行的RAFT一致性
<center> <img src="../assets/replica-restore.png"> </center>
1. 通过已经建立的TCP发送sync req给master节点
2. master收到sync req后以client的身份向vnode B主动建立一新的专用于同步的TCPsyncFd)
3. 新的TCP接建立成功后master将开始retrieve流程对应的vnode B将同步启动restore流程
1. 通过已经建立的TCP发送sync req给master节点
2. master收到sync req后以client的身份向vnode B主动建立一新的专用于同步的TCPsyncFd)
3. 新的TCP接建立成功后master将开始retrieve流程对应的vnode B将同步启动restore流程
4. Retrieve/Restore流程里先处理所有archived data (vnode里的data, head, last文件后处理WAL data。
5. 对于archived datamaster将通过回调函数getFileInfo获取数据文件的基本信息包括文件名、magic以及文件大小。
6. master 将获得的文件名、magic以及文件大小发给vnode B
@ -157,7 +157,7 @@ TDengine采取的是Master-Slave模式进行同步与流行的RAFT一致性
1. master节点调用回调函数getWalInfo获取WAL的文件名。
2. 如果getWalInfo返回值大于0表示该文件还不是最后一个WAL因此master调用sendfile一下把该文件发送给vnode B
3. 如果getWalInfo返回时为0表示该文件是最后一个WAL因为文件可能还处于写的状态中sync模块要根据WAL Head的定义逐条读出记录然后发往vnode B。
4. vnode A读取TCP接传来的数据按照WAL Head逐条读取如果版本号比现有的大调用回调函数writeToCache交给应用处理。如果小直接扔掉。
4. vnode A读取TCP接传来的数据按照WAL Head逐条读取如果版本号比现有的大调用回调函数writeToCache交给应用处理。如果小直接扔掉。
5. 上述流程循环直到所有WAL文件都被处理完。处理完后master就会将新来的数据包通过Forward消息转发给slave。
从同步文件启动起sync模块会通过inotify监控所有处理过的file以及wal。一旦发现被处理过的文件有更新变化同步流程将中止会重新启动。因为有可能落盘操作正在进行比如历史数据导入内存数据落盘把已经处理过的文件进行了修改需要重新同步才行。
@ -194,15 +194,15 @@ sync模块通过inotify监控LastWal文件的更新和关闭操作。而且在
因为写入失败客户端会重新写入数据。但对于TDengine而言是OK的。因为时序数据都是有时间戳的时间戳相同的数据更新操作第一次会执行但第二次会自动扔掉。对于Meta Data(增加、删除库、表等等的操作也是OK的。一张表、库已经被创建或删除再创建或删除不会被执行的。
在TDengine的设计里虚拟节点与虚拟节点之间是一个TCP是一个pipeline数据块一个接一个按顺序在这个pipeline里等待处理。一旦某个数据块的处理失败这个接会被重置后续的数据块的处理都会失败。因此不会存在Pipeline里一个数据块更新失败但下一个数据块成功的可能。
在TDengine的设计里虚拟节点与虚拟节点之间是一个TCP是一个pipeline数据块一个接一个按顺序在这个pipeline里等待处理。一旦某个数据块的处理失败这个接会被重置后续的数据块的处理都会失败。因此不会存在Pipeline里一个数据块更新失败但下一个数据块成功的可能。
## Split Brain的问题
选举流程中有个强制要求那就是一定有超过半数的虚拟节点在线。但是如果replication正好是偶数这个时候完全可能存在splt brain问题。
为解决这个问题TDengine提供Arbitrator的解决方法。Arbitrator是一个节点它的任务就是接受任何虚拟节点的接请求,并保持它。
为解决这个问题TDengine提供Arbitrator的解决方法。Arbitrator是一个节点它的任务就是接受任何虚拟节点的接请求,并保持它。
在启动复制模块实例时在配置参数中应用可以提供Arbitrator的IP地址。如果是奇数个副本复制模块不会与这个arbitrator去建立链接,但如果是偶数个副本,就会主动去建立链接。
在启动复制模块实例时在配置参数中应用可以提供Arbitrator的IP地址。如果是奇数个副本复制模块不会与这个arbitrator去建立连接,但如果是偶数个副本,就会主动去建立连接。
Arbitrator的程序tarbitrator.c在复制模块的同一目录, 编译整个系统时会在bin目录生成。命令行参数“-”查看可以配置的参数比如绑定的IP地址监听的端口号。

View File

@ -13,7 +13,7 @@ taosd的启动入口是dnode模块dnode然后启动其他模块包括可
该模块负责taosd与taosc, 以及其他数据节点之间的通讯。TDengine没有采取标准的HTTP或gRPC等第三方工具而是实现了自己的通讯模块RPC。
考虑到物联网场景下数据写入的包一般不大因此除支持TCP链接之外RPC还支持UDP链接。当数据包小于15K时RPC将采用UDP方式进行链接否则将采用TCP链接。对于查询类的消息RPC不管包的大小总是采取TCP链接。对于UDP链RPC实现了自己的超时、重传、顺序检查等机制以保证数据可靠传输。
考虑到物联网场景下数据写入的包一般不大因此除支持TCP连接之外RPC还支持UDP连接。当数据包小于15K时RPC将采用UDP方式进行连接否则将采用TCP连接。对于查询类的消息RPC不管包的大小总是采取TCP连接。对于UDP连RPC实现了自己的超时、重传、顺序检查等机制以保证数据可靠传输。
RPC模块还提供数据压缩功能如果数据包的字节数超过系统配置参数compressMsgSize, RPC在传输中将自动压缩数据以节省带宽。
@ -25,7 +25,7 @@ RPC模块还提供数据压缩功能如果数据包的字节数超过系统
- 系统的初始化,包括
- 从文件taos.cfg读取系统配置参数从文件dnodeCfg.json读取数据节点的配置参数
- 启动RPC模块并建立起与taosc通讯的server链接与其他数据节点通讯的server链接;
- 启动RPC模块并建立起与taosc通讯的server连接与其他数据节点通讯的server连接;
- 启动并初始化dnode的内部管理, 该模块将扫描该数据节点已有的vnode并打开它们
- 初始化可配置的模块如mnode, http, monitor等。
- 数据节点的管理,包括

View File

@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/TDengine-enterprise-arbitrator"
install_dir="${release_dir}/TDengine-enterprise-arbitrator-${version}"
else
install_dir="${release_dir}/TDengine-arbitrator"
install_dir="${release_dir}/TDengine-arbitrator-${version}"
fi
# Directories and files.
@ -48,9 +48,9 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/PowerDB-enterprise-arbitrator"
install_dir="${release_dir}/PowerDB-enterprise-arbitrator-${version}"
else
install_dir="${release_dir}/PowerDB-arbitrator"
install_dir="${release_dir}/PowerDB-arbitrator-${version}"
fi
# Directories and files.
@ -48,9 +48,9 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -32,9 +32,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/TDengine-enterprise-client"
install_dir="${release_dir}/TDengine-enterprise-client-${version}"
else
install_dir="${release_dir}/TDengine-client"
install_dir="${release_dir}/TDengine-client-${version}"
fi
# Directories and files.
@ -125,9 +125,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -32,9 +32,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/PowerDB-enterprise-client"
install_dir="${release_dir}/PowerDB-enterprise-client-${version}"
else
install_dir="${release_dir}/PowerDB-client"
install_dir="${release_dir}/PowerDB-client-${version}"
fi
# Directories and files.
@ -164,9 +164,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/TDengine-enterprise-server"
install_dir="${release_dir}/TDengine-enterprise-server-${version}"
else
install_dir="${release_dir}/TDengine-server"
install_dir="${release_dir}/TDengine-server-${version}"
fi
# Directories and files.
@ -138,9 +138,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/PowerDB-enterprise-server"
install_dir="${release_dir}/PowerDB-enterprise-server-${version}"
else
install_dir="${release_dir}/PowerDB-server"
install_dir="${release_dir}/PowerDB-server-${version}"
fi
# Directories and files.
@ -184,9 +184,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -490,13 +490,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetBooleanFp, i, (jboolean)(*((char *)row[i]) == 1));
break;
case TSDB_DATA_TYPE_TINYINT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteFp, i, (jbyte) * ((char *)row[i]));
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteFp, i, (jbyte) * ((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetShortFp, i, (jshort) * ((short *)row[i]));
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetShortFp, i, (jshort) * ((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetIntFp, i, (jint) * (int *)row[i]);
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetIntFp, i, (jint) * (int32_t *)row[i]);
break;
case TSDB_DATA_TYPE_BIGINT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetLongFp, i, (jlong) * ((int64_t *)row[i]));

View File

@ -525,7 +525,7 @@ static void do_sum(SQLFunctionCtx *pCtx) {
*retVal += pCtx->preAggVals.statis.sum;
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
double *retVal = (double*) pCtx->aOutputBuf;
*retVal += GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.sum));
*retVal += GET_DOUBLE_VAL((const char*)&(pCtx->preAggVals.statis.sum));
}
} else { // computing based on the true data block
void *pData = GET_INPUT_CHAR(pCtx);
@ -768,7 +768,7 @@ static void avg_function(SQLFunctionCtx *pCtx) {
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) {
*pVal += pCtx->preAggVals.statis.sum;
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
*pVal += GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.sum));
*pVal += GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.sum));
}
} else {
void *pData = GET_INPUT_CHAR(pCtx);
@ -3516,12 +3516,12 @@ static void spread_function(SQLFunctionCtx *pCtx) {
pInfo->max = (double)pCtx->preAggVals.statis.max;
}
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
if (pInfo->min > GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.min))) {
pInfo->min = GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.min));
if (pInfo->min > GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.min))) {
pInfo->min = GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.min));
}
if (pInfo->max < GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.max))) {
pInfo->max = GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.max));
if (pInfo->max < GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.max))) {
pInfo->max = GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.max));
}
}

View File

@ -306,16 +306,16 @@ static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengt
switch (type) {
case TSDB_DATA_TYPE_BOOL:
sprintf(result, "%s", ((((int)(*((char *)val))) == 1) ? "true" : "false"));
sprintf(result, "%s", ((((int32_t)(*((char *)val))) == 1) ? "true" : "false"));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(result, "%d", (int)(*((char *)val)));
sprintf(result, "%d", *((int8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(result, "%d", (int)(*((short *)val)));
sprintf(result, "%d", *((int16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
sprintf(result, "%d", *((int *)val));
sprintf(result, "%d", *((int32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(result, "%"PRId64, *((int64_t *)val));

View File

@ -1688,7 +1688,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (pItem->pNode->pParam != NULL) {
tSQLExprItem* pParamElem = &pItem->pNode->pParam->a[0];
SStrToken* pToken = &pParamElem->pNode->colInfo;
short sqlOptr = pParamElem->pNode->nSQLOptr;
int16_t sqlOptr = pParamElem->pNode->nSQLOptr;
if ((pToken->z == NULL || pToken->n == 0)
&& (TK_INTEGER != sqlOptr)) /*select count(1) from table*/ {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);

View File

@ -245,9 +245,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
uint64_t handle = (uint64_t) rpcMsg->ahandle;
void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(uint64_t));
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) {
rpcFreeCont(rpcMsg->pCont);
return;

View File

@ -720,15 +720,15 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
len += sprintf(str + len, "%d", *((char *)row[i]));
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(str + len, "%d", *((short *)row[i]));
len += sprintf(str + len, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
len += sprintf(str + len, "%d", *((int *)row[i]));
len += sprintf(str + len, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:

View File

@ -141,7 +141,7 @@ void taos_init_imp(void) {
int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj");
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeSqlObjInCache, "sqlObj");
}
tscDebug("client is initialized successfully");

View File

@ -1788,8 +1788,8 @@ void registerSqlObj(SSqlObj* pSql) {
int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
uint64_t p = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &p, sizeof(uint64_t), DEFAULT_LIFE_TIME);
TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME);
}
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {

View File

@ -219,7 +219,7 @@ static void getStatics_f(const TSKEY *primaryKey, const void *pData, int32_t num
}
float fv = 0;
fv = GET_FLOAT_VAL(&(data[i]));
fv = GET_FLOAT_VAL((const char*)&(data[i]));
dsum += fv;
if (fmin > fv) {
fmin = fv;
@ -233,17 +233,12 @@ static void getStatics_f(const TSKEY *primaryKey, const void *pData, int32_t num
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum = GET_DOUBLE_VAL((const char *)sum);
csum += dsum;
#ifdef _TD_ARM_32
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &fmax);
SET_DOUBLE_VAL_ALIGN(min, &fmin);
#else
*(double*)sum = csum;
*(double*)max = fmax;
*(double*)min = fmin;
#endif
SET_DOUBLE_VAL(sum, csum);
SET_DOUBLE_VAL(max, fmax);
SET_DOUBLE_VAL(min, fmin);
}
static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
@ -264,7 +259,7 @@ static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t num
}
double dv = 0;
dv = GET_DOUBLE_VAL(&(data[i]));
dv = GET_DOUBLE_VAL((const char*)&(data[i]));
dsum += dv;
if (dmin > dv) {
dmin = dv;
@ -278,19 +273,12 @@ static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t num
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum = GET_DOUBLE_VAL((const char *)sum);
csum += dsum;
#ifdef _TD_ARM_32
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &dmax);
SET_DOUBLE_VAL_ALIGN(min, &dmin);
#else
*(double*) sum = csum;
*(double*) max = dmax;
*(double*) min = dmin;
#endif
SET_DOUBLE_PTR(sum, &csum);
SET_DOUBLE_PTR(max, &dmax);
SET_DOUBLE_PTR(min, &dmin);
}
static void getStatics_bin(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
@ -493,46 +481,29 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
*((int32_t *)val) = GET_INT32_VAL(src);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
#ifdef _TD_ARM_32
float fv = GET_FLOAT_VAL(src);
SET_FLOAT_VAL_ALIGN(val, &fv);
#else
*((float *)val) = GET_FLOAT_VAL(src);
#endif
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_VAL(val, GET_FLOAT_VAL(src));
break;
};
case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32
double dv = GET_DOUBLE_VAL(src);
SET_DOUBLE_VAL_ALIGN(val, &dv);
#else
*((double *)val) = GET_DOUBLE_VAL(src);
#endif
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_VAL(val, GET_DOUBLE_VAL(src));
break;
};
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)val) = GET_INT64_VAL(src);
break;
};
case TSDB_DATA_TYPE_SMALLINT: {
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)val) = GET_INT16_VAL(src);
break;
};
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)val) = GET_INT8_VAL(src);
break;
};
case TSDB_DATA_TYPE_BINARY: {
case TSDB_DATA_TYPE_BINARY:
varDataCopy(val, src);
break;
};
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:
varDataCopy(val, src);
break;
};
default: {
memcpy(val, src, len);
break;
@ -579,4 +550,4 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
break;
}
}
}
}

View File

@ -709,46 +709,21 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
return -1;
}
#ifdef _TD_ARM_32
//memcpy(&payload, &value, sizeof(float));
float fv = (float)value;
SET_FLOAT_VAL_ALIGN(payload, &fv);
#else
*((float *)payload) = (float)value;
#endif
SET_FLOAT_VAL(payload, value);
}
} else if (pVariant->nType >= TSDB_DATA_TYPE_BOOL && pVariant->nType <= TSDB_DATA_TYPE_BIGINT) {
#ifdef _TD_ARM_32
//memcpy(&payload, &pVariant->i64Key, sizeof(float));
float fv = (float)pVariant->i64Key;
SET_FLOAT_VAL_ALIGN(payload, &fv);
#else
*((float *)payload) = (float)pVariant->i64Key;
#endif
SET_FLOAT_VAL(payload, pVariant->i64Key);
} else if (pVariant->nType == TSDB_DATA_TYPE_DOUBLE || pVariant->nType == TSDB_DATA_TYPE_FLOAT) {
#ifdef _TD_ARM_32
//memcpy(&payload, &pVariant->dKey, sizeof(float));
float fv = (float)pVariant->dKey;
SET_FLOAT_VAL_ALIGN(payload, &fv);
#else
*((float *)payload) = (float)pVariant->dKey;
#endif
SET_FLOAT_VAL(payload, pVariant->dKey);
} else if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
return 0;
}
#ifdef _TD_ARM_32
float fv = GET_FLOAT_VAL(payload);
if (isinf(fv) || isnan(fv) || fv > FLT_MAX || fv < -FLT_MAX) {
return -1;
}
#else
if (isinf(*((float *)payload)) || isnan(*((float *)payload)) || *((float *)payload) > FLT_MAX ||
*((float *)payload) < -FLT_MAX) {
return -1;
}
#endif
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
@ -765,42 +740,21 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
return -1;
}
#ifdef _TD_ARM_32
SET_DOUBLE_VAL_ALIGN(payload, &value);
#else
*((double *)payload) = value;
#endif
SET_DOUBLE_VAL(payload, value);
}
} else if (pVariant->nType >= TSDB_DATA_TYPE_BOOL && pVariant->nType <= TSDB_DATA_TYPE_BIGINT) {
#ifdef _TD_ARM_32
double dv = (double)(pVariant->i64Key);
SET_DOUBLE_VAL_ALIGN(payload, &dv);
#else
*((double *)payload) = (double)pVariant->i64Key;
#endif
SET_DOUBLE_VAL(payload, pVariant->i64Key);
} else if (pVariant->nType == TSDB_DATA_TYPE_DOUBLE || pVariant->nType == TSDB_DATA_TYPE_FLOAT) {
#ifdef _TD_ARM_32
double dv = (double)(pVariant->dKey);
SET_DOUBLE_VAL_ALIGN(payload, &dv);
#else
*((double *)payload) = pVariant->dKey;
#endif
SET_DOUBLE_VAL(payload, pVariant->dKey);
} else if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
return 0;
}
#ifdef _TD_ARM_32
double dv = GET_DOUBLE_VAL(payload);
if (isinf(dv) || isnan(dv) || dv > DBL_MAX || dv < -DBL_MAX) {
return -1;
}
#else
if (isinf(*((double *)payload)) || isnan(*((double *)payload)) || *((double *)payload) > DBL_MAX ||
*((double *)payload) < -DBL_MAX) {
return -1;
}
#endif
break;
}

View File

@ -55,6 +55,7 @@
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -67,14 +67,23 @@ public class DatabaseMetaDataResultSet implements ResultSet {
@Override
public boolean next() throws SQLException {
// boolean ret = false;
// if (rowDataList.size() > 0) {
// ret = rowDataList.iterator().hasNext();
// if (ret) {
// rowCursor = rowDataList.iterator().next();
// cursorRowNumber++;
// }
// }
// return ret;
/**** add by zyyang 2020-09-29 ****************/
boolean ret = false;
if (rowDataList.size() > 0) {
ret = rowDataList.iterator().hasNext();
if (ret) {
rowCursor = rowDataList.iterator().next();
cursorRowNumber++;
}
if (!rowDataList.isEmpty() && cursorRowNumber < rowDataList.size()) {
rowCursor = rowDataList.get(cursorRowNumber++);
ret = true;
}
return ret;
}
@ -91,7 +100,8 @@ public class DatabaseMetaDataResultSet implements ResultSet {
@Override
public String getString(int columnIndex) throws SQLException {
columnIndex--;
return rowCursor.getString(columnIndex, columnMetaDataList.get(columnIndex).getColType());
int colType = columnMetaDataList.get(columnIndex).getColType();
return rowCursor.getString(columnIndex, colType);
}
@Override

View File

@ -464,7 +464,14 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
for (int i = 0; i < pEpSet->numOfEps; ++i) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i])
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
if (!mnodeIsRunning()) {
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
dInfo("mnode index:%d %s:%u should work as master", i, pEpSet->fqdn[i], pEpSet->port[i]);
sdbUpdateSync();
}
}
}
tsDMnodeEpSet = *pEpSet;

View File

@ -64,7 +64,7 @@ extern const int32_t TYPE_BYTES[11];
// TODO: replace and remove code below
#define CHAR_BYTES sizeof(char)
#define SHORT_BYTES sizeof(int16_t)
#define INT_BYTES sizeof(int)
#define INT_BYTES sizeof(int32_t)
#define LONG_BYTES sizeof(int64_t)
#define FLOAT_BYTES sizeof(float)
#define DOUBLE_BYTES sizeof(double)
@ -73,7 +73,7 @@ extern const int32_t TYPE_BYTES[11];
#define TSDB_DATA_BOOL_NULL 0x02
#define TSDB_DATA_TINYINT_NULL 0x80
#define TSDB_DATA_SMALLINT_NULL 0x8000
#define TSDB_DATA_INT_NULL 0x80000000
#define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
@ -132,21 +132,30 @@ do { \
#define GET_INT32_VAL(x) (*(int32_t *)(x))
#define GET_INT64_VAL(x) (*(int64_t *)(x))
#ifdef _TD_ARM_32
#define GET_FLOAT_VAL(x) taos_align_get_float(x)
#define GET_DOUBLE_VAL(x) taos_align_get_double(x)
float taos_align_get_float(const char* pBuf);
double taos_align_get_double(const char* pBuf);
//#define __float_align_declear() float __underlyFloat = 0.0;
//#define __float_align_declear()
//#define GET_FLOAT_VAL_ALIGN(x) (*(int32_t*)&(__underlyFloat) = *(int32_t*)(x); __underlyFloat);
// notes: src must be float or double type variable !!!
#define SET_FLOAT_VAL_ALIGN(dst, src) (*(int32_t*) dst = *(int32_t*)src);
#define SET_DOUBLE_VAL_ALIGN(dst, src) (*(int64_t*) dst = *(int64_t*)src);
//#define SET_FLOAT_VAL_ALIGN(dst, src) (*(int32_t*) dst = *(int32_t*)src);
//#define SET_DOUBLE_VAL_ALIGN(dst, src) (*(int64_t*) dst = *(int64_t*)src);
float taos_align_get_float(const char* pBuf);
double taos_align_get_double(const char* pBuf);
#define GET_FLOAT_VAL(x) taos_align_get_float(x)
#define GET_DOUBLE_VAL(x) taos_align_get_double(x)
#define SET_FLOAT_VAL(x, y) { float z = (float)(y); (*(int32_t*) x = *(int32_t*)(&z)); }
#define SET_DOUBLE_VAL(x, y) { double z = (double)(y); (*(int64_t*) x = *(int64_t*)(&z)); }
#define SET_FLOAT_PTR(x, y) { (*(int32_t*) x = *(int32_t*)y); }
#define SET_DOUBLE_PTR(x, y) { (*(int64_t*) x = *(int64_t*)y); }
#else
#define GET_FLOAT_VAL(x) (*(float *)(x))
#define GET_DOUBLE_VAL(x) (*(double *)(x))
#define GET_FLOAT_VAL(x) (*(float *)(x))
#define GET_DOUBLE_VAL(x) (*(double *)(x))
#define SET_FLOAT_VAL(x, y) { (*(float *)(x)) = (float)(y); }
#define SET_DOUBLE_VAL(x, y) { (*(double *)(x)) = (double)(y); }
#define SET_FLOAT_PTR(x, y) { (*(float *)(x)) = (*(float *)(y)); }
#define SET_DOUBLE_PTR(x, y) { (*(double *)(x)) = (*(double *)(y)); }
#endif
typedef struct tDataTypeDescriptor {
@ -295,7 +304,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MIN_VNODES 64
#define TSDB_MAX_VNODES 2048
#define TSDB_MIN_VNODES_PER_DB 2
#define TSDB_MAX_VNODES_PER_DB 16
#define TSDB_MAX_VNODES_PER_DB 64
#define TSDB_DNODE_ROLE_ANY 0
#define TSDB_DNODE_ROLE_MGMT 1

View File

@ -248,6 +248,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, 0, 0x1001, "WAL file is corrupted")
// http
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, 0, 0x1100, "http server is not onlin")

View File

@ -419,16 +419,16 @@ static void dumpFieldToFile(FILE* fp, const char* val, TAOS_FIELD* field, int32_
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
fprintf(fp, "%d", ((((int)(*((char *)val))) == 1) ? 1 : 0));
fprintf(fp, "%d", ((((int32_t)(*((char *)val))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
fprintf(fp, "%d", (int)(*((char *)val)));
fprintf(fp, "%d", *((int8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
fprintf(fp, "%d", (int)(*((short *)val)));
fprintf(fp, "%d", *((int16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
fprintf(fp, "%d", *((int *)val));
fprintf(fp, "%d", *((int32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
fprintf(fp, "%" PRId64, *((int64_t *)val));
@ -559,16 +559,16 @@ static void printField(const char* val, TAOS_FIELD* field, int width, int32_t le
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
printf("%*s", width, ((((int)(*((char *)val))) == 1) ? "true" : "false"));
printf("%*s", width, ((((int32_t)(*((char *)val))) == 1) ? "true" : "false"));
break;
case TSDB_DATA_TYPE_TINYINT:
printf("%*d", width, (int)(*((char *)val)));
printf("%*d", width, *((int8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
printf("%*d", width, (int)(*((short *)val)));
printf("%*d", width, *((int16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
printf("%*d", width, *((int *)val));
printf("%*d", width, *((int32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
printf("%*" PRId64, width, *((int64_t *)val));

View File

@ -185,7 +185,11 @@ static int32_t sdbInitWal() {
}
sdbInfo("open sdb wal for restore");
walRestore(tsSdbObj.wal, NULL, sdbWrite);
int code = walRestore(tsSdbObj.wal, NULL, sdbWrite);
if (code != TSDB_CODE_SUCCESS) {
sdbError("failed to open wal for restore, reason:%s", tstrerror(code));
return -1;
}
return 0;
}

View File

@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 5, true, mnodeFreeShowObj, "show");
tsMnodeShowCache = taosCacheInit(TSDB_CACHE_PTR_KEY, 5, true, mnodeFreeShowObj, "show");
return 0;
}
@ -378,8 +378,8 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) {
}
static bool mnodeAccquireShowObj(SShowObj *pShow) {
uint64_t handleVal = (uint64_t)pShow;
SShowObj **ppShow = taosCacheAcquireByKey(tsMnodeShowCache, &handleVal, sizeof(int64_t));
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
SShowObj **ppShow = taosCacheAcquireByKey(tsMnodeShowCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE));
if (ppShow) {
mDebug("%p, show is accquired from cache, data:%p, index:%d", pShow, ppShow, pShow->index);
return true;
@ -393,8 +393,8 @@ static void* mnodePutShowObj(SShowObj *pShow) {
if (tsMnodeShowCache != NULL) {
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
uint64_t handleVal = (uint64_t)pShow;
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(int64_t), &pShow, sizeof(int64_t), DEFAULT_SHOWHANDLE_LIFE_SPAN);
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_SHOWHANDLE_LIFE_SPAN);
pShow->ppShow = (void**)ppShow;
mDebug("%p, show is put into cache, data:%p index:%d", pShow, ppShow, pShow->index);
return pShow;

View File

@ -20,6 +20,8 @@
extern "C" {
#endif
#include <sys/types.h>
// TAOS_OS_FUNC_DIR
void taosRemoveDir(char *rootDir);
int taosMkDir(const char *pathname, mode_t mode);

View File

@ -148,10 +148,10 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%" PRId64, fields[i].name, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.5f", fields[i].name, *((float *)row[i]));
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.5f", fields[i].name, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.9f", fields[i].name, *((double *)row[i]));
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.9f", fields[i].name, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
@ -210,10 +210,10 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
httpJsonFloat(jsonBuf, *((float *)row[i]));
httpJsonFloat(jsonBuf, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
httpJsonDouble(jsonBuf, *((double *)row[i]));
httpJsonDouble(jsonBuf, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:

View File

@ -124,10 +124,10 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
httpJsonFloat(jsonBuf, *((float *)row[i]));
httpJsonFloat(jsonBuf, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
httpJsonDouble(jsonBuf, *((double *)row[i]));
httpJsonDouble(jsonBuf, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:

View File

@ -7046,7 +7046,7 @@ void* qOpenQueryMgmt(int32_t vgId) {
return NULL;
}
pQueryMgmt->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryMgmt->qinfoPool = taosCacheInit(TSDB_CACHE_PTR_KEY, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryMgmt->closed = false;
pQueryMgmt->vgId = vgId;
@ -7115,23 +7115,23 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo);
return NULL;
} else {
uint64_t handleVal = (uint64_t) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(int64_t), &qInfo, POINTER_BYTES, DEFAULT_QHANDLE_LIFE_SPAN);
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_QHANDLE_LIFE_SPAN);
// pthread_mutex_unlock(&pQueryMgmt->lock);
return handle;
}
}
void** qAcquireQInfo(void* pMgmt, uint64_t key) {
void** qAcquireQInfo(void* pMgmt, uint64_t _key) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) {
return NULL;
}
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(uint64_t));
TSDB_CACHE_PTR_TYPE key = (TSDB_CACHE_PTR_TYPE)_key;
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(TSDB_CACHE_PTR_TYPE));
if (handle == NULL || *handle == NULL) {
return NULL;
} else {

View File

@ -216,6 +216,7 @@ static void rpcCleanConnCache(void *handle, void *tmrId) {
if (pCache == NULL || pCache->maxSessions == 0) return;
if (pCache->pTimer != tmrId) return;
pthread_mutex_lock(&pCache->mutex);
uint64_t time = taosGetTimestampMs();
for (hash = 0; hash < pCache->maxSessions; ++hash) {
@ -227,6 +228,7 @@ static void rpcCleanConnCache(void *handle, void *tmrId) {
// tTrace("timer, total connections in cache:%d", pCache->total);
taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
pthread_mutex_unlock(&pCache->mutex);
}
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {

View File

@ -491,7 +491,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL;
SSyncPeer *pPeer = (SSyncPeer *)calloc(1, sizeof(SSyncPeer));
SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer));
if (pPeer == NULL) return NULL;
pPeer->nodeId = pInfo->nodeId;
@ -499,7 +499,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer->ip = ip;
pPeer->port = pInfo->nodePort;
pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0;
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%d", pNode->vgId, pPeer->fqdn, pPeer->port);
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port);
pPeer->peerFd = -1;
pPeer->syncFd = -1;

View File

@ -109,7 +109,6 @@ int processRpcMsg(void *item) {
if (pCfg->quorum <= 1) {
rpcFreeCont(pMsg->pCont);
taosFreeQitem(item);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(msgSize);
@ -117,6 +116,7 @@ int processRpcMsg(void *item) {
rpcMsg.handle = pMsg->handle;
rpcMsg.code = code;
rpcSendResponse(&rpcMsg);
taosFreeQitem(item);
}
return code;

View File

@ -26,7 +26,7 @@ extern "C" {
#define COMP_OVERFLOW_BYTES 2
#define BITS_PER_BYTE 8
// Masks
#define INT64MASK(_x) ((1ul << _x) - 1)
#define INT64MASK(_x) ((((uint64_t)1) << _x) - 1)
#define INT32MASK(_x) (((uint32_t)1 << _x) - 1)
#define INT8MASK(_x) (((uint8_t)1 << _x) - 1)
// Compression algorithm

View File

@ -385,9 +385,10 @@ static void walRelease(SWal *pWal) {
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
char *name = pWal->name;
int size = 1024 * 1024; // default 1M buffer size
terrno = 0;
char *buffer = malloc(1024000); // size for one record
char *buffer = malloc(size);
if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
@ -395,7 +396,7 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
SWalHead *pHead = (SWalHead *)buffer;
int fd = open(name, O_RDONLY);
int fd = open(name, O_RDWR);
if (fd < 0) {
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@ -405,29 +406,58 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
wDebug("wal:%s, start to restore", name);
size_t offset = 0;
while (1) {
int ret = taosTRead(fd, pHead, sizeof(SWalHead));
if ( ret == 0) break;
if (ret == 0) break;
if (ret != sizeof(SWalHead)) {
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
if (ret < 0) {
wError("wal:%s, failed to read wal head part since %s", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < sizeof(SWalHead)) {
wError("wal:%s, failed to read head, ret:%d, skip the rest of file", name, ret);
taosFtruncate(fd, offset);
fsync(fd);
break;
}
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
terrno = TAOS_SYSTEM_ERROR(errno);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(false);
break;
}
}
if (pHead->len > size - sizeof(SWalHead)) {
size = sizeof(SWalHead) + pHead->len;
buffer = realloc(buffer, size);
if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
pHead = (SWalHead *)buffer;
}
ret = taosTRead(fd, pHead->cont, pHead->len);
if ( ret != pHead->len) {
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
if (ret < 0) {
wError("wal:%s failed to read wal body part since %s", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
wError("wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", name, pHead->len, ret);
taosFtruncate(fd, offset);
fsync(fd);
break;
}
offset = offset + sizeof(SWalHead) + pHead->len;
if (pWal->keep) pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
}

View File

@ -23,35 +23,39 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1538548685000
self.ts = 1593548685000
def run(self):
tdSql.prepare()
tdSql.execute("create table st (ts timestamp, voltage int) tags (loc nchar(30))")
tdSql.execute("insert into t0 using st tags('beijing') values(now, 220) (now - 15d, 221) (now - 30d, 225) (now - 35d, 228) (now - 45d, 222)")
tdSql.execute("insert into t1 using st tags('shanghai') values(now, 220) (now - 60d, 221) (now - 50d, 225) (now - 40d, 228) (now - 20d, 222)")
tdSql.execute("insert into t0 using st tags('beijing') values(%d, 220) (%d, 221) (%d, 225) (%d, 228) (%d, 222)"
% (self.ts, self.ts + 1000000000, self.ts + 2000000000, self.ts + 3000000000, self.ts + 6000000000))
tdSql.execute("insert into t1 using st tags('shanghai') values(%d, 220) (%d, 221) (%d, 225) (%d, 228) (%d, 222)"
% (self.ts, self.ts + 2000000000, self.ts + 4000000000, self.ts + 5000000000, self.ts + 7000000000))
tdSql.query("select avg(voltage) from st interval(1n)")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 223.0)
tdSql.checkData(1, 1, 225.0)
tdSql.checkData(2, 1, 220.333333)
tdSql.checkData(0, 1, 221.4)
tdSql.checkData(1, 1, 227.0)
tdSql.checkData(2, 1, 222.0)
tdSql.query("select avg(voltage) from st interval(1n, 15d)")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 224.8)
tdSql.checkData(1, 1, 222.666666)
tdSql.checkData(2, 1, 220.0)
tdSql.checkRows(4)
tdSql.checkData(0, 1, 220.333333)
tdSql.checkData(1, 1, 224.666666)
tdSql.checkData(2, 1, 225.0)
tdSql.checkData(3, 1, 222.0)
tdSql.query("select avg(voltage) from st interval(1n, 15d) group by loc")
tdSql.checkRows(6)
tdSql.checkData(0, 1, 225.0)
tdSql.checkData(1, 1, 223.0)
tdSql.checkData(2, 1, 220.0)
tdSql.checkData(3, 1, 224.666666)
tdSql.checkData(4, 1, 222.0)
tdSql.checkData(5, 1, 220.0)
tdSql.checkRows(7)
tdSql.checkData(0, 1, 220.5)
tdSql.checkData(1, 1, 226.5)
tdSql.checkData(2, 1, 222.0)
tdSql.checkData(3, 1, 220.0)
tdSql.checkData(4, 1, 221.0)
tdSql.checkData(5, 1, 226.5)
tdSql.checkData(6, 1, 222.0)
def stop(self):
tdSql.close()

View File

@ -14,12 +14,14 @@ $i = 0
$db = $dbPrefix . $i
$stb = $stbPrefix . $i
print use $db
sql use $db
##### select first/last from table
## TBASE-331
print ====== select first/last from table
$tb = $tbPrefix . 0
print select first(*) from $tb
sql select first(*) from $tb
if $rows != 1 then
return -1
@ -58,6 +60,7 @@ if $data09 != NCHAR then
return -1
endi
print select last(*) from $tb
sql select last(*) from $tb
if $rows != 1 then
return -1

View File

@ -94,7 +94,11 @@ endi
## select specified columns
print select c1 from $mt
sql select c1 from $mt
print rows $rows
print totalNum $totalNum
if $rows != $totalNum then
return -1
endi

View File

@ -303,8 +303,8 @@ cd ../../../debug; make
./test.sh -f unique/mnode/mgmt22.sim
./test.sh -f unique/mnode/mgmt23.sim
./test.sh -f unique/mnode/mgmt24.sim
#./test.sh -f unique/mnode/mgmt25.sim
#./test.sh -f unique/mnode/mgmt26.sim
./test.sh -f unique/mnode/mgmt25.sim
./test.sh -f unique/mnode/mgmt26.sim
./test.sh -f unique/mnode/mgmt33.sim
./test.sh -f unique/mnode/mgmt34.sim
./test.sh -f unique/mnode/mgmtr2.sim

View File

@ -65,7 +65,7 @@ endi
print ============== step4
sql drop dnode $hostname2
sleep 8000
sleep 16000
sql show mnodes
$dnode1Role = $data2_1

View File

@ -739,36 +739,22 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
((((int)(*((char *)row[i]))) == 1) ? "1" : "0"));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(value, "%d", (int)(*((char *)row[i])));
sprintf(value, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(value, "%d", (int)(*((short *)row[i])));
sprintf(value, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
sprintf(value, "%d", *((int *)row[i]));
sprintf(value, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(value, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:{
#ifdef _TD_ARM_32
float fv = 0;
*(int32_t*)(&fv) = *(int32_t*)row[i];
sprintf(value, "%.5f", fv);
#else
sprintf(value, "%.5f", *((float *)row[i]));
#endif
}
case TSDB_DATA_TYPE_FLOAT:
sprintf(value, "%.5f", GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32
double dv = 0;
*(int64_t*)(&dv) = *(int64_t*)row[i];
sprintf(value, "%.9lf", dv);
#else
sprintf(value, "%.9lf", *((double *)row[i]));
#endif
}
case TSDB_DATA_TYPE_DOUBLE:
sprintf(value, "%.9lf", GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: