Merge remote-tracking branch 'origin/develop' into feature/crash_gen

This commit is contained in:
Steven Li 2020-09-29 23:17:41 +00:00
commit 7a66bf103c
71 changed files with 1675 additions and 1261 deletions

View File

@ -48,6 +48,7 @@ ENDIF ()
IF (TD_LINUX_64)
ADD_DEFINITIONS(-D_M_X64)
ADD_DEFINITIONS(-D_TD_LINUX_64)
MESSAGE(STATUS "linux64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DUSE_LIBICONV)
ENDIF ()
@ -55,30 +56,38 @@ ENDIF ()
IF (TD_LINUX_32)
ADD_DEFINITIONS(-D_TD_LINUX_32)
ADD_DEFINITIONS(-DUSE_LIBICONV)
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -latomic -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
MESSAGE(STATUS "linux32 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
IF (TD_ARM_64)
ADD_DEFINITIONS(-D_M_X64)
ADD_DEFINITIONS(-D_TD_ARM_64_)
ADD_DEFINITIONS(-D_TD_ARM_64)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "arm64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
IF (TD_ARM_32)
ADD_DEFINITIONS(-D_TD_ARM_32_)
ADD_DEFINITIONS(-D_TD_ARM_32)
ADD_DEFINITIONS(-D_TD_ARM_)
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "arm32 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE -Wno-pointer-to-int-cast -Wno-int-to-pointer-cast -Wno-incompatible-pointer-types ")
ENDIF ()
IF (TD_MIPS_64)
ADD_DEFINITIONS(-D_TD_MIPS_64_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "mips64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
IF (TD_MIPS_32)
ADD_DEFINITIONS(-D_TD_MIPS_32_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "mips32 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
@ -86,6 +95,7 @@ IF (TD_APLHINE)
SET(COMMON_FLAGS "${COMMON_FLAGS} -largp")
link_libraries(/usr/lib/libargp.a)
ADD_DEFINITIONS(-D_ALPINE)
MESSAGE(STATUS "aplhine is defined")
ENDIF ()
IF (TD_LINUX)
@ -95,7 +105,7 @@ IF (TD_LINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
IF (TD_NINGSI_60)
ADD_DEFINITIONS(-D_TD_NINGSI_60_)
ADD_DEFINITIONS(-D_TD_NINGSI_60)
MESSAGE(STATUS "set ningsi macro to true")
ENDIF ()
@ -118,6 +128,7 @@ IF (TD_DARWIN_64)
ADD_DEFINITIONS(-DDARWIN)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "darwin64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
@ -147,11 +158,13 @@ IF (TD_WINDOWS_64)
ADD_DEFINITIONS(-D_M_X64)
ADD_DEFINITIONS(-D_TD_WINDOWS_64)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "windows64 is defined")
ENDIF ()
IF (TD_WINDOWS_32)
ADD_DEFINITIONS(-D_TD_WINDOWS_32)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "windows32 is defined")
ENDIF ()
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)

View File

@ -42,6 +42,12 @@ IF (DEFINED CPUTYPE)
ELSE ()
IF (TD_WINDOWS_32)
SET(TD_VER_CPUTYPE "x86")
ELSEIF (TD_LINUX_32)
SET(TD_VER_CPUTYPE "x86")
ELSEIF (TD_ARM_32)
SET(TD_VER_CPUTYPE "x86")
ELSEIF (TD_MIPS_32)
SET(TD_VER_CPUTYPE "x86")
ELSE ()
SET(TD_VER_CPUTYPE "x64")
ENDIF ()

View File

@ -68,7 +68,7 @@ systemctl status taosd
taos
```
如果TDengine终端接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息出来(请参考[FAQ](https://www.taosdata.com/cn/faq/)来解决终端接服务端失败的问题。TDengine终端的提示符号如下
如果TDengine终端接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息出来(请参考[FAQ](https://www.taosdata.com/cn/faq/)来解决终端接服务端失败的问题。TDengine终端的提示符号如下
```cmd
taos>
@ -99,8 +99,8 @@ Query OK, 2 row(s) in set (0.001700s)
- -c, --config-dir: 指定配置文件目录默认为_/etc/taos_
- -h, --host: 指定服务的IP地址默认为本地服务
- -s, --commands: 在不进入终端的情况下运行TDengine命令
- -u, -- user: 接TDengine服务器的用户名缺省为root
- -p, --password: 接TDengine服务器的密码缺省为taosdata
- -u, -- user: 接TDengine服务器的用户名缺省为root
- -p, --password: 接TDengine服务器的密码缺省为taosdata
- -?, --help: 打印出所有命令行参数
示例:

View File

@ -19,7 +19,7 @@ CREATE DATABASE power KEEP 365 DAYS 10 BLOCKS 4;
USE power;
```
就当前接里操作的库换为power否则对具体表操作前需要使用“库名.表名”来指定库的名字。
就当前接里操作的库换为power否则对具体表操作前需要使用“库名.表名”来指定库的名字。
**注意:**

View File

@ -196,7 +196,7 @@ TDengine是基于硬件、软件系统不可靠、一定会有故障的假设进
**对外服务地址**TDengine集群可以容纳单台、多台甚至几千台物理节点。应用只需要向集群中任何一个物理节点的publicIp发起连接即可。启动CLI应用taos时选项-h需要提供的就是publicIp。
**master/secondIp**每一个dnode都需要配置一个masterIp。dnode启动后将对配置的masterIp发起加入集群的连接请求。masterIp是已经创建的集群中的任何一个节点的privateIp对于集群中的第一个节点就是它自己的privateIp。为保证连接成功每个dnode还可配置secondIp, 该IP地址也是已创建的集群中的任何一个节点的privateIp。如果一个节点连接masterIp失败,它将试图接secondIp。
**master/secondIp**每一个dnode都需要配置一个masterIp。dnode启动后将对配置的masterIp发起加入集群的连接请求。masterIp是已经创建的集群中的任何一个节点的privateIp对于集群中的第一个节点就是它自己的privateIp。为保证连接成功每个dnode还可配置secondIp, 该IP地址也是已创建的集群中的任何一个节点的privateIp。如果一个节点连接masterIp失败,它将试图接secondIp。
dnode启动后会获知集群的mnode IP列表并且定时向mnode发送状态信息。
@ -245,4 +245,4 @@ vnode(虚拟数据节点)保存采集的时序数据,而且查询、计算都
**Note**目前集群功能仅仅限于企业版
**Note**目前集群功能仅仅限于企业版

View File

@ -130,9 +130,25 @@ TDengine集群中加入一个新的dnode时涉及集群相关的一些参数
- statusInterval: dnode向mnode报告状态时长。单位为秒默认值1。
- maxTablesPerVnode: 每个vnode中能够创建的最大表个数。默认值1000000。
- maxVgroupsPerDb: 每个数据库中能够使用的最大vnode个数。
- arbitrator: 系统中裁决器的end point缺省为空
- arbitrator: 系统中裁决器的end point缺省为空
- timezone、locale、charset 的配置见客户端配置。
为方便调试可通过SQL语句临时调整每个dnode的日志配置系统重启后会失效
```mysql
ALTER DNODE <dnode_id> <config>
```
- dnode_id: 可以通过SQL语句"SHOW DNODES"命令获取
- config: 要调整的日志参数,在如下列表中取值
> resetlog 截断旧日志文件,创建一个新日志文件
> debugFlag < 131 | 135 | 143 > 设置debugFlag为131、135或者143
例如:
```
alter dnode 1 debugFlag 135;
```
## 客户端配置
TDengine系统的前台交互客户端应用程序为taos它与taosd共享同一个配置文件taos.cfg。运行taos时使用参数-c指定配置文件目录如taos -c /home/cfg表示使用/home/cfg/目录下的taos.cfg配置文件中的参数缺省目录是/etc/taos。本节主要说明 taos 客户端应用在配置文件 taos.cfg 文件中使用到的参数。
@ -233,6 +249,12 @@ ALTER USER <user_name> PASS <'password'>;
修改用户密码, 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
```
ALTER USER <user_name> PRIVILEDGE <'super'|'write'|'read'>;
```
修改用户权限为super/write/read。 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
```
SHOW USERS;
```
@ -386,5 +408,5 @@ TDengine的所有可执行文件默认存放在 _/usr/local/taos/bin_ 目录下
您可以通过修改系统配置文件taos.cfg来配置不同的数据目录和日志目录。
##

View File

@ -84,17 +84,17 @@ TDengine 分布式架构的逻辑结构图如下:
**FQDN配置**一个数据节点有一个或多个FQDN可以在系统配置文件taos.cfg通过参数“fqdn"进行指定如果没有指定系统将自动获取FQDN。如果节点没有配置FQDN可以直接将该节点的配置参数fqdn设置为它的IP地址。但不建议使用IP因为IP地址可变一旦变化将让集群无法正常工作。一个数据节点的EP(End Point)由FQDN + Port组成。采用FQDN需要保证DNS服务正常工作或者在节点以及应用所在的节点配置好hosts文件。
**端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口是serverPort+10. 为支持多线程高效的处理UDP数据每个对内和对外的UDP都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10总共11个TCP/UDP端口。使用时需要确保防火墙将这些端口打开。每个数据节点可以配置不同的serverPort。
**端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口是serverPort+10. 为支持多线程高效的处理UDP数据每个对内和对外的UDP都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10总共11个TCP/UDP端口。使用时需要确保防火墙将这些端口打开。每个数据节点可以配置不同的serverPort。
**集群对外接:** TDengine集群可以容纳单个、多个甚至几千个数据节点。应用只需要向集群中任何一个数据节点发起连接即可接需要提供的网络参数是一数据节点的End Point(FQDN加配置的端口号。通过命令行CLI启动应用taos时可以通过选项-h来指定数据节点的FQDN, -P来指定其配置的端口号如果端口不配置将采用TDengine的系统配置参数serverPort。
**集群对外接:** TDengine集群可以容纳单个、多个甚至几千个数据节点。应用只需要向集群中任何一个数据节点发起连接即可接需要提供的网络参数是一数据节点的End Point(FQDN加配置的端口号。通过命令行CLI启动应用taos时可以通过选项-h来指定数据节点的FQDN, -P来指定其配置的端口号如果端口不配置将采用TDengine的系统配置参数serverPort。
**集群内部通讯**: 各个数据节点之间通过TCP/UDP进行接。一个数据节点启动时将获取mnode所在的dnode的EP信息然后与系统中的mnode建立起交换信息。获取mnode的EP信息有三步1检查mnodeEpList文件是否存在如果不存在或不能正常打开获得mnode EP信息进入第二步2检查系统配置文件taos.cfg, 获取mnode EP配置参数first, second如果不存在或者taos.cfg里没有这两个配置参数或无效进入第三步3将自己的EP设为mnode EP, 并独立运行起来。获取mnode EP列表后数据节点发起链接,如果链接成功则成功加入进工作的集群如果不成功则尝试mnode EP列表中的下一个。如果都尝试了接都仍然失败,则休眠几秒后,再进行尝试。
**集群内部通讯**: 各个数据节点之间通过TCP/UDP进行接。一个数据节点启动时将获取mnode所在的dnode的EP信息然后与系统中的mnode建立起交换信息。获取mnode的EP信息有三步1检查mnodeEpList文件是否存在如果不存在或不能正常打开获得mnode EP信息进入第二步2检查系统配置文件taos.cfg, 获取mnode EP配置参数first, second如果不存在或者taos.cfg里没有这两个配置参数或无效进入第三步3将自己的EP设为mnode EP, 并独立运行起来。获取mnode EP列表后数据节点发起连接,如果连接成功则成功加入进工作的集群如果不成功则尝试mnode EP列表中的下一个。如果都尝试了接都仍然失败,则休眠几秒后,再进行尝试。
**MNODE的选择:** TDengine逻辑上有管理节点但没有单独的执行代码服务器侧只有一套执行代码taosd。那么哪个数据节点会是管理节点呢这是系统自动决定的无需任何人工干预。原则如下一个数据节点启动时会检查自己的End Point, 并与获取的mnode EP List进行比对如果在其中该数据节点认为自己应该启动mnode模块成为mnode。如果自己的EP不在mnode EP List里则不启动mnode模块。在系统的运行过程中由于负载均衡、宕机等原因mnode有可能迁移至新的dnode但一切都是透明的无需人工干预配置参数的修改是mnode自己根据资源做出的决定。
**新数据节点的加入**系统有了一个数据节点后就已经成为一个工作的系统。添加新的节点进集群时有两个步骤第一步使用TDengine CLI接到现有工作的数据节点然后用命令”create dnode"将新的数据节点的End Point添加进去; 第二步在新的数据节点的系统配置参数文件taos.cfg里将first, second参数设置为现有集群中任意两个数据节点的EP即可。具体添加的详细步骤请见详细的用户手册。这样就把集群一步一步的建立起来。
**新数据节点的加入**系统有了一个数据节点后就已经成为一个工作的系统。添加新的节点进集群时有两个步骤第一步使用TDengine CLI接到现有工作的数据节点然后用命令”create dnode"将新的数据节点的End Point添加进去; 第二步在新的数据节点的系统配置参数文件taos.cfg里将first, second参数设置为现有集群中任意两个数据节点的EP即可。具体添加的详细步骤请见详细的用户手册。这样就把集群一步一步的建立起来。
**重定向**无论是dnode还是taosc最先都是要发起与mnode的但mnode是系统自动创建并维护的因此对于用户来说并不知道哪个dnode在运行mnode。TDengine只要求向系统中任何一个工作的dnode发起接即可。因为任何一个正在运行的dnode都维护有目前运行的mnode EP List。当收到一个来自新启动的dnode或taosc的接请求如果自己不是mnode则将mnode EP List回复给对方taosc或新启动的dnode收到这个list, 就重新尝试建立接。当mnode EP List发生改变通过节点之间的消息交互各个数据节点就很快获取最新列表并通知taosc。
**重定向**无论是dnode还是taosc最先都是要发起与mnode的但mnode是系统自动创建并维护的因此对于用户来说并不知道哪个dnode在运行mnode。TDengine只要求向系统中任何一个工作的dnode发起接即可。因为任何一个正在运行的dnode都维护有目前运行的mnode EP List。当收到一个来自新启动的dnode或taosc的接请求如果自己不是mnode则将mnode EP List回复给对方taosc或新启动的dnode收到这个list, 就重新尝试建立接。当mnode EP List发生改变通过节点之间的消息交互各个数据节点就很快获取最新列表并通知taosc。
### 一个典型的消息流程
为解释vnode, mnode, taosc和应用之间的关系以及各自扮演的角色下面对写入数据这个典型操作的流程进行剖析。
@ -197,7 +197,7 @@ Master Vnode遵循下面的写入流程
### 主从选择
Vnode会保持一个数据版本号(Version),对内存数据进行持久化存储时,对该版本号也进行持久化存储。每个数据更新操作,无论是采集的时序数据还是元数据,这个版本号将增一。
一个vnode启动时角色(master、slave) 是不定的数据是处于未同步状态它需要与虚拟节点组内其他节点建立TCP并互相交换status其中包括version和自己的角色。通过status的交换系统进入选主流程规则如下
一个vnode启动时角色(master、slave) 是不定的数据是处于未同步状态它需要与虚拟节点组内其他节点建立TCP并互相交换status其中包括version和自己的角色。通过status的交换系统进入选主流程规则如下
1. 如果只有一个副本该副本永远就是master
2. 所有副本都在线时版本最高的被选为master

View File

@ -142,7 +142,7 @@ C/C++的API类似于MySQL的C API。应用程序使用时需要包含TDengine
获取最近一次API调用失败的原因返回值为错误代码。
**注意**:对于单个数据库连接,在同一时刻只能有一个线程使用该接调用API否则会有未定义的行为出现并可能导致客户端crash。客户端应用可以通过建立多个连接进行多线程的数据写入或查询处理。
**注意**:对于单个数据库连接,在同一时刻只能有一个线程使用该接调用API否则会有未定义的行为出现并可能导致客户端crash。客户端应用可以通过建立多个连接进行多线程的数据写入或查询处理。
### 异步查询API

View File

@ -21,7 +21,7 @@
## 5. 遇到错误"Unable to establish connection", 我怎么办?
客户端遇到接故障,请按照下面的步骤进行检查:
客户端遇到接故障,请按照下面的步骤进行检查:
1. 检查网络环境
* 云服务器检查云服务器的安全组是否打开TCP/UDP 端口6030-6042的访问权限
@ -45,7 +45,7 @@
9. 如果仍不能排除连接故障请使用命令行工具nc来分别判断指定端口的TCP和UDP连接是否通畅
检查UDP端口连接是否工作`nc -vuz {hostIP} {port} `
检查服务器侧TCP端口连接是否工作`nc -l {port}`
检查客户端侧TCP端口接是否工作:`nc {hostIP} {port}`
检查客户端侧TCP端口接是否工作:`nc {hostIP} {port}`
10. 也可以使用taos程序内嵌的网络连通检测功能来验证服务器和客户端之间指定的端口连接是否通畅包括TCP和UDP[TDengine 内嵌网络检测工具使用指南](https://www.taosdata.com/blog/2020/09/08/1816.html)。
@ -57,7 +57,7 @@
1. 请检查连接的服务器的FQDN是否正确,FQDN配置参考[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)。
2. 如果网络配置有DNS server, 请检查是否正常工作
3. 如果网络没有配置DNS server, 请检查客户端所在机器的hosts文件查看该FQDN是否配置并是否有正确的IP地址。
4. 如果网络配置OK从客户端所在机器你需要能Ping该连接的FQDN否则客户端是无法接服务器的
4. 如果网络配置OK从客户端所在机器你需要能Ping该连接的FQDN否则客户端是无法接服务器的
## 7. 虽然语法正确,为什么我还是得到 "Invalid SQL" 错误
@ -108,4 +108,8 @@ Connection = DriverManager.getConnection(url, properties);
附上必要的问题描述,以及发生该问题的执行操作,出现问题的表征及大概的时间,在<a href='https://github.com/taosdata/TDengine'> GitHub</a>提交Issue。
为了保证有足够的debug信息如果问题能够重复请修改/etc/taos/taos.cfg文件最后面添加一行“debugFlag 135"(不带引号本身然后重启taosd, 重复问题然后再递交。但系统正常运行时请一定将debugFlag设置为131否则会产生大量的日志信息降低系统效率。
为了保证有足够的debug信息如果问题能够重复请修改/etc/taos/taos.cfg文件最后面添加一行“debugFlag 135"(不带引号本身然后重启taosd, 重复问题然后再递交。也可以通过如下SQL语句临时设置taosd的日志级别。
```
alter dnode <dnode_id> debugFlag 135;
```
但系统正常运行时请一定将debugFlag设置为131否则会产生大量的日志信息降低系统效率。

View File

@ -66,21 +66,21 @@ TDengine采取的是Master-Slave模式进行同步与流行的RAFT一致性
数据实时复制有三个主要流程:选主、数据转发、数据恢复。后续做详细讨论。
## 虚拟节点之间的网络
## 虚拟节点之间的网络
虚拟节点之间通过TCP进行链接节点之间的状态交换、数据包的转发都是通过这个TCP链接(peerFd)进行。为避免竞争两个虚拟节点之间的TCP链接总是由IP地址(UINT32)小的节点作为TCP客户端发起。一旦TCP链接被中断虚拟节点能通过TCP socket自动检测到将对方标为offline。如果监测到任何错误比如数据恢复流程虚拟节点将主动重置该接。
虚拟节点之间通过TCP进行连接节点之间的状态交换、数据包的转发都是通过这个TCP连接(peerFd)进行。为避免竞争两个虚拟节点之间的TCP连接总是由IP地址(UINT32)小的节点作为TCP客户端发起。一旦TCP连接被中断虚拟节点能通过TCP socket自动检测到将对方标为offline。如果监测到任何错误比如数据恢复流程虚拟节点将主动重置该接。
一旦作为客户端的节点链接不成或中断,它将周期性的每隔一秒钟去试图去链接一次。因为TCP本身有心跳机制虚拟节点之间不再另行提供心跳。
一旦作为客户端的节点连接不成或中断,它将周期性的每隔一秒钟去试图去连接一次。因为TCP本身有心跳机制虚拟节点之间不再另行提供心跳。
如果一个unsynced节点要发起数据恢复流程它与Master将建立起专有的TCP链接(syncFd)。数据恢复完成后,该链接会被关闭。而且为限制资源的使用,系统只容许一定数量(配置参数tsMaxSyncNum)的数据恢复的socket存在。如果超过这个数字系统会将新的数据恢复请求延后处理。
如果一个unsynced节点要发起数据恢复流程它与Master将建立起专有的TCP连接(syncFd)。数据恢复完成后,该连接会被关闭。而且为限制资源的使用,系统只容许一定数量(配置参数tsMaxSyncNum)的数据恢复的socket存在。如果超过这个数字系统会将新的数据恢复请求延后处理。
任意一个节点无论有多少虚拟节点都会启动而且只会启动一个TCP server, 来接受来自其他虚拟节点的上述两类TCP的接请求。当TCP socket建立起来客户端侧发送的消息体里会带有vgId全局唯一的vgroup ID), TCP 服务器侧会检查该vgId是否已经在该节点启动运行。如果已经启动运行就接受其请求。如果不存在就直接将接请求关闭。在TDengine代码里mnode group的vgId设置为1。
任意一个节点无论有多少虚拟节点都会启动而且只会启动一个TCP server, 来接受来自其他虚拟节点的上述两类TCP的接请求。当TCP socket建立起来客户端侧发送的消息体里会带有vgId全局唯一的vgroup ID), TCP 服务器侧会检查该vgId是否已经在该节点启动运行。如果已经启动运行就接受其请求。如果不存在就直接将接请求关闭。在TDengine代码里mnode group的vgId设置为1。
## 选主流程
当同一组的两个虚拟节点之间(vnode A, vnode B)建立连接后他们互换status消息。status消息里包含本地存储的同一虚拟节点组内所有虚拟节点的role和version。
如果一个虚拟节点(vnode A)检测到与同一虚拟节点组内另外一虚拟节点vnode B接中断vnode A将立即把vnode B的role设置为offline。无论是接收到另外一虚拟节点发来的status消息还是检测与另外一虚拟节点的接中断,该虚拟节点都将进入状态处理流程。状态处理流程的规则如下:
如果一个虚拟节点(vnode A)检测到与同一虚拟节点组内另外一虚拟节点vnode B接中断vnode A将立即把vnode B的role设置为offline。无论是接收到另外一虚拟节点发来的status消息还是检测与另外一虚拟节点的接中断,该虚拟节点都将进入状态处理流程。状态处理流程的规则如下:
1. 如果检测到在线的节点数没有超过一半则将自己的状态设置为unsynced.
2. 如果在线的虚拟节点数超过一半会检查master节点是否存在如果存在则会决定是否将自己状态改为slave或启动数据恢复流程
@ -118,7 +118,7 @@ TDengine采取的是Master-Slave模式进行同步与流行的RAFT一致性
9. 如果quorum为1上述678步不会发生。
10. 如果要等待slave的确认master会启动2秒的定时器可配置如果超时则认为失败。
对于回复确认sync模块提供的是异步回调函数因此APP在调用syncForwardToPeer之后无需等待可以处理下一个操作。在Master与Slave的TCP接管道里可能有多个Forward消息这些消息是严格按照应用提供的顺序排好的。对于Forward Response也是一样TCP管道里存在多个但都是排序好的。这个顺序SYNC模块并没有做特别的事情是由APP单线程顺序写来保证的(TDengine里每个vnode的写数据都是单线程
对于回复确认sync模块提供的是异步回调函数因此APP在调用syncForwardToPeer之后无需等待可以处理下一个操作。在Master与Slave的TCP接管道里可能有多个Forward消息这些消息是严格按照应用提供的顺序排好的。对于Forward Response也是一样TCP管道里存在多个但都是排序好的。这个顺序SYNC模块并没有做特别的事情是由APP单线程顺序写来保证的(TDengine里每个vnode的写数据都是单线程
## 数据恢复流程
@ -142,9 +142,9 @@ TDengine采取的是Master-Slave模式进行同步与流行的RAFT一致性
<center> <img src="../assets/replica-restore.png"> </center>
1. 通过已经建立的TCP发送sync req给master节点
2. master收到sync req后以client的身份向vnode B主动建立一新的专用于同步的TCPsyncFd)
3. 新的TCP接建立成功后master将开始retrieve流程对应的vnode B将同步启动restore流程
1. 通过已经建立的TCP发送sync req给master节点
2. master收到sync req后以client的身份向vnode B主动建立一新的专用于同步的TCPsyncFd)
3. 新的TCP接建立成功后master将开始retrieve流程对应的vnode B将同步启动restore流程
4. Retrieve/Restore流程里先处理所有archived data (vnode里的data, head, last文件后处理WAL data。
5. 对于archived datamaster将通过回调函数getFileInfo获取数据文件的基本信息包括文件名、magic以及文件大小。
6. master 将获得的文件名、magic以及文件大小发给vnode B
@ -157,7 +157,7 @@ TDengine采取的是Master-Slave模式进行同步与流行的RAFT一致性
1. master节点调用回调函数getWalInfo获取WAL的文件名。
2. 如果getWalInfo返回值大于0表示该文件还不是最后一个WAL因此master调用sendfile一下把该文件发送给vnode B
3. 如果getWalInfo返回时为0表示该文件是最后一个WAL因为文件可能还处于写的状态中sync模块要根据WAL Head的定义逐条读出记录然后发往vnode B。
4. vnode A读取TCP接传来的数据按照WAL Head逐条读取如果版本号比现有的大调用回调函数writeToCache交给应用处理。如果小直接扔掉。
4. vnode A读取TCP接传来的数据按照WAL Head逐条读取如果版本号比现有的大调用回调函数writeToCache交给应用处理。如果小直接扔掉。
5. 上述流程循环直到所有WAL文件都被处理完。处理完后master就会将新来的数据包通过Forward消息转发给slave。
从同步文件启动起sync模块会通过inotify监控所有处理过的file以及wal。一旦发现被处理过的文件有更新变化同步流程将中止会重新启动。因为有可能落盘操作正在进行比如历史数据导入内存数据落盘把已经处理过的文件进行了修改需要重新同步才行。
@ -194,15 +194,15 @@ sync模块通过inotify监控LastWal文件的更新和关闭操作。而且在
因为写入失败客户端会重新写入数据。但对于TDengine而言是OK的。因为时序数据都是有时间戳的时间戳相同的数据更新操作第一次会执行但第二次会自动扔掉。对于Meta Data(增加、删除库、表等等的操作也是OK的。一张表、库已经被创建或删除再创建或删除不会被执行的。
在TDengine的设计里虚拟节点与虚拟节点之间是一个TCP是一个pipeline数据块一个接一个按顺序在这个pipeline里等待处理。一旦某个数据块的处理失败这个接会被重置后续的数据块的处理都会失败。因此不会存在Pipeline里一个数据块更新失败但下一个数据块成功的可能。
在TDengine的设计里虚拟节点与虚拟节点之间是一个TCP是一个pipeline数据块一个接一个按顺序在这个pipeline里等待处理。一旦某个数据块的处理失败这个接会被重置后续的数据块的处理都会失败。因此不会存在Pipeline里一个数据块更新失败但下一个数据块成功的可能。
## Split Brain的问题
选举流程中有个强制要求那就是一定有超过半数的虚拟节点在线。但是如果replication正好是偶数这个时候完全可能存在splt brain问题。
为解决这个问题TDengine提供Arbitrator的解决方法。Arbitrator是一个节点它的任务就是接受任何虚拟节点的接请求,并保持它。
为解决这个问题TDengine提供Arbitrator的解决方法。Arbitrator是一个节点它的任务就是接受任何虚拟节点的接请求,并保持它。
在启动复制模块实例时在配置参数中应用可以提供Arbitrator的IP地址。如果是奇数个副本复制模块不会与这个arbitrator去建立链接,但如果是偶数个副本,就会主动去建立链接。
在启动复制模块实例时在配置参数中应用可以提供Arbitrator的IP地址。如果是奇数个副本复制模块不会与这个arbitrator去建立连接,但如果是偶数个副本,就会主动去建立连接。
Arbitrator的程序tarbitrator.c在复制模块的同一目录, 编译整个系统时会在bin目录生成。命令行参数“-”查看可以配置的参数比如绑定的IP地址监听的端口号。

View File

@ -13,7 +13,7 @@ taosd的启动入口是dnode模块dnode然后启动其他模块包括可
该模块负责taosd与taosc, 以及其他数据节点之间的通讯。TDengine没有采取标准的HTTP或gRPC等第三方工具而是实现了自己的通讯模块RPC。
考虑到物联网场景下数据写入的包一般不大因此除支持TCP链接之外RPC还支持UDP链接。当数据包小于15K时RPC将采用UDP方式进行链接否则将采用TCP链接。对于查询类的消息RPC不管包的大小总是采取TCP链接。对于UDP链RPC实现了自己的超时、重传、顺序检查等机制以保证数据可靠传输。
考虑到物联网场景下数据写入的包一般不大因此除支持TCP连接之外RPC还支持UDP连接。当数据包小于15K时RPC将采用UDP方式进行连接否则将采用TCP连接。对于查询类的消息RPC不管包的大小总是采取TCP连接。对于UDP连RPC实现了自己的超时、重传、顺序检查等机制以保证数据可靠传输。
RPC模块还提供数据压缩功能如果数据包的字节数超过系统配置参数compressMsgSize, RPC在传输中将自动压缩数据以节省带宽。
@ -25,7 +25,7 @@ RPC模块还提供数据压缩功能如果数据包的字节数超过系统
- 系统的初始化,包括
- 从文件taos.cfg读取系统配置参数从文件dnodeCfg.json读取数据节点的配置参数
- 启动RPC模块并建立起与taosc通讯的server链接与其他数据节点通讯的server链接;
- 启动RPC模块并建立起与taosc通讯的server连接与其他数据节点通讯的server连接;
- 启动并初始化dnode的内部管理, 该模块将扫描该数据节点已有的vnode并打开它们
- 初始化可配置的模块如mnode, http, monitor等。
- 数据节点的管理,包括

View File

@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/TDengine-enterprise-arbitrator"
install_dir="${release_dir}/TDengine-enterprise-arbitrator-${version}"
else
install_dir="${release_dir}/TDengine-arbitrator"
install_dir="${release_dir}/TDengine-arbitrator-${version}"
fi
# Directories and files.
@ -48,9 +48,9 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/PowerDB-enterprise-arbitrator"
install_dir="${release_dir}/PowerDB-enterprise-arbitrator-${version}"
else
install_dir="${release_dir}/PowerDB-arbitrator"
install_dir="${release_dir}/PowerDB-arbitrator-${version}"
fi
# Directories and files.
@ -48,9 +48,9 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -32,9 +32,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/TDengine-enterprise-client"
install_dir="${release_dir}/TDengine-enterprise-client-${version}"
else
install_dir="${release_dir}/TDengine-client"
install_dir="${release_dir}/TDengine-client-${version}"
fi
# Directories and files.
@ -125,9 +125,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -32,9 +32,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/PowerDB-enterprise-client"
install_dir="${release_dir}/PowerDB-enterprise-client-${version}"
else
install_dir="${release_dir}/PowerDB-client"
install_dir="${release_dir}/PowerDB-client-${version}"
fi
# Directories and files.
@ -164,9 +164,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/TDengine-enterprise-server"
install_dir="${release_dir}/TDengine-enterprise-server-${version}"
else
install_dir="${release_dir}/TDengine-server"
install_dir="${release_dir}/TDengine-server-${version}"
fi
# Directories and files.
@ -138,9 +138,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/PowerDB-enterprise-server"
install_dir="${release_dir}/PowerDB-enterprise-server-${version}"
else
install_dir="${release_dir}/PowerDB-server"
install_dir="${release_dir}/PowerDB-server-${version}"
fi
# Directories and files.
@ -184,9 +184,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1

View File

@ -149,7 +149,7 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_initImp(JNIEnv *e
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setOptions(JNIEnv *env, jobject jobj, jint optionIndex,
jstring optionValue) {
if (optionValue == NULL) {
jniDebug("option index:%d value is null", optionIndex);
jniDebug("option index:%d value is null", (int32_t)optionIndex);
return 0;
}
@ -183,7 +183,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setOptions(JNIEnv
}
(*env)->ReleaseStringUTFChars(env, optionValue, tz1);
} else {
jniError("option index:%d is not found", optionIndex);
jniError("option index:%d is not found", (int32_t)optionIndex);
}
return res;
@ -227,10 +227,10 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn
ret = (jlong)taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport);
if (ret == 0) {
jniError("jobj:%p, conn:%p, connect to database failed, host=%s, user=%s, dbname=%s, port=%d", jobj, (void *)ret,
(char *)host, (char *)user, (char *)dbname, jport);
(char *)host, (char *)user, (char *)dbname, (int32_t)jport);
} else {
jniDebug("jobj:%p, conn:%p, connect to database succeed, host=%s, user=%s, dbname=%s, port=%d", jobj, (void *)ret,
(char *)host, (char *)user, (char *)dbname, jport);
(char *)host, (char *)user, (char *)dbname, (int32_t)jport);
}
if (host != NULL) (*env)->ReleaseStringUTFChars(env, jhost, host);
@ -385,7 +385,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsIm
}
jint ret = taos_affected_rows((SSqlObj *)res);
jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (void *)con, (void *)res, ret);
jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (void *)con, (void *)res, (int32_t)ret);
return ret;
}
@ -490,13 +490,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetBooleanFp, i, (jboolean)(*((char *)row[i]) == 1));
break;
case TSDB_DATA_TYPE_TINYINT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteFp, i, (jbyte) * ((char *)row[i]));
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteFp, i, (jbyte) * ((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetShortFp, i, (jshort) * ((short *)row[i]));
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetShortFp, i, (jshort) * ((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetIntFp, i, (jint) * (int *)row[i]);
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetIntFp, i, (jint) * (int32_t *)row[i]);
break;
case TSDB_DATA_TYPE_BIGINT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetLongFp, i, (jlong) * ((int64_t *)row[i]));

View File

@ -525,7 +525,7 @@ static void do_sum(SQLFunctionCtx *pCtx) {
*retVal += pCtx->preAggVals.statis.sum;
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
double *retVal = (double*) pCtx->aOutputBuf;
*retVal += GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.sum));
*retVal += GET_DOUBLE_VAL((const char*)&(pCtx->preAggVals.statis.sum));
}
} else { // computing based on the true data block
void *pData = GET_INPUT_CHAR(pCtx);
@ -768,7 +768,7 @@ static void avg_function(SQLFunctionCtx *pCtx) {
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) {
*pVal += pCtx->preAggVals.statis.sum;
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
*pVal += GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.sum));
*pVal += GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.sum));
}
} else {
void *pData = GET_INPUT_CHAR(pCtx);
@ -3516,12 +3516,12 @@ static void spread_function(SQLFunctionCtx *pCtx) {
pInfo->max = (double)pCtx->preAggVals.statis.max;
}
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
if (pInfo->min > GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.min))) {
pInfo->min = GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.min));
if (pInfo->min > GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.min))) {
pInfo->min = GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.min));
}
if (pInfo->max < GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.max))) {
pInfo->max = GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.max));
if (pInfo->max < GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.max))) {
pInfo->max = GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.max));
}
}

View File

@ -306,16 +306,16 @@ static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengt
switch (type) {
case TSDB_DATA_TYPE_BOOL:
sprintf(result, "%s", ((((int)(*((char *)val))) == 1) ? "true" : "false"));
sprintf(result, "%s", ((((int32_t)(*((char *)val))) == 1) ? "true" : "false"));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(result, "%d", (int)(*((char *)val)));
sprintf(result, "%d", *((int8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(result, "%d", (int)(*((short *)val)));
sprintf(result, "%d", *((int16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
sprintf(result, "%d", *((int *)val));
sprintf(result, "%d", *((int32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(result, "%"PRId64, *((int64_t *)val));

View File

@ -406,7 +406,7 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
return TSDB_CODE_SUCCESS;
}
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, SSqlCmd* pCmd,
int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
int32_t index = 0;
SStrToken sToken = {0};
@ -426,12 +426,17 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
*str += index;
if (sToken.type == TK_QUESTION) {
if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
*code = tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str);
return -1;
}
uint32_t offset = (uint32_t)(start - pDataBlocks->pData);
if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
continue;
}
strcpy(error, "client out of memory");
strcpy(pCmd->payload, "client out of memory");
*code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1;
}
@ -439,8 +444,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
int16_t type = sToken.type;
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
tscSQLSyntaxErrMsg(error, "invalid data or symbol", sToken.z);
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
*code = tscSQLSyntaxErrMsg(pCmd->payload, "invalid data or symbol", sToken.z);
return -1;
}
@ -470,14 +474,14 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
}
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec);
int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, pCmd->payload, str, isPrimaryKey, timePrec);
if (ret != TSDB_CODE_SUCCESS) {
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1; // NOTE: here 0 mean error!
}
if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z);
tscInvalidSQLErrMsg(pCmd->payload, "client time/server time can not be mixed up", sToken.z);
*code = TSDB_CODE_TSC_INVALID_TIME_STAMP;
return -1;
}
@ -522,7 +526,7 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) {
}
int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMeta, int maxRows,
SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) {
SParsedDataColInfo *spd, SSqlCmd* pCmd, int32_t *code, char *tmpTokenBuf) {
int32_t index = 0;
SStrToken sToken;
@ -534,8 +538,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe
int32_t precision = tinfo.precision;
if (spd->hasVal[0] == false) {
strcpy(error, "primary timestamp column can not be null");
*code = TSDB_CODE_TSC_INVALID_SQL;
*code = tscInvalidSQLErrMsg(pCmd->payload, "primary timestamp column can not be null", *str);
return -1;
}
@ -547,17 +550,17 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe
*str += index;
if (numOfRows >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
int32_t tSize;
int32_t retcode = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
if (retcode != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
strcpy(error, "client out of memory");
*code = retcode;
*code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
if (*code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
strcpy(pCmd->payload, "client out of memory");
return -1;
}
ASSERT(tSize > maxRows);
maxRows = tSize;
}
int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf);
int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, pCmd, precision, code, tmpTokenBuf);
if (len <= 0) { // error message has been set in tsParseOneRowData
return -1;
}
@ -568,7 +571,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe
sToken = tStrGetToken(*str, &index, false, 0, NULL);
*str += index;
if (sToken.n == 0 || sToken.type != TK_RP) {
tscSQLSyntaxErrMsg(error, ") expected", *str);
tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str);
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1;
}
@ -577,7 +580,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe
}
if (numOfRows <= 0) {
strcpy(error, "no any data points");
strcpy(pCmd->payload, "no any data points");
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1;
} else {
@ -704,7 +707,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf);
int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd, &code, tmpTokenBuf);
free(tmpTokenBuf);
if (numOfRows <= 0) {
return code;
@ -724,10 +727,6 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st
dataBuf->vgId = pTableMeta->vgroupInfo.vgId;
dataBuf->numOfTables = 1;
/*
* the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS,
* which is actually returned from server.
*/
*totalNum += numOfRows;
return TSDB_CODE_SUCCESS;
}
@ -1458,8 +1457,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
char *lineptr = line;
strtolower(line, line);
int32_t len =
tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tokenBuf);
int32_t len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd, tinfo.precision, &code, tokenBuf);
if (len <= 0 || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code;
break;

View File

@ -43,10 +43,6 @@ typedef struct SNormalStmt {
tVariant* params;
} SNormalStmt;
//typedef struct SInsertStmt {
//
//} SInsertStmt;
typedef struct STscStmt {
bool isInsert;
STscObj* taos;
@ -54,7 +50,6 @@ typedef struct STscStmt {
SNormalStmt normal;
} STscStmt;
static int normalStmtAddPart(SNormalStmt* stmt, bool isParam, char* str, uint32_t len) {
uint16_t size = stmt->numParts + 1;
if (size > stmt->sizeParts) {
@ -514,7 +509,6 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
SSqlObj* pSql = pStmt->pSql;
size_t sqlLen = strlen(sql);
//doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pSql->param = (void*) pSql;

View File

@ -1699,7 +1699,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (pItem->pNode->pParam != NULL) {
tSQLExprItem* pParamElem = &pItem->pNode->pParam->a[0];
SStrToken* pToken = &pParamElem->pNode->colInfo;
short sqlOptr = pParamElem->pNode->nSQLOptr;
int16_t sqlOptr = pParamElem->pNode->nSQLOptr;
if ((pToken->z == NULL || pToken->n == 0)
&& (TK_INTEGER != sqlOptr)) /*select count(1) from table*/ {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);

View File

@ -234,9 +234,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
uint64_t handle = (uint64_t) rpcMsg->ahandle;
void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(uint64_t));
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) {
rpcFreeCont(rpcMsg->pCont);
return;
@ -642,14 +641,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
tscError("%p illegal value of numOfCols in query msg: %"PRIu64", table cols:%d", pSql, numOfSrcCols,
tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
tscGetNumOfColumns(pTableMeta));
return TSDB_CODE_TSC_INVALID_SQL;
}
if (pQueryInfo->interval.interval < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->interval.interval);
tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
return TSDB_CODE_TSC_INVALID_SQL;
}

View File

@ -719,15 +719,15 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
len += sprintf(str + len, "%d", *((char *)row[i]));
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(str + len, "%d", *((short *)row[i]));
len += sprintf(str + len, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
len += sprintf(str + len, "%d", *((int *)row[i]));
len += sprintf(str + len, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:

View File

@ -398,8 +398,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.interval < minIntervalTime) {
tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64, pSql, pStream,
pQueryInfo->interval.interval, minIntervalTime);
tscWarn("%p stream:%p, original sample interval:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
(int64_t)pQueryInfo->interval.interval, minIntervalTime);
pQueryInfo->interval.interval = minIntervalTime;
}

View File

@ -141,7 +141,7 @@ void taos_init_imp(void) {
int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj");
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeSqlObjInCache, "sqlObj");
}
tscDebug("client is initialized successfully");

View File

@ -1788,8 +1788,8 @@ void registerSqlObj(SSqlObj* pSql) {
int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
uint64_t p = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &p, sizeof(uint64_t), DEFAULT_LIFE_TIME);
TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME);
}
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {

View File

@ -313,13 +313,13 @@ void dataColSetOffset(SDataCol *pCol, int nEle) {
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
if (pCols == NULL) {
uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCols), strerror(errno));
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
return NULL;
}
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
if (pCols->cols == NULL) {
uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCol) * maxCols, strerror(errno));
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, strerror(errno));
tdFreeDataCols(pCols);
return NULL;
}
@ -331,7 +331,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
pCols->buf = malloc(pCols->bufSize);
if (pCols->buf == NULL) {
uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCol) * maxCols, strerror(errno));
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, strerror(errno));
tdFreeDataCols(pCols);
return NULL;
}
@ -716,4 +716,4 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
return row;
}
}

View File

@ -219,7 +219,7 @@ static void getStatics_f(const TSKEY *primaryKey, const void *pData, int32_t num
}
float fv = 0;
fv = GET_FLOAT_VAL(&(data[i]));
fv = GET_FLOAT_VAL((const char*)&(data[i]));
dsum += fv;
if (fmin > fv) {
fmin = fv;
@ -233,17 +233,12 @@ static void getStatics_f(const TSKEY *primaryKey, const void *pData, int32_t num
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum = GET_DOUBLE_VAL((const char *)sum);
csum += dsum;
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &fmax);
SET_DOUBLE_VAL_ALIGN(min, &fmin);
#else
*(double*)sum = csum;
*(double*)max = fmax;
*(double*)min = fmin;
#endif
SET_DOUBLE_VAL(sum, csum);
SET_DOUBLE_VAL(max, fmax);
SET_DOUBLE_VAL(min, fmin);
}
static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
@ -264,7 +259,7 @@ static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t num
}
double dv = 0;
dv = GET_DOUBLE_VAL(&(data[i]));
dv = GET_DOUBLE_VAL((const char*)&(data[i]));
dsum += dv;
if (dmin > dv) {
dmin = dv;
@ -278,19 +273,12 @@ static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t num
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum = GET_DOUBLE_VAL((const char *)sum);
csum += dsum;
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &dmax);
SET_DOUBLE_VAL_ALIGN(min, &dmin);
#else
*(double*) sum = csum;
*(double*) max = dmax;
*(double*) min = dmin;
#endif
SET_DOUBLE_PTR(sum, &csum);
SET_DOUBLE_PTR(max, &dmax);
SET_DOUBLE_PTR(min, &dmin);
}
static void getStatics_bin(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
@ -493,46 +481,29 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
*((int32_t *)val) = GET_INT32_VAL(src);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
#ifdef _TD_ARM_32_
float fv = GET_FLOAT_VAL(src);
SET_FLOAT_VAL_ALIGN(val, &fv);
#else
*((float *)val) = GET_FLOAT_VAL(src);
#endif
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_VAL(val, GET_FLOAT_VAL(src));
break;
};
case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32_
double dv = GET_DOUBLE_VAL(src);
SET_DOUBLE_VAL_ALIGN(val, &dv);
#else
*((double *)val) = GET_DOUBLE_VAL(src);
#endif
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_VAL(val, GET_DOUBLE_VAL(src));
break;
};
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)val) = GET_INT64_VAL(src);
break;
};
case TSDB_DATA_TYPE_SMALLINT: {
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)val) = GET_INT16_VAL(src);
break;
};
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)val) = GET_INT8_VAL(src);
break;
};
case TSDB_DATA_TYPE_BINARY: {
case TSDB_DATA_TYPE_BINARY:
varDataCopy(val, src);
break;
};
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:
varDataCopy(val, src);
break;
};
default: {
memcpy(val, src, len);
break;
@ -579,4 +550,4 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
break;
}
}
}
}

View File

@ -709,46 +709,21 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
return -1;
}
#ifdef _TD_ARM_32_
//memcpy(&payload, &value, sizeof(float));
float fv = (float)value;
SET_FLOAT_VAL_ALIGN(payload, &fv);
#else
*((float *)payload) = (float)value;
#endif
SET_FLOAT_VAL(payload, value);
}
} else if (pVariant->nType >= TSDB_DATA_TYPE_BOOL && pVariant->nType <= TSDB_DATA_TYPE_BIGINT) {
#ifdef _TD_ARM_32_
//memcpy(&payload, &pVariant->i64Key, sizeof(float));
float fv = (float)pVariant->i64Key;
SET_FLOAT_VAL_ALIGN(payload, &fv);
#else
*((float *)payload) = (float)pVariant->i64Key;
#endif
SET_FLOAT_VAL(payload, pVariant->i64Key);
} else if (pVariant->nType == TSDB_DATA_TYPE_DOUBLE || pVariant->nType == TSDB_DATA_TYPE_FLOAT) {
#ifdef _TD_ARM_32_
//memcpy(&payload, &pVariant->dKey, sizeof(float));
float fv = (float)pVariant->dKey;
SET_FLOAT_VAL_ALIGN(payload, &fv);
#else
*((float *)payload) = (float)pVariant->dKey;
#endif
SET_FLOAT_VAL(payload, pVariant->dKey);
} else if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
return 0;
}
#ifdef _TD_ARM_32_
float fv = GET_FLOAT_VAL(payload);
if (isinf(fv) || isnan(fv) || fv > FLT_MAX || fv < -FLT_MAX) {
return -1;
}
#else
if (isinf(*((float *)payload)) || isnan(*((float *)payload)) || *((float *)payload) > FLT_MAX ||
*((float *)payload) < -FLT_MAX) {
return -1;
}
#endif
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
@ -765,42 +740,21 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
return -1;
}
#ifdef _TD_ARM_32_
SET_DOUBLE_VAL_ALIGN(payload, &value);
#else
*((double *)payload) = value;
#endif
SET_DOUBLE_VAL(payload, value);
}
} else if (pVariant->nType >= TSDB_DATA_TYPE_BOOL && pVariant->nType <= TSDB_DATA_TYPE_BIGINT) {
#ifdef _TD_ARM_32_
double dv = (double)(pVariant->i64Key);
SET_DOUBLE_VAL_ALIGN(payload, &dv);
#else
*((double *)payload) = (double)pVariant->i64Key;
#endif
SET_DOUBLE_VAL(payload, pVariant->i64Key);
} else if (pVariant->nType == TSDB_DATA_TYPE_DOUBLE || pVariant->nType == TSDB_DATA_TYPE_FLOAT) {
#ifdef _TD_ARM_32_
double dv = (double)(pVariant->dKey);
SET_DOUBLE_VAL_ALIGN(payload, &dv);
#else
*((double *)payload) = pVariant->dKey;
#endif
SET_DOUBLE_VAL(payload, pVariant->dKey);
} else if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
return 0;
}
#ifdef _TD_ARM_32_
double dv = GET_DOUBLE_VAL(payload);
if (isinf(dv) || isnan(dv) || dv > DBL_MAX || dv < -DBL_MAX) {
return -1;
}
#else
if (isinf(*((double *)payload)) || isnan(*((double *)payload)) || *((double *)payload) > DBL_MAX ||
*((double *)payload) < -DBL_MAX) {
return -1;
}
#endif
break;
}

@ -1 +1 @@
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b
Subproject commit 06ec30a0f1762e8169bf6b9045c82bcaa52bcdf0

View File

@ -55,6 +55,7 @@
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -67,14 +67,23 @@ public class DatabaseMetaDataResultSet implements ResultSet {
@Override
public boolean next() throws SQLException {
// boolean ret = false;
// if (rowDataList.size() > 0) {
// ret = rowDataList.iterator().hasNext();
// if (ret) {
// rowCursor = rowDataList.iterator().next();
// cursorRowNumber++;
// }
// }
// return ret;
/**** add by zyyang 2020-09-29 ****************/
boolean ret = false;
if (rowDataList.size() > 0) {
ret = rowDataList.iterator().hasNext();
if (ret) {
rowCursor = rowDataList.iterator().next();
cursorRowNumber++;
}
if (!rowDataList.isEmpty() && cursorRowNumber < rowDataList.size()) {
rowCursor = rowDataList.get(cursorRowNumber++);
ret = true;
}
return ret;
}
@ -91,7 +100,8 @@ public class DatabaseMetaDataResultSet implements ResultSet {
@Override
public String getString(int columnIndex) throws SQLException {
columnIndex--;
return rowCursor.getString(columnIndex, columnMetaDataList.get(columnIndex).getColType());
int colType = columnMetaDataList.get(columnIndex).getColType();
return rowCursor.getString(columnIndex, colType);
}
@Override

View File

@ -131,22 +131,31 @@ do { \
#define GET_INT16_VAL(x) (*(int16_t *)(x))
#define GET_INT32_VAL(x) (*(int32_t *)(x))
#define GET_INT64_VAL(x) (*(int64_t *)(x))
#ifdef _TD_ARM_32_
#define GET_FLOAT_VAL(x) taos_align_get_float(x)
#define GET_DOUBLE_VAL(x) taos_align_get_double(x)
float taos_align_get_float(const char* pBuf);
double taos_align_get_double(const char* pBuf);
#ifdef _TD_ARM_32
//#define __float_align_declear() float __underlyFloat = 0.0;
//#define __float_align_declear()
//#define GET_FLOAT_VAL_ALIGN(x) (*(int32_t*)&(__underlyFloat) = *(int32_t*)(x); __underlyFloat);
// notes: src must be float or double type variable !!!
#define SET_FLOAT_VAL_ALIGN(dst, src) (*(int32_t*) dst = *(int32_t*)src);
#define SET_DOUBLE_VAL_ALIGN(dst, src) (*(int64_t*) dst = *(int64_t*)src);
//#define SET_FLOAT_VAL_ALIGN(dst, src) (*(int32_t*) dst = *(int32_t*)src);
//#define SET_DOUBLE_VAL_ALIGN(dst, src) (*(int64_t*) dst = *(int64_t*)src);
float taos_align_get_float(const char* pBuf);
double taos_align_get_double(const char* pBuf);
#define GET_FLOAT_VAL(x) taos_align_get_float(x)
#define GET_DOUBLE_VAL(x) taos_align_get_double(x)
#define SET_FLOAT_VAL(x, y) { float z = (float)(y); (*(int32_t*) x = *(int32_t*)(&z)); }
#define SET_DOUBLE_VAL(x, y) { double z = (double)(y); (*(int64_t*) x = *(int64_t*)(&z)); }
#define SET_FLOAT_PTR(x, y) { (*(int32_t*) x = *(int32_t*)y); }
#define SET_DOUBLE_PTR(x, y) { (*(int64_t*) x = *(int64_t*)y); }
#else
#define GET_FLOAT_VAL(x) (*(float *)(x))
#define GET_DOUBLE_VAL(x) (*(double *)(x))
#define GET_FLOAT_VAL(x) (*(float *)(x))
#define GET_DOUBLE_VAL(x) (*(double *)(x))
#define SET_FLOAT_VAL(x, y) { (*(float *)(x)) = (float)(y); }
#define SET_DOUBLE_VAL(x, y) { (*(double *)(x)) = (double)(y); }
#define SET_FLOAT_PTR(x, y) { (*(float *)(x)) = (*(float *)(y)); }
#define SET_DOUBLE_PTR(x, y) { (*(double *)(x)) = (*(double *)(y)); }
#endif
typedef struct tDataTypeDescriptor {

View File

@ -246,6 +246,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, 0, 0x1001, "WAL file is corrupted")
// http
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, 0, 0x1100, "http server is not onlin")

View File

@ -419,16 +419,16 @@ static void dumpFieldToFile(FILE* fp, const char* val, TAOS_FIELD* field, int32_
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
fprintf(fp, "%d", ((((int)(*((char *)val))) == 1) ? 1 : 0));
fprintf(fp, "%d", ((((int32_t)(*((char *)val))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
fprintf(fp, "%d", (int)(*((char *)val)));
fprintf(fp, "%d", *((int8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
fprintf(fp, "%d", (int)(*((short *)val)));
fprintf(fp, "%d", *((int16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
fprintf(fp, "%d", *((int *)val));
fprintf(fp, "%d", *((int32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
fprintf(fp, "%" PRId64, *((int64_t *)val));
@ -559,16 +559,16 @@ static void printField(const char* val, TAOS_FIELD* field, int width, int32_t le
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
printf("%*s", width, ((((int)(*((char *)val))) == 1) ? "true" : "false"));
printf("%*s", width, ((((int32_t)(*((char *)val))) == 1) ? "true" : "false"));
break;
case TSDB_DATA_TYPE_TINYINT:
printf("%*d", width, (int)(*((char *)val)));
printf("%*d", width, *((int8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
printf("%*d", width, (int)(*((short *)val)));
printf("%*d", width, *((int16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
printf("%*d", width, *((int *)val));
printf("%*d", width, *((int32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
printf("%*" PRId64, width, *((int64_t *)val));

View File

@ -185,7 +185,11 @@ static int32_t sdbInitWal() {
}
sdbInfo("open sdb wal for restore");
walRestore(tsSdbObj.wal, NULL, sdbWrite);
int code = walRestore(tsSdbObj.wal, NULL, sdbWrite);
if (code != TSDB_CODE_SUCCESS) {
sdbError("failed to open wal for restore, reason:%s", tstrerror(code));
return -1;
}
return 0;
}

View File

@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 5, true, mnodeFreeShowObj, "show");
tsMnodeShowCache = taosCacheInit(TSDB_CACHE_PTR_KEY, 5, true, mnodeFreeShowObj, "show");
return 0;
}
@ -378,8 +378,8 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) {
}
static bool mnodeAccquireShowObj(SShowObj *pShow) {
uint64_t handleVal = (uint64_t)pShow;
SShowObj **ppShow = taosCacheAcquireByKey(tsMnodeShowCache, &handleVal, sizeof(int64_t));
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
SShowObj **ppShow = taosCacheAcquireByKey(tsMnodeShowCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE));
if (ppShow) {
mDebug("%p, show is accquired from cache, data:%p, index:%d", pShow, ppShow, pShow->index);
return true;
@ -393,8 +393,8 @@ static void* mnodePutShowObj(SShowObj *pShow) {
if (tsMnodeShowCache != NULL) {
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
uint64_t handleVal = (uint64_t)pShow;
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(int64_t), &pShow, sizeof(int64_t), DEFAULT_SHOWHANDLE_LIFE_SPAN);
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_SHOWHANDLE_LIFE_SPAN);
pShow->ppShow = (void**)ppShow;
mDebug("%p, show is put into cache, data:%p index:%d", pShow, ppShow, pShow->index);
return pShow;

View File

@ -24,10 +24,14 @@ extern "C" {
#include "osDarwin.h"
#endif
#ifdef _TD_ARM_64_
#ifdef _TD_ARM_64
#include "osArm64.h"
#endif
#ifdef _TD_ARM_32
#include "osArm32.h"
#endif
#ifdef _TD_LINUX_64
#include "osLinux64.h"
#endif
@ -40,7 +44,7 @@ extern "C" {
#include "osAlpine.h"
#endif
#ifdef _TD_NINGSI_60_
#ifdef _TD_NINGSI_60
#include "osNingsi.h"
#endif

90
src/os/inc/osArm32.h Normal file
View File

@ -0,0 +1,90 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_ARM32_H
#define TDENGINE_OS_ARM32_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdio.h>
#include <stdlib.h>
#include <argp.h>
#include <arpa/inet.h>
#include <assert.h>
#include <ctype.h>
#include <dirent.h>
#include <endian.h>
#include <errno.h>
#include <float.h>
#include <ifaddrs.h>
#include <libgen.h>
#include <limits.h>
#include <locale.h>
#include <math.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <pthread.h>
#include <pwd.h>
#include <regex.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
#include <strings.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/file.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/sendfile.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/statvfs.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <syslog.h>
#include <termios.h>
#include <unistd.h>
#include <wchar.h>
#include <wordexp.h>
#include <wctype.h>
#include <inttypes.h>
#include <fcntl.h>
#include <sys/utsname.h>
#include <sys/resource.h>
#include <error.h>
#define TAOS_OS_FUNC_LZ4
#define BUILDIN_CLZL(val) __builtin_clzll(val)
#define BUILDIN_CTZL(val) __builtin_ctzll(val)
#define BUILDIN_CLZ(val) __builtin_clz(val)
#define BUILDIN_CTZ(val) __builtin_ctz(val)
#ifdef __cplusplus
}
#endif
#endif

View File

@ -20,6 +20,8 @@
extern "C" {
#endif
#include <sys/types.h>
// TAOS_OS_FUNC_DIR
void taosRemoveDir(char *rootDir);
int taosMkDir(const char *pathname, mode_t mode);

View File

@ -129,8 +129,6 @@ void* atomic_exchange_ptr_impl( void **ptr, void *val );
#define atomic_fetch_xor_64(ptr, val) __sync_fetch_and_xor((ptr), (val))
#define atomic_fetch_xor_ptr(ptr, val) __sync_fetch_and_xor((ptr), (val))
#ifdef __cplusplus
}
#endif

View File

@ -51,8 +51,6 @@
extern "C" {
#endif
#define TAOS_OS_FUNC_ATOMIC
#define TAOS_OS_FUNC_LZ4
int32_t BUILDIN_CLZL(uint64_t val);
int32_t BUILDIN_CLZ(uint32_t val);
@ -351,4 +349,4 @@ void wordfree(wordexp_t *pwordexp);
#ifdef __cplusplus
}
#endif
#endif
#endif

View File

@ -9,3 +9,7 @@ SET_SOURCE_FILES_PROPERTIES(osCoredump.c PROPERTIES COMPILE_FLAGS -w)
ADD_LIBRARY(osdetail ${SRC})
TARGET_LINK_LIBRARIES(osdetail os)
IF (TD_ARM_32 OR TD_LINUX_32)
TARGET_LINK_LIBRARIES(osdetail atomic)
ENDIF ()

View File

@ -569,7 +569,6 @@ int taosSystem(const char *cmd) {
}
}
int _sysctl(struct __sysctl_args *args );
void taosSetCoreDump() {
if (0 == tsEnableCoreFile) {
return;

View File

@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#ifdef _TD_NINGSI_60_
#ifdef _TD_NINGSI_60
void* atomic_exchange_ptr_impl(void** ptr, void* val ) {
void *old;
do {

View File

@ -67,7 +67,7 @@ static void httpDestroyContext(void *data) {
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpDestroyContext, "restc");
tsHttpServer.contextCache = taosCacheInit(TSDB_CACHE_PTR_KEY, 2, true, httpDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
@ -117,8 +117,9 @@ HttpContext *httpCreateContext(int32_t fd) {
pContext->state = HTTP_CONTEXT_STATE_READY;
pContext->parser = httpCreateParser(pContext);
uint64_t handleVal = (uint64_t)pContext;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(int64_t), &pContext, sizeof(int64_t), 3000);
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pContext;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &pContext,
sizeof(TSDB_CACHE_PTR_TYPE), 3000);
pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
@ -129,8 +130,8 @@ HttpContext *httpCreateContext(int32_t fd) {
}
HttpContext *httpGetContext(void *ptr) {
uint64_t handleVal = (uint64_t)ptr;
HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &handleVal, sizeof(HttpContext *));
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)ptr;
HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE));
if (ppContext) {
HttpContext *pContext = *ppContext;

View File

@ -145,13 +145,13 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d,", fields[i].name, *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%ld", fields[i].name, *((int64_t *)row[i]));
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%" PRId64, fields[i].name, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.5f", fields[i].name, *((float *)row[i]));
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.5f", fields[i].name, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.9f", fields[i].name, *((double *)row[i]));
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.9f", fields[i].name, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
@ -210,10 +210,10 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
httpJsonFloat(jsonBuf, *((float *)row[i]));
httpJsonFloat(jsonBuf, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
httpJsonDouble(jsonBuf, *((double *)row[i]));
httpJsonDouble(jsonBuf, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:

View File

@ -113,7 +113,7 @@ int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
httpTrace("context:%p, fd:%d, no data need dump", buf->pContext, buf->pContext->fd);
return 0; // there is no data to dump.
} else {
int32_t len = sprintf(sLen, "%lx\r\n", srcLen);
int32_t len = sprintf(sLen, "%" PRIx64 "\r\n", srcLen);
httpTrace("context:%p, fd:%d, write body, chunkSize:%" PRIu64 ", response:\n%s", buf->pContext, buf->pContext->fd,
srcLen, buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len);
@ -267,9 +267,9 @@ void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) {
ptm = localtime(&tt);
int32_t length = (int32_t) strftime(ts, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (us) {
length += snprintf(ts + length, 8, ".%06ld", t % precision);
length += snprintf(ts + length, 8, ".%06" PRId64, t % precision);
} else {
length += snprintf(ts + length, 5, ".%03ld", t % precision);
length += snprintf(ts + length, 5, ".%03" PRId64, t % precision);
}
httpJsonString(buf, ts, length);
@ -287,9 +287,9 @@ void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us) {
ptm = localtime(&tt);
int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm);
if (us) {
length += snprintf(ts + length, 8, ".%06ld", t % precision);
length += snprintf(ts + length, 8, ".%06" PRId64, t % precision);
} else {
length += snprintf(ts + length, 5, ".%03ld", t % precision);
length += snprintf(ts + length, 5, ".%03" PRId64, t % precision);
}
length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm);

View File

@ -124,10 +124,10 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
httpJsonFloat(jsonBuf, *((float *)row[i]));
httpJsonFloat(jsonBuf, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
httpJsonDouble(jsonBuf, *((double *)row[i]));
httpJsonDouble(jsonBuf, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:

View File

@ -5050,8 +5050,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
qDebug(
"QInfo %p numOfTables:%"PRIu64", index:%d, numOfGroups:%" PRIzu ", %"PRId64" points returned, total:%"PRId64", offset:%" PRId64,
pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total,
"QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 " points returned, total:%" PRId64 ", offset:%" PRId64,
pQInfo, (uint64_t)pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total,
pQuery->limit.offset);
}
@ -7032,7 +7032,7 @@ void* qOpenQueryMgmt(int32_t vgId) {
return NULL;
}
pQueryMgmt->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryMgmt->qinfoPool = taosCacheInit(TSDB_CACHE_PTR_KEY, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryMgmt->closed = false;
pQueryMgmt->vgId = vgId;
@ -7101,23 +7101,23 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo);
return NULL;
} else {
uint64_t handleVal = (uint64_t) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(int64_t), &qInfo, POINTER_BYTES, DEFAULT_QHANDLE_LIFE_SPAN);
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_QHANDLE_LIFE_SPAN);
// pthread_mutex_unlock(&pQueryMgmt->lock);
return handle;
}
}
void** qAcquireQInfo(void* pMgmt, uint64_t key) {
void** qAcquireQInfo(void* pMgmt, uint64_t _key) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) {
return NULL;
}
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(uint64_t));
TSDB_CACHE_PTR_TYPE key = (TSDB_CACHE_PTR_TYPE)_key;
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(TSDB_CACHE_PTR_TYPE));
if (handle == NULL || *handle == NULL) {
return NULL;
} else {

View File

@ -240,7 +240,7 @@ void *rpcOpen(const SRpcInit *pInit) {
size_t size = sizeof(SRpcConn) * pRpc->sessions;
pRpc->connList = (SRpcConn *)calloc(1, size);
if (pRpc->connList == NULL) {
tError("%s failed to allocate memory for taos connections, size:%ld", pRpc->label, size);
tError("%s failed to allocate memory for taos connections, size:%" PRId64, pRpc->label, (int64_t)size);
rpcClose(pRpc);
return NULL;
}

View File

@ -211,7 +211,7 @@ static void *taosRecvUdpData(void *param) {
char *tmsg = malloc(dataLen + tsRpcOverhead);
if (NULL == tmsg) {
tError("%s failed to allocate memory, size:%ld", pConn->label, dataLen);
tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
continue;
} else {
tDebug("UDP malloc mem: %p", tmsg);

View File

@ -491,7 +491,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL;
SSyncPeer *pPeer = (SSyncPeer *)calloc(1, sizeof(SSyncPeer));
SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer));
if (pPeer == NULL) return NULL;
pPeer->nodeId = pInfo->nodeId;
@ -499,7 +499,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer->ip = ip;
pPeer->port = pInfo->nodePort;
pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0;
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%d", pNode->vgId, pPeer->fqdn, pPeer->port);
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port);
pPeer->peerFd = -1;
pPeer->syncFd = -1;

View File

@ -325,7 +325,7 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
// if all data up to fversion is read out, it is over
if (pPeer->sversion >= fversion && fversion > 0) {
code = 0;
sDebug("%s, data up to fversion:%ld has been read out, bytes:%d", pPeer->id, fversion, bytes);
sDebug("%s, data up to fversion:%" PRId64 " has been read out, bytes:%d", pPeer->id, fversion, bytes);
break;
}

View File

@ -109,7 +109,6 @@ int processRpcMsg(void *item) {
if (pCfg->quorum <= 1) {
rpcFreeCont(pMsg->pCont);
taosFreeQitem(item);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(msgSize);
@ -117,6 +116,7 @@ int processRpcMsg(void *item) {
rpcMsg.handle = pMsg->handle;
rpcMsg.code = code;
rpcSendResponse(&rpcMsg);
taosFreeQitem(item);
}
return code;
@ -386,7 +386,7 @@ int main(int argc, char *argv[]) {
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
printf(" [-v version]: initial node version, default is:%ld\n", syncInfo.version);
printf(" [-v version]: initial node version, default is:%" PRId64 "\n", syncInfo.version);
printf(" [-r replica]: replicacation number, default is:%d\n", pCfg->replica);
printf(" [-q quorum]: quorum, default is:%d\n", pCfg->quorum);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);

View File

@ -697,22 +697,41 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
if (pCheckInfo->pDataCols == NULL) {
tsdbError("%p failed to malloc buf, %p", pQueryHandle, pQueryHandle->qinfo);
tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return terrno;
goto _error;
}
}
STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj);
tdInitDataCols(pCheckInfo->pDataCols, pSchema);
tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema);
tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
int32_t code = tdInitDataCols(pCheckInfo->pDataCols, pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _error;
}
code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _error;
}
code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _error;
}
int16_t* colIds = pQueryHandle->defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle)));
if (ret != TSDB_CODE_SUCCESS) {
return terrno;
int32_t c = terrno;
assert(c != TSDB_CODE_SUCCESS);
goto _error;
}
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
@ -729,10 +748,16 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
int64_t elapsedTime = (taosGetTimestampUs() - st);
pQueryHandle->cost.blockLoadTime += elapsedTime;
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us, %p",
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %p",
pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo);
return TSDB_CODE_SUCCESS;
_error:
pBlock->numOfRows = 0;
tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %p",
pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pQueryHandle->qinfo);
return terrno;
}
static int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo);

View File

@ -24,6 +24,14 @@ extern "C" {
#include "tlockfree.h"
#include "hash.h"
#if defined(_TD_ARM_32)
#define TSDB_CACHE_PTR_KEY TSDB_DATA_TYPE_INT
#define TSDB_CACHE_PTR_TYPE int32_t
#else
#define TSDB_CACHE_PTR_KEY TSDB_DATA_TYPE_BIGINT
#define TSDB_CACHE_PTR_TYPE int64_t
#endif
typedef void (*__cache_free_fn_t)(void*);
typedef struct SCacheStatis {

View File

@ -738,7 +738,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
int64_t et = taosGetTimestampUs();
uDebug("hash table resize completed, new capacity:%"PRId64", load factor:%f, elapsed time:%fms", pHashObj->capacity,
uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", (int32_t)pHashObj->capacity,
((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
}

View File

@ -385,9 +385,10 @@ static void walRelease(SWal *pWal) {
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
char *name = pWal->name;
int size = 1024 * 1024; // default 1M buffer size
terrno = 0;
char *buffer = malloc(1024000); // size for one record
char *buffer = malloc(size);
if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
@ -395,7 +396,7 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
SWalHead *pHead = (SWalHead *)buffer;
int fd = open(name, O_RDONLY);
int fd = open(name, O_RDWR);
if (fd < 0) {
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@ -405,29 +406,58 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
wDebug("wal:%s, start to restore", name);
size_t offset = 0;
while (1) {
int ret = taosTRead(fd, pHead, sizeof(SWalHead));
if ( ret == 0) break;
if (ret == 0) break;
if (ret != sizeof(SWalHead)) {
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
if (ret < 0) {
wError("wal:%s, failed to read wal head part since %s", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < sizeof(SWalHead)) {
wError("wal:%s, failed to read head, ret:%d, skip the rest of file", name, ret);
taosFtruncate(fd, offset);
fsync(fd);
break;
}
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
terrno = TAOS_SYSTEM_ERROR(errno);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(false);
break;
}
}
if (pHead->len > size - sizeof(SWalHead)) {
size = sizeof(SWalHead) + pHead->len;
buffer = realloc(buffer, size);
if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
pHead = (SWalHead *)buffer;
}
ret = taosTRead(fd, pHead->cont, pHead->len);
if ( ret != pHead->len) {
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
if (ret < 0) {
wError("wal:%s failed to read wal body part since %s", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
wError("wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", name, pHead->len, ret);
taosFtruncate(fd, offset);
fsync(fd);
break;
}
offset = offset + sizeof(SWalHead) + pHead->len;
if (pWal->keep) pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
}

View File

@ -71,7 +71,7 @@ int main(int argc, char *argv[]) {
printf(" [-t total]: total wal files, default is:%d\n", total);
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
printf(" [-k keep]: keep the wal after closing, default is:%d\n", keep);
printf(" [-v version]: initial version, default is:%ld\n", ver);
printf(" [-v version]: initial version, default is:%" PRId64 "\n", ver);
printf(" [-d debugFlag]: debug flag, default:%d\n", dDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
@ -97,7 +97,7 @@ int main(int argc, char *argv[]) {
exit(-1);
}
printf("version starts from:%ld\n", ver);
printf("version starts from:%" PRId64 "\n", ver);
int contLen = sizeof(SWalHead) + size;
SWalHead *pHead = (SWalHead *) malloc(contLen);

View File

@ -278,7 +278,7 @@ void writeData() {
free(threads);
printf("---- Spent %f seconds to insert %ld records, speed: %f Rows/Second\n", seconds, statis.totalRows, rs);
printf("---- Spent %f seconds to insert %" PRId64 " records, speed: %f Rows/Second\n", seconds, statis.totalRows, rs);
}
void readDataImp(void *param)

View File

@ -19,6 +19,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <taos.h> // TAOS header file
int main(int argc, char *argv[]) {
@ -67,7 +68,7 @@ int main(int argc, char *argv[]) {
// insert 10 records
int i = 0;
for (i = 0; i < 10; ++i) {
sprintf(qstr, "insert into m1 values (%ld, %d, %d, %d, %d, %f, %lf, '%s')", 1546300800000 + i * 1000, i, i, i, i*10000000, i*1.0, i*2.0, "hello");
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", 1546300800000 + i * 1000, i, i, i, i*10000000, i*1.0, i*2.0, "hello");
printf("qstr: %s\n", qstr);
if (taos_query(taos, qstr)) {
printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));

View File

@ -1,302 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"database/sql"
"time"
"log"
"fmt"
_ "github.com/taosdata/driver-go/taosSql"
)
func main() {
taosDriverName := "taosSql"
demodb := "demodb"
demot := "demot"
fmt.Printf("\n======== start demo test ========\n")
// open connect to taos server
db, err := sql.Open(taosDriverName, "root:taosdata@/tcp(127.0.0.1:0)/")
if err != nil {
log.Fatalf("Open database error: %s\n", err)
}
defer db.Close()
drop_database(db, demodb)
create_database(db, demodb)
use_database(db, demodb)
create_table(db, demot)
insert_data(db, demot)
select_data(db, demot)
fmt.Printf("\n======== start stmt mode test ========\n")
demodbStmt := "demodbStmt"
demotStmt := "demotStmt"
drop_database_stmt(db, demodbStmt)
create_database_stmt(db, demodbStmt)
use_database_stmt(db, demodbStmt)
create_table_stmt(db, demotStmt)
insert_data_stmt(db, demotStmt)
select_data_stmt(db, demotStmt)
fmt.Printf("\n======== end demo test ========\n")
}
func drop_database(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
res, err := db.Exec("drop database if exists " + demodb)
checkErr(err, "drop database if exists " + demodb)
affectd, err := res.RowsAffected()
checkErr(err, "drop db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("drop database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func create_database(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// create database
res, err := db.Exec("create database " + demodb)
checkErr(err, "create db, db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "create db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
return
}
func use_database(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// use database
res, err := db.Exec("use " + demodb) // notes: must no quote to db name
checkErr(err, "use db db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "use db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("use database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func create_table(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
// create table
res, err := db.Exec("create table " + demot + " (ts timestamp, id int, name binary(8), len tinyint, flag bool, notes binary(8), fv float, dv double)")
checkErr(err, "create table db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "create table res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create table result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func insert_data(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
// insert data
res, err := db.Exec("insert into " + demot +
" values (now, 100, 'beijing', 10, true, 'one', 123.456, 123.456)" +
" (now+1s, 101, 'shanghai', 11, true, 'two', 789.123, 789.123)" +
" (now+2s, 102, 'shenzhen', 12, false, 'three', 456.789, 456.789)")
checkErr(err, "insert data, db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "insert data res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func select_data(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
rows, err := db.Query("select * from ? " , demot) // go text mode
checkErr(err, "select db.Query")
fmt.Printf("%10s%s%8s %5s %9s%s %s %8s%s %7s%s %8s%s %4s%s %5s%s\n", " ","ts", " ", "id"," ", "name"," ","len", " ","flag"," ", "notes", " ", "fv", " ", " ", "dv")
var affectd int
for rows.Next() {
var ts string
var name string
var id int
var len int8
var flag bool
var notes string
var fv float32
var dv float64
err = rows.Scan(&ts, &id, &name, &len, &flag, &notes, &fv, &dv)
checkErr(err, "select rows.Scan")
fmt.Printf("%s\t", ts)
fmt.Printf("%d\t",id)
fmt.Printf("%10s\t",name)
fmt.Printf("%d\t",len)
fmt.Printf("%t\t",flag)
fmt.Printf("%s\t",notes)
fmt.Printf("%06.3f\t",fv)
fmt.Printf("%09.6f\n",dv)
affectd++
}
et := time.Now().Nanosecond()
fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func drop_database_stmt(db *sql.DB,demodb string) {
st := time.Now().Nanosecond()
// drop test db
res, err := db.Exec("drop database if exists " + demodb)
checkErr(err, "drop database " + demodb)
affectd, err := res.RowsAffected()
checkErr(err, "drop db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("drop database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func create_database_stmt(db *sql.DB,demodb string) {
st := time.Now().Nanosecond()
// create database
//var stmt interface{}
stmt, err := db.Prepare("create database ?")
checkErr(err, "create db, db.Prepare")
//var res driver.Result
res, err := stmt.Exec(demodb)
checkErr(err, "create db, stmt.Exec")
//fmt.Printf("Query OK, %d row(s) affected()", res.RowsAffected())
affectd, err := res.RowsAffected()
checkErr(err, "create db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func use_database_stmt (db *sql.DB,demodb string) {
st := time.Now().Nanosecond()
// create database
//var stmt interface{}
stmt, err := db.Prepare("use " + demodb)
checkErr(err, "use db, db.Prepare")
res, err := stmt.Exec()
checkErr(err, "use db, stmt.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "use db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("use database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func create_table_stmt (db *sql.DB,demot string) {
st := time.Now().Nanosecond()
// create table
// (ts timestamp, id int, name binary(8), len tinyint, flag bool, notes binary(8), fv float, dv double)
stmt, err := db.Prepare("create table ? (? timestamp, ? int, ? binary(10), ? tinyint, ? bool, ? binary(8), ? float, ? double)")
checkErr(err, "create table db.Prepare")
res, err := stmt.Exec(demot, "ts", "id", "name", "len", "flag", "notes", "fv", "dv")
checkErr(err, "create table stmt.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "create table res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create table result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func insert_data_stmt(db *sql.DB,demot string) {
st := time.Now().Nanosecond()
// insert data into table
stmt, err := db.Prepare("insert into ? values(?, ?, ?, ?, ?, ?, ?, ?) (?, ?, ?, ?, ?, ?, ?, ?) (?, ?, ?, ?, ?, ?, ?, ?)")
checkErr(err, "insert db.Prepare")
res, err := stmt.Exec(demot, "now" , 1000, "'haidian'" , 6, true, "'AI world'", 6987.654, 321.987,
"now+1s", 1001, "'changyang'" , 7, false, "'DeepMode'", 12356.456, 128634.456,
"now+2s", 1002, "'chuangping'" , 8, true, "'database'", 3879.456, 65433478.456,)
checkErr(err, "insert data, stmt.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func select_data_stmt(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
stmt, err := db.Prepare("select ?, ?, ?, ?, ?, ?, ?, ? from ?" ) // go binary mode
checkErr(err, "db.Prepare")
rows, err := stmt.Query("ts", "id","name","len", "flag","notes", "fv", "dv", demot)
checkErr(err, "stmt.Query")
fmt.Printf("%10s%s%8s %5s %8s%s %s %10s%s %7s%s %8s%s %11s%s %14s%s\n", " ","ts", " ", "id"," ", "name"," ","len", " ","flag"," ", "notes", " ", "fv", " ", " ", "dv")
var affectd int
for rows.Next() {
var ts string
var name string
var id int
var len int8
var flag bool
var notes string
var fv float32
var dv float64
err = rows.Scan(&ts, &id, &name, &len, &flag, &notes, &fv, &dv)
//fmt.Println("start scan fields from row.rs, &fv:", &fv)
//err = rows.Scan(&fv)
checkErr(err, "rows.Scan")
fmt.Printf("%s\t", ts)
fmt.Printf("%d\t",id)
fmt.Printf("%10s\t",name)
fmt.Printf("%d\t",len)
fmt.Printf("%t\t",flag)
fmt.Printf("%s\t",notes)
fmt.Printf("%06.3f\t",fv)
fmt.Printf("%09.6f\n",dv)
affectd++
}
et := time.Now().Nanosecond()
fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func checkErr(err error, prompt string) {
if err != nil {
fmt.Printf("%s\n", prompt)
panic(err)
}
}

View File

@ -0,0 +1,409 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/taosSql"
"os"
"sync"
"runtime"
"strconv"
"time"
"flag"
"math/rand"
//"golang.org/x/sys/unix"
)
const (
maxLocationSize = 32
maxSqlBufSize = 65480
)
var locations = [maxLocationSize]string {
"Beijing", "Shanghai", "Guangzhou", "Shenzhen",
"HangZhou", "Tianjin", "Wuhan", "Changsha",
"Nanjing", "Xian"}
type config struct {
hostName string
serverPort int
user string
password string
dbName string
supTblName string
tablePrefix string
numOftables int
numOfRecordsPerTable int
numOfRecordsPerReq int
numOfThreads int
startTimestamp string
startTs int64
keep int
days int
}
var configPara config
var taosDriverName = "taosSql"
var url string
func init() {
flag.StringVar(&configPara.hostName, "h", "127.0.0.1","The host to connect to TDengine server.")
flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.")
flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.")
flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.")
flag.StringVar(&configPara.dbName, "d", "test", "Destination database.")
flag.StringVar(&configPara.tablePrefix, "m", "d", "Table prefix name.")
flag.IntVar(&configPara.numOftables, "t", 2, "The number of tables.")
flag.IntVar(&configPara.numOfRecordsPerTable, "n", 10, "The number of records per table.")
flag.IntVar(&configPara.numOfRecordsPerReq, "r", 3, "The number of records per request.")
flag.IntVar(&configPara.numOfThreads, "T", 1, "The number of threads.")
flag.StringVar(&configPara.startTimestamp, "s", "2020-10-01 08:00:00", "The start timestamp for one table.")
flag.Parse()
configPara.keep = 365 * 20
configPara.days = 30
configPara.supTblName = "meters"
startTs, err := time.ParseInLocation("2006-01-02 15:04:05", configPara.startTimestamp, time.Local)
if err==nil {
configPara.startTs = startTs.UnixNano() / 1e6
}
}
func printAllArgs() {
fmt.Printf("\n============= args parse result: =============\n")
fmt.Printf("dbName: %v\n", configPara.hostName)
fmt.Printf("serverPort: %v\n", configPara.serverPort)
fmt.Printf("usr: %v\n", configPara.user)
fmt.Printf("password: %v\n", configPara.password)
fmt.Printf("dbName: %v\n", configPara.dbName)
fmt.Printf("tablePrefix: %v\n", configPara.tablePrefix)
fmt.Printf("numOftables: %v\n", configPara.numOftables)
fmt.Printf("numOfRecordsPerTable: %v\n", configPara.numOfRecordsPerTable)
fmt.Printf("numOfRecordsPerReq: %v\n", configPara.numOfRecordsPerReq)
fmt.Printf("numOfThreads: %v\n", configPara.numOfThreads)
fmt.Printf("startTimestamp: %v[%v]\n", configPara.startTimestamp, configPara.startTs)
fmt.Printf("================================================\n")
}
func main() {
printAllArgs()
fmt.Printf("Please press enter key to continue....\n")
fmt.Scanln()
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
// open connect to taos server
//db, err := sql.Open(taosDriverName, url)
//if err != nil {
// fmt.Println("Open database error: %s\n", err)
// os.Exit(1)
//}
//defer db.Close()
createDatabase(configPara.dbName, configPara.supTblName)
fmt.Printf("======== create database success! ========\n\n")
//create_table(db, stblName)
multiThreadCreateTable(configPara.numOfThreads, configPara.numOftables, configPara.dbName, configPara.tablePrefix)
fmt.Printf("======== create super table and child tables success! ========\n\n")
//insert_data(db, demot)
multiThreadInsertData(configPara.numOfThreads, configPara.numOftables, configPara.dbName, configPara.tablePrefix)
fmt.Printf("======== insert data into child tables success! ========\n\n")
//select_data(db, demot)
selectTest(configPara.dbName, configPara.tablePrefix, configPara.supTblName)
fmt.Printf("======== select data success! ========\n\n")
fmt.Printf("======== end demo ========\n")
}
func createDatabase(dbName string, supTblName string) {
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
// drop database if exists
sqlStr := "drop database if exists " + dbName
_, err = db.Exec(sqlStr)
checkErr(err, sqlStr)
time.Sleep(time.Second)
// create database
sqlStr = "create database " + dbName + " keep " + strconv.Itoa(configPara.keep) + " days " + strconv.Itoa(configPara.days)
_, err = db.Exec(sqlStr)
checkErr(err, sqlStr)
// use database
//sqlStr = "use " + dbName
//_, err = db.Exec(sqlStr)
//checkErr(err, sqlStr)
sqlStr = "create table if not exists " + dbName + "." + supTblName + " (ts timestamp, current float, voltage int, phase float) tags(location binary(64), groupId int);"
_, err = db.Exec(sqlStr)
checkErr(err, sqlStr)
}
func multiThreadCreateTable(threads int, ntables int, dbName string, tablePrefix string) {
st := time.Now().UnixNano()
if (threads < 1) {
threads = 1;
}
a := ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
b := ntables % threads;
last := 0;
endTblId := 0
wg := sync.WaitGroup{}
for i := 0; i < threads; i++ {
startTblId := last
if (i < b ) {
endTblId = last + a
} else {
endTblId = last + a - 1
}
last = endTblId + 1
wg.Add(1)
go createTable(dbName, tablePrefix, startTblId, endTblId, &wg)
}
wg.Wait()
et := time.Now().UnixNano()
fmt.Printf("create tables spent duration: %6.6fs\n", (float32(et-st))/1e9)
}
func createTable(dbName string, childTblPrefix string, startTblId int, endTblId int, wg *sync.WaitGroup) {
//fmt.Printf("subThread[%d]: create table from %d to %d \n", unix.Gettid(), startTblId, endTblId)
// windows.GetCurrentThreadId()
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
for i := startTblId; i <= endTblId; i++ {
sqlStr := "create table if not exists " + dbName + "." + childTblPrefix + strconv.Itoa(i) + " using " + dbName + ".meters tags('" + locations[i%maxLocationSize] + "', " + strconv.Itoa(i) + ");"
//fmt.Printf("sqlStr: %v\n", sqlStr)
_, err = db.Exec(sqlStr)
checkErr(err, sqlStr)
}
wg.Done()
runtime.Goexit()
}
func generateRowData(ts int64) string {
voltage := rand.Int() % 1000
current := 200 + rand.Float32()
phase := rand.Float32()
values := "( " + strconv.FormatInt(ts, 10) + ", " + strconv.FormatFloat(float64(current), 'f', 6, 64) + ", " + strconv.Itoa(voltage) + ", " + strconv.FormatFloat(float64(phase), 'f', 6, 64) + " ) "
return values
}
func insertData(dbName string, childTblPrefix string, startTblId int, endTblId int, wg *sync.WaitGroup) {
//fmt.Printf("subThread[%d]: insert data to table from %d to %d \n", unix.Gettid(), startTblId, endTblId)
// windows.GetCurrentThreadId()
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
tmpTs := configPara.startTs;
//rand.New(rand.NewSource(time.Now().UnixNano()))
for tID := startTblId; tID <= endTblId; tID++{
totalNum := 0
for {
sqlStr := "insert into " + dbName + "." + childTblPrefix + strconv.Itoa(tID) + " values "
currRowNum := 0
for {
tmpTs += 1000
valuesOfRow := generateRowData(tmpTs)
currRowNum += 1
totalNum += 1
sqlStr = fmt.Sprintf("%s %s", sqlStr, valuesOfRow)
if (currRowNum >= configPara.numOfRecordsPerReq || totalNum >= configPara.numOfRecordsPerTable) {
break
}
}
res, err := db.Exec(sqlStr)
checkErr(err, sqlStr)
count, err := res.RowsAffected()
checkErr(err, "rows affected")
if (count != int64(currRowNum)) {
fmt.Printf("insert data, expect affected:%d, actual:%d\n", currRowNum, count)
os.Exit(1)
}
if (totalNum >= configPara.numOfRecordsPerTable) {
break
}
}
}
wg.Done()
runtime.Goexit()
}
func multiThreadInsertData(threads int, ntables int, dbName string, tablePrefix string) {
st := time.Now().UnixNano()
if (threads < 1) {
threads = 1;
}
a := ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
b := ntables % threads;
last := 0;
endTblId := 0
wg := sync.WaitGroup{}
for i := 0; i < threads; i++ {
startTblId := last
if (i < b ) {
endTblId = last + a
} else {
endTblId = last + a - 1
}
last = endTblId + 1
wg.Add(1)
go insertData(dbName, tablePrefix, startTblId , endTblId, &wg)
}
wg.Wait()
et := time.Now().UnixNano()
fmt.Printf("insert data spent duration: %6.6fs\n", (float32(et-st))/1e9)
}
func selectTest(dbName string, tbPrefix string, supTblName string){
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
// select sql 1
limit := 3
offset := 0
sqlStr := "select * from " + dbName + "." + supTblName + " limit " + strconv.Itoa(limit) + " offset " + strconv.Itoa(offset)
rows, err := db.Query(sqlStr)
checkErr(err, sqlStr)
defer rows.Close()
fmt.Printf("query sql: %s\n", sqlStr)
for rows.Next() {
var (
ts string
current float32
voltage int
phase float32
location string
groupid int
)
err := rows.Scan(&ts, &current, &voltage, &phase, &location, &groupid)
if err != nil {
checkErr(err, "rows scan fail")
}
fmt.Printf("ts:%s\t current:%f\t voltage:%d\t phase:%f\t location:%s\t groupid:%d\n", ts, current, voltage, phase, location, groupid)
}
// check iteration error
if rows.Err() != nil {
checkErr(err, "rows next iteration error")
}
// select sql 2
sqlStr = "select avg(voltage), min(voltage), max(voltage) from " + dbName + "." + tbPrefix + strconv.Itoa( rand.Int() % configPara.numOftables)
rows, err = db.Query(sqlStr)
checkErr(err, sqlStr)
defer rows.Close()
fmt.Printf("\nquery sql: %s\n", sqlStr)
for rows.Next() {
var (
voltageAvg float32
voltageMin int
voltageMax int
)
err := rows.Scan(&voltageAvg, &voltageMin, &voltageMax)
if err != nil {
checkErr(err, "rows scan fail")
}
fmt.Printf("avg(voltage):%f\t min(voltage):%d\t max(voltage):%d\n", voltageAvg, voltageMin, voltageMax)
}
// check iteration error
if rows.Err() != nil {
checkErr(err, "rows next iteration error")
}
// select sql 3
sqlStr = "select last(*) from " + dbName + "." + supTblName
rows, err = db.Query(sqlStr)
checkErr(err, sqlStr)
defer rows.Close()
fmt.Printf("\nquery sql: %s\n", sqlStr)
for rows.Next() {
var (
lastTs string
lastCurrent float32
lastVoltage int
lastPhase float32
)
err := rows.Scan(&lastTs, &lastCurrent, &lastVoltage, &lastPhase)
if err != nil {
checkErr(err, "rows scan fail")
}
fmt.Printf("last(ts):%s\t last(current):%f\t last(voltage):%d\t last(phase):%f\n", lastTs, lastCurrent, lastVoltage, lastPhase)
}
// check iteration error
if rows.Err() != nil {
checkErr(err, "rows next iteration error")
}
}
func checkErr(err error, prompt string) {
if err != nil {
fmt.Printf("%s\n", prompt)
panic(err)
}
}

View File

@ -23,35 +23,39 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1538548685000
self.ts = 1593548685000
def run(self):
tdSql.prepare()
tdSql.execute("create table st (ts timestamp, voltage int) tags (loc nchar(30))")
tdSql.execute("insert into t0 using st tags('beijing') values(now, 220) (now - 15d, 221) (now - 30d, 225) (now - 35d, 228) (now - 45d, 222)")
tdSql.execute("insert into t1 using st tags('shanghai') values(now, 220) (now - 60d, 221) (now - 50d, 225) (now - 40d, 228) (now - 20d, 222)")
tdSql.execute("insert into t0 using st tags('beijing') values(%d, 220) (%d, 221) (%d, 225) (%d, 228) (%d, 222)"
% (self.ts, self.ts + 1000000000, self.ts + 2000000000, self.ts + 3000000000, self.ts + 6000000000))
tdSql.execute("insert into t1 using st tags('shanghai') values(%d, 220) (%d, 221) (%d, 225) (%d, 228) (%d, 222)"
% (self.ts, self.ts + 2000000000, self.ts + 4000000000, self.ts + 5000000000, self.ts + 7000000000))
tdSql.query("select avg(voltage) from st interval(1n)")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 223.0)
tdSql.checkData(1, 1, 225.0)
tdSql.checkData(2, 1, 220.333333)
tdSql.checkData(0, 1, 221.4)
tdSql.checkData(1, 1, 227.0)
tdSql.checkData(2, 1, 222.0)
tdSql.query("select avg(voltage) from st interval(1n, 15d)")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 224.8)
tdSql.checkData(1, 1, 222.666666)
tdSql.checkData(2, 1, 220.0)
tdSql.checkRows(4)
tdSql.checkData(0, 1, 220.333333)
tdSql.checkData(1, 1, 224.666666)
tdSql.checkData(2, 1, 225.0)
tdSql.checkData(3, 1, 222.0)
tdSql.query("select avg(voltage) from st interval(1n, 15d) group by loc")
tdSql.checkRows(6)
tdSql.checkData(0, 1, 225.0)
tdSql.checkData(1, 1, 223.0)
tdSql.checkData(2, 1, 220.0)
tdSql.checkData(3, 1, 224.666666)
tdSql.checkData(4, 1, 222.0)
tdSql.checkData(5, 1, 220.0)
tdSql.checkRows(7)
tdSql.checkData(0, 1, 220.5)
tdSql.checkData(1, 1, 226.5)
tdSql.checkData(2, 1, 222.0)
tdSql.checkData(3, 1, 220.0)
tdSql.checkData(4, 1, 221.0)
tdSql.checkData(5, 1, 226.5)
tdSql.checkData(6, 1, 222.0)
def stop(self):
tdSql.close()

View File

@ -14,12 +14,14 @@ $i = 0
$db = $dbPrefix . $i
$stb = $stbPrefix . $i
print use $db
sql use $db
##### select first/last from table
## TBASE-331
print ====== select first/last from table
$tb = $tbPrefix . 0
print select first(*) from $tb
sql select first(*) from $tb
if $rows != 1 then
return -1
@ -58,6 +60,7 @@ if $data09 != NCHAR then
return -1
endi
print select last(*) from $tb
sql select last(*) from $tb
if $rows != 1 then
return -1

View File

@ -94,7 +94,11 @@ endi
## select specified columns
print select c1 from $mt
sql select c1 from $mt
print rows $rows
print totalNum $totalNum
if $rows != $totalNum then
return -1
endi

View File

@ -739,36 +739,22 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
((((int)(*((char *)row[i]))) == 1) ? "1" : "0"));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(value, "%d", (int)(*((char *)row[i])));
sprintf(value, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(value, "%d", (int)(*((short *)row[i])));
sprintf(value, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
sprintf(value, "%d", *((int *)row[i]));
sprintf(value, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(value, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:{
#ifdef _TD_ARM_32_
float fv = 0;
*(int32_t*)(&fv) = *(int32_t*)row[i];
sprintf(value, "%.5f", fv);
#else
sprintf(value, "%.5f", *((float *)row[i]));
#endif
}
case TSDB_DATA_TYPE_FLOAT:
sprintf(value, "%.5f", GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32_
double dv = 0;
*(int64_t*)(&dv) = *(int64_t*)row[i];
sprintf(value, "%.9lf", dv);
#else
sprintf(value, "%.9lf", *((double *)row[i]));
#endif
}
case TSDB_DATA_TYPE_DOUBLE:
sprintf(value, "%.9lf", GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: