From d12b77ce1486e2eb98bf77726a93ebcb0fb71d9c Mon Sep 17 00:00:00 2001 From: zyyang Date: Thu, 28 Jan 2021 18:31:12 +0800 Subject: [PATCH 01/14] change --- .../com/taosdata/jdbc/TSDBJNIConnector.java | 23 +- .../java/com/taosdata/jdbc/utils/TDNode.java | 272 ------------------ .../java/com/taosdata/jdbc/utils/TDNodes.java | 72 ----- .../com/taosdata/jdbc/utils/TaosInfo.java | 19 ++ .../taosdata/jdbc/utils/TaosInfoMBean.java | 8 + 5 files changed, 46 insertions(+), 348 deletions(-) delete mode 100644 src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNode.java delete mode 100644 src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNodes.java create mode 100644 src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java create mode 100644 src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java index f918463439..178bbc483f 100755 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java @@ -14,13 +14,19 @@ *****************************************************************************/ package com.taosdata.jdbc; +import com.taosdata.jdbc.utils.TaosInfo; + import java.sql.SQLException; import java.sql.SQLWarning; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; public class TSDBJNIConnector { private static volatile Boolean isInitialized = false; + private static AtomicInteger open_count = new AtomicInteger(); + private static AtomicInteger close_count = new AtomicInteger(); + static { System.loadLibrary("taos"); System.out.println("java.library.path:" + System.getProperty("java.library.path")); @@ -91,7 +97,8 @@ public class TSDBJNIConnector { */ public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException { if (this.taos != TSDBConstants.JNI_NULL_POINTER) { - this.closeConnectionImp(this.taos); +// this.closeConnectionImp(this.taos); + closeConnection(); this.taos = TSDBConstants.JNI_NULL_POINTER; } @@ -99,7 +106,10 @@ public class TSDBJNIConnector { if (this.taos == TSDBConstants.JNI_NULL_POINTER) { throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg(0L)), "", this.getErrCode(0l)); } - + // invoke connectImp only here + int open = open_count.incrementAndGet(); + int close = close_count.get(); + System.out.println("open_count: " + open + ", close_count: " + close + ", connection_count: " + (open - close)); return true; } @@ -244,10 +254,11 @@ public class TSDBJNIConnector { private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData); public int fetchBlock(long resultSet, TSDBResultSetBlockData blockData) { - return this.fetchBlockImp(this.taos, resultSet, blockData); + return this.fetchBlockImp(this.taos, resultSet, blockData); } - + private native int fetchBlockImp(long connection, long resultSet, TSDBResultSetBlockData blockData); + /** * Execute close operation from C to release connection pointer by JNI * @@ -262,6 +273,10 @@ public class TSDBJNIConnector { } else { throw new SQLException("Undefined error code returned by TDengine when closing a connection"); } + // invoke closeConnectionImpl only here + int open = open_count.get(); + int close = close_count.incrementAndGet(); + System.out.println("open_count: " + open + ", close_count: " + close + ", connection_count: " + (open - close)); } private native int closeConnectionImp(long connection); diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNode.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNode.java deleted file mode 100644 index 800265868d..0000000000 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNode.java +++ /dev/null @@ -1,272 +0,0 @@ -package com.taosdata.jdbc.utils; - -import java.io.BufferedReader; -import java.io.File; -import java.io.InputStreamReader; -import java.util.*; -import java.util.concurrent.TimeUnit; - -public class TDNode { - - private int index; - private int running; - private int deployed; - private boolean testCluster; - private String path; - private String cfgDir; - private String dataDir; - private String logDir; - private String cfgPath; - - public TDNode(int index) { - this.index = index; - running = 0; - deployed = 0; - testCluster = false; - } - - public void setPath(String path) { - this.path = path; - } - - public void setTestCluster(boolean testCluster) { - this.testCluster = testCluster; - } - - public void setRunning(int running) { - this.running = running; - } - - public void searchTaosd(File dir, ArrayList taosdPath) { - File[] fileList = dir.listFiles(); - - if(fileList == null || fileList.length == 0) { - return; - } - - for(File file : fileList) { - if(file.isFile()) { - if(file.getName().equals("taosd")) { - taosdPath.add(file.getAbsolutePath()); - } - } else { - searchTaosd(file, taosdPath); - } - } - } - - public void start() { - String selfPath = System.getProperty("user.dir"); - String binPath = ""; - String projDir = selfPath + "/../../../"; - - try { - ArrayList taosdPath = new ArrayList<>(); - - File dir = new File(projDir); - String realProjDir = dir.getCanonicalPath(); - dir = new File(realProjDir); - System.out.println("project Dir: " + projDir); - searchTaosd(dir, taosdPath); - - if(taosdPath.size() == 0) { - System.out.println("The project path doens't exist"); - return; - } else { - for(String p : taosdPath) { - if(!p.contains("packaging")) { - binPath = p; - } - } - } - } catch (Exception e) { - e.printStackTrace(); - } - - if(binPath.isEmpty()) { - System.out.println("taosd not found"); - return; - } else { - System.out.println("taosd found in " + binPath); - } - - if(this.deployed == 0) { - System.out.println("dnode" + index + "is not deployed"); - return; - } - - String cmd = "nohup " + binPath + " -c " + cfgDir + " > /dev/null 2>&1 & "; - System.out.println("start taosd cmd: " + cmd); - - try{ - Runtime.getRuntime().exec(cmd); - TimeUnit.SECONDS.sleep(5); - } catch (Exception e) { - e.printStackTrace(); - } - - this.running = 1; - } - - public Integer getTaosdPid() { - String cmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"; - String[] cmds = {"sh", "-c", cmd}; - try { - Process process = Runtime.getRuntime().exec(cmds); - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); - String line = null; - Integer res = null; - while((line = reader.readLine()) != null) { - if(!line.isEmpty()) { - res = Integer.valueOf(line); - break; - } - } - - return res; - } catch (Exception e) { - e.printStackTrace(); - } - return null; - } - - public void stop() { - - if (this.running != 0) { - Integer pid = null; - while((pid = getTaosdPid()) != null) { - - String killCmd = "kill -term " + pid; - String[] killCmds = {"sh", "-c", killCmd}; - try { - Runtime.getRuntime().exec(killCmds).waitFor(); - - TimeUnit.SECONDS.sleep(2); - } catch (Exception e) { - e.printStackTrace(); - } - } - - try { - for(int port = 6030; port < 6041; port ++) { - String fuserCmd = "fuser -k -n tcp " + port; - Runtime.getRuntime().exec(fuserCmd).waitFor(); - } - } catch (Exception e) { - e.printStackTrace(); - } - - this.running = 0; - System.out.println("dnode:" + this.index + " is stopped by kill -term"); - } - } - - public void startIP() { - try{ - String cmd = "sudo ifconfig lo:" + index + "192.168.0." + index + " up"; - Runtime.getRuntime().exec(cmd).waitFor(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public void stopIP() { - try{ - String cmd = "sudo ifconfig lo:" + index + "192.168.0." + index + " down"; - Runtime.getRuntime().exec(cmd).waitFor(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public void setCfgConfig(String option, String value) { - try{ - String cmd = "echo " + option + " " + value + " >> " + this.cfgPath; - String[] cmdLine = {"sh", "-c", cmd}; - Process ps = Runtime.getRuntime().exec(cmdLine); - ps.waitFor(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public String getDnodeRootDir() { - String dnodeRootDir = this.path + "/sim/psim/dnode" + this.index; - return dnodeRootDir; - } - - public String getDnodesRootDir() { - String dnodesRootDir = this.path + "/sim/psim" + this.index; - return dnodesRootDir; - } - - public void deploy() { - this.logDir = this.path + "/sim/dnode" + this.index + "/log"; - this.dataDir = this.path + "/sim/dnode" + this.index + "/data"; - this.cfgDir = this.path + "/sim/dnode" + this.index + "/cfg"; - this.cfgPath = this.path + "/sim/dnode" + this.index + "/cfg/taos.cfg"; - - try { - String cmd = "rm -rf " + this.logDir; - Runtime.getRuntime().exec(cmd).waitFor(); - - cmd = "rm -rf " + this.cfgDir; - Runtime.getRuntime().exec(cmd).waitFor(); - - cmd = "rm -rf " + this.dataDir; - Runtime.getRuntime().exec(cmd).waitFor(); - - cmd = "mkdir -p " + this.logDir; - Runtime.getRuntime().exec(cmd).waitFor(); - - cmd = "mkdir -p " + this.cfgDir; - Runtime.getRuntime().exec(cmd).waitFor(); - - cmd = "mkdir -p " + this.dataDir; - Runtime.getRuntime().exec(cmd).waitFor(); - - cmd = "touch " + this.cfgPath; - Runtime.getRuntime().exec(cmd).waitFor(); - } catch (Exception e) { - e.printStackTrace(); - } - - if(this.testCluster) { - startIP(); - setCfgConfig("masterIp", "192.168.0.1"); - setCfgConfig("secondIp", "192.168.0.2"); - setCfgConfig("publicIp", "192.168.0." + this.index); - setCfgConfig("internalIp", "192.168.0." + this.index); - setCfgConfig("privateIp", "192.168.0." + this.index); - } - setCfgConfig("dataDir", this.dataDir); - setCfgConfig("logDir", this.logDir); - setCfgConfig("numOfLogLines", "1000000/00"); - setCfgConfig("mnodeEqualVnodeNum", "0"); - setCfgConfig("walLevel", "1"); - setCfgConfig("statusInterval", "1"); - setCfgConfig("numOfMnodes", "3"); - setCfgConfig("numOfThreadsPerCore", "2.0"); - setCfgConfig("monitor", "0"); - setCfgConfig("maxVnodeConnections", "30000"); - setCfgConfig("maxMgmtConnections", "30000"); - setCfgConfig("maxMeterConnections", "30000"); - setCfgConfig("maxShellConns", "30000"); - setCfgConfig("locale", "en_US.UTF-8"); - setCfgConfig("charset", "UTF-8"); - setCfgConfig("asyncLog", "0"); - setCfgConfig("anyIp", "0"); - setCfgConfig("dDebugFlag", "135"); - setCfgConfig("mDebugFlag", "135"); - setCfgConfig("sdbDebugFlag", "135"); - setCfgConfig("rpcDebugFlag", "135"); - setCfgConfig("tmrDebugFlag", "131"); - setCfgConfig("cDebugFlag", "135"); - setCfgConfig("httpDebugFlag", "135"); - setCfgConfig("monitorDebugFlag", "135"); - setCfgConfig("udebugFlag", "135"); - setCfgConfig("jnidebugFlag", "135"); - setCfgConfig("qdebugFlag", "135"); - this.deployed = 1; - } -} \ No newline at end of file diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNodes.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNodes.java deleted file mode 100644 index efc4c53e28..0000000000 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNodes.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.taosdata.jdbc.utils; - -import java.io.File; -import java.util.*; - -public class TDNodes { - private ArrayList tdNodes; - private boolean testCluster; - - public TDNodes () { - tdNodes = new ArrayList<>(); - for(int i = 1; i < 11; i ++) { - tdNodes.add(new TDNode(i)); - } - } - - public void setTestCluster(boolean testCluster) { - this.testCluster = testCluster; - } - - public void check(int index) { - if(index < 1 || index > 10) { - System.out.println("index: " + index + " should on a scale of [1, 10]"); - return; - } - } - - public void deploy(int index) { - try { - File file = new File(System.getProperty("user.dir") + "/../../../"); - String projectRealPath = file.getCanonicalPath(); - check(index); - tdNodes.get(index - 1).setTestCluster(this.testCluster); - tdNodes.get(index - 1).setPath(projectRealPath); - tdNodes.get(index - 1).deploy(); - } catch (Exception e) { - e.printStackTrace(); - System.out.println("deploy Test Exception"); - } - } - - public void cfg(int index, String option, String value) { - check(index); - tdNodes.get(index - 1).setCfgConfig(option, value); - } - - public TDNode getTDNode(int index) { - check(index); - return tdNodes.get(index - 1); - } - - public void start(int index) { - check(index); - tdNodes.get(index - 1).start(); - } - - public void stop(int index) { - check(index); - tdNodes.get(index - 1).stop(); - } - - public void startIP(int index) { - check(index); - tdNodes.get(index - 1).startIP(); - } - - public void stopIP(int index) { - check(index); - tdNodes.get(index - 1).stopIP(); - } - -} \ No newline at end of file diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java new file mode 100644 index 0000000000..7d36568465 --- /dev/null +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java @@ -0,0 +1,19 @@ +package com.taosdata.jdbc.utils; + +import java.util.concurrent.atomic.AtomicInteger; + +public class TaosInfo implements TaosInfoMBean { + + public AtomicInteger conn = new AtomicInteger(); + public AtomicInteger stmt = new AtomicInteger(); + + @Override + public int getConnectionCount() { + return conn.get(); + } + + @Override + public int getStatementCount() { + return stmt.get(); + } +} diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java new file mode 100644 index 0000000000..791292e902 --- /dev/null +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java @@ -0,0 +1,8 @@ +package com.taosdata.jdbc.utils; + +public interface TaosInfoMBean { + + int getConnectionCount(); + + int getStatementCount(); +} From 72700141768fbb66130b59ba1aa9a54283e89606 Mon Sep 17 00:00:00 2001 From: zyyang Date: Fri, 29 Jan 2021 14:02:01 +0800 Subject: [PATCH 02/14] change --- .../com/taosdata/jdbc/TSDBJNIConnector.java | 20 +++++---- .../com/taosdata/jdbc/utils/TaosInfo.java | 44 +++++++++++++++++-- 2 files changed, 51 insertions(+), 13 deletions(-) diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java index 178bbc483f..9b387b1959 100755 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java @@ -16,20 +16,22 @@ package com.taosdata.jdbc; import com.taosdata.jdbc.utils.TaosInfo; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; import java.sql.SQLException; import java.sql.SQLWarning; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; public class TSDBJNIConnector { private static volatile Boolean isInitialized = false; - private static AtomicInteger open_count = new AtomicInteger(); - private static AtomicInteger close_count = new AtomicInteger(); + private TaosInfo taosInfo = TaosInfo.getInstance(); static { System.loadLibrary("taos"); System.out.println("java.library.path:" + System.getProperty("java.library.path")); + } /** @@ -107,9 +109,9 @@ public class TSDBJNIConnector { throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg(0L)), "", this.getErrCode(0l)); } // invoke connectImp only here - int open = open_count.incrementAndGet(); - int close = close_count.get(); - System.out.println("open_count: " + open + ", close_count: " + close + ", connection_count: " + (open - close)); + int open = taosInfo.getOpen_count().incrementAndGet(); + int close = taosInfo.getClose_count().get(); + System.out.println("open_count: " + open + ", close_count: " + close + ", connection_count: " + (taosInfo.getConnectionCount())); return true; } @@ -274,9 +276,9 @@ public class TSDBJNIConnector { throw new SQLException("Undefined error code returned by TDengine when closing a connection"); } // invoke closeConnectionImpl only here - int open = open_count.get(); - int close = close_count.incrementAndGet(); - System.out.println("open_count: " + open + ", close_count: " + close + ", connection_count: " + (open - close)); + int open = taosInfo.getOpen_count().get(); + int close = taosInfo.getClose_count().incrementAndGet(); + System.out.println("open_count: " + open + ", close_count: " + close + ", connection_count: " + (taosInfo.getConnectionCount())); } private native int closeConnectionImp(long connection); diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java index 7d36568465..83ac7ed809 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java @@ -1,19 +1,55 @@ package com.taosdata.jdbc.utils; +import javax.management.*; +import java.lang.management.ManagementFactory; import java.util.concurrent.atomic.AtomicInteger; public class TaosInfo implements TaosInfoMBean { - public AtomicInteger conn = new AtomicInteger(); - public AtomicInteger stmt = new AtomicInteger(); + private static volatile TaosInfo instance; + private AtomicInteger open_count = new AtomicInteger(); + private AtomicInteger close_count = new AtomicInteger(); + + static { + try { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + ObjectName name = new ObjectName("TaosInfoMBean:name=TaosInfo"); + server.registerMBean(TaosInfo.getInstance(), name); + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { + e.printStackTrace(); + } + } @Override public int getConnectionCount() { - return conn.get(); + return open_count.get() - close_count.get(); } @Override public int getStatementCount() { - return stmt.get(); + return 0; } + + public AtomicInteger getOpen_count() { + return open_count; + } + + public AtomicInteger getClose_count() { + return close_count; + } + + private TaosInfo() { + } + + public static TaosInfo getInstance() { + if (instance == null) { + synchronized (TaosInfo.class) { + if (instance == null) { + instance = new TaosInfo(); + } + } + } + return instance; + } + } From 89a0add293077db2338393f2553a411c70f16bb0 Mon Sep 17 00:00:00 2001 From: zyyang Date: Fri, 29 Jan 2021 14:17:28 +0800 Subject: [PATCH 03/14] change --- src/connector/jdbc/pom.xml | 1 + .../jdbc/cases/TaosInfoMonitorTest.java | 37 +++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml index 4756ac555f..4c1ac0f06f 100755 --- a/src/connector/jdbc/pom.xml +++ b/src/connector/jdbc/pom.xml @@ -127,6 +127,7 @@ **/AppMemoryLeakTest.java + **/TaosInfoMonitorTest.java **/FailOverTest.java true diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java new file mode 100644 index 0000000000..3c77803c55 --- /dev/null +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java @@ -0,0 +1,37 @@ +package com.taosdata.jdbc.cases; + +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class TaosInfoMonitorTest { + + @Test + public void testCreateTooManyConnection() throws ClassNotFoundException, SQLException, InterruptedException { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + int conCnt = 0; + final String url = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata"; + + List connectionList = IntStream.range(0, 100).mapToObj(i -> { + try { + return DriverManager.getConnection(url); + } catch (SQLException e) { + e.printStackTrace(); + } + return null; + }).collect(Collectors.toList()); + connectionList.stream().forEach(conn -> { + try { + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + }); + } +} From 89ab10f9ab40421b12def83b544c88a6ded458a1 Mon Sep 17 00:00:00 2001 From: zyyang Date: Fri, 29 Jan 2021 14:19:13 +0800 Subject: [PATCH 04/14] change --- .../java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java index 3c77803c55..e8b0b9d70d 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java @@ -20,8 +20,9 @@ public class TaosInfoMonitorTest { List connectionList = IntStream.range(0, 100).mapToObj(i -> { try { + TimeUnit.SECONDS.sleep(1); return DriverManager.getConnection(url); - } catch (SQLException e) { + } catch (SQLException | InterruptedException e) { e.printStackTrace(); } return null; @@ -29,7 +30,8 @@ public class TaosInfoMonitorTest { connectionList.stream().forEach(conn -> { try { conn.close(); - } catch (SQLException e) { + TimeUnit.SECONDS.sleep(1); + } catch (SQLException | InterruptedException e) { e.printStackTrace(); } }); From b5eebd099caa4bd6d69baceb06d791c7675b8aea Mon Sep 17 00:00:00 2001 From: zyyang Date: Fri, 29 Jan 2021 15:24:16 +0800 Subject: [PATCH 05/14] change --- src/connector/jdbc/pom.xml | 3 +++ .../java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml index 4c1ac0f06f..5ca5e96a84 100755 --- a/src/connector/jdbc/pom.xml +++ b/src/connector/jdbc/pom.xml @@ -36,6 +36,7 @@ 3.6.0 1.1.2 3.5 + @@ -122,6 +123,8 @@ maven-surefire-plugin 2.12.4 + pretest + ${maven.test.jvmargs} **/*Test.java diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java index e8b0b9d70d..3dce0d69a6 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java @@ -13,9 +13,8 @@ import java.util.stream.IntStream; public class TaosInfoMonitorTest { @Test - public void testCreateTooManyConnection() throws ClassNotFoundException, SQLException, InterruptedException { + public void testCreateTooManyConnection() throws ClassNotFoundException { Class.forName("com.taosdata.jdbc.TSDBDriver"); - int conCnt = 0; final String url = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata"; List connectionList = IntStream.range(0, 100).mapToObj(i -> { @@ -27,6 +26,7 @@ public class TaosInfoMonitorTest { } return null; }).collect(Collectors.toList()); + connectionList.stream().forEach(conn -> { try { conn.close(); From b0c7a36e7370ac92a1b514d378ab502168ac310a Mon Sep 17 00:00:00 2001 From: zyyang Date: Mon, 1 Feb 2021 13:02:12 +0800 Subject: [PATCH 06/14] change --- src/connector/jdbc/pom.xml | 2 +- .../com/taosdata/jdbc/TSDBJNIConnector.java | 13 ++----- .../java/com/taosdata/jdbc/TSDBStatement.java | 6 ++- .../com/taosdata/jdbc/utils/TaosInfo.java | 39 ++++++++++++++----- .../taosdata/jdbc/utils/TaosInfoMBean.java | 9 ++++- 5 files changed, 44 insertions(+), 25 deletions(-) diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml index 5ca5e96a84..0626bcf1fb 100755 --- a/src/connector/jdbc/pom.xml +++ b/src/connector/jdbc/pom.xml @@ -123,7 +123,7 @@ maven-surefire-plugin 2.12.4 - pretest + pertest ${maven.test.jvmargs} **/*Test.java diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java index 9b387b1959..349a02fb37 100755 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java @@ -16,9 +16,6 @@ package com.taosdata.jdbc; import com.taosdata.jdbc.utils.TaosInfo; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import java.lang.management.ManagementFactory; import java.sql.SQLException; import java.sql.SQLWarning; import java.util.List; @@ -31,7 +28,6 @@ public class TSDBJNIConnector { static { System.loadLibrary("taos"); System.out.println("java.library.path:" + System.getProperty("java.library.path")); - } /** @@ -109,9 +105,7 @@ public class TSDBJNIConnector { throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg(0L)), "", this.getErrCode(0l)); } // invoke connectImp only here - int open = taosInfo.getOpen_count().incrementAndGet(); - int close = taosInfo.getClose_count().get(); - System.out.println("open_count: " + open + ", close_count: " + close + ", connection_count: " + (taosInfo.getConnectionCount())); + taosInfo.conn_open_increment(); return true; } @@ -132,6 +126,7 @@ public class TSDBJNIConnector { Long pSql = 0l; try { pSql = this.executeQueryImp(sql.getBytes(TaosGlobalConfig.getCharset()), this.taos); + taosInfo.stmt_count_increment(); } catch (Exception e) { e.printStackTrace(); this.freeResultSetImp(this.taos, pSql); @@ -276,9 +271,7 @@ public class TSDBJNIConnector { throw new SQLException("Undefined error code returned by TDengine when closing a connection"); } // invoke closeConnectionImpl only here - int open = taosInfo.getOpen_count().get(); - int close = taosInfo.getClose_count().incrementAndGet(); - System.out.println("open_count: " + open + ", close_count: " + close + ", connection_count: " + (taosInfo.getConnectionCount())); + taosInfo.connect_close_increment(); } private native int closeConnectionImp(long connection); diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java index e7317b8e1d..402d114215 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java @@ -14,13 +14,15 @@ *****************************************************************************/ package com.taosdata.jdbc; +import com.taosdata.jdbc.utils.TaosInfo; + import java.sql.*; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; public class TSDBStatement implements Statement { - private TSDBJNIConnector connector = null; + private TSDBJNIConnector connector; + private TaosInfo taosInfo = TaosInfo.getInstance(); /** * To store batched commands diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java index 83ac7ed809..69a90fe3b3 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java @@ -3,12 +3,14 @@ package com.taosdata.jdbc.utils; import javax.management.*; import java.lang.management.ManagementFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class TaosInfo implements TaosInfoMBean { private static volatile TaosInfo instance; - private AtomicInteger open_count = new AtomicInteger(); - private AtomicInteger close_count = new AtomicInteger(); + private AtomicLong connect_open = new AtomicLong(); + private AtomicLong connect_close = new AtomicLong(); + private AtomicLong statement_count = new AtomicLong(); static { try { @@ -21,23 +23,40 @@ public class TaosInfo implements TaosInfoMBean { } @Override - public int getConnectionCount() { - return open_count.get() - close_count.get(); + public long getConnect_open() { + return connect_open.get(); } @Override - public int getStatementCount() { - return 0; + public long getConnect_close() { + return connect_close.get(); } - public AtomicInteger getOpen_count() { - return open_count; + @Override + public long getConnect_active() { + return connect_open.get() - connect_close.get(); } - public AtomicInteger getClose_count() { - return close_count; + @Override + public long getStatement_count() { + return statement_count.get(); } + /*******************************************************/ + + public void conn_open_increment() { + connect_open.incrementAndGet(); + } + + public void connect_close_increment() { + connect_close.incrementAndGet(); + } + + public void stmt_count_increment() { + statement_count.incrementAndGet(); + } + + /********************************************************************************/ private TaosInfo() { } diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java index 791292e902..e16f41b2f5 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java @@ -2,7 +2,12 @@ package com.taosdata.jdbc.utils; public interface TaosInfoMBean { - int getConnectionCount(); + long getConnect_open(); + + long getConnect_close(); + + long getConnect_active(); + + long getStatement_count(); - int getStatementCount(); } From f2ba1f79b847eb55518f0f1c763ee2ce0978ac61 Mon Sep 17 00:00:00 2001 From: zyyang Date: Mon, 1 Feb 2021 13:08:06 +0800 Subject: [PATCH 07/14] change --- .../taosdata/jdbc/cases/TaosInfoMonitorTest.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java index 3dce0d69a6..2ab4045e27 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java @@ -2,9 +2,7 @@ package com.taosdata.jdbc.cases; import org.junit.Test; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; +import java.sql.*; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -27,6 +25,18 @@ public class TaosInfoMonitorTest { return null; }).collect(Collectors.toList()); + connectionList.stream().forEach(conn -> { + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("show databases"); + while (rs.next()) { + + } + TimeUnit.MILLISECONDS.sleep(50); + } catch (SQLException | InterruptedException e) { + e.printStackTrace(); + } + }); + connectionList.stream().forEach(conn -> { try { conn.close(); From e396b608ae84479d9f329517c79cbd0e4ca974fe Mon Sep 17 00:00:00 2001 From: zyyang Date: Mon, 1 Feb 2021 13:21:30 +0800 Subject: [PATCH 08/14] change --- .../src/main/java/com/taosdata/jdbc/utils/TaosInfo.java | 2 +- .../java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java index 69a90fe3b3..ee1364ce21 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java @@ -2,7 +2,6 @@ package com.taosdata.jdbc.utils; import javax.management.*; import java.lang.management.ManagementFactory; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class TaosInfo implements TaosInfoMBean { @@ -17,6 +16,7 @@ public class TaosInfo implements TaosInfoMBean { MBeanServer server = ManagementFactory.getPlatformMBeanServer(); ObjectName name = new ObjectName("TaosInfoMBean:name=TaosInfo"); server.registerMBean(TaosInfo.getInstance(), name); + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { e.printStackTrace(); } diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java index 2ab4045e27..e9e36e20c4 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java @@ -17,7 +17,7 @@ public class TaosInfoMonitorTest { List connectionList = IntStream.range(0, 100).mapToObj(i -> { try { - TimeUnit.SECONDS.sleep(1); + TimeUnit.MILLISECONDS.sleep(100); return DriverManager.getConnection(url); } catch (SQLException | InterruptedException e) { e.printStackTrace(); @@ -31,7 +31,7 @@ public class TaosInfoMonitorTest { while (rs.next()) { } - TimeUnit.MILLISECONDS.sleep(50); + TimeUnit.MILLISECONDS.sleep(100); } catch (SQLException | InterruptedException e) { e.printStackTrace(); } @@ -40,7 +40,7 @@ public class TaosInfoMonitorTest { connectionList.stream().forEach(conn -> { try { conn.close(); - TimeUnit.SECONDS.sleep(1); + TimeUnit.MILLISECONDS.sleep(100); } catch (SQLException | InterruptedException e) { e.printStackTrace(); } From 288dd3e47b319b9315ddf4f5ae1d1605c2ef056b Mon Sep 17 00:00:00 2001 From: zyyang Date: Tue, 2 Feb 2021 13:47:49 +0800 Subject: [PATCH 09/14] change --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 536cbe73a2..3119b50319 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -45,7 +45,7 @@ def pre_test(){ git pull git fetch origin +refs/pull/${CHANGE_ID}/merge git checkout -qf FETCH_HEAD - git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|//src//connector|Jenkinsfile' || exit 0 + git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|.*src/connector|Jenkinsfile' || exit 0 cd ${WK} git reset --hard HEAD~10 git checkout develop From 751ac814aaf9f182fd649a98f86b711732a86bf1 Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Tue, 2 Feb 2021 14:31:31 +0800 Subject: [PATCH 10/14] remove taosdemoTest2.py --- tests/pytest/pytest_3.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pytest/pytest_3.sh b/tests/pytest/pytest_3.sh index a64c144ffd..5e59bb879b 100755 --- a/tests/pytest/pytest_3.sh +++ b/tests/pytest/pytest_3.sh @@ -99,7 +99,7 @@ python3 test.py -f query/queryFillTest.py python3 test.py -f tools/taosdemoTest.py python3 test.py -f tools/taosdumpTest.py python3 test.py -f tools/lowaTest.py -python3 test.py -f tools/taosdemoTest2.py +#python3 test.py -f tools/taosdemoTest2.py # subscribe python3 test.py -f subscribe/singlemeter.py From dee48f94a904699f5fb35cf2582f50b17fd7b801 Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Tue, 2 Feb 2021 14:49:30 +0800 Subject: [PATCH 11/14] fix case error --- tests/pytest/insert/retentionpolicy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pytest/insert/retentionpolicy.py b/tests/pytest/insert/retentionpolicy.py index 0188ee22c5..e0446113d6 100644 --- a/tests/pytest/insert/retentionpolicy.py +++ b/tests/pytest/insert/retentionpolicy.py @@ -100,14 +100,14 @@ class TDTestRetetion: tdLog.info(cmd) tdSql.execute(cmd) self.queryRows=tdSql.query('select * from test') - self.checkRows(7,cmd) + self.checkRows(5,cmd) tdLog.info("=============== step5") tdDnodes.stop(1) tdDnodes.start(1) cmd='select * from test where ts > now-1d' self.queryRows=tdSql.query('select * from test where ts > now-1d') - self.checkRows(1,cmd) + self.checkRows(2,cmd) def stop(self): os.system("sudo timedatectl set-ntp true") From 95c801e613ee7524594d7b88966768ad6517257f Mon Sep 17 00:00:00 2001 From: Hui Li Date: Tue, 2 Feb 2021 15:31:17 +0800 Subject: [PATCH 12/14] [TD-2735]support batch create table --- src/kit/taosdemox/taosdemox.c | 99 +++++++++++++++++++++++++---------- 1 file changed, 70 insertions(+), 29 deletions(-) diff --git a/src/kit/taosdemox/taosdemox.c b/src/kit/taosdemox/taosdemox.c index a6d962df55..96b65d402f 100644 --- a/src/kit/taosdemox/taosdemox.c +++ b/src/kit/taosdemox/taosdemox.c @@ -173,6 +173,7 @@ typedef struct SSuperTable_S { int childTblCount; bool superTblExists; // 0: no, 1: yes bool childTblExists; // 0: no, 1: yes + int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table char childTblPrefix[MAX_TB_NAME_SIZE]; char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample @@ -808,13 +809,14 @@ static void init_rand_data() { static void printfInsertMeta() { printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n"); - printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); - printf("user: \033[33m%s\033[0m\n", g_Dbs.user); - printf("password: \033[33m%s\033[0m\n", g_Dbs.password); - printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile); - printf("thread count: \033[33m%d\033[0m\n", g_Dbs.threadCount); + printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); + printf("user: \033[33m%s\033[0m\n", g_Dbs.user); + printf("password: \033[33m%s\033[0m\n", g_Dbs.password); + printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile); + printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount); + printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl); - printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount); + printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount); for (int i = 0; i < g_Dbs.dbCount; i++) { printf("database[\033[33m%d\033[0m]:\n", i); printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName); @@ -944,11 +946,12 @@ static void printfInsertMeta() { static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, "================ insert.json parse result START================\n"); - fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port); - fprintf(fp, "user: %s\n", g_Dbs.user); - fprintf(fp, "password: %s\n", g_Dbs.password); - fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); - fprintf(fp, "thread count: %d\n", g_Dbs.threadCount); + fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port); + fprintf(fp, "user: %s\n", g_Dbs.user); + fprintf(fp, "password: %s\n", g_Dbs.password); + fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); + fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount); + fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl) fprintf(fp, "database count: %d\n", g_Dbs.dbCount); for (int i = 0; i < g_Dbs.dbCount; i++) { @@ -1730,19 +1733,27 @@ static int createDatabases() { void * createTable(void *sarg) -{ - char command[BUFFER_SIZE] = "\0"; - +{ threadInfo *winfo = (threadInfo *)sarg; SSuperTable* superTblInfo = winfo->superTblInfo; int64_t lastPrintTime = taosGetTimestampMs(); + char* buffer = calloc(superTblInfo->maxSqlLen, 1); + + int len = 0; + int batchNum = 0; //printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { if (0 == g_Dbs.use_metric) { - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable); + snprintf(buffer, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable); } else { + if (0 == len) { + batchNum = 0; + memset(buffer, 0, superTblInfo->maxSqlLen); + len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "create table "); + } + char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo); @@ -1750,13 +1761,22 @@ void * createTable(void *sarg) tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount); } if (NULL == tagsValBuf) { + free(buffer); return NULL; } - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.%s tags %s;", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf); + + len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "if not exists %s.%s%d using %s.%s tags %s ", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf); free(tagsValBuf); + batchNum++; + + if ((batchNum < superTblInfo->batchCreateTableNum) && ((superTblInfo->maxSqlLen - len) >= (superTblInfo->lenOfTagOfOneRow + 256))) { + continue; + } } - - if (0 != queryDbExec(winfo->taos, command, NO_INSERT_TYPE)){ + + len = 0; + if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ + free(buffer); return NULL; } @@ -1766,7 +1786,12 @@ void * createTable(void *sarg) lastPrintTime = currentPrintTime; } } - + + if (0 != len) { + (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); + } + + free(buffer); return NULL; } @@ -2422,6 +2447,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, auto_create_table not found"); goto PARSE_OVER; } + + cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num"); + if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) { + g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint; + } else if (!batchCreateTbl) { + g_Dbs.db[i].superTbls[j].batchCreateTableNum = 2000; + } else { + printf("failed to read json, batch_create_tbl_num not found"); + goto PARSE_OVER; + } cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) { @@ -3679,14 +3714,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu b = ntables % threads; } - TAOS* taos; - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { - taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - if (NULL == taos) { - printf("connect to server fail, reason: %s\n", taos_errstr(NULL)); - exit(-1); - } - } + //TAOS* taos; + //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { + // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); + // if (NULL == taos) { + // printf("connect to server fail, reason: %s\n", taos_errstr(NULL)); + // exit(-1); + // } + //} int32_t timePrec = TSDB_TIME_PRECISION_MILLI; if (0 != precision[0]) { @@ -3719,7 +3754,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu t_info->start_time = start_time; if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { - t_info->taos = taos; + //t_info->taos = taos; + t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); + if (NULL == t_info->taos) { + printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL)); + exit(-1); + } } else { t_info->taos = NULL; #ifdef TD_LOWA_CURL @@ -3754,6 +3794,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu threadInfo *t_info = infos + i; tsem_destroy(&(t_info->lock_sem)); + taos_close(t_info->taos); superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalRowsInserted += t_info->totalRowsInserted; @@ -3766,7 +3807,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu double end = getCurrentTime(); - taos_close(taos); + //taos_close(taos); free(pids); free(infos); From 84d02b37158032641074be3d7d33bcfb86d1fcaa Mon Sep 17 00:00:00 2001 From: Hui Li Date: Tue, 2 Feb 2021 15:34:52 +0800 Subject: [PATCH 13/14] [TD-2735]support batch create table --- src/kit/taosdemox/taosdemox.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kit/taosdemox/taosdemox.c b/src/kit/taosdemox/taosdemox.c index 96b65d402f..f2f7aaaf5d 100644 --- a/src/kit/taosdemox/taosdemox.c +++ b/src/kit/taosdemox/taosdemox.c @@ -951,7 +951,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, "password: %s\n", g_Dbs.password); fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount); - fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl) + fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl); fprintf(fp, "database count: %d\n", g_Dbs.dbCount); for (int i = 0; i < g_Dbs.dbCount; i++) { From 10d5d9dd7b192dbd54ba9ef6909dcedc8745f35b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 2 Feb 2021 08:12:26 +0000 Subject: [PATCH 14/14] add file state check --- src/tsdb/inc/tsdbFile.h | 18 ++++++++++++++++++ src/tsdb/src/tsdbFile.c | 8 +++++++- src/tsdb/src/tsdbSync.c | 6 ++++-- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 40d974d486..f1e2422e45 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -20,6 +20,8 @@ #define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF #define TSDB_IVLD_FID INT_MIN +#define TSDB_FILE_STATE_OK 0 +#define TSDB_FILE_STATE_BAD 1 #define TSDB_FILE_INFO(tf) (&((tf)->info)) #define TSDB_FILE_F(tf) (&((tf)->f)) @@ -31,6 +33,10 @@ #define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf)) #define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf)) #define TSDB_FILE_FSYNC(tf) fsync(TSDB_FILE_FD(tf)) +#define TSDB_FILE_STATE(tf) ((tf)->state) +#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) +#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) +#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T; @@ -47,6 +53,7 @@ typedef struct { SMFInfo info; TFILE f; int fd; + uint8_t state; } SMFile; void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver); @@ -165,6 +172,7 @@ typedef struct { SDFInfo info; TFILE f; int fd; + uint8_t state; } SDFile; void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype); @@ -346,4 +354,14 @@ static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, *maxKey = *minKey + days * tsMsPerDay[precision] - 1; } +static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) { + return false; + } + } + + return true; +} + #endif /* _TS_TSDB_FILE_H_ */ \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index e0e09b5deb..8124a0e3b5 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -33,7 +33,7 @@ static int tsdbRollBackDFile(SDFile *pDFile); void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) { char fname[TSDB_FILENAME_LEN]; - TSDB_FILE_SET_CLOSED(pMFile); + TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_OK); memset(&(pMFile->info), 0, sizeof(pMFile->info)); pMFile->info.magic = TSDB_FILE_INIT_MAGIC; @@ -201,6 +201,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) { tsdbError("vgId:%d meta file %s not exit, report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile)); pRepo->state |= TSDB_STATE_BAD_META; + TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_BAD); return 0; } @@ -232,6 +233,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) { tsdbError("vgId:%d meta file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), mfstat.st_size, pMFile->info.size); pRepo->state |= TSDB_STATE_BAD_META; + TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_BAD); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return 0; } else { @@ -293,6 +295,8 @@ static int tsdbRollBackMFile(SMFile *pMFile) { void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) { char fname[TSDB_FILENAME_LEN]; + TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_OK); + TSDB_FILE_SET_CLOSED(pDFile); memset(&(pDFile->info), 0, sizeof(pDFile->info)); @@ -439,6 +443,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile)); pRepo->state |= TSDB_STATE_BAD_DATA; + TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD); return 0; } @@ -470,6 +475,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { tsdbError("vgId:%d data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), dfstat.st_size, pDFile->info.size); pRepo->state |= TSDB_STATE_BAD_DATA; + TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return 0; } else { diff --git a/src/tsdb/src/tsdbSync.c b/src/tsdb/src/tsdbSync.c index bae4637d77..da337c7280 100644 --- a/src/tsdb/src/tsdbSync.c +++ b/src/tsdb/src/tsdbSync.c @@ -191,7 +191,8 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) { return 0; } - if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0) { + if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0 || + TSDB_FILE_IS_BAD(pLMFile)) { // Local has no meta file or has a different meta file, need to copy from remote pSynch->mfChanged = true; @@ -409,7 +410,8 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { pSynch->pdf != NULL ? pSynch->pdf->fid : -1); pLSet = tsdbFSIterNext(&fsiter); } else { - if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf)) { + if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf) && + tsdbFSetIsOk(pLSet)) { // Just keep local files and notify remote not to send tsdbInfo("vgId:%d, fileset:%d is same and no need to recv", REPO_ID(pRepo), pLSet->fid);