diff --git a/alert/cmd/alert/main.go b/alert/cmd/alert/main.go
index f934a11d42..f4c30e156a 100644
--- a/alert/cmd/alert/main.go
+++ b/alert/cmd/alert/main.go
@@ -119,7 +119,7 @@ WantedBy=multi-user.target
return nil
}
-const version = "TDengine alert v2.0.0.0"
+const version = "TDengine alert v2.0.0.1"
func main() {
var (
diff --git a/documentation20/webdocs/markdowndocs/administrator-ch.md b/documentation20/webdocs/markdowndocs/administrator-ch.md
index d6fc649b90..50b388650b 100644
--- a/documentation20/webdocs/markdowndocs/administrator-ch.md
+++ b/documentation20/webdocs/markdowndocs/administrator-ch.md
@@ -131,23 +131,83 @@ TDengine集群中加入一个新的dnode时,涉及集群相关的一些参数
- maxTablesPerVnode: 每个vnode中能够创建的最大表个数。默认值:1000000。
- maxVgroupsPerDb: 每个数据库中能够使用的最大vnode个数。
- arbitrator: 系统中裁决器的end point,缺省为空
-- timezone:时区。从系统中动态获取当前的时区设置。
-- locale:系统区位信息及编码格式。系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置。
-- charset:字符集编码。系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置。
+- timezone、locale、charset 的配置见客户端配置。
## 客户端配置
-TDengine系统的前台交互客户端应用程序为taos,它与taosd共享同一个配置文件taos.cfg。运行taos时,使用参数-c指定配置文件目录,如taos -c /home/cfg,表示使用/home/cfg/目录下的taos.cfg配置文件中的参数,缺省目录是/etc/taos。更多taos的使用方法请见[Shell命令行程序](#_TDengine_Shell命令行程序)。本节主要讲解taos客户端应用在配置文件taos.cfg文件中使用到的参数。
+TDengine系统的前台交互客户端应用程序为taos,它与taosd共享同一个配置文件taos.cfg。运行taos时,使用参数-c指定配置文件目录,如taos -c /home/cfg,表示使用/home/cfg/目录下的taos.cfg配置文件中的参数,缺省目录是/etc/taos。本节主要说明 taos 客户端应用在配置文件 taos.cfg 文件中使用到的参数。
-客户端配置参数列表及解释
+客户端配置参数
- firstEp: taos启动时,主动连接的集群中第一个taosd实例的end point, 缺省值为 localhost:6030。
- secondEp: taos启动时,如果first连接不上,尝试连接集群中第二个taosd实例的end point, 缺省值为空。
-- charset:字符集编码。系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置。
-- locale:系统区位信息及编码格式。系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置。
-- maxBinaryDisplayWidth:Shell中binary 和 nchar字段的显示宽度上限,超过此限制的部分将被隐藏。默认值:30。可在 shell 中通过命令 set max_binary_display_width *nn* 动态修改此选项。
+- locale
-日志的配置参数,与server的配置参数完全一样。
+ > 默认值:系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置
+
+TDengine为存储中文、日文、韩文等非ASCII编码的宽字符,提供一种专门的字段类型nchar。写入nchar字段的数据将统一采用UCS4-LE格式进行编码并发送到服务器。需要注意的是,编码正确性是客户端来保证。因此,如果用户想要正常使用nchar字段来存储诸如中文、日文、韩文等非ASCII字符,需要正确设置客户端的编码格式。
+
+客户端的输入的字符均采用操作系统当前默认的编码格式,在Linux系统上多为UTF-8,部分中文系统编码则可能是GB18030或GBK等。在docker环境中默认的编码是POSIX。在中文版Windows系统中,编码则是CP936。客户端需要确保正确设置自己所使用的字符集,即客户端运行的操作系统当前编码字符集,才能保证nchar中的数据正确转换为UCS4-LE编码格式。
+
+在 Linux 中 locale 的命名规则为: <语言>_<地区>.<字符集编码> 如:zh_CN.UTF-8,zh代表中文,CN代表大陆地区,UTF-8表示字符集。字符集编码为客户端正确解析本地字符串提供编码转换的说明。Linux系统与 Mac OSX 系统可以通过设置locale来确定系统的字符编码,由于Windows使用的locale中不是POSIX标准的locale格式,因此在Windows下需要采用另一个配置参数charset来指定字符编码。在Linux 系统中也可以使用charset来指定字符编码。
+
+- charset
+
+ > 默认值:系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置
+
+如果配置文件中不设置charset,在Linux系统中,taos在启动时候,自动读取系统当前的locale信息,并从locale信息中解析提取charset编码格式。如果自动读取locale信息失败,则尝试读取charset配置,如果读取charset配置也失败,则中断启动过程。
+
+在Linux系统中,locale信息包含了字符编码信息,因此正确设置了Linux系统locale以后可以不用再单独设置charset。例如:
+```
+ locale zh_CN.UTF-8
+```
+在Windows系统中,无法从locale获取系统当前编码。如果无法从配置文件中读取字符串编码信息,taos默认设置为字符编码为CP936。其等效在配置文件中添加如下配置:
+```
+ charset CP936
+```
+如果需要调整字符编码,请查阅当前操作系统使用的编码,并在配置文件中正确设置。
+
+在Linux系统中,如果用户同时设置了locale和字符集编码charset,并且locale和charset的不一致,后设置的值将覆盖前面设置的值。
+```
+ locale zh_CN.UTF-8
+ charset GBK
+```
+则charset的有效值是GBK。
+```
+ charset GBK
+ locale zh_CN.UTF-8
+```
+charset的有效值是UTF-8。
+
+日志的配置参数,与server 的配置参数完全一样。
+
+- timezone
+
+ 默认值:从系统中动态获取当前的时区设置
+
+客户端运行系统所在的时区。为应对多时区的数据写入和查询问题,TDengine 采用 Unix 时间戳(Unix Timestamp)来记录和存储时间戳。Unix 时间戳的特点决定了任一时刻不论在任何时区,产生的时间戳均一致。需要注意的是,Unix时间戳是在客户端完成转换和记录。为了确保客户端其他形式的时间转换为正确的 Unix 时间戳,需要设置正确的时区。
+
+在Linux系统中,客户端会自动读取系统设置的时区信息。用户也可以采用多种方式在配置文件设置时区。例如:
+```
+ timezone UTC-8
+ timezone GMT-8
+ timezone Asia/Shanghai
+```
+均是合法的设置东八区时区的格式。
+
+时区的设置对于查询和写入SQL语句中非Unix时间戳的内容(时间戳字符串、关键词now的解析)产生影响。例如:
+```
+ SELECT count(*) FROM table_name WHERE TS<'2019-04-11 12:01:08';
+```
+在东八区,SQL语句等效于
+```
+ SELECT count(*) FROM table_name WHERE TS<1554955268000;
+```
+在UTC时区,SQL语句等效于
+```
+ SELECT count(*) FROM table_name WHERE TS<1554984068000;
+```
+为了避免使用字符串时间格式带来的不确定性,也可以直接使用Unix时间戳。此外,还可以在SQL语句中使用带有时区的时间戳字符串,例如:RFC3339格式的时间戳字符串,2013-04-12T15:52:01.123+08:00或者ISO-8601格式时间戳字符串2013-04-12T15:52:01.123+0800。上述两个字符串转化为Unix时间戳不受系统所在时区的影响。
启动taos时,也可以从命令行指定一个taosd实例的end point,否则就从taos.cfg读取。
diff --git a/documentation20/webdocs/markdowndocs/cluster-ch.md b/documentation20/webdocs/markdowndocs/cluster-ch.md
index 097433a18a..a1ac1d6fd6 100644
--- a/documentation20/webdocs/markdowndocs/cluster-ch.md
+++ b/documentation20/webdocs/markdowndocs/cluster-ch.md
@@ -2,13 +2,15 @@
多个taosd的运行实例可以组成一个集群,以保证TDengine的高可靠运行,并提供水平扩展能力。要了解TDengine 2.0的集群管理,需要对集群的基本概念有所了解,请看TDengine 2.0整体架构一章。而且在安装集群之前,请按照[《立即开始》](https://www.taosdata.com/cn/getting-started20/)一章安装并体验过单节点功能。
-集群的每个节点是由End Point来唯一标识的,End Point是由FQDN(Fully Qualified Domain Name)外加Port组成,比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname,可通过Linux命令`hostname -f`获取。端口是这个节点对外服务的端口号,缺省是6030,但可以通过taos.cfg里配置参数serverPort进行修改。一个节点可能配置了多个hostname, TDengine会自动获取第一个,但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问,可以将参数fqdn设置为本节点的IP地址。
+集群的每个节点是由End Point来唯一标识的,End Point是由FQDN(Fully Qualified Domain Name)外加Port组成,比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname,可通过Linux命令`hostname -f`获取,FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)。端口是这个节点对外服务的端口号,缺省是6030,但可以通过taos.cfg里配置参数serverPort进行修改。一个节点可能配置了多个hostname, TDengine会自动获取第一个,但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问,可以将参数fqdn设置为本节点的IP地址。
TDengine的集群管理极其简单,除添加和删除节点需要人工干预之外,其他全部是自动完成,最大程度的降低了运维的工作量。本章对集群管理的操作做详细的描述。
## 准备工作
**第一步**:如果搭建集群的节点中,存有之前的测试数据、装过1.X的版本,或者装过其他版本的TDengine,请先将其删除,并清空所有数据,具体步骤请参考博客[《TDengine多种安装包的安装和卸载》](https://www.taosdata.com/blog/2019/08/09/566.html )
+**注意1:**因为FQDN的信息会写进文件,如果之前没有配置或者更改FQDN,且启动了TDengine。请一定在确保数据无用或者备份的前提下,清理一下之前的数据(rm -rf /var/lib/taos/);
+**注意2:**客户端也需要配置,确保它可以正确解析每个节点的FQDN配置,不管是通过DNS服务,还是 Host 文件。
**第二步**:建议关闭防火墙,至少保证端口:6030 - 6042的TCP和UDP端口都是开放的。**强烈建议**先关闭防火墙,集群搭建完毕之后,再来配置端口;
@@ -23,7 +25,7 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预
**第五步**:修改TDengine的配置文件(所有节点的文件/etc/taos/taos.cfg都需要修改)。假设准备启动的第一个节点End Point为 h1.taosdata.com:6030, 那么以下几个参数与集群相关:
```
-// firstEp 是每个节点启动后连接的第一个节点
+// firstEp 集群中所有节点的配置都是一致的,对其第一次访问后,就获得了整个集群的信息
firstEp h1.taosdata.com:6030
// 配置本节点的FQDN,如果本机只有一个hostname, 无需配置
@@ -32,7 +34,7 @@ fqdn h1.taosdata.com
// 配置本节点的端口号,缺省是6030
serverPort 6030
-// 副本数为偶数的时候,需要配置,请参考《Arbitrator的使用》的部分
+// 服务端节点数为偶数的时候,需要配置,请参考《Arbitrator的使用》的部分
arbitrator ha.taosdata.com:6042
```
diff --git a/documentation20/webdocs/markdowndocs/faq-ch.md b/documentation20/webdocs/markdowndocs/faq-ch.md
index e4bb920dd3..7bbf7531c8 100644
--- a/documentation20/webdocs/markdowndocs/faq-ch.md
+++ b/documentation20/webdocs/markdowndocs/faq-ch.md
@@ -32,7 +32,7 @@
3. 在服务器,执行 `systemctl status taosd` 检查*taosd*运行状态。如果没有运行,启动*taosd*
-4. 确认客户端连接时指定了正确的服务器FQDN (Fully Qualified Domain Name(可在服务器上执行Linux命令hostname -f获得)
+4. 确认客户端连接时指定了正确的服务器FQDN (Fully Qualified Domain Name(可在服务器上执行Linux命令hostname -f获得)),FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)。
5. ping服务器FQDN,如果没有反应,请检查你的网络,DNS设置,或客户端所在计算机的系统hosts文件
@@ -54,7 +54,7 @@
## 6. 遇到错误“Unexpected generic error in RPC”或者"TDengine Error: Unable to resolve FQDN", 我怎么办?
产生这个错误,是由于客户端或数据节点无法解析FQDN(Fully Qualified Domain Name)导致。对于TAOS Shell或客户端应用,请做如下检查:
-1. 请检查连接的服务器的FQDN是否正确
+1. 请检查连接的服务器的FQDN是否正确,FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)。
2. 如果网络配置有DNS server, 请检查是否正常工作
3. 如果网络没有配置DNS server, 请检查客户端所在机器的hosts文件,查看该FQDN是否配置,并是否有正确的IP地址。
4. 如果网络配置OK,从客户端所在机器,你需要能Ping该连接的FQDN,否则客户端是无法链接服务器的
diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c
index ae2013cd2b..c9fe5fbd2c 100644
--- a/src/client/src/tscServer.c
+++ b/src/client/src/tscServer.c
@@ -2090,6 +2090,10 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
+ if (pRetrieve == NULL) {
+ pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
+ return pRes->code;
+ }
pRes->numOfRows = htonl(pRetrieve->numOfRows);
pRes->precision = htons(pRetrieve->precision);
diff --git a/src/connector/jdbc/deploy-pom.xml b/src/connector/jdbc/deploy-pom.xml
index b73bb010e0..766a58f9ba 100755
--- a/src/connector/jdbc/deploy-pom.xml
+++ b/src/connector/jdbc/deploy-pom.xml
@@ -93,14 +93,13 @@
3.6.1
UTF-8
- 11
- 11
+ 8
+ 8
true
true
-
org.apache.maven.plugins
maven-source-plugin
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
index 86d179eae4..d8df2fc0d3 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
@@ -57,9 +57,9 @@ public class TSDBConnection implements Connection {
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0];
List endpoints = loadConfigEndpoints(cfgFile);
- if (!endpoints.isEmpty()){
- info.setProperty(TSDBDriver.PROPERTY_KEY_HOST,endpoints.get(0).split(":")[0]);
- info.setProperty(TSDBDriver.PROPERTY_KEY_PORT,endpoints.get(0).split(":")[1]);
+ if (!endpoints.isEmpty()) {
+ info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
+ info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
}
//load taos.cfg end
@@ -69,15 +69,15 @@ public class TSDBConnection implements Connection {
info.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
}
- private List loadConfigEndpoints(File cfgFile){
+ private List loadConfigEndpoints(File cfgFile) {
List endpoints = new ArrayList<>();
- try(BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
+ try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
String line = null;
- while ((line = reader.readLine())!=null){
- if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")){
- endpoints.add(line.substring(line.indexOf('p')+1).trim());
+ while ((line = reader.readLine()) != null) {
+ if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
+ endpoints.add(line.substring(line.indexOf('p') + 1).trim());
}
- if (endpoints.size()>1)
+ if (endpoints.size() > 1)
break;
}
} catch (FileNotFoundException e) {
@@ -91,7 +91,7 @@ public class TSDBConnection implements Connection {
/**
* @param cfgDirPath
* @return return the config dir
- * **/
+ **/
private File loadConfigDir(String cfgDirPath) {
if (cfgDirPath == null)
return loadDefaultConfigDir();
@@ -103,8 +103,8 @@ public class TSDBConnection implements Connection {
/**
* @return search the default config dir, if the config dir is not exist will return null
- * */
- private File loadDefaultConfigDir(){
+ */
+ private File loadDefaultConfigDir() {
File cfgDir;
File cfgDir_linux = new File("/etc/taos");
cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
@@ -132,7 +132,9 @@ public class TSDBConnection implements Connection {
public Statement createStatement() throws SQLException {
if (!this.connector.isClosed()) {
- return new TSDBStatement(this.connector);
+ TSDBStatement statement = new TSDBStatement(this, this.connector);
+ statement.setConnection(this);
+ return statement;
} else {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
@@ -153,7 +155,7 @@ public class TSDBConnection implements Connection {
public PreparedStatement prepareStatement(String sql) throws SQLException {
if (!this.connector.isClosed()) {
- return new TSDBPreparedStatement(this.connector, sql);
+ return new TSDBPreparedStatement(this, this.connector, sql);
} else {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBPreparedStatement.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBPreparedStatement.java
index 6097ad9c43..230943fd53 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBPreparedStatement.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBPreparedStatement.java
@@ -42,8 +42,8 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
private SavedPreparedStatement savedPreparedStatement;
- TSDBPreparedStatement(TSDBJNIConnector connecter, String sql) {
- super(connecter);
+ TSDBPreparedStatement(TSDBConnection connection, TSDBJNIConnector connecter, String sql) {
+ super(connection, connecter);
init(sql);
}
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java
index 06ae449596..a8d6ceb713 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java
@@ -19,153 +19,164 @@ import java.util.ArrayList;
import java.util.List;
public class TSDBStatement implements Statement {
- private TSDBJNIConnector connecter = null;
+ private TSDBJNIConnector connecter = null;
- /** To store batched commands */
- protected List batchedArgs;
+ /**
+ * To store batched commands
+ */
+ protected List batchedArgs;
- /** Timeout for a query */
- protected int queryTimeout = 0;
+ /**
+ * Timeout for a query
+ */
+ protected int queryTimeout = 0;
- private Long pSql = 0l;
+ private Long pSql = 0l;
/**
* Status of current statement
*/
- private boolean isClosed = true;
- private int affectedRows = 0;
+ private boolean isClosed = true;
+ private int affectedRows = 0;
- TSDBStatement(TSDBJNIConnector connecter) {
- this.connecter = connecter;
- this.isClosed = false;
- }
+ private TSDBConnection connection;
- public T unwrap(Class iface) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public void setConnection(TSDBConnection connection) {
+ this.connection = connection;
+ }
- public boolean isWrapperFor(Class> iface) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ TSDBStatement(TSDBConnection connection, TSDBJNIConnector connecter) {
+ this.connection = connection;
+ this.connecter = connecter;
+ this.isClosed = false;
+ }
- public ResultSet executeQuery(String sql) throws SQLException {
+ public T unwrap(Class iface) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
+
+ public boolean isWrapperFor(Class> iface) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
+
+ public ResultSet executeQuery(String sql) throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
// TODO make sure it is not a update query
- pSql = this.connecter.executeQuery(sql);
+ pSql = this.connecter.executeQuery(sql);
- long resultSetPointer = this.connecter.getResultSet();
+ long resultSetPointer = this.connecter.getResultSet();
- if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
- this.connecter.freeResultSet(pSql);
- throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
- }
+ if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
+ this.connecter.freeResultSet(pSql);
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
// create/insert/update/delete/alter
- if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
- this.connecter.freeResultSet(pSql);
- return null;
- }
+ if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
+ this.connecter.freeResultSet(pSql);
+ return null;
+ }
- if (!this.connecter.isUpdateQuery(pSql)) {
- return new TSDBResultSet(this.connecter, resultSetPointer);
- } else {
- this.connecter.freeResultSet(pSql);
- return null;
- }
+ if (!this.connecter.isUpdateQuery(pSql)) {
+ return new TSDBResultSet(this.connecter, resultSetPointer);
+ } else {
+ this.connecter.freeResultSet(pSql);
+ return null;
+ }
- }
+ }
- public int executeUpdate(String sql) throws SQLException {
+ public int executeUpdate(String sql) throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
// TODO check if current query is update query
- pSql = this.connecter.executeQuery(sql);
- long resultSetPointer = this.connecter.getResultSet();
+ pSql = this.connecter.executeQuery(sql);
+ long resultSetPointer = this.connecter.getResultSet();
- if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
- this.connecter.freeResultSet(pSql);
- throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
- }
+ if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
+ this.connecter.freeResultSet(pSql);
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
- this.affectedRows = this.connecter.getAffectedRows(pSql);
- this.connecter.freeResultSet(pSql);
+ this.affectedRows = this.connecter.getAffectedRows(pSql);
+ this.connecter.freeResultSet(pSql);
- return this.affectedRows;
- }
+ return this.affectedRows;
+ }
- public String getErrorMsg(long pSql) {
- return this.connecter.getErrMsg(pSql);
- }
+ public String getErrorMsg(long pSql) {
+ return this.connecter.getErrMsg(pSql);
+ }
- public void close() throws SQLException {
+ public void close() throws SQLException {
if (!isClosed) {
if (!this.connecter.isResultsetClosed()) {
this.connecter.freeResultSet();
}
isClosed = true;
}
- }
+ }
- public int getMaxFieldSize() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public int getMaxFieldSize() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public void setMaxFieldSize(int max) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public void setMaxFieldSize(int max) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public int getMaxRows() throws SQLException {
- // always set maxRows to zero, meaning unlimitted rows in a resultSet
- return 0;
- }
+ public int getMaxRows() throws SQLException {
+ // always set maxRows to zero, meaning unlimitted rows in a resultSet
+ return 0;
+ }
- public void setMaxRows(int max) throws SQLException {
- // always set maxRows to zero, meaning unlimitted rows in a resultSet
- }
+ public void setMaxRows(int max) throws SQLException {
+ // always set maxRows to zero, meaning unlimitted rows in a resultSet
+ }
- public void setEscapeProcessing(boolean enable) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public int getQueryTimeout() throws SQLException {
- return queryTimeout;
- }
+ public int getQueryTimeout() throws SQLException {
+ return queryTimeout;
+ }
- public void setQueryTimeout(int seconds) throws SQLException {
- this.queryTimeout = seconds;
- }
+ public void setQueryTimeout(int seconds) throws SQLException {
+ this.queryTimeout = seconds;
+ }
- public void cancel() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public void cancel() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public SQLWarning getWarnings() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public SQLWarning getWarnings() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public void clearWarnings() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public void clearWarnings() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public void setCursorName(String name) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public void setCursorName(String name) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public boolean execute(String sql) throws SQLException {
+ public boolean execute(String sql) throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
- boolean res = true;
- pSql = this.connecter.executeQuery(sql);
- long resultSetPointer = this.connecter.getResultSet();
+ boolean res = true;
+ pSql = this.connecter.executeQuery(sql);
+ long resultSetPointer = this.connecter.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
- this.connecter.freeResultSet(pSql);
+ this.connecter.freeResultSet(pSql);
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
// no result set is retrieved
@@ -173,145 +184,147 @@ public class TSDBStatement implements Statement {
res = false;
}
- return res;
- }
+ return res;
+ }
- public ResultSet getResultSet() throws SQLException {
+ public ResultSet getResultSet() throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
- long resultSetPointer = connecter.getResultSet();
- TSDBResultSet resSet = null;
+ long resultSetPointer = connecter.getResultSet();
+ TSDBResultSet resSet = null;
if (resultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
resSet = new TSDBResultSet(connecter, resultSetPointer);
}
- return resSet;
- }
+ return resSet;
+ }
- public int getUpdateCount() throws SQLException {
+ public int getUpdateCount() throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
- return this.affectedRows;
- }
+ return this.affectedRows;
+ }
- public boolean getMoreResults() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public boolean getMoreResults() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public void setFetchDirection(int direction) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public void setFetchDirection(int direction) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public int getFetchDirection() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public int getFetchDirection() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- /*
- * used by spark
- */
- public void setFetchSize(int rows) throws SQLException {
- }
+ /*
+ * used by spark
+ */
+ public void setFetchSize(int rows) throws SQLException {
+ }
- /*
- * used by spark
- */
- public int getFetchSize() throws SQLException {
- return 4096;
- }
+ /*
+ * used by spark
+ */
+ public int getFetchSize() throws SQLException {
+ return 4096;
+ }
- public int getResultSetConcurrency() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public int getResultSetConcurrency() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public int getResultSetType() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public int getResultSetType() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public void addBatch(String sql) throws SQLException {
- if (batchedArgs == null) {
- batchedArgs = new ArrayList();
- }
- batchedArgs.add(sql);
- }
+ public void addBatch(String sql) throws SQLException {
+ if (batchedArgs == null) {
+ batchedArgs = new ArrayList();
+ }
+ batchedArgs.add(sql);
+ }
- public void clearBatch() throws SQLException {
- batchedArgs.clear();
- }
+ public void clearBatch() throws SQLException {
+ batchedArgs.clear();
+ }
- public int[] executeBatch() throws SQLException {
+ public int[] executeBatch() throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
- if (batchedArgs == null) {
- throw new SQLException(TSDBConstants.WrapErrMsg("Batch is empty!"));
- } else {
- int[] res = new int[batchedArgs.size()];
- for (int i = 0; i < batchedArgs.size(); i++) {
- res[i] = executeUpdate(batchedArgs.get(i));
- }
- return res;
- }
- }
+ if (batchedArgs == null) {
+ throw new SQLException(TSDBConstants.WrapErrMsg("Batch is empty!"));
+ } else {
+ int[] res = new int[batchedArgs.size()];
+ for (int i = 0; i < batchedArgs.size(); i++) {
+ res[i] = executeUpdate(batchedArgs.get(i));
+ }
+ return res;
+ }
+ }
- public Connection getConnection() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public Connection getConnection() throws SQLException {
+ if (this.connecter != null)
+ return this.connection;
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public boolean getMoreResults(int current) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public boolean getMoreResults(int current) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public ResultSet getGeneratedKeys() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public ResultSet getGeneratedKeys() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public int executeUpdate(String sql, String[] columnNames) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public boolean execute(String sql, int[] columnIndexes) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public boolean execute(String sql, String[] columnNames) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public boolean execute(String sql, String[] columnNames) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public int getResultSetHoldability() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public int getResultSetHoldability() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public boolean isClosed() throws SQLException {
- return isClosed;
- }
+ public boolean isClosed() throws SQLException {
+ return isClosed;
+ }
- public void setPoolable(boolean poolable) throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public void setPoolable(boolean poolable) throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public boolean isPoolable() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public boolean isPoolable() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public void closeOnCompletion() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public void closeOnCompletion() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
- public boolean isCloseOnCompletion() throws SQLException {
- throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
- }
+ public boolean isCloseOnCompletion() throws SQLException {
+ throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
+ }
}
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/SqlSyntaxValidator.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/SqlSyntaxValidator.java
index 7e144cbe0f..066dfad5d5 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/SqlSyntaxValidator.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/SqlSyntaxValidator.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
public class SqlSyntaxValidator {
private TSDBConnection tsdbConnection;
+
public SqlSyntaxValidator(Connection connection) {
this.tsdbConnection = (TSDBConnection) connection;
}
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BaseTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BaseTest.java
index 6c3437186f..b793a47c99 100644
--- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BaseTest.java
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BaseTest.java
@@ -8,8 +8,7 @@ import org.junit.BeforeClass;
public class BaseTest {
private static boolean testCluster = false;
- private static TDNodes nodes = new TDNodes();
-
+ private static TDNodes nodes = new TDNodes();
@BeforeClass
public static void setupEnv() {
@@ -19,11 +18,9 @@ public class BaseTest {
nodes.getTDNode(1).setRunning(1);
nodes.stop(1);
}
-
nodes.setTestCluster(testCluster);
nodes.deploy(1);
- nodes.start(1);
-
+ nodes.start(1);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BatchInsertTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BatchInsertTest.java
index c49293c96b..7d96cbb538 100644
--- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BatchInsertTest.java
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/BatchInsertTest.java
@@ -7,13 +7,11 @@ import org.junit.Test;
import java.sql.*;
import java.util.Properties;
import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
-import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.*;
-
-import static org.junit.Assert.assertTrue;
public class BatchInsertTest extends BaseTest {
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SelectTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SelectTest.java
index 1844a92b47..6c75860e0f 100644
--- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SelectTest.java
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SelectTest.java
@@ -34,7 +34,6 @@ public class SelectTest extends BaseTest {
statement.executeUpdate("drop database if exists " + dbName);
statement.executeUpdate("create database if not exists " + dbName);
statement.executeUpdate("create table if not exists " + dbName + "." + tName + " (ts timestamp, k int, v int)");
-
}
@Test
@@ -66,6 +65,5 @@ public class SelectTest extends BaseTest {
statement.close();
connection.close();
Thread.sleep(10);
-
}
}
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java
index 07b43d1227..0a71c77d1d 100644
--- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java
@@ -10,8 +10,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
-import static org.junit.Assert.assertTrue;
-
public class SubscribeTest extends BaseTest {
Connection connection = null;
Statement statement = null;
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/BatchInsertTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/BatchInsertTest.java
new file mode 100644
index 0000000000..9608c4985d
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/BatchInsertTest.java
@@ -0,0 +1,108 @@
+package com.taosdata.jdbc.cases;
+
+import com.taosdata.jdbc.lib.TSDBCommon;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class BatchInsertTest {
+
+ static String host = "localhost";
+ static String dbName = "test";
+ static String stbName = "meters";
+ static int numOfTables = 30;
+ final static int numOfRecordsPerTable = 1000;
+ static long ts = 1496732686000l;
+ final static String tablePrefix = "t";
+
+ private Connection connection;
+
+ @Before
+ public void before() {
+ try {
+ connection = TSDBCommon.getConn(host);
+ TSDBCommon.createDatabase(connection, dbName);
+ TSDBCommon.createStable(connection, stbName);
+ TSDBCommon.createTables(connection, numOfTables, stbName, tablePrefix);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testBatchInsert(){
+ ExecutorService executorService = Executors.newFixedThreadPool(numOfTables);
+ for (int i = 0; i < numOfTables; i++) {
+ final int index = i;
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long startTime = System.currentTimeMillis();
+ Statement statement = connection.createStatement(); // get statement
+ StringBuilder sb = new StringBuilder();
+ sb.append("INSERT INTO " + tablePrefix + index + " VALUES");
+ Random rand = new Random();
+ for (int j = 1; j <= numOfRecordsPerTable; j++) {
+ sb.append("(" + (ts + j) + ", ");
+ sb.append(rand.nextInt(100) + ", ");
+ sb.append(rand.nextInt(100) + ", ");
+ sb.append(rand.nextInt(100) + ")");
+ }
+ statement.addBatch(sb.toString());
+ statement.executeBatch();
+ long endTime = System.currentTimeMillis();
+ System.out.println("Thread " + index + " takes " + (endTime - startTime) + " microseconds");
+ connection.commit();
+ statement.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ try{
+ Statement statement = connection.createStatement();
+ ResultSet rs = statement.executeQuery("select * from meters");
+ int num = 0;
+ while (rs.next()) {
+ num++;
+ }
+ assertEquals(num, numOfTables * numOfRecordsPerTable);
+ rs.close();
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ }
+
+ @After
+ public void after() {
+ try {
+ if (connection != null)
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+}
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/lib/TSDBCommon.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/lib/TSDBCommon.java
new file mode 100644
index 0000000000..0e2613d617
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/lib/TSDBCommon.java
@@ -0,0 +1,47 @@
+package com.taosdata.jdbc.lib;
+
+import com.taosdata.jdbc.TSDBDriver;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+public class TSDBCommon {
+
+ public static Connection getConn(String host) throws SQLException, ClassNotFoundException {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
+ return DriverManager.getConnection("jdbc:TAOS://" + host + ":0/", properties);
+ }
+
+ public static void createDatabase(Connection connection, String dbName) throws SQLException {
+ Statement statement = connection.createStatement();
+ statement.executeUpdate("drop database if exists " + dbName);
+ statement.executeUpdate("create database if not exists " + dbName);
+ statement.executeUpdate("use " + dbName);
+ statement.close();
+ }
+
+ public static void createStable(Connection connection, String stbName) throws SQLException {
+ Statement statement = connection.createStatement();
+ String createTableSql = "create table " + stbName + "(ts timestamp, f1 int, f2 int, f3 int) tags(areaid int, loc binary(20))";
+ statement.executeUpdate(createTableSql);
+ statement.close();
+ }
+
+ public static void createTables(Connection connection, int numOfTables, String stbName,String tablePrefix) throws SQLException {
+ Statement statement = connection.createStatement();
+ for(int i = 0; i < numOfTables; i++) {
+ String loc = i % 2 == 0 ? "beijing" : "shanghai";
+ String createSubTalbesSql = "create table " + tablePrefix + i + " using " + stbName + " tags(" + i + ", '" + loc + "')";
+ statement.executeUpdate(createSubTalbesSql);
+ }
+ statement.close();
+ }
+}
diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c
index e1d298089c..0a5b9b550c 100644
--- a/src/dnode/src/dnodeModule.c
+++ b/src/dnode/src/dnodeModule.c
@@ -16,11 +16,13 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
+#include "taosmsg.h"
#include "tglobal.h"
#include "mnode.h"
#include "http.h"
#include "tmqtt.h"
#include "monitor.h"
+#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeModule.h"
@@ -129,17 +131,34 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
for (int32_t module = TSDB_MOD_MNODE; module < TSDB_MOD_HTTP; ++module) {
bool enableModule = moduleStatus & (1 << module);
if (!tsModule[module].enable && enableModule) {
- dInfo("module status:%u is received, start %s module", tsModuleStatus, tsModule[module].name);
+ dInfo("module status:%u is set, start %s module", moduleStatus, tsModule[module].name);
tsModule[module].enable = true;
dnodeSetModuleStatus(module);
(*tsModule[module].startFp)();
}
if (tsModule[module].enable && !enableModule) {
- dInfo("module status:%u is received, stop %s module", tsModuleStatus, tsModule[module].name);
+ dInfo("module status:%u is set, stop %s module", moduleStatus, tsModule[module].name);
tsModule[module].enable = false;
dnodeUnSetModuleStatus(module);
(*tsModule[module].stopFp)();
}
}
}
+
+bool dnodeCheckMnodeStarting() {
+ if (tsModuleStatus & TSDB_MOD_MNODE) return false;
+
+ SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
+ for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
+ SDMMnodeInfo *node = &mnodes->nodeInfos[i];
+ if (node->nodeId == dnodeGetDnodeId()) {
+ uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);;
+ dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
+ dnodeProcessModuleStatus(moduleStatus);
+ return true;
+ }
+ }
+
+ return false;
+}
diff --git a/src/inc/dnode.h b/src/inc/dnode.h
index 093ce93205..fda9c1c1dd 100644
--- a/src/inc/dnode.h
+++ b/src/inc/dnode.h
@@ -43,6 +43,7 @@ void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeEpSetForShell(void *epSet);
void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId();
+bool dnodeCheckMnodeStarting();
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
diff --git a/src/inc/tsync.h b/src/inc/tsync.h
index ff9c9901bd..ca0f70d104 100644
--- a/src/inc/tsync.h
+++ b/src/inc/tsync.h
@@ -103,6 +103,9 @@ typedef struct {
typedef void* tsync_h;
+int32_t syncInit();
+void syncCleanUp();
+
tsync_h syncStart(const SSyncInfo *);
void syncStop(tsync_h shandle);
int32_t syncReconfig(tsync_h shandle, const SSyncCfg *);
diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c
index 8928a6622d..646c17b2b8 100644
--- a/src/mnode/src/mnodeSdb.c
+++ b/src/mnode/src/mnodeSdb.c
@@ -22,6 +22,7 @@
#include "tqueue.h"
#include "twal.h"
#include "tsync.h"
+#include "ttimer.h"
#include "tglobal.h"
#include "dnode.h"
#include "mnode.h"
@@ -64,6 +65,7 @@ typedef struct _SSdbTable {
int32_t (*encodeFp)(SSdbOper *pOper);
int32_t (*destroyFp)(SSdbOper *pOper);
int32_t (*restoredFp)();
+ pthread_mutex_t mutex;
} SSdbTable;
typedef struct {
@@ -88,6 +90,8 @@ typedef struct {
SSdbWriteWorker *writeWorker;
} SSdbWriteWorkerPool;
+extern void * tsMnodeTmr;
+static void * tsUpdateSyncTmr;
static SSdbObject tsSdbObj = {0};
static taos_qset tsSdbWriteQset;
static taos_qall tsSdbWriteQall;
@@ -290,11 +294,17 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
taosFreeQitem(pOper);
}
+static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(); }
+
void sdbUpdateSync() {
if (!mnodeIsRunning()) {
mDebug("mnode not start yet, update sync info later");
+ if (dnodeCheckMnodeStarting()) {
+ taosTmrReset(sdbUpdateSyncTmrFp, 1000, NULL, tsMnodeTmr, &tsUpdateSyncTmr);
+ }
return;
}
+ mDebug("update sync info in sdb");
SSyncCfg syncCfg = {0};
int32_t index = 0;
@@ -387,8 +397,6 @@ int32_t sdbInit() {
tsSdbObj.role = TAOS_SYNC_ROLE_MASTER;
}
- sdbUpdateSync();
-
tsSdbObj.status = SDB_STATUS_SERVING;
return TSDB_CODE_SUCCESS;
}
@@ -448,8 +456,9 @@ static void *sdbGetRowMeta(SSdbTable *pTable, void *key) {
}
void **ppRow = (void **)taosHashGet(pTable->iHandle, key, keySize);
- if (ppRow == NULL) return NULL;
- return *ppRow;
+ if (ppRow != NULL) return *ppRow;
+
+ return NULL;
}
static void *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
@@ -457,13 +466,14 @@ static void *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
}
void *sdbGetRow(void *handle, void *key) {
+ SSdbTable *pTable = handle;
+
+ pthread_mutex_lock(&pTable->mutex);
void *pRow = sdbGetRowMeta(handle, key);
- if (pRow) {
- sdbIncRef(handle, pRow);
- return pRow;
- } else {
- return NULL;
- }
+ if (pRow) sdbIncRef(handle, pRow);
+ pthread_mutex_unlock(&pTable->mutex);
+
+ return pRow;
}
static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
@@ -478,7 +488,9 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
keySize = strlen((char *)key);
}
+ pthread_mutex_lock(&pTable->mutex);
taosHashPut(pTable->iHandle, key, keySize, &pOper->pObj, sizeof(int64_t));
+ pthread_mutex_unlock(&pTable->mutex);
sdbIncRef(pTable, pOper->pObj);
atomic_add_fetch_32(&pTable->numOfRows, 1);
@@ -519,7 +531,10 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
keySize = strlen((char *)key);
}
+ pthread_mutex_lock(&pTable->mutex);
taosHashRemove(pTable->iHandle, key, keySize);
+ pthread_mutex_unlock(&pTable->mutex);
+
atomic_sub_fetch_32(&pTable->numOfRows, 1);
sdbDebug("table:%s, delete record:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName,
@@ -861,6 +876,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
if (pTable == NULL) return NULL;
+ pthread_mutex_init(&pTable->mutex, NULL);
tstrncpy(pTable->tableName, pDesc->tableName, SDB_TABLE_LEN);
pTable->keyType = pDesc->keyType;
pTable->tableId = pDesc->tableId;
@@ -908,6 +924,7 @@ void sdbCloseTable(void *handle) {
taosHashDestroyIter(pIter);
taosHashCleanup(pTable->iHandle);
+ pthread_mutex_destroy(&pTable->mutex);
sdbDebug("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables);
free(pTable);
diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c
index da40160bc1..f96b902efd 100644
--- a/src/sync/src/syncMain.c
+++ b/src/sync/src/syncMain.c
@@ -30,23 +30,19 @@
#include "syncInt.h"
// global configurable
-int tsMaxSyncNum = 2;
-int tsSyncTcpThreads = 2;
-int tsMaxWatchFiles = 500;
-int tsMaxFwdInfo = 200;
-int tsSyncTimer = 1;
-//int sDebugFlag = 135;
-//char tsArbitrator[TSDB_FQDN_LEN] = {0};
+int tsMaxSyncNum = 2;
+int tsSyncTcpThreads = 2;
+int tsMaxWatchFiles = 500;
+int tsMaxFwdInfo = 200;
+int tsSyncTimer = 1;
// module global, not configurable
-int tsSyncNum; // number of sync in process in whole system
-char tsNodeFqdn[TSDB_FQDN_LEN];
+int tsSyncNum; // number of sync in process in whole system
+char tsNodeFqdn[TSDB_FQDN_LEN];
-static int tsNodeNum; // number of nodes in system
-static ttpool_h tsTcpPool;
-static void *syncTmrCtrl = NULL;
-static void *vgIdHash;
-static pthread_once_t syncModuleInit = PTHREAD_ONCE_INIT;
+static ttpool_h tsTcpPool;
+static void * syncTmrCtrl = NULL;
+static void * vgIdHash;
// local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
@@ -75,7 +71,7 @@ char* syncRole[] = {
"master"
};
-static void syncModuleInitFunc() {
+int32_t syncInit() {
SPoolInfo info;
info.numOfThreads = tsSyncTcpThreads;
@@ -87,25 +83,52 @@ static void syncModuleInitFunc() {
info.processIncomingConn = syncProcessIncommingConnection;
tsTcpPool = taosOpenTcpThreadPool(&info);
- if (tsTcpPool == NULL) return;
+ if (tsTcpPool == NULL) {
+ sError("failed to init tcpPool");
+ return -1;
+ }
syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
if (syncTmrCtrl == NULL) {
+ sError("failed to init tmrCtrl");
taosCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
- return;
+ return -1;
}
-
+
vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
if (vgIdHash == NULL) {
+ sError("failed to init vgIdHash");
taosTmrCleanUp(syncTmrCtrl);
taosCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
syncTmrCtrl = NULL;
- return;
- }
+ return -1;
+ }
tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
+ sInfo("sync module initialized successfully");
+
+ return 0;
+}
+
+void syncCleanUp() {
+ if (tsTcpPool) {
+ taosCloseTcpThreadPool(tsTcpPool);
+ tsTcpPool = NULL;
+ }
+
+ if (syncTmrCtrl) {
+ taosTmrCleanUp(syncTmrCtrl);
+ syncTmrCtrl = NULL;
+ }
+
+ if (vgIdHash) {
+ taosHashCleanup(vgIdHash);
+ vgIdHash = NULL;
+ }
+
+ sInfo("sync module is cleaned up");
}
void *syncStart(const SSyncInfo *pInfo) {
@@ -118,15 +141,6 @@ void *syncStart(const SSyncInfo *pInfo) {
return NULL;
}
- pthread_once(&syncModuleInit, syncModuleInitFunc);
- if (tsTcpPool == NULL) {
- free(pNode);
- syncModuleInit = PTHREAD_ONCE_INIT;
- sError("failed to init sync module(%s)", tstrerror(errno));
- return NULL;
- }
-
- atomic_add_fetch_32(&tsNodeNum, 1);
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
pthread_mutex_init(&pNode->mutex, NULL);
@@ -138,7 +152,7 @@ void *syncStart(const SSyncInfo *pInfo) {
pNode->confirmForward = pInfo->confirmForward;
pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl;
pNode->notifyFileSynced = pInfo->notifyFileSynced;
-
+
pNode->selfIndex = -1;
pNode->vgId = pInfo->vgId;
pNode->replica = pCfg->replica;
@@ -148,8 +162,9 @@ void *syncStart(const SSyncInfo *pInfo) {
for (int i = 0; i < pCfg->replica; ++i) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
- if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort))
+ if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) {
pNode->selfIndex = i;
+ }
}
if (pNode->selfIndex < 0) {
@@ -181,16 +196,17 @@ void *syncStart(const SSyncInfo *pInfo) {
syncAddArbitrator(pNode);
syncAddNodeRef(pNode);
taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *));
-
- if (pNode->notifyRole)
- (*pNode->notifyRole)(pNode->ahandle, nodeRole);
+
+ if (pNode->notifyRole) {
+ (*pNode->notifyRole)(pNode->ahandle, nodeRole);
+ }
return pNode;
}
void syncStop(void *param) {
- SSyncNode * pNode = param;
- SSyncPeer *pPeer;
+ SSyncNode *pNode = param;
+ SSyncPeer *pPeer;
if (pNode == NULL) return;
sInfo("vgId:%d, cleanup sync", pNode->vgId);
@@ -199,7 +215,7 @@ void syncStop(void *param) {
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
- if (pPeer) syncRemovePeer(pPeer);
+ if (pPeer) syncRemovePeer(pPeer);
}
pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
@@ -214,19 +230,19 @@ void syncStop(void *param) {
}
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
- SSyncNode * pNode = param;
- int i, j;
+ SSyncNode *pNode = param;
+ int i, j;
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
- sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole],
- pNewCfg->replica, pNode->replica);
+ sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
+ pNode->replica);
pthread_mutex_lock(&(pNode->mutex));
for (i = 0; i < pNode->replica; ++i) {
for (j = 0; j < pNewCfg->replica; ++j) {
- if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) &&
- (pNode->peerInfo[i]->port == pNewCfg->nodeInfo[j].nodePort))
+ if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) &&
+ (pNode->peerInfo[i]->port == pNewCfg->nodeInfo[j].nodePort))
break;
}
@@ -241,8 +257,8 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
const SNodeInfo *pNewNode = &pNewCfg->nodeInfo[i];
for (j = 0; j < pNode->replica; ++j) {
- if (pNode->peerInfo[j] && (strcmp(pNode->peerInfo[j]->fqdn, pNewNode->nodeFqdn) == 0) &&
- (pNode->peerInfo[j]->port == pNewNode->nodePort))
+ if (pNode->peerInfo[j] && (strcmp(pNode->peerInfo[j]->fqdn, pNewNode->nodeFqdn) == 0) &&
+ (pNode->peerInfo[j]->port == pNewNode->nodePort))
break;
}
@@ -252,8 +268,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
newPeers[i] = pNode->peerInfo[j];
}
- if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort))
+ if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) {
pNode->selfIndex = i;
+ }
}
pNode->replica = pNewCfg->replica;
@@ -261,8 +278,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
memcpy(pNode->peerInfo, newPeers, sizeof(SSyncPeer *) * pNewCfg->replica);
- for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i)
+ for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i) {
pNode->peerInfo[i] = NULL;
+ }
syncAddArbitrator(pNode);
@@ -274,43 +292,44 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
pthread_mutex_unlock(&(pNode->mutex));
- sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]);
+ sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum,
+ syncRole[nodeRole]);
syncBroadcastStatus(pNode);
return 0;
}
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
- SSyncNode * pNode = param;
- SSyncPeer * pPeer;
- SSyncHead *pSyncHead;
- SWalHead *pWalHead = data;
- int fwdLen;
- int code = 0;
+ SSyncNode *pNode = param;
+ SSyncPeer *pPeer;
+ SSyncHead *pSyncHead;
+ SWalHead * pWalHead = data;
+ int fwdLen;
+ int code = 0;
if (pNode == NULL) return 0;
// always update version
nodeVersion = pWalHead->version;
- if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER ) return 0;
+ if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance
- pSyncHead = (SSyncHead *) ( ((char *)pWalHead) - sizeof(SSyncHead));
+ pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD;
pSyncHead->pversion = 0;
- pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
- fwdLen = pSyncHead->len + sizeof(SSyncHead); //include the WAL and SYNC head
+ pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
+ fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
- if (pPeer == NULL || pPeer->peerFd <0) continue;
- if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
-
+ if (pPeer == NULL || pPeer->peerFd < 0) continue;
+ if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
+
if (pNode->quorum > 1 && code == 0) {
syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
code = 1;
@@ -335,12 +354,12 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) {
if (pNode == NULL) return;
if (pNode->quorum <= 1) return;
- SSyncPeer *pPeer = pNode->pMaster;
+ SSyncPeer *pPeer = pNode->pMaster;
if (pPeer == NULL) return;
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
- SSyncHead *pHead = (SSyncHead *) msg;
+ SSyncHead *pHead = (SSyncHead *)msg;
pHead->type = TAOS_SMSG_FORWARD_RSP;
pHead->len = sizeof(SFwdRsp);
@@ -363,7 +382,7 @@ void syncRecover(void *param) {
SSyncNode *pNode = param;
SSyncPeer *pPeer;
- // to do: add a few lines to check if recover is OK
+ // to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
@@ -373,7 +392,7 @@ void syncRecover(void *param) {
pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pNode->replica; ++i) {
- pPeer = (SSyncPeer *) pNode->peerInfo[i];
+ pPeer = (SSyncPeer *)pNode->peerInfo[i];
if (pPeer->peerFd >= 0) {
syncRestartConnection(pPeer);
}
@@ -386,7 +405,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
SSyncNode *pNode = param;
pNodesRole->selfIndex = pNode->selfIndex;
- for (int i=0; ireplica; ++i) {
+ for (int i = 0; i < pNode->replica; ++i) {
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
pNodesRole->role[i] = pNode->peerInfo[i]->role;
}
@@ -410,7 +429,7 @@ static void syncAddArbitrator(SSyncNode *pNode) {
if (-1 == ret) {
nodeInfo.nodePort = tsArbitratorPort;
}
-
+
if (pPeer) {
if ((strcmp(nodeInfo.nodeFqdn, pPeer->fqdn) == 0) && (nodeInfo.nodePort == pPeer->port)) {
return;
@@ -418,39 +437,26 @@ static void syncAddArbitrator(SSyncNode *pNode) {
syncRemovePeer(pPeer);
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = NULL;
}
- }
+ }
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo);
}
-static void syncAddNodeRef(SSyncNode *pNode)
-{
- atomic_add_fetch_8(&pNode->refCount, 1);
+static void syncAddNodeRef(SSyncNode *pNode) {
+ atomic_add_fetch_8(&pNode->refCount, 1);
}
-static void syncDecNodeRef(SSyncNode *pNode)
-{
+static void syncDecNodeRef(SSyncNode *pNode) {
if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) {
pthread_mutex_destroy(&pNode->mutex);
taosTFree(pNode->pRecv);
taosTFree(pNode->pSyncFwds);
taosTFree(pNode);
-
- if (atomic_sub_fetch_32(&tsNodeNum, 1) == 0) {
- if (tsTcpPool) taosCloseTcpThreadPool(tsTcpPool);
- if (syncTmrCtrl) taosTmrCleanUp(syncTmrCtrl);
- if (vgIdHash) taosHashCleanup(vgIdHash);
- syncTmrCtrl = NULL;
- tsTcpPool = NULL;
- vgIdHash = NULL;
- syncModuleInit = PTHREAD_ONCE_INIT;
- sDebug("sync module is cleaned up");
- }
}
}
void syncAddPeerRef(SSyncPeer *pPeer) {
- atomic_add_fetch_8(&pPeer->refCount, 1);
+ atomic_add_fetch_8(&pPeer->refCount, 1);
}
int syncDecPeerRef(SSyncPeer *pPeer) {
@@ -486,8 +492,8 @@ static void syncRemovePeer(SSyncPeer *pPeer) {
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL;
-
- SSyncPeer *pPeer = (SSyncPeer *) calloc(1, sizeof(SSyncPeer));
+
+ SSyncPeer *pPeer = (SSyncPeer *)calloc(1, sizeof(SSyncPeer));
if (pPeer == NULL) return NULL;
pPeer->nodeId = pInfo->nodeId;
@@ -506,9 +512,11 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
sDebug("%s, start to check peer connection", pPeer->id);
- taosTmrReset(syncCheckPeerConnection, 100 + (pNode->vgId*10)%100, pPeer, syncTmrCtrl, &pPeer->timer);
+ int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
+ if (pNode->vgId) checkMs = tsStatusInterval * 2000 + 100;
+ taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer);
}
-
+
syncAddNodeRef(pNode);
return pPeer;
}
@@ -542,16 +550,18 @@ static void syncChooseMaster(SSyncNode *pNode) {
sDebug("vgId:%d, choose master", pNode->vgId);
for (int i = 0; i < pNode->replica; ++i) {
- if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE)
+ if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++;
+ }
}
if (onlineNum == pNode->replica) {
// if all peers are online, peer with highest version shall be master
index = 0;
for (int i = 1; i < pNode->replica; ++i) {
- if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version)
+ if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) {
index = i;
+ }
}
}
@@ -568,8 +578,9 @@ static void syncChooseMaster(SSyncNode *pNode) {
//slave with highest version shall be master
pPeer = pNode->peerInfo[i];
if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) {
- if (index < 0 || pPeer->version > pNode->peerInfo[index]->version)
+ if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) {
index = i;
+ }
}
}
}
@@ -595,8 +606,9 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
int replica = pNode->replica;
for (int i = 0; i < pNode->replica; ++i) {
- if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE)
+ if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++;
+ }
}
// add arbitrator connection
@@ -644,7 +656,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
code = -1;
for (int i = 0; i < pNode->replica; ++i) {
- if ( i == pNode->selfIndex ) continue;
+ if (i == pNode->selfIndex) continue;
syncRestartPeer(pNode->peerInfo[i]);
}
}
@@ -661,12 +673,11 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
pNode->peerInfo[pNode->selfIndex]->version = nodeVersion;
pPeer->role = newRole;
- sDebug("%s, own role:%s, new peer role:%s", pPeer->id,
- syncRole[nodeRole], syncRole[pPeer->role]);
+ sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]);
SSyncPeer *pMaster = syncCheckMaster(pNode);
- if ( pMaster ) {
+ if (pMaster) {
// master is there
pNode->pMaster = pMaster;
sDebug("%s, it is the master, ver:%" PRIu64, pMaster->id, pMaster->version);
@@ -691,27 +702,30 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
for (i = 0; i < pNode->replica; ++i) {
SSyncPeer *pTemp = pNode->peerInfo[i];
if (pTemp->role != peersStatus[i].role) break;
- if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[i].version)) break;
+ if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[i].version)) break;
}
-
+
if (i >= pNode->replica) consistent = 1;
} else {
if (pNode->replica == 2) consistent = 1;
}
- if (consistent)
+ if (consistent) {
syncChooseMaster(pNode);
+ }
}
if (syncRequired) {
syncRecoverFromMaster(pMaster);
}
- if (peerOldRole != newRole || nodeRole != selfOldRole)
+ if (peerOldRole != newRole || nodeRole != selfOldRole) {
syncBroadcastStatus(pNode);
+ }
- if (nodeRole != TAOS_SYNC_ROLE_MASTER)
+ if (nodeRole != TAOS_SYNC_ROLE_MASTER) {
syncResetFlowCtrl(pNode);
+ }
}
static void syncRestartPeer(SSyncPeer *pPeer) {
@@ -722,8 +736,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
- if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort))
+ if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
+ }
}
void syncRestartConnection(SSyncPeer *pPeer) {
@@ -747,13 +762,13 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
if (pPeer->sstatus != TAOS_SYNC_STATUS_INIT) {
sDebug("%s, sync is already started", pPeer->id);
- return; // already started
+ return; // already started
}
// start a new thread to retrieve the data
syncAddPeerRef(pPeer);
- pthread_attr_t thattr;
- pthread_t thread;
+ pthread_attr_t thattr;
+ pthread_t thread;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
int ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer);
@@ -780,8 +795,8 @@ static void syncNotStarted(void *param, void *tmrId) {
}
static void syncTryRecoverFromMaster(void *param, void *tmrId) {
- SSyncPeer *pPeer = param;
- SSyncNode *pNode = pPeer->pSyncNode;
+ SSyncPeer *pPeer = param;
+ SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex));
syncRecoverFromMaster(pPeer);
@@ -805,7 +820,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
return;
}
- sDebug("%s, try to sync", pPeer->id)
+ sDebug("%s, try to sync", pPeer->id);
SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt));
@@ -814,49 +829,47 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
- taosTmrReset(syncNotStarted, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer);
+ taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
- if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt) ) {
+ if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("%s, failed to send sync-req to peer", pPeer->id);
} else {
nodeSStatus = TAOS_SYNC_STATUS_START;
sInfo("%s, sync-req is sent", pPeer->id);
}
-
- return;
}
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
- SSyncNode * pNode = pPeer->pSyncNode;
- SFwdRsp *pFwdRsp = (SFwdRsp *) cont;
- SSyncFwds *pSyncFwds = pNode->pSyncFwds;
- SFwdInfo *pFwdInfo;
+ SSyncNode *pNode = pPeer->pSyncNode;
+ SFwdRsp * pFwdRsp = (SFwdRsp *)cont;
+ SSyncFwds *pSyncFwds = pNode->pSyncFwds;
+ SFwdInfo * pFwdInfo;
sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version);
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
// find the forwardInfo from first
- for (int i=0; ifwds; ++i) {
- pFwdInfo = pSyncFwds->fwdInfo + (i+pSyncFwds->first)%tsMaxFwdInfo;
+ for (int i = 0; i < pSyncFwds->fwds; ++i) {
+ pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo;
if (pFwdRsp->version == pFwdInfo->version) break;
}
-
+
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
syncRemoveConfirmedFwdInfo(pNode);
}
}
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
- SSyncNode * pNode = pPeer->pSyncNode;
- SWalHead *pHead = (SWalHead *)cont;
+ SSyncNode *pNode = pPeer->pSyncNode;
+ SWalHead * pHead = (SWalHead *)cont;
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
- //nodeVersion = pHead->version;
+ // nodeVersion = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
- } else {
+ } else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead);
} else {
@@ -877,12 +890,13 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
pPeer->version = pPeersStatus->version;
syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);
- if (pPeersStatus->ack)
+ if (pPeersStatus->ack) {
syncSendPeersStatusMsgToPeer(pPeer, 0);
+ }
}
static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
- if (pPeer->peerFd <0) return -1;
+ if (pPeer->peerFd < 0) return -1;
int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
if (hlen != sizeof(SSyncHead)) {
@@ -906,9 +920,9 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
}
static int syncProcessPeerMsg(void *param, void *buffer) {
- SSyncPeer * pPeer = param;
- SSyncHead head;
- char *cont = (char *)buffer;
+ SSyncPeer *pPeer = param;
+ SSyncHead head;
+ char * cont = (char *)buffer;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex));
@@ -932,16 +946,16 @@ static int syncProcessPeerMsg(void *param, void *buffer) {
return code;
}
-#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA
+#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0};
- if (pPeer->peerFd <0 || pPeer->ip ==0) return;
+ if (pPeer->peerFd < 0 || pPeer->ip == 0) return;
- SSyncHead *pHead = (SSyncHead *) msg;
- SPeersStatus *pPeersStatus = (SPeersStatus *) (msg + sizeof(SSyncHead));
+ SSyncHead * pHead = (SSyncHead *)msg;
+ SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead));
pHead->type = TAOS_SMSG_STATUS;
pHead->len = statusMsgLen - sizeof(SSyncHead);
@@ -979,28 +993,28 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) {
sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno));
- taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer);
+ taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
return;
}
SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt));
- firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId:0;
+ firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0;
firstPkt.syncHead.type = TAOS_SMSG_STATUS;
- tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
+ tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
if (write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
sDebug("%s, connection to peer server is setup", pPeer->id);
- pPeer->peerFd = connFd;
+ pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer);
} else {
sDebug("try later");
close(connFd);
- taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer);
+ taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
}
}
@@ -1011,7 +1025,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, check peer connection", pPeer->id);
- syncSetupPeerConnection(pPeer);
+ syncSetupPeerConnection(pPeer);
pthread_mutex_unlock(&(pNode->mutex));
}
@@ -1020,7 +1034,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
taosTmrStopA(&pPeer->timer);
pthread_attr_t thattr;
- pthread_t thread;
+ pthread_t thread;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
@@ -1032,15 +1046,15 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
sError("%s, failed to create sync thread", pPeer->id);
taosClose(pPeer->syncFd);
syncDecPeerRef(pPeer);
- } else {
+ } else {
sInfo("%s, sync connection is up", pPeer->id);
}
}
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
- char ipstr[24];
- int i;
-
+ char ipstr[24];
+ int i;
+
tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr);
@@ -1065,8 +1079,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
- if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port))
- break;
+ if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break;
}
pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL;
@@ -1091,8 +1104,6 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
}
pthread_mutex_unlock(&(pNode->mutex));
-
- return;
}
static void syncProcessBrokenLink(void *param) {
@@ -1121,10 +1132,12 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
if (pSyncFwds->fwds >= tsMaxFwdInfo) {
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
pSyncFwds->fwds--;
- }
+ }
+
+ if (pSyncFwds->fwds > 0) {
+ pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo;
+ }
- if (pSyncFwds->fwds > 0)
- pSyncFwds->last = (pSyncFwds->last+1) % tsMaxFwdInfo;
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
pFwdInfo->version = version;
pFwdInfo->mhandle = mhandle;
@@ -1140,14 +1153,14 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int fwds = pSyncFwds->fwds;
- for (int i=0; ifwdInfo + pSyncFwds->first;
+ for (int i = 0; i < fwds; ++i) {
+ SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFwdInfo->confirmed == 0) break;
- pSyncFwds->first = (pSyncFwds->first+1) % tsMaxFwdInfo;
+ pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
pSyncFwds->fwds--;
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
- //sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d",
+ // sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d",
// pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
memset(pFwdInfo, 0, sizeof(SFwdInfo));
}
@@ -1159,12 +1172,14 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
if (code == 0) {
pFwdInfo->acks++;
- if (pFwdInfo->acks >= pNode->quorum-1)
+ if (pFwdInfo->acks >= pNode->quorum - 1) {
confirm = 1;
+ }
} else {
pFwdInfo->nacks++;
- if (pFwdInfo->nacks > pNode->replica-pNode->quorum)
+ if (pFwdInfo->nacks > pNode->replica - pNode->quorum) {
confirm = 1;
+ }
}
if (confirm && pFwdInfo->confirmed == 0) {
@@ -1181,15 +1196,15 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex));
- for (int i=0; ifwds; ++i) {
- SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first+i) % tsMaxFwdInfo;
+ for (int i = 0; i < pSyncFwds->fwds; ++i) {
+ SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (time - pFwdInfo->time < 2000) break;
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
syncRemoveConfirmedFwdInfo(pNode);
pthread_mutex_unlock(&(pNode->mutex));
- }
-
+ }
+
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
}
diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c
index 4a2d9859b9..a4e88fb946 100644
--- a/src/vnode/src/vnodeMain.c
+++ b/src/vnode/src/vnodeMain.c
@@ -57,6 +57,9 @@ void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
#endif
int32_t vnodeInitResources() {
+ int code = syncInit();
+ if (code != 0) return code;
+
vnodeInitWriteFp();
vnodeInitReadFp();
@@ -70,11 +73,12 @@ int32_t vnodeInitResources() {
}
void vnodeCleanupResources() {
-
if (tsDnodeVnodesHash != NULL) {
taosHashCleanup(tsDnodeVnodesHash);
tsDnodeVnodesHash = NULL;
}
+
+ syncCleanUp();
}
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c
index 11315f5321..017eeaf426 100644
--- a/src/vnode/src/vnodeRead.c
+++ b/src/vnode/src/vnodeRead.c
@@ -93,11 +93,12 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle,
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
}
} else {
- SRetrieveTableRsp* pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
+ SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
pRsp->completed = true;
pRet->rsp = pRsp;
+ pRet->len = sizeof(SRetrieveTableRsp);
*freeHandle = true;
}
@@ -270,6 +271,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (code != TSDB_CODE_SUCCESS) {
//TODO handle malloc failure
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
+ pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
freeHandle = true;
} else { // result is not ready, return immediately
diff --git a/tests/examples/JDBC/JDBCDemo/pom.xml b/tests/examples/JDBC/JDBCDemo/pom.xml
index f0234f2bd7..92d757edfd 100644
--- a/tests/examples/JDBC/JDBCDemo/pom.xml
+++ b/tests/examples/JDBC/JDBCDemo/pom.xml
@@ -9,21 +9,20 @@
1.0-SNAPSHOT
jar
-
-
-
- org.apache.maven.plugins
- maven-plugins
- 30
-
-
- org.apache.maven.plugins
- maven-assembly-plugin
- 3.0.0
-
-
-
+
+
+ org.apache.maven.plugins
+ maven-plugins
+ 30
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.0.0
+
+
org.apache.maven.plugins
maven-assembly-plugin
@@ -48,6 +47,7 @@
+
org.apache.maven.plugins
maven-compiler-plugin
diff --git a/tests/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JDBCConnectorChecker.java b/tests/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JDBCConnectorChecker.java
new file mode 100644
index 0000000000..1e801bc658
--- /dev/null
+++ b/tests/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JDBCConnectorChecker.java
@@ -0,0 +1,174 @@
+package com.taosdata.example;
+
+import com.taosdata.jdbc.TSDBDriver;
+
+import java.sql.*;
+import java.util.Properties;
+
+public class JDBCConnectorChecker {
+ private static String host;
+ private static String dbName = "test";
+ private static String tbName = "weather";
+ private Connection connection;
+
+
+ /**
+ * get connection
+ **/
+ private void init() {
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
+ System.out.println("get connection starting...");
+ connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/", properties);
+ if (connection != null)
+ System.out.println("[ OK ] Connection established.");
+ } catch (ClassNotFoundException | SQLException e) {
+ throw new RuntimeException("connection failed: " + host);
+ }
+ }
+
+ /**
+ * create database
+ */
+ private void createDatabase() {
+ String sql = "create database if not exists " + dbName;
+ exuete(sql);
+ }
+
+ /**
+ * use database
+ */
+ private void useDatabase() {
+ String sql = "use " + dbName;
+ exuete(sql);
+ }
+
+ /**
+ * select
+ */
+ private void checkSelect() {
+ final String sql = "select * from test.weather";
+ executeQuery(sql);
+ }
+
+ private void executeQuery(String sql) {
+ try (Statement statement = connection.createStatement()) {
+ long start = System.currentTimeMillis();
+ ResultSet resultSet = statement.executeQuery(sql);
+ long end = System.currentTimeMillis();
+ printSql(sql, true, (end - start));
+ printResult(resultSet);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void printResult(ResultSet resultSet) throws SQLException {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ while (resultSet.next()) {
+ for (int i = 1; i <= metaData.getColumnCount(); i++) {
+ String columnLabel = metaData.getColumnLabel(i);
+ String value = resultSet.getString(i);
+ System.out.printf("%s: %s\t", columnLabel, value);
+ }
+ System.out.println();
+ }
+ }
+
+ private String formatString(String str) {
+ StringBuilder sb = new StringBuilder();
+ int blankCnt = (26 - str.length()) / 2;
+ for (int j = 0; j < blankCnt; j++)
+ sb.append(" ");
+ sb.append(str);
+ for (int j = 0; j < blankCnt; j++)
+ sb.append(" ");
+ sb.append("|");
+ return sb.toString();
+ }
+
+
+ /**
+ * insert
+ */
+ private void checkInsert() {
+ final String sql = "insert into test.weather (ts, temperature, humidity) values(now, 20.5, 34)";
+ exuete(sql);
+ }
+
+ /**
+ * create table
+ */
+ private void createTable() {
+ final String sql = "create table if not exists " + dbName + "." + tbName + " (ts timestamp, temperature float, humidity int)";
+ exuete(sql);
+ }
+
+ private final void printSql(String sql, boolean succeed, long cost) {
+ System.out.println("[ " + (succeed ? "OK" : "ERROR!") + " ] time cost: " + cost + " ms, execute statement ====> " + sql);
+ }
+
+ private final void exuete(String sql) {
+ try (Statement statement = connection.createStatement()) {
+ long start = System.currentTimeMillis();
+ boolean execute = statement.execute(sql);
+ long end = System.currentTimeMillis();
+ printSql(sql, execute, (end - start));
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void close() {
+ try {
+ if (connection != null) {
+ this.connection.close();
+ System.out.println("connection closed.");
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void checkDropTable() {
+ final String sql = "drop table if exists " + dbName + "." + tbName + "";
+ exuete(sql);
+ }
+
+ public static void main(String[] args) {
+ for (int i = 0; i < args.length; i++) {
+ if ("-host".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ host = args[++i];
+ }
+ if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ dbName = args[++i];
+ }
+ if ("-t".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ tbName = args[++i];
+ }
+ }
+
+ if (host == null) {
+ System.out.println("Usage: java -jar JDBCConnectorChecker.jar -host ");
+ return;
+ }
+
+ JDBCConnectorChecker checker = new JDBCConnectorChecker();
+ checker.init();
+ checker.createDatabase();
+ checker.useDatabase();
+ checker.checkDropTable();
+ checker.createTable();
+ checker.checkInsert();
+ checker.checkSelect();
+ checker.checkDropTable();
+ checker.close();
+ }
+
+
+}
diff --git a/tests/examples/JDBC/SpringJdbcTemplate/pom.xml b/tests/examples/JDBC/SpringJdbcTemplate/pom.xml
index b796d52d28..15aed1cf03 100644
--- a/tests/examples/JDBC/SpringJdbcTemplate/pom.xml
+++ b/tests/examples/JDBC/SpringJdbcTemplate/pom.xml
@@ -1,85 +1,91 @@
- 4.0.0
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ 4.0.0
- com.taosdata.jdbc
- SpringJdbcTemplate
- 1.0-SNAPSHOT
+ com.taosdata.jdbc
+ SpringJdbcTemplate
+ 1.0-SNAPSHOT
- SpringJdbcTemplate
- http://www.taosdata.com
+ SpringJdbcTemplate
+ http://www.taosdata.com
-
- UTF-8
- 1.8
- 1.8
-
+
+ UTF-8
+ 1.8
+ 1.8
+
-
+
-
- org.springframework
- spring-context
- 4.3.2.RELEASE
-
+
+ org.springframework
+ spring-context
+ 5.2.8.RELEASE
+
-
- org.springframework
- spring-jdbc
- 4.3.2.RELEASE
-
+
+ org.springframework
+ spring-jdbc
+ 5.1.9.RELEASE
+
-
- junit
- junit
- 4.11
- test
-
+
+ org.springframework
+ spring-test
+ 5.1.9.RELEASE
+
-
- com.taosdata.jdbc
- taos-jdbcdriver
- 2.0.2
-
+
+ junit
+ junit
+ 4.13
+ test
+
-
+
+ com.taosdata.jdbc
+ taos-jdbcdriver
+ 2.0.4
+
-
-
-
- maven-compiler-plugin
- 3.8.0
-
- 1.8
- 1.8
-
-
-
- org.apache.maven.plugins
- maven-assembly-plugin
- 3.1.0
-
-
-
- com.taosdata.jdbc.App
-
-
-
- jar-with-dependencies
-
-
-
-
- make-assembly
- package
-
- single
-
-
-
-
-
-
+
+
+
+
+
+ maven-compiler-plugin
+ 3.8.0
+
+ 1.8
+ 1.8
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.1.0
+
+
+
+ com.taosdata.jdbc.example.jdbcTemplate.App
+
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
diff --git a/tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/jdbc/App.java b/tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/jdbc/App.java
deleted file mode 100644
index 3230af46a8..0000000000
--- a/tests/examples/JDBC/SpringJdbcTemplate/src/main/java/com/taosdata/jdbc/App.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package com.taosdata.jdbc;
-
-
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.util.CollectionUtils;
-
-import java.util.List;
-import java.util.Map;
-
-public class App {
-
- public static void main( String[] args ) {
-
- ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
-
- JdbcTemplate jdbcTemplate = (JdbcTemplate) ctx.getBean("jdbcTemplate");
-
- // create database
- jdbcTemplate.execute("create database if not exists db ");
-
- // create table
- jdbcTemplate.execute("create table if not exists db.tb (ts timestamp, temperature int, humidity float)");
-
- String insertSql = "insert into db.tb values(now, 23, 10.3) (now + 1s, 20, 9.3)";
-
- // insert rows
- int affectedRows = jdbcTemplate.update(insertSql);
-
- System.out.println("insert success " + affectedRows + " rows.");
-
- // query for list
- List