diff --git a/Jenkinsfile b/Jenkinsfile
index 536cbe73a2..3119b50319 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -45,7 +45,7 @@ def pre_test(){
git pull
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
- git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|//src//connector|Jenkinsfile' || exit 0
+ git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|.*src/connector|Jenkinsfile' || exit 0
cd ${WK}
git reset --hard HEAD~10
git checkout develop
diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml
index 4756ac555f..0626bcf1fb 100755
--- a/src/connector/jdbc/pom.xml
+++ b/src/connector/jdbc/pom.xml
@@ -36,6 +36,7 @@
3.6.0
1.1.2
3.5
+
@@ -122,11 +123,14 @@
maven-surefire-plugin
2.12.4
+ pertest
+ ${maven.test.jvmargs}
**/*Test.java
**/AppMemoryLeakTest.java
+ **/TaosInfoMonitorTest.java
**/FailOverTest.java
true
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
index f918463439..349a02fb37 100755
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
@@ -14,6 +14,8 @@
*****************************************************************************/
package com.taosdata.jdbc;
+import com.taosdata.jdbc.utils.TaosInfo;
+
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.List;
@@ -21,6 +23,8 @@ import java.util.List;
public class TSDBJNIConnector {
private static volatile Boolean isInitialized = false;
+ private TaosInfo taosInfo = TaosInfo.getInstance();
+
static {
System.loadLibrary("taos");
System.out.println("java.library.path:" + System.getProperty("java.library.path"));
@@ -91,7 +95,8 @@ public class TSDBJNIConnector {
*/
public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException {
if (this.taos != TSDBConstants.JNI_NULL_POINTER) {
- this.closeConnectionImp(this.taos);
+// this.closeConnectionImp(this.taos);
+ closeConnection();
this.taos = TSDBConstants.JNI_NULL_POINTER;
}
@@ -99,7 +104,8 @@ public class TSDBJNIConnector {
if (this.taos == TSDBConstants.JNI_NULL_POINTER) {
throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg(0L)), "", this.getErrCode(0l));
}
-
+ // invoke connectImp only here
+ taosInfo.conn_open_increment();
return true;
}
@@ -120,6 +126,7 @@ public class TSDBJNIConnector {
Long pSql = 0l;
try {
pSql = this.executeQueryImp(sql.getBytes(TaosGlobalConfig.getCharset()), this.taos);
+ taosInfo.stmt_count_increment();
} catch (Exception e) {
e.printStackTrace();
this.freeResultSetImp(this.taos, pSql);
@@ -244,10 +251,11 @@ public class TSDBJNIConnector {
private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData);
public int fetchBlock(long resultSet, TSDBResultSetBlockData blockData) {
- return this.fetchBlockImp(this.taos, resultSet, blockData);
+ return this.fetchBlockImp(this.taos, resultSet, blockData);
}
-
+
private native int fetchBlockImp(long connection, long resultSet, TSDBResultSetBlockData blockData);
+
/**
* Execute close operation from C to release connection pointer by JNI
*
@@ -262,6 +270,8 @@ public class TSDBJNIConnector {
} else {
throw new SQLException("Undefined error code returned by TDengine when closing a connection");
}
+ // invoke closeConnectionImpl only here
+ taosInfo.connect_close_increment();
}
private native int closeConnectionImp(long connection);
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java
index e7317b8e1d..402d114215 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java
@@ -14,13 +14,15 @@
*****************************************************************************/
package com.taosdata.jdbc;
+import com.taosdata.jdbc.utils.TaosInfo;
+
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
public class TSDBStatement implements Statement {
- private TSDBJNIConnector connector = null;
+ private TSDBJNIConnector connector;
+ private TaosInfo taosInfo = TaosInfo.getInstance();
/**
* To store batched commands
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNode.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNode.java
deleted file mode 100644
index 800265868d..0000000000
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNode.java
+++ /dev/null
@@ -1,272 +0,0 @@
-package com.taosdata.jdbc.utils;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.InputStreamReader;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-public class TDNode {
-
- private int index;
- private int running;
- private int deployed;
- private boolean testCluster;
- private String path;
- private String cfgDir;
- private String dataDir;
- private String logDir;
- private String cfgPath;
-
- public TDNode(int index) {
- this.index = index;
- running = 0;
- deployed = 0;
- testCluster = false;
- }
-
- public void setPath(String path) {
- this.path = path;
- }
-
- public void setTestCluster(boolean testCluster) {
- this.testCluster = testCluster;
- }
-
- public void setRunning(int running) {
- this.running = running;
- }
-
- public void searchTaosd(File dir, ArrayList taosdPath) {
- File[] fileList = dir.listFiles();
-
- if(fileList == null || fileList.length == 0) {
- return;
- }
-
- for(File file : fileList) {
- if(file.isFile()) {
- if(file.getName().equals("taosd")) {
- taosdPath.add(file.getAbsolutePath());
- }
- } else {
- searchTaosd(file, taosdPath);
- }
- }
- }
-
- public void start() {
- String selfPath = System.getProperty("user.dir");
- String binPath = "";
- String projDir = selfPath + "/../../../";
-
- try {
- ArrayList taosdPath = new ArrayList<>();
-
- File dir = new File(projDir);
- String realProjDir = dir.getCanonicalPath();
- dir = new File(realProjDir);
- System.out.println("project Dir: " + projDir);
- searchTaosd(dir, taosdPath);
-
- if(taosdPath.size() == 0) {
- System.out.println("The project path doens't exist");
- return;
- } else {
- for(String p : taosdPath) {
- if(!p.contains("packaging")) {
- binPath = p;
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- if(binPath.isEmpty()) {
- System.out.println("taosd not found");
- return;
- } else {
- System.out.println("taosd found in " + binPath);
- }
-
- if(this.deployed == 0) {
- System.out.println("dnode" + index + "is not deployed");
- return;
- }
-
- String cmd = "nohup " + binPath + " -c " + cfgDir + " > /dev/null 2>&1 & ";
- System.out.println("start taosd cmd: " + cmd);
-
- try{
- Runtime.getRuntime().exec(cmd);
- TimeUnit.SECONDS.sleep(5);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- this.running = 1;
- }
-
- public Integer getTaosdPid() {
- String cmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'";
- String[] cmds = {"sh", "-c", cmd};
- try {
- Process process = Runtime.getRuntime().exec(cmds);
- BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
- String line = null;
- Integer res = null;
- while((line = reader.readLine()) != null) {
- if(!line.isEmpty()) {
- res = Integer.valueOf(line);
- break;
- }
- }
-
- return res;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- public void stop() {
-
- if (this.running != 0) {
- Integer pid = null;
- while((pid = getTaosdPid()) != null) {
-
- String killCmd = "kill -term " + pid;
- String[] killCmds = {"sh", "-c", killCmd};
- try {
- Runtime.getRuntime().exec(killCmds).waitFor();
-
- TimeUnit.SECONDS.sleep(2);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- try {
- for(int port = 6030; port < 6041; port ++) {
- String fuserCmd = "fuser -k -n tcp " + port;
- Runtime.getRuntime().exec(fuserCmd).waitFor();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- this.running = 0;
- System.out.println("dnode:" + this.index + " is stopped by kill -term");
- }
- }
-
- public void startIP() {
- try{
- String cmd = "sudo ifconfig lo:" + index + "192.168.0." + index + " up";
- Runtime.getRuntime().exec(cmd).waitFor();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public void stopIP() {
- try{
- String cmd = "sudo ifconfig lo:" + index + "192.168.0." + index + " down";
- Runtime.getRuntime().exec(cmd).waitFor();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public void setCfgConfig(String option, String value) {
- try{
- String cmd = "echo " + option + " " + value + " >> " + this.cfgPath;
- String[] cmdLine = {"sh", "-c", cmd};
- Process ps = Runtime.getRuntime().exec(cmdLine);
- ps.waitFor();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public String getDnodeRootDir() {
- String dnodeRootDir = this.path + "/sim/psim/dnode" + this.index;
- return dnodeRootDir;
- }
-
- public String getDnodesRootDir() {
- String dnodesRootDir = this.path + "/sim/psim" + this.index;
- return dnodesRootDir;
- }
-
- public void deploy() {
- this.logDir = this.path + "/sim/dnode" + this.index + "/log";
- this.dataDir = this.path + "/sim/dnode" + this.index + "/data";
- this.cfgDir = this.path + "/sim/dnode" + this.index + "/cfg";
- this.cfgPath = this.path + "/sim/dnode" + this.index + "/cfg/taos.cfg";
-
- try {
- String cmd = "rm -rf " + this.logDir;
- Runtime.getRuntime().exec(cmd).waitFor();
-
- cmd = "rm -rf " + this.cfgDir;
- Runtime.getRuntime().exec(cmd).waitFor();
-
- cmd = "rm -rf " + this.dataDir;
- Runtime.getRuntime().exec(cmd).waitFor();
-
- cmd = "mkdir -p " + this.logDir;
- Runtime.getRuntime().exec(cmd).waitFor();
-
- cmd = "mkdir -p " + this.cfgDir;
- Runtime.getRuntime().exec(cmd).waitFor();
-
- cmd = "mkdir -p " + this.dataDir;
- Runtime.getRuntime().exec(cmd).waitFor();
-
- cmd = "touch " + this.cfgPath;
- Runtime.getRuntime().exec(cmd).waitFor();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- if(this.testCluster) {
- startIP();
- setCfgConfig("masterIp", "192.168.0.1");
- setCfgConfig("secondIp", "192.168.0.2");
- setCfgConfig("publicIp", "192.168.0." + this.index);
- setCfgConfig("internalIp", "192.168.0." + this.index);
- setCfgConfig("privateIp", "192.168.0." + this.index);
- }
- setCfgConfig("dataDir", this.dataDir);
- setCfgConfig("logDir", this.logDir);
- setCfgConfig("numOfLogLines", "1000000/00");
- setCfgConfig("mnodeEqualVnodeNum", "0");
- setCfgConfig("walLevel", "1");
- setCfgConfig("statusInterval", "1");
- setCfgConfig("numOfMnodes", "3");
- setCfgConfig("numOfThreadsPerCore", "2.0");
- setCfgConfig("monitor", "0");
- setCfgConfig("maxVnodeConnections", "30000");
- setCfgConfig("maxMgmtConnections", "30000");
- setCfgConfig("maxMeterConnections", "30000");
- setCfgConfig("maxShellConns", "30000");
- setCfgConfig("locale", "en_US.UTF-8");
- setCfgConfig("charset", "UTF-8");
- setCfgConfig("asyncLog", "0");
- setCfgConfig("anyIp", "0");
- setCfgConfig("dDebugFlag", "135");
- setCfgConfig("mDebugFlag", "135");
- setCfgConfig("sdbDebugFlag", "135");
- setCfgConfig("rpcDebugFlag", "135");
- setCfgConfig("tmrDebugFlag", "131");
- setCfgConfig("cDebugFlag", "135");
- setCfgConfig("httpDebugFlag", "135");
- setCfgConfig("monitorDebugFlag", "135");
- setCfgConfig("udebugFlag", "135");
- setCfgConfig("jnidebugFlag", "135");
- setCfgConfig("qdebugFlag", "135");
- this.deployed = 1;
- }
-}
\ No newline at end of file
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNodes.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNodes.java
deleted file mode 100644
index efc4c53e28..0000000000
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TDNodes.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.taosdata.jdbc.utils;
-
-import java.io.File;
-import java.util.*;
-
-public class TDNodes {
- private ArrayList tdNodes;
- private boolean testCluster;
-
- public TDNodes () {
- tdNodes = new ArrayList<>();
- for(int i = 1; i < 11; i ++) {
- tdNodes.add(new TDNode(i));
- }
- }
-
- public void setTestCluster(boolean testCluster) {
- this.testCluster = testCluster;
- }
-
- public void check(int index) {
- if(index < 1 || index > 10) {
- System.out.println("index: " + index + " should on a scale of [1, 10]");
- return;
- }
- }
-
- public void deploy(int index) {
- try {
- File file = new File(System.getProperty("user.dir") + "/../../../");
- String projectRealPath = file.getCanonicalPath();
- check(index);
- tdNodes.get(index - 1).setTestCluster(this.testCluster);
- tdNodes.get(index - 1).setPath(projectRealPath);
- tdNodes.get(index - 1).deploy();
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println("deploy Test Exception");
- }
- }
-
- public void cfg(int index, String option, String value) {
- check(index);
- tdNodes.get(index - 1).setCfgConfig(option, value);
- }
-
- public TDNode getTDNode(int index) {
- check(index);
- return tdNodes.get(index - 1);
- }
-
- public void start(int index) {
- check(index);
- tdNodes.get(index - 1).start();
- }
-
- public void stop(int index) {
- check(index);
- tdNodes.get(index - 1).stop();
- }
-
- public void startIP(int index) {
- check(index);
- tdNodes.get(index - 1).startIP();
- }
-
- public void stopIP(int index) {
- check(index);
- tdNodes.get(index - 1).stopIP();
- }
-
-}
\ No newline at end of file
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java
new file mode 100644
index 0000000000..ee1364ce21
--- /dev/null
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfo.java
@@ -0,0 +1,74 @@
+package com.taosdata.jdbc.utils;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TaosInfo implements TaosInfoMBean {
+
+ private static volatile TaosInfo instance;
+ private AtomicLong connect_open = new AtomicLong();
+ private AtomicLong connect_close = new AtomicLong();
+ private AtomicLong statement_count = new AtomicLong();
+
+ static {
+ try {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ ObjectName name = new ObjectName("TaosInfoMBean:name=TaosInfo");
+ server.registerMBean(TaosInfo.getInstance(), name);
+
+ } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public long getConnect_open() {
+ return connect_open.get();
+ }
+
+ @Override
+ public long getConnect_close() {
+ return connect_close.get();
+ }
+
+ @Override
+ public long getConnect_active() {
+ return connect_open.get() - connect_close.get();
+ }
+
+ @Override
+ public long getStatement_count() {
+ return statement_count.get();
+ }
+
+ /*******************************************************/
+
+ public void conn_open_increment() {
+ connect_open.incrementAndGet();
+ }
+
+ public void connect_close_increment() {
+ connect_close.incrementAndGet();
+ }
+
+ public void stmt_count_increment() {
+ statement_count.incrementAndGet();
+ }
+
+ /********************************************************************************/
+ private TaosInfo() {
+ }
+
+ public static TaosInfo getInstance() {
+ if (instance == null) {
+ synchronized (TaosInfo.class) {
+ if (instance == null) {
+ instance = new TaosInfo();
+ }
+ }
+ }
+ return instance;
+ }
+
+}
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java
new file mode 100644
index 0000000000..e16f41b2f5
--- /dev/null
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/TaosInfoMBean.java
@@ -0,0 +1,13 @@
+package com.taosdata.jdbc.utils;
+
+public interface TaosInfoMBean {
+
+ long getConnect_open();
+
+ long getConnect_close();
+
+ long getConnect_active();
+
+ long getStatement_count();
+
+}
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java
new file mode 100644
index 0000000000..e9e36e20c4
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TaosInfoMonitorTest.java
@@ -0,0 +1,49 @@
+package com.taosdata.jdbc.cases;
+
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class TaosInfoMonitorTest {
+
+ @Test
+ public void testCreateTooManyConnection() throws ClassNotFoundException {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ final String url = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
+
+ List connectionList = IntStream.range(0, 100).mapToObj(i -> {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ return DriverManager.getConnection(url);
+ } catch (SQLException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }).collect(Collectors.toList());
+
+ connectionList.stream().forEach(conn -> {
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("show databases");
+ while (rs.next()) {
+
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (SQLException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+
+ connectionList.stream().forEach(conn -> {
+ try {
+ conn.close();
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (SQLException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+}
diff --git a/src/kit/taosdemox/taosdemox.c b/src/kit/taosdemox/taosdemox.c
index a6d962df55..f2f7aaaf5d 100644
--- a/src/kit/taosdemox/taosdemox.c
+++ b/src/kit/taosdemox/taosdemox.c
@@ -173,6 +173,7 @@ typedef struct SSuperTable_S {
int childTblCount;
bool superTblExists; // 0: no, 1: yes
bool childTblExists; // 0: no, 1: yes
+ int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
@@ -808,13 +809,14 @@ static void init_rand_data() {
static void printfInsertMeta() {
printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n");
- printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
- printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
- printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
- printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
- printf("thread count: \033[33m%d\033[0m\n", g_Dbs.threadCount);
+ printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
+ printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
+ printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
+ printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
+ printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount);
+ printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
- printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
+ printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) {
printf("database[\033[33m%d\033[0m]:\n", i);
printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName);
@@ -944,11 +946,12 @@ static void printfInsertMeta() {
static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, "================ insert.json parse result START================\n");
- fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
- fprintf(fp, "user: %s\n", g_Dbs.user);
- fprintf(fp, "password: %s\n", g_Dbs.password);
- fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
- fprintf(fp, "thread count: %d\n", g_Dbs.threadCount);
+ fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
+ fprintf(fp, "user: %s\n", g_Dbs.user);
+ fprintf(fp, "password: %s\n", g_Dbs.password);
+ fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
+ fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
+ fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) {
@@ -1730,19 +1733,27 @@ static int createDatabases() {
void * createTable(void *sarg)
-{
- char command[BUFFER_SIZE] = "\0";
-
+{
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
int64_t lastPrintTime = taosGetTimestampMs();
+ char* buffer = calloc(superTblInfo->maxSqlLen, 1);
+
+ int len = 0;
+ int batchNum = 0;
//printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
if (0 == g_Dbs.use_metric) {
- snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
+ snprintf(buffer, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
} else {
+ if (0 == len) {
+ batchNum = 0;
+ memset(buffer, 0, superTblInfo->maxSqlLen);
+ len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "create table ");
+ }
+
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
@@ -1750,13 +1761,22 @@ void * createTable(void *sarg)
tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
+ free(buffer);
return NULL;
}
- snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.%s tags %s;", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
+
+ len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "if not exists %s.%s%d using %s.%s tags %s ", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
free(tagsValBuf);
+ batchNum++;
+
+ if ((batchNum < superTblInfo->batchCreateTableNum) && ((superTblInfo->maxSqlLen - len) >= (superTblInfo->lenOfTagOfOneRow + 256))) {
+ continue;
+ }
}
-
- if (0 != queryDbExec(winfo->taos, command, NO_INSERT_TYPE)){
+
+ len = 0;
+ if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
+ free(buffer);
return NULL;
}
@@ -1766,7 +1786,12 @@ void * createTable(void *sarg)
lastPrintTime = currentPrintTime;
}
}
-
+
+ if (0 != len) {
+ (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
+ }
+
+ free(buffer);
return NULL;
}
@@ -2422,6 +2447,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("failed to read json, auto_create_table not found");
goto PARSE_OVER;
}
+
+ cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
+ if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
+ g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
+ } else if (!batchCreateTbl) {
+ g_Dbs.db[i].superTbls[j].batchCreateTableNum = 2000;
+ } else {
+ printf("failed to read json, batch_create_tbl_num not found");
+ goto PARSE_OVER;
+ }
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) {
@@ -3679,14 +3714,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
b = ntables % threads;
}
- TAOS* taos;
- if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
- taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
- if (NULL == taos) {
- printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
- exit(-1);
- }
- }
+ //TAOS* taos;
+ //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
+ // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
+ // if (NULL == taos) {
+ // printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
+ // exit(-1);
+ // }
+ //}
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) {
@@ -3719,7 +3754,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
t_info->start_time = start_time;
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
- t_info->taos = taos;
+ //t_info->taos = taos;
+ t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
+ if (NULL == t_info->taos) {
+ printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL));
+ exit(-1);
+ }
} else {
t_info->taos = NULL;
#ifdef TD_LOWA_CURL
@@ -3754,6 +3794,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
threadInfo *t_info = infos + i;
tsem_destroy(&(t_info->lock_sem));
+ taos_close(t_info->taos);
superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
@@ -3766,7 +3807,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
double end = getCurrentTime();
- taos_close(taos);
+ //taos_close(taos);
free(pids);
free(infos);
diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h
index 40d974d486..f1e2422e45 100644
--- a/src/tsdb/inc/tsdbFile.h
+++ b/src/tsdb/inc/tsdbFile.h
@@ -20,6 +20,8 @@
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TSDB_IVLD_FID INT_MIN
+#define TSDB_FILE_STATE_OK 0
+#define TSDB_FILE_STATE_BAD 1
#define TSDB_FILE_INFO(tf) (&((tf)->info))
#define TSDB_FILE_F(tf) (&((tf)->f))
@@ -31,6 +33,10 @@
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
#define TSDB_FILE_FSYNC(tf) fsync(TSDB_FILE_FD(tf))
+#define TSDB_FILE_STATE(tf) ((tf)->state)
+#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
+#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
+#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
@@ -47,6 +53,7 @@ typedef struct {
SMFInfo info;
TFILE f;
int fd;
+ uint8_t state;
} SMFile;
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
@@ -165,6 +172,7 @@ typedef struct {
SDFInfo info;
TFILE f;
int fd;
+ uint8_t state;
} SDFile;
void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype);
@@ -346,4 +354,14 @@ static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid,
*maxKey = *minKey + days * tsMsPerDay[precision] - 1;
}
+static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
+ for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
+ if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
#endif /* _TS_TSDB_FILE_H_ */
\ No newline at end of file
diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c
index e0e09b5deb..8124a0e3b5 100644
--- a/src/tsdb/src/tsdbFile.c
+++ b/src/tsdb/src/tsdbFile.c
@@ -33,7 +33,7 @@ static int tsdbRollBackDFile(SDFile *pDFile);
void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) {
char fname[TSDB_FILENAME_LEN];
- TSDB_FILE_SET_CLOSED(pMFile);
+ TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_OK);
memset(&(pMFile->info), 0, sizeof(pMFile->info));
pMFile->info.magic = TSDB_FILE_INIT_MAGIC;
@@ -201,6 +201,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) {
tsdbError("vgId:%d meta file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pMFile));
pRepo->state |= TSDB_STATE_BAD_META;
+ TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_BAD);
return 0;
}
@@ -232,6 +233,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) {
tsdbError("vgId:%d meta file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it",
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), mfstat.st_size, pMFile->info.size);
pRepo->state |= TSDB_STATE_BAD_META;
+ TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_BAD);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return 0;
} else {
@@ -293,6 +295,8 @@ static int tsdbRollBackMFile(SMFile *pMFile) {
void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) {
char fname[TSDB_FILENAME_LEN];
+ TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_OK);
+
TSDB_FILE_SET_CLOSED(pDFile);
memset(&(pDFile->info), 0, sizeof(pDFile->info));
@@ -439,6 +443,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pDFile));
pRepo->state |= TSDB_STATE_BAD_DATA;
+ TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD);
return 0;
}
@@ -470,6 +475,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
tsdbError("vgId:%d data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it",
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), dfstat.st_size, pDFile->info.size);
pRepo->state |= TSDB_STATE_BAD_DATA;
+ TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return 0;
} else {
diff --git a/src/tsdb/src/tsdbSync.c b/src/tsdb/src/tsdbSync.c
index bae4637d77..da337c7280 100644
--- a/src/tsdb/src/tsdbSync.c
+++ b/src/tsdb/src/tsdbSync.c
@@ -191,7 +191,8 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) {
return 0;
}
- if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0) {
+ if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0 ||
+ TSDB_FILE_IS_BAD(pLMFile)) {
// Local has no meta file or has a different meta file, need to copy from remote
pSynch->mfChanged = true;
@@ -409,7 +410,8 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
pSynch->pdf != NULL ? pSynch->pdf->fid : -1);
pLSet = tsdbFSIterNext(&fsiter);
} else {
- if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf)) {
+ if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf) &&
+ tsdbFSetIsOk(pLSet)) {
// Just keep local files and notify remote not to send
tsdbInfo("vgId:%d, fileset:%d is same and no need to recv", REPO_ID(pRepo), pLSet->fid);
diff --git a/tests/pytest/insert/retentionpolicy.py b/tests/pytest/insert/retentionpolicy.py
index 0188ee22c5..e0446113d6 100644
--- a/tests/pytest/insert/retentionpolicy.py
+++ b/tests/pytest/insert/retentionpolicy.py
@@ -100,14 +100,14 @@ class TDTestRetetion:
tdLog.info(cmd)
tdSql.execute(cmd)
self.queryRows=tdSql.query('select * from test')
- self.checkRows(7,cmd)
+ self.checkRows(5,cmd)
tdLog.info("=============== step5")
tdDnodes.stop(1)
tdDnodes.start(1)
cmd='select * from test where ts > now-1d'
self.queryRows=tdSql.query('select * from test where ts > now-1d')
- self.checkRows(1,cmd)
+ self.checkRows(2,cmd)
def stop(self):
os.system("sudo timedatectl set-ntp true")
diff --git a/tests/pytest/pytest_3.sh b/tests/pytest/pytest_3.sh
index a64c144ffd..5e59bb879b 100755
--- a/tests/pytest/pytest_3.sh
+++ b/tests/pytest/pytest_3.sh
@@ -99,7 +99,7 @@ python3 test.py -f query/queryFillTest.py
python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/lowaTest.py
-python3 test.py -f tools/taosdemoTest2.py
+#python3 test.py -f tools/taosdemoTest2.py
# subscribe
python3 test.py -f subscribe/singlemeter.py