Merge branch 'feature/TD-1925_new' into feature/linux
This commit is contained in:
commit
20cbb0c493
|
@ -45,7 +45,7 @@ def pre_test(){
|
||||||
git pull
|
git pull
|
||||||
git fetch origin +refs/pull/${CHANGE_ID}/merge
|
git fetch origin +refs/pull/${CHANGE_ID}/merge
|
||||||
git checkout -qf FETCH_HEAD
|
git checkout -qf FETCH_HEAD
|
||||||
git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|//src//connector|Jenkinsfile' || exit 0
|
git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|.*src/connector|Jenkinsfile' || exit 0
|
||||||
cd ${WK}
|
cd ${WK}
|
||||||
git reset --hard HEAD~10
|
git reset --hard HEAD~10
|
||||||
git checkout develop
|
git checkout develop
|
||||||
|
|
|
@ -36,6 +36,7 @@
|
||||||
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
|
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
|
||||||
<commons-logging.version>1.1.2</commons-logging.version>
|
<commons-logging.version>1.1.2</commons-logging.version>
|
||||||
<commons-lang3.version>3.5</commons-lang3.version>
|
<commons-lang3.version>3.5</commons-lang3.version>
|
||||||
|
<maven.test.jvmargs></maven.test.jvmargs>
|
||||||
</properties>
|
</properties>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -122,11 +123,14 @@
|
||||||
<artifactId>maven-surefire-plugin</artifactId>
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
<version>2.12.4</version>
|
<version>2.12.4</version>
|
||||||
<configuration>
|
<configuration>
|
||||||
|
<forkMode>pertest</forkMode>
|
||||||
|
<argLine>${maven.test.jvmargs}</argLine>
|
||||||
<includes>
|
<includes>
|
||||||
<include>**/*Test.java</include>
|
<include>**/*Test.java</include>
|
||||||
</includes>
|
</includes>
|
||||||
<excludes>
|
<excludes>
|
||||||
<exclude>**/AppMemoryLeakTest.java</exclude>
|
<exclude>**/AppMemoryLeakTest.java</exclude>
|
||||||
|
<exclude>**/TaosInfoMonitorTest.java</exclude>
|
||||||
<exclude>**/FailOverTest.java</exclude>
|
<exclude>**/FailOverTest.java</exclude>
|
||||||
</excludes>
|
</excludes>
|
||||||
<testFailureIgnore>true</testFailureIgnore>
|
<testFailureIgnore>true</testFailureIgnore>
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
*****************************************************************************/
|
*****************************************************************************/
|
||||||
package com.taosdata.jdbc;
|
package com.taosdata.jdbc;
|
||||||
|
|
||||||
|
import com.taosdata.jdbc.utils.TaosInfo;
|
||||||
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.SQLWarning;
|
import java.sql.SQLWarning;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -21,6 +23,8 @@ import java.util.List;
|
||||||
public class TSDBJNIConnector {
|
public class TSDBJNIConnector {
|
||||||
private static volatile Boolean isInitialized = false;
|
private static volatile Boolean isInitialized = false;
|
||||||
|
|
||||||
|
private TaosInfo taosInfo = TaosInfo.getInstance();
|
||||||
|
|
||||||
static {
|
static {
|
||||||
System.loadLibrary("taos");
|
System.loadLibrary("taos");
|
||||||
System.out.println("java.library.path:" + System.getProperty("java.library.path"));
|
System.out.println("java.library.path:" + System.getProperty("java.library.path"));
|
||||||
|
@ -91,7 +95,8 @@ public class TSDBJNIConnector {
|
||||||
*/
|
*/
|
||||||
public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException {
|
public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException {
|
||||||
if (this.taos != TSDBConstants.JNI_NULL_POINTER) {
|
if (this.taos != TSDBConstants.JNI_NULL_POINTER) {
|
||||||
this.closeConnectionImp(this.taos);
|
// this.closeConnectionImp(this.taos);
|
||||||
|
closeConnection();
|
||||||
this.taos = TSDBConstants.JNI_NULL_POINTER;
|
this.taos = TSDBConstants.JNI_NULL_POINTER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,7 +104,8 @@ public class TSDBJNIConnector {
|
||||||
if (this.taos == TSDBConstants.JNI_NULL_POINTER) {
|
if (this.taos == TSDBConstants.JNI_NULL_POINTER) {
|
||||||
throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg(0L)), "", this.getErrCode(0l));
|
throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg(0L)), "", this.getErrCode(0l));
|
||||||
}
|
}
|
||||||
|
// invoke connectImp only here
|
||||||
|
taosInfo.conn_open_increment();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,6 +126,7 @@ public class TSDBJNIConnector {
|
||||||
Long pSql = 0l;
|
Long pSql = 0l;
|
||||||
try {
|
try {
|
||||||
pSql = this.executeQueryImp(sql.getBytes(TaosGlobalConfig.getCharset()), this.taos);
|
pSql = this.executeQueryImp(sql.getBytes(TaosGlobalConfig.getCharset()), this.taos);
|
||||||
|
taosInfo.stmt_count_increment();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
this.freeResultSetImp(this.taos, pSql);
|
this.freeResultSetImp(this.taos, pSql);
|
||||||
|
@ -244,10 +251,11 @@ public class TSDBJNIConnector {
|
||||||
private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData);
|
private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData);
|
||||||
|
|
||||||
public int fetchBlock(long resultSet, TSDBResultSetBlockData blockData) {
|
public int fetchBlock(long resultSet, TSDBResultSetBlockData blockData) {
|
||||||
return this.fetchBlockImp(this.taos, resultSet, blockData);
|
return this.fetchBlockImp(this.taos, resultSet, blockData);
|
||||||
}
|
}
|
||||||
|
|
||||||
private native int fetchBlockImp(long connection, long resultSet, TSDBResultSetBlockData blockData);
|
private native int fetchBlockImp(long connection, long resultSet, TSDBResultSetBlockData blockData);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute close operation from C to release connection pointer by JNI
|
* Execute close operation from C to release connection pointer by JNI
|
||||||
*
|
*
|
||||||
|
@ -262,6 +270,8 @@ public class TSDBJNIConnector {
|
||||||
} else {
|
} else {
|
||||||
throw new SQLException("Undefined error code returned by TDengine when closing a connection");
|
throw new SQLException("Undefined error code returned by TDengine when closing a connection");
|
||||||
}
|
}
|
||||||
|
// invoke closeConnectionImpl only here
|
||||||
|
taosInfo.connect_close_increment();
|
||||||
}
|
}
|
||||||
|
|
||||||
private native int closeConnectionImp(long connection);
|
private native int closeConnectionImp(long connection);
|
||||||
|
|
|
@ -14,13 +14,15 @@
|
||||||
*****************************************************************************/
|
*****************************************************************************/
|
||||||
package com.taosdata.jdbc;
|
package com.taosdata.jdbc;
|
||||||
|
|
||||||
|
import com.taosdata.jdbc.utils.TaosInfo;
|
||||||
|
|
||||||
import java.sql.*;
|
import java.sql.*;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
public class TSDBStatement implements Statement {
|
public class TSDBStatement implements Statement {
|
||||||
private TSDBJNIConnector connector = null;
|
private TSDBJNIConnector connector;
|
||||||
|
private TaosInfo taosInfo = TaosInfo.getInstance();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* To store batched commands
|
* To store batched commands
|
||||||
|
|
|
@ -1,272 +0,0 @@
|
||||||
package com.taosdata.jdbc.utils;
|
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.util.*;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
public class TDNode {
|
|
||||||
|
|
||||||
private int index;
|
|
||||||
private int running;
|
|
||||||
private int deployed;
|
|
||||||
private boolean testCluster;
|
|
||||||
private String path;
|
|
||||||
private String cfgDir;
|
|
||||||
private String dataDir;
|
|
||||||
private String logDir;
|
|
||||||
private String cfgPath;
|
|
||||||
|
|
||||||
public TDNode(int index) {
|
|
||||||
this.index = index;
|
|
||||||
running = 0;
|
|
||||||
deployed = 0;
|
|
||||||
testCluster = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setPath(String path) {
|
|
||||||
this.path = path;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTestCluster(boolean testCluster) {
|
|
||||||
this.testCluster = testCluster;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setRunning(int running) {
|
|
||||||
this.running = running;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void searchTaosd(File dir, ArrayList<String> taosdPath) {
|
|
||||||
File[] fileList = dir.listFiles();
|
|
||||||
|
|
||||||
if(fileList == null || fileList.length == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for(File file : fileList) {
|
|
||||||
if(file.isFile()) {
|
|
||||||
if(file.getName().equals("taosd")) {
|
|
||||||
taosdPath.add(file.getAbsolutePath());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
searchTaosd(file, taosdPath);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start() {
|
|
||||||
String selfPath = System.getProperty("user.dir");
|
|
||||||
String binPath = "";
|
|
||||||
String projDir = selfPath + "/../../../";
|
|
||||||
|
|
||||||
try {
|
|
||||||
ArrayList<String> taosdPath = new ArrayList<>();
|
|
||||||
|
|
||||||
File dir = new File(projDir);
|
|
||||||
String realProjDir = dir.getCanonicalPath();
|
|
||||||
dir = new File(realProjDir);
|
|
||||||
System.out.println("project Dir: " + projDir);
|
|
||||||
searchTaosd(dir, taosdPath);
|
|
||||||
|
|
||||||
if(taosdPath.size() == 0) {
|
|
||||||
System.out.println("The project path doens't exist");
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
for(String p : taosdPath) {
|
|
||||||
if(!p.contains("packaging")) {
|
|
||||||
binPath = p;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
if(binPath.isEmpty()) {
|
|
||||||
System.out.println("taosd not found");
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
System.out.println("taosd found in " + binPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
if(this.deployed == 0) {
|
|
||||||
System.out.println("dnode" + index + "is not deployed");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
String cmd = "nohup " + binPath + " -c " + cfgDir + " > /dev/null 2>&1 & ";
|
|
||||||
System.out.println("start taosd cmd: " + cmd);
|
|
||||||
|
|
||||||
try{
|
|
||||||
Runtime.getRuntime().exec(cmd);
|
|
||||||
TimeUnit.SECONDS.sleep(5);
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
this.running = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getTaosdPid() {
|
|
||||||
String cmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'";
|
|
||||||
String[] cmds = {"sh", "-c", cmd};
|
|
||||||
try {
|
|
||||||
Process process = Runtime.getRuntime().exec(cmds);
|
|
||||||
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
|
|
||||||
String line = null;
|
|
||||||
Integer res = null;
|
|
||||||
while((line = reader.readLine()) != null) {
|
|
||||||
if(!line.isEmpty()) {
|
|
||||||
res = Integer.valueOf(line);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stop() {
|
|
||||||
|
|
||||||
if (this.running != 0) {
|
|
||||||
Integer pid = null;
|
|
||||||
while((pid = getTaosdPid()) != null) {
|
|
||||||
|
|
||||||
String killCmd = "kill -term " + pid;
|
|
||||||
String[] killCmds = {"sh", "-c", killCmd};
|
|
||||||
try {
|
|
||||||
Runtime.getRuntime().exec(killCmds).waitFor();
|
|
||||||
|
|
||||||
TimeUnit.SECONDS.sleep(2);
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
for(int port = 6030; port < 6041; port ++) {
|
|
||||||
String fuserCmd = "fuser -k -n tcp " + port;
|
|
||||||
Runtime.getRuntime().exec(fuserCmd).waitFor();
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
this.running = 0;
|
|
||||||
System.out.println("dnode:" + this.index + " is stopped by kill -term");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void startIP() {
|
|
||||||
try{
|
|
||||||
String cmd = "sudo ifconfig lo:" + index + "192.168.0." + index + " up";
|
|
||||||
Runtime.getRuntime().exec(cmd).waitFor();
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stopIP() {
|
|
||||||
try{
|
|
||||||
String cmd = "sudo ifconfig lo:" + index + "192.168.0." + index + " down";
|
|
||||||
Runtime.getRuntime().exec(cmd).waitFor();
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCfgConfig(String option, String value) {
|
|
||||||
try{
|
|
||||||
String cmd = "echo " + option + " " + value + " >> " + this.cfgPath;
|
|
||||||
String[] cmdLine = {"sh", "-c", cmd};
|
|
||||||
Process ps = Runtime.getRuntime().exec(cmdLine);
|
|
||||||
ps.waitFor();
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getDnodeRootDir() {
|
|
||||||
String dnodeRootDir = this.path + "/sim/psim/dnode" + this.index;
|
|
||||||
return dnodeRootDir;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getDnodesRootDir() {
|
|
||||||
String dnodesRootDir = this.path + "/sim/psim" + this.index;
|
|
||||||
return dnodesRootDir;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void deploy() {
|
|
||||||
this.logDir = this.path + "/sim/dnode" + this.index + "/log";
|
|
||||||
this.dataDir = this.path + "/sim/dnode" + this.index + "/data";
|
|
||||||
this.cfgDir = this.path + "/sim/dnode" + this.index + "/cfg";
|
|
||||||
this.cfgPath = this.path + "/sim/dnode" + this.index + "/cfg/taos.cfg";
|
|
||||||
|
|
||||||
try {
|
|
||||||
String cmd = "rm -rf " + this.logDir;
|
|
||||||
Runtime.getRuntime().exec(cmd).waitFor();
|
|
||||||
|
|
||||||
cmd = "rm -rf " + this.cfgDir;
|
|
||||||
Runtime.getRuntime().exec(cmd).waitFor();
|
|
||||||
|
|
||||||
cmd = "rm -rf " + this.dataDir;
|
|
||||||
Runtime.getRuntime().exec(cmd).waitFor();
|
|
||||||
|
|
||||||
cmd = "mkdir -p " + this.logDir;
|
|
||||||
Runtime.getRuntime().exec(cmd).waitFor();
|
|
||||||
|
|
||||||
cmd = "mkdir -p " + this.cfgDir;
|
|
||||||
Runtime.getRuntime().exec(cmd).waitFor();
|
|
||||||
|
|
||||||
cmd = "mkdir -p " + this.dataDir;
|
|
||||||
Runtime.getRuntime().exec(cmd).waitFor();
|
|
||||||
|
|
||||||
cmd = "touch " + this.cfgPath;
|
|
||||||
Runtime.getRuntime().exec(cmd).waitFor();
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
if(this.testCluster) {
|
|
||||||
startIP();
|
|
||||||
setCfgConfig("masterIp", "192.168.0.1");
|
|
||||||
setCfgConfig("secondIp", "192.168.0.2");
|
|
||||||
setCfgConfig("publicIp", "192.168.0." + this.index);
|
|
||||||
setCfgConfig("internalIp", "192.168.0." + this.index);
|
|
||||||
setCfgConfig("privateIp", "192.168.0." + this.index);
|
|
||||||
}
|
|
||||||
setCfgConfig("dataDir", this.dataDir);
|
|
||||||
setCfgConfig("logDir", this.logDir);
|
|
||||||
setCfgConfig("numOfLogLines", "1000000/00");
|
|
||||||
setCfgConfig("mnodeEqualVnodeNum", "0");
|
|
||||||
setCfgConfig("walLevel", "1");
|
|
||||||
setCfgConfig("statusInterval", "1");
|
|
||||||
setCfgConfig("numOfMnodes", "3");
|
|
||||||
setCfgConfig("numOfThreadsPerCore", "2.0");
|
|
||||||
setCfgConfig("monitor", "0");
|
|
||||||
setCfgConfig("maxVnodeConnections", "30000");
|
|
||||||
setCfgConfig("maxMgmtConnections", "30000");
|
|
||||||
setCfgConfig("maxMeterConnections", "30000");
|
|
||||||
setCfgConfig("maxShellConns", "30000");
|
|
||||||
setCfgConfig("locale", "en_US.UTF-8");
|
|
||||||
setCfgConfig("charset", "UTF-8");
|
|
||||||
setCfgConfig("asyncLog", "0");
|
|
||||||
setCfgConfig("anyIp", "0");
|
|
||||||
setCfgConfig("dDebugFlag", "135");
|
|
||||||
setCfgConfig("mDebugFlag", "135");
|
|
||||||
setCfgConfig("sdbDebugFlag", "135");
|
|
||||||
setCfgConfig("rpcDebugFlag", "135");
|
|
||||||
setCfgConfig("tmrDebugFlag", "131");
|
|
||||||
setCfgConfig("cDebugFlag", "135");
|
|
||||||
setCfgConfig("httpDebugFlag", "135");
|
|
||||||
setCfgConfig("monitorDebugFlag", "135");
|
|
||||||
setCfgConfig("udebugFlag", "135");
|
|
||||||
setCfgConfig("jnidebugFlag", "135");
|
|
||||||
setCfgConfig("qdebugFlag", "135");
|
|
||||||
this.deployed = 1;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,72 +0,0 @@
|
||||||
package com.taosdata.jdbc.utils;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
public class TDNodes {
|
|
||||||
private ArrayList<TDNode> tdNodes;
|
|
||||||
private boolean testCluster;
|
|
||||||
|
|
||||||
public TDNodes () {
|
|
||||||
tdNodes = new ArrayList<>();
|
|
||||||
for(int i = 1; i < 11; i ++) {
|
|
||||||
tdNodes.add(new TDNode(i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTestCluster(boolean testCluster) {
|
|
||||||
this.testCluster = testCluster;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void check(int index) {
|
|
||||||
if(index < 1 || index > 10) {
|
|
||||||
System.out.println("index: " + index + " should on a scale of [1, 10]");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void deploy(int index) {
|
|
||||||
try {
|
|
||||||
File file = new File(System.getProperty("user.dir") + "/../../../");
|
|
||||||
String projectRealPath = file.getCanonicalPath();
|
|
||||||
check(index);
|
|
||||||
tdNodes.get(index - 1).setTestCluster(this.testCluster);
|
|
||||||
tdNodes.get(index - 1).setPath(projectRealPath);
|
|
||||||
tdNodes.get(index - 1).deploy();
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
System.out.println("deploy Test Exception");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void cfg(int index, String option, String value) {
|
|
||||||
check(index);
|
|
||||||
tdNodes.get(index - 1).setCfgConfig(option, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
public TDNode getTDNode(int index) {
|
|
||||||
check(index);
|
|
||||||
return tdNodes.get(index - 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start(int index) {
|
|
||||||
check(index);
|
|
||||||
tdNodes.get(index - 1).start();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stop(int index) {
|
|
||||||
check(index);
|
|
||||||
tdNodes.get(index - 1).stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void startIP(int index) {
|
|
||||||
check(index);
|
|
||||||
tdNodes.get(index - 1).startIP();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stopIP(int index) {
|
|
||||||
check(index);
|
|
||||||
tdNodes.get(index - 1).stopIP();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
package com.taosdata.jdbc.utils;
|
||||||
|
|
||||||
|
import javax.management.*;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
public class TaosInfo implements TaosInfoMBean {
|
||||||
|
|
||||||
|
private static volatile TaosInfo instance;
|
||||||
|
private AtomicLong connect_open = new AtomicLong();
|
||||||
|
private AtomicLong connect_close = new AtomicLong();
|
||||||
|
private AtomicLong statement_count = new AtomicLong();
|
||||||
|
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
ObjectName name = new ObjectName("TaosInfoMBean:name=TaosInfo");
|
||||||
|
server.registerMBean(TaosInfo.getInstance(), name);
|
||||||
|
|
||||||
|
} catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getConnect_open() {
|
||||||
|
return connect_open.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getConnect_close() {
|
||||||
|
return connect_close.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getConnect_active() {
|
||||||
|
return connect_open.get() - connect_close.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getStatement_count() {
|
||||||
|
return statement_count.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*******************************************************/
|
||||||
|
|
||||||
|
public void conn_open_increment() {
|
||||||
|
connect_open.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void connect_close_increment() {
|
||||||
|
connect_close.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stmt_count_increment() {
|
||||||
|
statement_count.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
/********************************************************************************/
|
||||||
|
private TaosInfo() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TaosInfo getInstance() {
|
||||||
|
if (instance == null) {
|
||||||
|
synchronized (TaosInfo.class) {
|
||||||
|
if (instance == null) {
|
||||||
|
instance = new TaosInfo();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package com.taosdata.jdbc.utils;
|
||||||
|
|
||||||
|
public interface TaosInfoMBean {
|
||||||
|
|
||||||
|
long getConnect_open();
|
||||||
|
|
||||||
|
long getConnect_close();
|
||||||
|
|
||||||
|
long getConnect_active();
|
||||||
|
|
||||||
|
long getStatement_count();
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
package com.taosdata.jdbc.cases;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.sql.*;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
public class TaosInfoMonitorTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateTooManyConnection() throws ClassNotFoundException {
|
||||||
|
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||||
|
final String url = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
|
||||||
|
|
||||||
|
List<Connection> connectionList = IntStream.range(0, 100).mapToObj(i -> {
|
||||||
|
try {
|
||||||
|
TimeUnit.MILLISECONDS.sleep(100);
|
||||||
|
return DriverManager.getConnection(url);
|
||||||
|
} catch (SQLException | InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
|
||||||
|
connectionList.stream().forEach(conn -> {
|
||||||
|
try (Statement stmt = conn.createStatement()) {
|
||||||
|
ResultSet rs = stmt.executeQuery("show databases");
|
||||||
|
while (rs.next()) {
|
||||||
|
|
||||||
|
}
|
||||||
|
TimeUnit.MILLISECONDS.sleep(100);
|
||||||
|
} catch (SQLException | InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
connectionList.stream().forEach(conn -> {
|
||||||
|
try {
|
||||||
|
conn.close();
|
||||||
|
TimeUnit.MILLISECONDS.sleep(100);
|
||||||
|
} catch (SQLException | InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
|
@ -173,6 +173,7 @@ typedef struct SSuperTable_S {
|
||||||
int childTblCount;
|
int childTblCount;
|
||||||
bool superTblExists; // 0: no, 1: yes
|
bool superTblExists; // 0: no, 1: yes
|
||||||
bool childTblExists; // 0: no, 1: yes
|
bool childTblExists; // 0: no, 1: yes
|
||||||
|
int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
|
||||||
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
|
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
|
||||||
char childTblPrefix[MAX_TB_NAME_SIZE];
|
char childTblPrefix[MAX_TB_NAME_SIZE];
|
||||||
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
|
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
|
||||||
|
@ -808,13 +809,14 @@ static void init_rand_data() {
|
||||||
|
|
||||||
static void printfInsertMeta() {
|
static void printfInsertMeta() {
|
||||||
printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n");
|
printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n");
|
||||||
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
|
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
|
||||||
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
|
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
|
||||||
printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
|
printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
|
||||||
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
|
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
|
||||||
printf("thread count: \033[33m%d\033[0m\n", g_Dbs.threadCount);
|
printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount);
|
||||||
|
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
|
||||||
|
|
||||||
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
|
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
|
||||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||||
printf("database[\033[33m%d\033[0m]:\n", i);
|
printf("database[\033[33m%d\033[0m]:\n", i);
|
||||||
printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName);
|
printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName);
|
||||||
|
@ -944,11 +946,12 @@ static void printfInsertMeta() {
|
||||||
|
|
||||||
static void printfInsertMetaToFile(FILE* fp) {
|
static void printfInsertMetaToFile(FILE* fp) {
|
||||||
fprintf(fp, "================ insert.json parse result START================\n");
|
fprintf(fp, "================ insert.json parse result START================\n");
|
||||||
fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
|
fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
|
||||||
fprintf(fp, "user: %s\n", g_Dbs.user);
|
fprintf(fp, "user: %s\n", g_Dbs.user);
|
||||||
fprintf(fp, "password: %s\n", g_Dbs.password);
|
fprintf(fp, "password: %s\n", g_Dbs.password);
|
||||||
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
|
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
|
||||||
fprintf(fp, "thread count: %d\n", g_Dbs.threadCount);
|
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
|
||||||
|
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
|
||||||
|
|
||||||
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
|
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
|
||||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||||
|
@ -1731,18 +1734,26 @@ static int createDatabases() {
|
||||||
|
|
||||||
void * createTable(void *sarg)
|
void * createTable(void *sarg)
|
||||||
{
|
{
|
||||||
char command[BUFFER_SIZE] = "\0";
|
|
||||||
|
|
||||||
threadInfo *winfo = (threadInfo *)sarg;
|
threadInfo *winfo = (threadInfo *)sarg;
|
||||||
SSuperTable* superTblInfo = winfo->superTblInfo;
|
SSuperTable* superTblInfo = winfo->superTblInfo;
|
||||||
|
|
||||||
int64_t lastPrintTime = taosGetTimestampMs();
|
int64_t lastPrintTime = taosGetTimestampMs();
|
||||||
|
|
||||||
|
char* buffer = calloc(superTblInfo->maxSqlLen, 1);
|
||||||
|
|
||||||
|
int len = 0;
|
||||||
|
int batchNum = 0;
|
||||||
//printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
|
//printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
|
||||||
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
|
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
|
||||||
if (0 == g_Dbs.use_metric) {
|
if (0 == g_Dbs.use_metric) {
|
||||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
|
snprintf(buffer, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
|
||||||
} else {
|
} else {
|
||||||
|
if (0 == len) {
|
||||||
|
batchNum = 0;
|
||||||
|
memset(buffer, 0, superTblInfo->maxSqlLen);
|
||||||
|
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "create table ");
|
||||||
|
}
|
||||||
|
|
||||||
char* tagsValBuf = NULL;
|
char* tagsValBuf = NULL;
|
||||||
if (0 == superTblInfo->tagSource) {
|
if (0 == superTblInfo->tagSource) {
|
||||||
tagsValBuf = generateTagVaulesForStb(superTblInfo);
|
tagsValBuf = generateTagVaulesForStb(superTblInfo);
|
||||||
|
@ -1750,13 +1761,22 @@ void * createTable(void *sarg)
|
||||||
tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount);
|
tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount);
|
||||||
}
|
}
|
||||||
if (NULL == tagsValBuf) {
|
if (NULL == tagsValBuf) {
|
||||||
|
free(buffer);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.%s tags %s;", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
|
|
||||||
|
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "if not exists %s.%s%d using %s.%s tags %s ", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
|
||||||
free(tagsValBuf);
|
free(tagsValBuf);
|
||||||
|
batchNum++;
|
||||||
|
|
||||||
|
if ((batchNum < superTblInfo->batchCreateTableNum) && ((superTblInfo->maxSqlLen - len) >= (superTblInfo->lenOfTagOfOneRow + 256))) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 != queryDbExec(winfo->taos, command, NO_INSERT_TYPE)){
|
len = 0;
|
||||||
|
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
|
||||||
|
free(buffer);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1767,6 +1787,11 @@ void * createTable(void *sarg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (0 != len) {
|
||||||
|
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(buffer);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2423,6 +2448,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
|
||||||
|
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
|
||||||
|
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
|
||||||
|
} else if (!batchCreateTbl) {
|
||||||
|
g_Dbs.db[i].superTbls[j].batchCreateTableNum = 2000;
|
||||||
|
} else {
|
||||||
|
printf("failed to read json, batch_create_tbl_num not found");
|
||||||
|
goto PARSE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
|
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
|
||||||
if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) {
|
if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) {
|
||||||
if (0 == strncasecmp(childTblExists->valuestring, "yes", 3)) {
|
if (0 == strncasecmp(childTblExists->valuestring, "yes", 3)) {
|
||||||
|
@ -3679,14 +3714,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
|
||||||
b = ntables % threads;
|
b = ntables % threads;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS* taos;
|
//TAOS* taos;
|
||||||
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
|
//if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
|
||||||
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
|
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
|
||||||
if (NULL == taos) {
|
// if (NULL == taos) {
|
||||||
printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
|
// printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
|
||||||
exit(-1);
|
// exit(-1);
|
||||||
}
|
// }
|
||||||
}
|
//}
|
||||||
|
|
||||||
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
|
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
|
||||||
if (0 != precision[0]) {
|
if (0 != precision[0]) {
|
||||||
|
@ -3719,7 +3754,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
|
||||||
t_info->start_time = start_time;
|
t_info->start_time = start_time;
|
||||||
|
|
||||||
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
|
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
|
||||||
t_info->taos = taos;
|
//t_info->taos = taos;
|
||||||
|
t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
|
||||||
|
if (NULL == t_info->taos) {
|
||||||
|
printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL));
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
t_info->taos = NULL;
|
t_info->taos = NULL;
|
||||||
#ifdef TD_LOWA_CURL
|
#ifdef TD_LOWA_CURL
|
||||||
|
@ -3754,6 +3794,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
|
||||||
threadInfo *t_info = infos + i;
|
threadInfo *t_info = infos + i;
|
||||||
|
|
||||||
tsem_destroy(&(t_info->lock_sem));
|
tsem_destroy(&(t_info->lock_sem));
|
||||||
|
taos_close(t_info->taos);
|
||||||
|
|
||||||
superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
|
superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
|
||||||
superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
|
superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
|
||||||
|
@ -3766,7 +3807,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
|
||||||
|
|
||||||
double end = getCurrentTime();
|
double end = getCurrentTime();
|
||||||
|
|
||||||
taos_close(taos);
|
//taos_close(taos);
|
||||||
|
|
||||||
free(pids);
|
free(pids);
|
||||||
free(infos);
|
free(infos);
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
||||||
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
|
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
|
||||||
#define TSDB_IVLD_FID INT_MIN
|
#define TSDB_IVLD_FID INT_MIN
|
||||||
|
#define TSDB_FILE_STATE_OK 0
|
||||||
|
#define TSDB_FILE_STATE_BAD 1
|
||||||
|
|
||||||
#define TSDB_FILE_INFO(tf) (&((tf)->info))
|
#define TSDB_FILE_INFO(tf) (&((tf)->info))
|
||||||
#define TSDB_FILE_F(tf) (&((tf)->f))
|
#define TSDB_FILE_F(tf) (&((tf)->f))
|
||||||
|
@ -31,6 +33,10 @@
|
||||||
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
|
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
|
||||||
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
|
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
|
||||||
#define TSDB_FILE_FSYNC(tf) fsync(TSDB_FILE_FD(tf))
|
#define TSDB_FILE_FSYNC(tf) fsync(TSDB_FILE_FD(tf))
|
||||||
|
#define TSDB_FILE_STATE(tf) ((tf)->state)
|
||||||
|
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
|
||||||
|
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
|
||||||
|
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
|
||||||
|
|
||||||
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
|
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
|
||||||
|
|
||||||
|
@ -47,6 +53,7 @@ typedef struct {
|
||||||
SMFInfo info;
|
SMFInfo info;
|
||||||
TFILE f;
|
TFILE f;
|
||||||
int fd;
|
int fd;
|
||||||
|
uint8_t state;
|
||||||
} SMFile;
|
} SMFile;
|
||||||
|
|
||||||
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
|
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
|
||||||
|
@ -165,6 +172,7 @@ typedef struct {
|
||||||
SDFInfo info;
|
SDFInfo info;
|
||||||
TFILE f;
|
TFILE f;
|
||||||
int fd;
|
int fd;
|
||||||
|
uint8_t state;
|
||||||
} SDFile;
|
} SDFile;
|
||||||
|
|
||||||
void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype);
|
void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype);
|
||||||
|
@ -346,4 +354,14 @@ static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid,
|
||||||
*maxKey = *minKey + days * tsMsPerDay[precision] - 1;
|
*maxKey = *minKey + days * tsMsPerDay[precision] - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
|
||||||
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
|
if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
#endif /* _TS_TSDB_FILE_H_ */
|
#endif /* _TS_TSDB_FILE_H_ */
|
|
@ -33,7 +33,7 @@ static int tsdbRollBackDFile(SDFile *pDFile);
|
||||||
void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) {
|
void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) {
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
TSDB_FILE_SET_CLOSED(pMFile);
|
TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_OK);
|
||||||
|
|
||||||
memset(&(pMFile->info), 0, sizeof(pMFile->info));
|
memset(&(pMFile->info), 0, sizeof(pMFile->info));
|
||||||
pMFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
pMFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||||
|
@ -201,6 +201,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) {
|
||||||
tsdbError("vgId:%d meta file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
|
tsdbError("vgId:%d meta file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
|
||||||
TSDB_FILE_FULL_NAME(pMFile));
|
TSDB_FILE_FULL_NAME(pMFile));
|
||||||
pRepo->state |= TSDB_STATE_BAD_META;
|
pRepo->state |= TSDB_STATE_BAD_META;
|
||||||
|
TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_BAD);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,6 +233,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) {
|
||||||
tsdbError("vgId:%d meta file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it",
|
tsdbError("vgId:%d meta file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it",
|
||||||
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), mfstat.st_size, pMFile->info.size);
|
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), mfstat.st_size, pMFile->info.size);
|
||||||
pRepo->state |= TSDB_STATE_BAD_META;
|
pRepo->state |= TSDB_STATE_BAD_META;
|
||||||
|
TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_BAD);
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -293,6 +295,8 @@ static int tsdbRollBackMFile(SMFile *pMFile) {
|
||||||
void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) {
|
void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) {
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_OK);
|
||||||
|
|
||||||
TSDB_FILE_SET_CLOSED(pDFile);
|
TSDB_FILE_SET_CLOSED(pDFile);
|
||||||
|
|
||||||
memset(&(pDFile->info), 0, sizeof(pDFile->info));
|
memset(&(pDFile->info), 0, sizeof(pDFile->info));
|
||||||
|
@ -439,6 +443,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
|
||||||
tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
|
tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
|
||||||
TSDB_FILE_FULL_NAME(pDFile));
|
TSDB_FILE_FULL_NAME(pDFile));
|
||||||
pRepo->state |= TSDB_STATE_BAD_DATA;
|
pRepo->state |= TSDB_STATE_BAD_DATA;
|
||||||
|
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -470,6 +475,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
|
||||||
tsdbError("vgId:%d data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it",
|
tsdbError("vgId:%d data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it",
|
||||||
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), dfstat.st_size, pDFile->info.size);
|
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), dfstat.st_size, pDFile->info.size);
|
||||||
pRepo->state |= TSDB_STATE_BAD_DATA;
|
pRepo->state |= TSDB_STATE_BAD_DATA;
|
||||||
|
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD);
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -191,7 +191,8 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0) {
|
if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0 ||
|
||||||
|
TSDB_FILE_IS_BAD(pLMFile)) {
|
||||||
// Local has no meta file or has a different meta file, need to copy from remote
|
// Local has no meta file or has a different meta file, need to copy from remote
|
||||||
pSynch->mfChanged = true;
|
pSynch->mfChanged = true;
|
||||||
|
|
||||||
|
@ -409,7 +410,8 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
|
||||||
pSynch->pdf != NULL ? pSynch->pdf->fid : -1);
|
pSynch->pdf != NULL ? pSynch->pdf->fid : -1);
|
||||||
pLSet = tsdbFSIterNext(&fsiter);
|
pLSet = tsdbFSIterNext(&fsiter);
|
||||||
} else {
|
} else {
|
||||||
if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf)) {
|
if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf) &&
|
||||||
|
tsdbFSetIsOk(pLSet)) {
|
||||||
// Just keep local files and notify remote not to send
|
// Just keep local files and notify remote not to send
|
||||||
tsdbInfo("vgId:%d, fileset:%d is same and no need to recv", REPO_ID(pRepo), pLSet->fid);
|
tsdbInfo("vgId:%d, fileset:%d is same and no need to recv", REPO_ID(pRepo), pLSet->fid);
|
||||||
|
|
||||||
|
|
|
@ -100,14 +100,14 @@ class TDTestRetetion:
|
||||||
tdLog.info(cmd)
|
tdLog.info(cmd)
|
||||||
tdSql.execute(cmd)
|
tdSql.execute(cmd)
|
||||||
self.queryRows=tdSql.query('select * from test')
|
self.queryRows=tdSql.query('select * from test')
|
||||||
self.checkRows(7,cmd)
|
self.checkRows(5,cmd)
|
||||||
|
|
||||||
tdLog.info("=============== step5")
|
tdLog.info("=============== step5")
|
||||||
tdDnodes.stop(1)
|
tdDnodes.stop(1)
|
||||||
tdDnodes.start(1)
|
tdDnodes.start(1)
|
||||||
cmd='select * from test where ts > now-1d'
|
cmd='select * from test where ts > now-1d'
|
||||||
self.queryRows=tdSql.query('select * from test where ts > now-1d')
|
self.queryRows=tdSql.query('select * from test where ts > now-1d')
|
||||||
self.checkRows(1,cmd)
|
self.checkRows(2,cmd)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
os.system("sudo timedatectl set-ntp true")
|
os.system("sudo timedatectl set-ntp true")
|
||||||
|
|
|
@ -99,7 +99,7 @@ python3 test.py -f query/queryFillTest.py
|
||||||
python3 test.py -f tools/taosdemoTest.py
|
python3 test.py -f tools/taosdemoTest.py
|
||||||
python3 test.py -f tools/taosdumpTest.py
|
python3 test.py -f tools/taosdumpTest.py
|
||||||
python3 test.py -f tools/lowaTest.py
|
python3 test.py -f tools/lowaTest.py
|
||||||
python3 test.py -f tools/taosdemoTest2.py
|
#python3 test.py -f tools/taosdemoTest2.py
|
||||||
|
|
||||||
# subscribe
|
# subscribe
|
||||||
python3 test.py -f subscribe/singlemeter.py
|
python3 test.py -f subscribe/singlemeter.py
|
||||||
|
|
Loading…
Reference in New Issue