diff --git a/cmake/install.inc b/cmake/install.inc
index 9e325531d5..f8b3b7c3c6 100755
--- a/cmake/install.inc
+++ b/cmake/install.inc
@@ -32,7 +32,7 @@ ELSEIF (TD_WINDOWS)
#INSTALL(TARGETS taos RUNTIME DESTINATION driver)
#INSTALL(TARGETS shell RUNTIME DESTINATION .)
IF (TD_MVN_INSTALLED)
- INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.28-dist.jar DESTINATION connector/jdbc)
+ INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.29.jar DESTINATION connector/jdbc)
ENDIF ()
ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh
index dca3dd2ff6..178a248cfe 100755
--- a/packaging/tools/install.sh
+++ b/packaging/tools/install.sh
@@ -607,6 +607,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'Type=simple' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/taosd' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStartPre=/usr/local/taos/bin/startPre.sh' >> ${taosd_service_config}"
+ ${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${taosd_service_config}"
@@ -630,6 +631,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo '[Service]' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'Type=simple' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/tarbitrator' >> ${tarbitratord_service_config}"
+ ${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${tarbitratord_service_config}"
@@ -655,6 +657,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'PIDFile=/usr/local/nginxd/logs/nginx.pid' >> ${nginx_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/local/nginxd/sbin/nginx' >> ${nginx_service_config}"
${csudo} bash -c "echo 'ExecStop=/usr/local/nginxd/sbin/nginx -s stop' >> ${nginx_service_config}"
+ ${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${nginx_service_config}"
diff --git a/packaging/tools/install_arbi.sh b/packaging/tools/install_arbi.sh
index a89d2257dc..f47c3672cb 100755
--- a/packaging/tools/install_arbi.sh
+++ b/packaging/tools/install_arbi.sh
@@ -205,6 +205,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo '[Service]' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'Type=simple' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/tarbitrator' >> ${tarbitratord_service_config}"
+ ${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${tarbitratord_service_config}"
diff --git a/packaging/tools/install_arbi_power.sh b/packaging/tools/install_arbi_power.sh
index 4b12913760..3f27175151 100755
--- a/packaging/tools/install_arbi_power.sh
+++ b/packaging/tools/install_arbi_power.sh
@@ -205,6 +205,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo '[Service]' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'Type=simple' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/tarbitrator' >> ${tarbitratord_service_config}"
+ ${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${tarbitratord_service_config}"
diff --git a/packaging/tools/install_power.sh b/packaging/tools/install_power.sh
index ba6ace4009..9f28435cb5 100755
--- a/packaging/tools/install_power.sh
+++ b/packaging/tools/install_power.sh
@@ -577,6 +577,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'Type=simple' >> ${powerd_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/powerd' >> ${powerd_service_config}"
${csudo} bash -c "echo 'ExecStartPre=/usr/local/power/bin/startPre.sh' >> ${powerd_service_config}"
+ ${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${powerd_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${powerd_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${powerd_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${powerd_service_config}"
@@ -599,6 +600,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo '[Service]' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'Type=simple' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/tarbitrator' >> ${tarbitratord_service_config}"
+ ${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${tarbitratord_service_config}"
@@ -624,6 +626,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'PIDFile=/usr/local/nginxd/logs/nginx.pid' >> ${nginx_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/local/nginxd/sbin/nginx' >> ${nginx_service_config}"
${csudo} bash -c "echo 'ExecStop=/usr/local/nginxd/sbin/nginx -s stop' >> ${nginx_service_config}"
+ ${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${nginx_service_config}"
diff --git a/packaging/tools/make_install.sh b/packaging/tools/make_install.sh
index 1fd0e943b1..f03065d70c 100755
--- a/packaging/tools/make_install.sh
+++ b/packaging/tools/make_install.sh
@@ -333,6 +333,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'Type=simple' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/taosd' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStartPre=/usr/local/taos/bin/startPre.sh' >> ${taosd_service_config}"
+ ${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${taosd_service_config}"
diff --git a/packaging/tools/post.sh b/packaging/tools/post.sh
index 8665b3fec3..3aa8083175 100755
--- a/packaging/tools/post.sh
+++ b/packaging/tools/post.sh
@@ -405,6 +405,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'Type=simple' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/taosd' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStartPre=/usr/local/taos/bin/startPre.sh' >> ${taosd_service_config}"
+ ${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${taosd_service_config}"
diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c
index cc4eb751ff..3cb2b60ce2 100644
--- a/src/client/src/tscSubquery.c
+++ b/src/client/src/tscSubquery.c
@@ -2865,7 +2865,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
tscDebug("0x%"PRIx64" sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d",
pParentSql->self, pSql, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
- if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
+ if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0) && !(tscGetQueryInfoDetail(&pParentSql->cmd, 0)->distinctTag)) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
pParentSql->self, pSql->self, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
diff --git a/src/common/inc/texpr.h b/src/common/inc/texpr.h
index acfbffc01e..9addea412b 100644
--- a/src/common/inc/texpr.h
+++ b/src/common/inc/texpr.h
@@ -89,9 +89,6 @@ void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
bool exprTreeApplayFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param);
-typedef void (*_arithmetic_operator_fn_t)(void *left, int32_t numLeft, int32_t leftType, void *right, int32_t numRight,
- int32_t rightType, void *output, int32_t order);
-
void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, const char*, int32_t));
diff --git a/src/connector/grafanaplugin b/src/connector/grafanaplugin
index 32e2c97a4c..3530c6df09 160000
--- a/src/connector/grafanaplugin
+++ b/src/connector/grafanaplugin
@@ -1 +1 @@
-Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
+Subproject commit 3530c6df097134a410bacec6b3cd013ef38a61aa
diff --git a/src/connector/jdbc/CMakeLists.txt b/src/connector/jdbc/CMakeLists.txt
index de4b8f6bfb..61e976cb18 100644
--- a/src/connector/jdbc/CMakeLists.txt
+++ b/src/connector/jdbc/CMakeLists.txt
@@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED)
ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME}
POST_BUILD
COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
- COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.28-dist.jar ${LIBRARY_OUTPUT_PATH}
+ COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.29.jar ${LIBRARY_OUTPUT_PATH}
COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMENT "build jdbc driver")
ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME})
diff --git a/src/connector/jdbc/deploy-pom.xml b/src/connector/jdbc/deploy-pom.xml
index a31796ffde..968a9bf470 100755
--- a/src/connector/jdbc/deploy-pom.xml
+++ b/src/connector/jdbc/deploy-pom.xml
@@ -5,7 +5,7 @@
com.taosdata.jdbc
taos-jdbcdriver
- 2.0.28
+ 2.0.29
jar
JDBCDriver
diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml
index 3400a82e73..ef353d1d19 100755
--- a/src/connector/jdbc/pom.xml
+++ b/src/connector/jdbc/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.taosdata.jdbc
taos-jdbcdriver
- 2.0.28
+ 2.0.29
jar
JDBCDriver
https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc
@@ -122,6 +122,7 @@
**/FailOverTest.java
**/InvalidResultSetPointerTest.java
**/RestfulConnectionTest.java
+ **/TD4144Test.java
true
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
index 5e3ffffa4f..7f0cf7de8d 100755
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
@@ -32,7 +32,7 @@ public class TSDBJNIConnector {
// Connection pointer used in C
private long taos = TSDBConstants.JNI_NULL_POINTER;
// result set status in current connection
- private boolean isResultsetClosed = true;
+ private boolean isResultsetClosed;
private int affectedRows = -1;
static {
@@ -135,6 +135,7 @@ public class TSDBJNIConnector {
// Try retrieving result set for the executed SQL using the current connection pointer.
pSql = this.getResultSetImp(this.taos, pSql);
+ // if pSql == 0L that means resultset is closed
isResultsetClosed = (pSql == TSDBConstants.JNI_NULL_POINTER);
return pSql;
@@ -172,16 +173,7 @@ public class TSDBJNIConnector {
* Free resultset operation from C to release resultset pointer by JNI
*/
public int freeResultSet(long pSql) {
- int res = TSDBConstants.JNI_SUCCESS;
-// if (result != taosResultSetPointer && taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
-// throw new RuntimeException("Invalid result set pointer");
-// }
-
-// if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
- res = this.freeResultSetImp(this.taos, pSql);
-// taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
-// }
-
+ int res = this.freeResultSetImp(this.taos, pSql);
isResultsetClosed = true;
return res;
}
@@ -199,7 +191,6 @@ public class TSDBJNIConnector {
// }
// return resCode;
// }
-
private native int freeResultSetImp(long connection, long result);
/**
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
index 2576a25f0d..aba29d602b 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
@@ -109,6 +109,8 @@ public class TSDBResultSet extends AbstractResultSet implements ResultSet {
public void close() throws SQLException {
if (isClosed)
return;
+ if (this.statement == null)
+ return;
if (this.jniConnector != null) {
int code = this.jniConnector.freeResultSet(this.resultSetPointer);
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
@@ -461,12 +463,13 @@ public class TSDBResultSet extends AbstractResultSet implements ResultSet {
}
public boolean isClosed() throws SQLException {
- if (isClosed)
- return true;
- if (jniConnector != null) {
- isClosed = jniConnector.isResultsetClosed();
- }
return isClosed;
+// if (isClosed)
+// return true;
+// if (jniConnector != null) {
+// isClosed = jniConnector.isResultsetClosed();
+// }
+// return isClosed;
}
public String getNString(int columnIndex) throws SQLException {
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java
index 3a223ed981..24c73fdd5c 100644
--- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java
@@ -1,6 +1,7 @@
package com.taosdata.jdbc;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -19,6 +20,7 @@ public class SubscribeTest {
String tName = "t0";
String host = "127.0.0.1";
String topic = "test";
+ private long ts;
@Test
public void subscribe() {
@@ -27,26 +29,40 @@ public class SubscribeTest {
TSDBConnection conn = connection.unwrap(TSDBConnection.class);
TSDBSubscribe subscribe = conn.subscribe(topic, rawSql, false);
- int a = 0;
- while (true) {
- TimeUnit.MILLISECONDS.sleep(1000);
+ for (int j = 0; j < 10; j++) {
+ TimeUnit.SECONDS.sleep(1);
TSDBResultSet resSet = subscribe.consume();
+
+ int rowCnt = 0;
while (resSet.next()) {
- for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) {
- System.out.printf(i + ": " + resSet.getString(i) + "\t");
+ if (rowCnt == 0) {
+ long cur_ts = resSet.getTimestamp(1).getTime();
+ int k = resSet.getInt(2);
+ int v = resSet.getInt(3);
+ Assert.assertEquals(ts, cur_ts);
+ Assert.assertEquals(100, k);
+ Assert.assertEquals(1, v);
}
- System.out.println("\n======" + a + "==========");
- }
- a++;
- if (a >= 2) {
- break;
+ if (rowCnt == 1) {
+ long cur_ts = resSet.getTimestamp(1).getTime();
+ int k = resSet.getInt(2);
+ int v = resSet.getInt(3);
+ Assert.assertEquals(ts + 1, cur_ts);
+ Assert.assertEquals(101, k);
+ Assert.assertEquals(2, v);
+
+ }
+ rowCnt++;
}
+ if (j == 0)
+ Assert.assertEquals(2, rowCnt);
resSet.close();
}
-
subscribe.close(true);
- } catch (Exception e) {
- e.printStackTrace();
+
+
+ } catch (SQLException | InterruptedException throwables) {
+ throwables.printStackTrace();
}
}
@@ -62,7 +78,7 @@ public class SubscribeTest {
statement.execute("drop database if exists " + dbName);
statement.execute("create database if not exists " + dbName);
statement.execute("create table if not exists " + dbName + "." + tName + " (ts timestamp, k int, v int)");
- long ts = System.currentTimeMillis();
+ ts = System.currentTimeMillis();
statement.executeUpdate("insert into " + dbName + "." + tName + " values (" + ts + ", 100, 1)");
statement.executeUpdate("insert into " + dbName + "." + tName + " values (" + (ts + 1) + ", 101, 2)");
}
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TD4144Test.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TD4144Test.java
new file mode 100644
index 0000000000..6f29f64111
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/cases/TD4144Test.java
@@ -0,0 +1,105 @@
+package com.taosdata.jdbc.cases;
+
+import com.taosdata.jdbc.TSDBConnection;
+import com.taosdata.jdbc.TSDBDriver;
+import com.taosdata.jdbc.TSDBResultSet;
+import com.taosdata.jdbc.TSDBSubscribe;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.DriverManager;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class TD4144Test {
+
+ private static TSDBConnection connection;
+ private static final String host = "127.0.0.1";
+
+ private static final String topic = "topic-meter-current-bg-10";
+ private static final String sql = "select * from meters where current > 10";
+ private static final String sql2 = "select * from meters where ts >= '2020-08-15 12:20:00.000'";
+
+
+ @Test
+ public void test() throws SQLException {
+ TSDBSubscribe subscribe = null;
+ TSDBResultSet res = null;
+ boolean hasNext = false;
+
+ try {
+ subscribe = connection.subscribe(topic, sql, false);
+ int count = 0;
+ while (true) {
+ // 等待1秒,避免频繁调用 consume,给服务端造成压力
+ TimeUnit.SECONDS.sleep(1);
+ if (res == null) {
+ // 消费数据
+ res = subscribe.consume();
+ hasNext = res.next();
+ }
+
+ if (res == null) {
+ continue;
+ }
+ ResultSetMetaData metaData = res.getMetaData();
+ int number = 0;
+ while (hasNext) {
+ int columnCount = metaData.getColumnCount();
+ for (int i = 1; i <= columnCount; i++) {
+ System.out.print(metaData.getColumnLabel(i) + ": " + res.getString(i) + "\t");
+ }
+ System.out.println();
+ count++;
+ number++;
+ hasNext = res.next();
+ if (!hasNext) {
+ res.close();
+ res = null;
+ System.out.println("rows: " + count);
+ }
+ if (hasNext == true && number >= 10) {
+ System.out.println("batch" + number);
+ break;
+ }
+ }
+
+ }
+
+ } catch (SQLException | InterruptedException throwables) {
+ throwables.printStackTrace();
+ } finally {
+ if (subscribe != null)
+ subscribe.close(true);
+ }
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws SQLException {
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
+ String url = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata";
+ connection = (DriverManager.getConnection(url, properties)).unwrap(TSDBConnection.class);
+ try (Statement stmt = connection.createStatement()) {
+ stmt.execute("drop database if exists power");
+ stmt.execute("create database if not exists power");
+ stmt.execute("use power");
+ stmt.execute("create table meters(ts timestamp, current float, voltage int, phase int) tags(location binary(64), groupId int)");
+ stmt.execute("create table d1001 using meters tags(\"Beijing.Chaoyang\", 2)");
+ stmt.execute("create table d1002 using meters tags(\"Beijing.Haidian\", 2)");
+ stmt.execute("insert into d1001 values(\"2020-08-15 12:00:00.000\", 12, 220, 1),(\"2020-08-15 12:10:00.000\", 12.3, 220, 2),(\"2020-08-15 12:20:00.000\", 12.2, 220, 1)");
+ stmt.execute("insert into d1002 values(\"2020-08-15 12:00:00.000\", 9.9, 220, 1),(\"2020-08-15 12:10:00.000\", 10.3, 220, 1),(\"2020-08-15 12:20:00.000\", 11.2, 220, 1)");
+ }
+ }
+
+ @AfterClass
+ public static void afterClass() throws SQLException {
+ if (connection != null)
+ connection.close();
+ }
+}
diff --git a/src/connector/python/linux/python3/taos/__init__.py b/src/connector/python/linux/python3/taos/__init__.py
index 9732635738..1b086f36ec 100644
--- a/src/connector/python/linux/python3/taos/__init__.py
+++ b/src/connector/python/linux/python3/taos/__init__.py
@@ -1,6 +1,7 @@
from .connection import TDengineConnection
from .cursor import TDengineCursor
+from .error import Error
# Globals
threadsafety = 0
diff --git a/src/kit/taosdemo/CMakeLists.txt b/src/kit/taosdemo/CMakeLists.txt
index 4e38a8842e..5f75be0e19 100644
--- a/src/kit/taosdemo/CMakeLists.txt
+++ b/src/kit/taosdemo/CMakeLists.txt
@@ -10,7 +10,11 @@ IF (GIT_FOUND)
COMMAND ${GIT_EXECUTABLE} log --pretty=oneline -n 1 ${CMAKE_CURRENT_LIST_DIR}/taosdemo.c
RESULT_VARIABLE RESULT
OUTPUT_VARIABLE TAOSDEMO_COMMIT_SHA1)
- STRING(SUBSTRING "${TAOSDEMO_COMMIT_SHA1}" 0 7 TAOSDEMO_COMMIT_SHA1)
+ IF ("${TAOSDEMO_COMMIT_SHA1}" STREQUAL "")
+ MESSAGE("taosdemo's latest commit in short is:" ${TAOSDEMO_COMMIT_SHA1})
+ ELSE ()
+ STRING(SUBSTRING "${TAOSDEMO_COMMIT_SHA1}" 0 7 TAOSDEMO_COMMIT_SHA1)
+ ENDIF ()
EXECUTE_PROCESS(
COMMAND ${GIT_EXECUTABLE} status -z -s ${CMAKE_CURRENT_LIST_DIR}/taosdemo.c
RESULT_VARIABLE RESULT
diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c
index a3a8968017..49d89dc597 100644
--- a/src/kit/taosdemo/taosdemo.c
+++ b/src/kit/taosdemo/taosdemo.c
@@ -68,12 +68,6 @@ enum TEST_MODE {
INVAID_TEST
};
-enum QUERY_MODE {
- SYNC_QUERY_MODE, // 0
- ASYNC_QUERY_MODE, // 1
- INVALID_MODE
-};
-
#define MAX_RECORDS_PER_REQ 32766
#define MAX_SQL_SIZE 65536
@@ -731,7 +725,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
exit(EXIT_FAILURE);
}
tstrncpy(configDir, argv[++i], TSDB_FILENAME_LEN);
-
} else if (strcmp(argv[i], "-h") == 0) {
if (argc == i+1) {
printHelp();
@@ -776,16 +769,16 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
arguments->sqlFile = argv[++i];
} else if (strcmp(argv[i], "-q") == 0) {
- if ((argc == i+1) ||
- (!isStringNumber(argv[i+1]))) {
+ if ((argc == i+1)
+ || (!isStringNumber(argv[i+1]))) {
printHelp();
errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, 1: ASYNC. Default is SYNC.\n");
exit(EXIT_FAILURE);
}
arguments->async_mode = atoi(argv[++i]);
} else if (strcmp(argv[i], "-T") == 0) {
- if ((argc == i+1) ||
- (!isStringNumber(argv[i+1]))) {
+ if ((argc == i+1)
+ || (!isStringNumber(argv[i+1]))) {
printHelp();
errorPrint("%s", "\n\t-T need a number following!\n");
exit(EXIT_FAILURE);
@@ -800,24 +793,24 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
arguments->insert_interval = atoi(argv[++i]);
} else if (strcmp(argv[i], "-qt") == 0) {
- if ((argc == i+1) ||
- (!isStringNumber(argv[i+1]))) {
+ if ((argc == i+1)
+ || (!isStringNumber(argv[i+1]))) {
printHelp();
errorPrint("%s", "\n\t-qt need a number following!\n");
exit(EXIT_FAILURE);
}
arguments->query_times = atoi(argv[++i]);
} else if (strcmp(argv[i], "-B") == 0) {
- if ((argc == i+1) ||
- (!isStringNumber(argv[i+1]))) {
+ if ((argc == i+1)
+ || (!isStringNumber(argv[i+1]))) {
printHelp();
errorPrint("%s", "\n\t-B need a number following!\n");
exit(EXIT_FAILURE);
}
arguments->interlace_rows = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
- if ((argc == i+1) ||
- (!isStringNumber(argv[i+1]))) {
+ if ((argc == i+1)
+ || (!isStringNumber(argv[i+1]))) {
printHelp();
errorPrint("%s", "\n\t-r need a number following!\n");
exit(EXIT_FAILURE);
@@ -855,6 +848,11 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
arguments->num_of_CPR = atoi(argv[++i]);
} else if (strcmp(argv[i], "-b") == 0) {
+ if (argc == i+1) {
+ printHelp();
+ errorPrint("%s", "\n\t-b need valid string following!\n");
+ exit(EXIT_FAILURE);
+ }
sptr = arguments->datatype;
++i;
if (strstr(argv[i], ",") == NULL) {
@@ -1077,7 +1075,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
if (code != 0) {
if (!quiet) {
debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
- errorPrint("Failed to run %s, reason: %s\n", command, taos_errstr(res));
+ errorPrint("Failed to execute %s, reason: %s\n", command, taos_errstr(res));
}
taos_free_result(res);
//taos_close(taos);
@@ -1107,6 +1105,7 @@ static void appendResultBufToFile(char *resultBuf, char *resultFile)
}
}
+
fprintf(fp, "%s", resultBuf);
tmfclose(fp);
}
@@ -1142,6 +1141,7 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
totalLen += len;
}
+ verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", __func__, __LINE__, databuf, resultFile);
appendResultBufToFile(databuf, resultFile);
free(databuf);
}
@@ -3410,8 +3410,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
g_args.interlace_rows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
g_args.num_of_RPR);
- printf(" press Enter key to continue or Ctrl-C to stop.");
- (void)getchar();
+ if (!g_args.answer_yes) {
+ printf(" press Enter key to continue or Ctrl-C to stop.");
+ (void)getchar();
+ }
g_args.interlace_rows = g_args.num_of_RPR;
}
} else if (!interlaceRows) {
@@ -3930,8 +3932,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
g_args.num_of_RPR);
- printf(" press Enter key to continue or Ctrl-C to stop.");
- (void)getchar();
+ if (!g_args.answer_yes) {
+ printf(" press Enter key to continue or Ctrl-C to stop.");
+ (void)getchar();
+ }
g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR;
}
} else if (!interlaceRows) {
@@ -4075,7 +4079,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* gQueryTimes = cJSON_GetObjectItem(root, "query_times");
if (gQueryTimes && gQueryTimes->type == cJSON_Number) {
- if (gQueryTimes->valueint < 0) {
+ if (gQueryTimes->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
@@ -4126,9 +4130,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery,
"query_times");
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
- if (specifiedQueryTimes->valueint < 0) {
- errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
- __func__, __LINE__);
+ if (specifiedQueryTimes->valueint <= 0) {
+ errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
+ __func__, __LINE__, specifiedQueryTimes->valueint);
goto PARSE_OVER;
}
@@ -4270,9 +4274,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times");
if (superQueryTimes && superQueryTimes->type == cJSON_Number) {
- if (superQueryTimes->valueint < 0) {
- errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
- __func__, __LINE__);
+ if (superQueryTimes->valueint <= 0) {
+ errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
+ __func__, __LINE__, superQueryTimes->valueint);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint;
@@ -5236,6 +5240,13 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
startTs = taosGetTimestampMs();
+ if (recOfBatch == 0) {
+ errorPrint("[%d] %s() LN%d try inserting records of batch is %"PRIu64"\n",
+ pThreadInfo->threadID, __func__, __LINE__,
+ recOfBatch);
+ errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n");
+ goto free_of_interlace;
+ }
int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
endTs = taosGetTimestampMs();
@@ -6313,7 +6324,9 @@ static int queryTestProcess() {
(void)getchar();
}
- printfQuerySystemInfo(taos);
+ if (g_args.debug_print || g_args.verbose_print) {
+ printfQuerySystemInfo(taos);
+ }
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
if (convertHostToServAddr(
@@ -6517,59 +6530,63 @@ static void *superSubscribe(void *sarg) {
return NULL;
}
- //int64_t st = 0;
- //int64_t et = 0;
- do {
- //if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < g_queryInfo.specifiedQueryInfo.queryInterval) {
- // taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval- (et - st)); // ms
- // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to);
- //}
-
- //st = taosGetTimestampMs();
- char topic[32] = {0};
- for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
- sprintf(topic, "taosdemo-subscribe-%d", i);
+ char topic[32] = {0};
+ for (uint64_t i = pThreadInfo->start_table_from;
+ i <= pThreadInfo->end_table_to; i++) {
+ for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
+ sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%d", i, j);
memset(subSqlstr,0,sizeof(subSqlstr));
- replaceChildTblName(g_queryInfo.superQueryInfo.sql[i], subSqlstr, i);
+ replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], subSqlstr, i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
- if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
+ if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(tmpFile, "%s-%d",
- g_queryInfo.superQueryInfo.result[i], pThreadInfo->threadID);
+ g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID);
}
- tsub[i] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile);
- if (NULL == tsub[i]) {
+
+ uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
+ debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
+ __func__, __LINE__, subSeq, subSqlstr);
+ tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile);
+ if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
- //et = taosGetTimestampMs();
- //printf("========thread[%"PRIu64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
- } while(0);
+ }
// start loop to consume result
TAOS_RES* res = NULL;
while(1) {
- for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
- if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
- continue;
- }
+ for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
+ for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
+ if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
+ continue;
+ }
- res = taos_consume(tsub[i]);
- if (res) {
- char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
- if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
- sprintf(tmpFile, "%s-%d",
- g_queryInfo.superQueryInfo.result[i],
+ uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
+ taosMsleep(100); // ms
+ res = taos_consume(tsub[subSeq]);
+ if (res) {
+ char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
+ if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
+ sprintf(tmpFile, "%s-%d",
+ g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID);
- appendResultToFile(res, tmpFile);
+ appendResultToFile(res, tmpFile);
+ }
}
}
}
}
taos_free_result(res);
- for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
- taos_unsubscribe(tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress);
+ for (uint64_t i = pThreadInfo->start_table_from;
+ i <= pThreadInfo->end_table_to; i++) {
+ for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
+ uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
+ taos_unsubscribe(tsub[subSeq],
+ g_queryInfo.superQueryInfo.subscribeKeepProgress);
+ }
}
taos_close(pThreadInfo->taos);
@@ -6607,17 +6624,8 @@ static void *specifiedSubscribe(void *sarg) {
return NULL;
}
- //int64_t st = 0;
- //int64_t et = 0;
- do {
- //if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < g_queryInfo.specifiedQueryInfo.queryInterval) {
- // taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval- (et - st)); // ms
- // //printf("========sleep duration:%"PRIu64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to);
- //}
-
- //st = taosGetTimestampMs();
- char topic[32] = {0};
- for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
+ char topic[32] = {0};
+ for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
sprintf(topic, "taosdemo-subscribe-%d", i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
@@ -6630,11 +6638,7 @@ static void *specifiedSubscribe(void *sarg) {
taos_close(pThreadInfo->taos);
return NULL;
}
- }
- //et = taosGetTimestampMs();
- //printf("========thread[%"PRIu64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
- } while(0);
-
+ }
// start loop to consume result
TAOS_RES* res = NULL;
while(1) {
@@ -6643,6 +6647,7 @@ static void *specifiedSubscribe(void *sarg) {
continue;
}
+ taosMsleep(1000); // ms
res = taos_consume(tsub[i]);
if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
@@ -6699,31 +6704,35 @@ static int subscribeTestProcess() {
pthread_t *pids = NULL;
threadInfo *infos = NULL;
- //==== create sub threads for query from super table
- if ((g_queryInfo.specifiedQueryInfo.sqlCount <= 0) ||
- (g_queryInfo.specifiedQueryInfo.concurrent <= 0)) {
- errorPrint("%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n",
+ //==== create sub threads for query for specified table
+ if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
+ printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
__func__, __LINE__,
- g_queryInfo.specifiedQueryInfo.sqlCount,
- g_queryInfo.specifiedQueryInfo.concurrent);
- exit(-1);
- }
+ g_queryInfo.specifiedQueryInfo.sqlCount);
+ } else {
+ if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) {
+ errorPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
+ __func__, __LINE__,
+ g_queryInfo.specifiedQueryInfo.sqlCount);
+ exit(-1);
+ }
- pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t));
- infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
- if ((NULL == pids) || (NULL == infos)) {
- errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
- exit(-1);
- }
+ pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t));
+ infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
+ if ((NULL == pids) || (NULL == infos)) {
+ errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
+ exit(-1);
+ }
- for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
+ for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + i, NULL, specifiedSubscribe, t_info);
+ }
}
- //==== create sub threads for query from sub table
+ //==== create sub threads for super table query
pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL;
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
diff --git a/src/query/src/qSqlParser.c b/src/query/src/qSqlParser.c
index 56d395e498..78c531ce49 100644
--- a/src/query/src/qSqlParser.c
+++ b/src/query/src/qSqlParser.c
@@ -588,14 +588,14 @@ void tSetDbName(SStrToken *pCpxName, SStrToken *pDb) {
void tSetColumnInfo(TAOS_FIELD *pField, SStrToken *pName, TAOS_FIELD *pType) {
int32_t maxLen = sizeof(pField->name) / sizeof(pField->name[0]);
-
- // truncate the column name
- if ((int32_t)pName->n >= maxLen) {
- pName->n = maxLen - 1;
- }
- strncpy(pField->name, pName->z, pName->n);
- pField->name[pName->n] = 0;
+ // column name is too long, set the it to be invalid.
+ if ((int32_t) pName->n >= maxLen) {
+ pName->n = -1;
+ } else {
+ strncpy(pField->name, pName->z, pName->n);
+ pField->name[pName->n] = 0;
+ }
pField->type = pType->type;
if(!isValidDataType(pField->type)){
diff --git a/tests/pytest/crash_gen/__init__.py b/tests/pytest/crash_gen/__init__.py
new file mode 100644
index 0000000000..fe03bde354
--- /dev/null
+++ b/tests/pytest/crash_gen/__init__.py
@@ -0,0 +1,2 @@
+# Helpful Ref: https://stackoverflow.com/questions/24100558/how-can-i-split-a-module-into-multiple-files-without-breaking-a-backwards-compa/24100645
+from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess
diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py
index 44295e8bee..644aa79916 100755
--- a/tests/pytest/crash_gen/crash_gen_main.py
+++ b/tests/pytest/crash_gen/crash_gen_main.py
@@ -1,6 +1,6 @@
# -----!/usr/bin/python3.7
###################################################################
-# Copyright (c) 2016 by TAOS Technologies, Inc.
+# Copyright (c) 2016-2021 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
@@ -15,7 +15,7 @@
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
-from typing import Set
+from typing import Any, Set, Tuple
from typing import Dict
from typing import List
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
@@ -24,29 +24,34 @@ import textwrap
import time
import datetime
import random
-import logging
import threading
-import copy
import argparse
-import getopt
import sys
import os
+import io
import signal
import traceback
-import resource
+import requests
# from guppy import hpy
import gc
-
-from crash_gen.service_manager import ServiceManager, TdeInstance
-from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
-from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
-import crash_gen.settings
-
import taos
-import requests
-crash_gen.settings.init()
+from .shared.types import TdColumns, TdTags
+
+# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
+# from crash_gen import ServiceManager, Config, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \
+# CrashGenError, Progress, MyTDSql, \
+# TdeInstance
+
+from .service_manager import ServiceManager, TdeInstance
+
+from .shared.config import Config
+from .shared.db import DbConn, DbManager, DbConnNative, MyTDSql
+from .shared.misc import Dice, Logging, Helper, Status, CrashGenError, Progress
+from .shared.types import TdDataType
+
+# Config.init()
# Require Python 3
if sys.version_info[0] < 3:
@@ -56,8 +61,8 @@ if sys.version_info[0] < 3:
# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
-gConfig: argparse.Namespace
-gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection
+# gConfig: argparse.Namespace
+gSvcMgr: Optional[ServiceManager] # TODO: refactor this hack, use dep injection
# logger: logging.Logger
gContainer: Container
@@ -80,20 +85,20 @@ class WorkerThread:
self._stepGate = threading.Event()
# Let us have a DB connection of our own
- if (gConfig.per_thread_db_connection): # type: ignore
+ if (Config.getConfig().per_thread_db_connection): # type: ignore
# print("connector_type = {}".format(gConfig.connector_type))
tInst = gContainer.defTdeInstance
- if gConfig.connector_type == 'native':
+ if Config.getConfig().connector_type == 'native':
self._dbConn = DbConn.createNative(tInst.getDbTarget())
- elif gConfig.connector_type == 'rest':
+ elif Config.getConfig().connector_type == 'rest':
self._dbConn = DbConn.createRest(tInst.getDbTarget())
- elif gConfig.connector_type == 'mixed':
+ elif Config.getConfig().connector_type == 'mixed':
if Dice.throw(2) == 0: # 1/2 chance
- self._dbConn = DbConn.createNative()
+ self._dbConn = DbConn.createNative(tInst.getDbTarget())
else:
- self._dbConn = DbConn.createRest()
+ self._dbConn = DbConn.createRest(tInst.getDbTarget())
else:
- raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
+ raise RuntimeError("Unexpected connector type: {}".format(Config.getConfig().connector_type))
# self._dbInUse = False # if "use db" was executed already
@@ -122,14 +127,14 @@ class WorkerThread:
# self.isSleeping = False
Logging.info("Starting to run thread: {}".format(self._tid))
- if (gConfig.per_thread_db_connection): # type: ignore
+ if (Config.getConfig().per_thread_db_connection): # type: ignore
Logging.debug("Worker thread openning database connection")
self._dbConn.open()
self._doTaskLoop()
# clean up
- if (gConfig.per_thread_db_connection): # type: ignore
+ if (Config.getConfig().per_thread_db_connection): # type: ignore
if self._dbConn.isOpen: #sometimes it is not open
self._dbConn.close()
else:
@@ -157,7 +162,7 @@ class WorkerThread:
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
try:
- if (gConfig.per_thread_db_connection): # most likely TRUE
+ if (Config.getConfig().per_thread_db_connection): # most likely TRUE
if not self._dbConn.isOpen: # might have been closed during server auto-restart
self._dbConn.open()
# self.useDb() # might encounter exceptions. TODO: catch
@@ -231,7 +236,7 @@ class WorkerThread:
return self.getDbConn().getQueryResult()
def getDbConn(self) -> DbConn :
- if (gConfig.per_thread_db_connection):
+ if (Config.getConfig().per_thread_db_connection):
return self._dbConn
else:
return self._tc.getDbManager().getDbConn()
@@ -253,7 +258,7 @@ class ThreadCoordinator:
self._pool = pool
# self._wd = wd
self._te = None # prepare for every new step
- self._dbManager = dbManager
+ self._dbManager = dbManager # type: Optional[DbManager] # may be freed
self._executedTasks: List[Task] = [] # in a given step
self._lock = threading.RLock() # sync access for a few things
@@ -265,9 +270,13 @@ class ThreadCoordinator:
self._stepStartTime = None # Track how long it takes to execute each step
def getTaskExecutor(self):
+ if self._te is None:
+ raise CrashGenError("Unexpected empty TE")
return self._te
def getDbManager(self) -> DbManager:
+ if self._dbManager is None:
+ raise ChildProcessError("Unexpected empty _dbManager")
return self._dbManager
def crossStepBarrier(self, timeout=None):
@@ -278,7 +287,7 @@ class ThreadCoordinator:
self._execStats.registerFailure("User Interruption")
def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
- maxSteps = gConfig.max_steps # type: ignore
+ maxSteps = Config.getConfig().max_steps # type: ignore
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
return True
if self._runStatus != Status.STATUS_RUNNING:
@@ -383,7 +392,7 @@ class ThreadCoordinator:
hasAbortedTask = False
workerTimeout = False
while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
- if not gConfig.debug: # print this only if we are not in debug mode
+ if not Config.getConfig().debug: # print this only if we are not in debug mode
Progress.emit(Progress.STEP_BOUNDARY)
# print(".", end="", flush=True)
# if (self._curStep % 2) == 0: # print memory usage once every 10 steps
@@ -468,7 +477,7 @@ class ThreadCoordinator:
self._pool = None
self._te = None
self._dbManager = None
- self._executedTasks = None
+ self._executedTasks = []
self._lock = None
self._stepBarrier = None
self._execStats = None
@@ -507,18 +516,18 @@ class ThreadCoordinator:
''' Initialize multiple databases, invoked at __ini__() time '''
self._dbs = [] # type: List[Database]
dbc = self.getDbManager().getDbConn()
- if gConfig.max_dbs == 0:
+ if Config.getConfig().max_dbs == 0:
self._dbs.append(Database(0, dbc))
else:
baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
- )*333) % 888 if gConfig.dynamic_db_table_names else 0
- for i in range(gConfig.max_dbs):
+ )*333) % 888 if Config.getConfig().dynamic_db_table_names else 0
+ for i in range(Config.getConfig().max_dbs):
self._dbs.append(Database(baseDbNumber + i, dbc))
def pickDatabase(self):
idxDb = 0
- if gConfig.max_dbs != 0 :
- idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1
+ if Config.getConfig().max_dbs != 0 :
+ idxDb = Dice.throw(Config.getConfig().max_dbs) # 0 to N-1
db = self._dbs[idxDb] # type: Database
return db
@@ -562,7 +571,7 @@ class ThreadPool:
workerThread._thread.join()
def cleanup(self):
- self.threadList = None # maybe clean up each?
+ self.threadList = [] # maybe clean up each?
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
@@ -672,7 +681,7 @@ class AnyState:
# Each sub state tells us the "info", about itself, so we can determine
# on things like canDropDB()
- def getInfo(self):
+ def getInfo(self) -> List[Any]:
raise RuntimeError("Must be overriden by child classes")
def equals(self, other):
@@ -700,7 +709,7 @@ class AnyState:
def canDropDb(self):
# If user requests to run up to a number of DBs,
# we'd then not do drop_db operations any more
- if gConfig.max_dbs > 0 or gConfig.use_shadow_db :
+ if Config.getConfig().max_dbs > 0 or Config.getConfig().use_shadow_db :
return False
return self._info[self.CAN_DROP_DB]
@@ -708,7 +717,7 @@ class AnyState:
return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
def canDropFixedSuperTable(self):
- if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
+ if Config.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
return False
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
@@ -910,7 +919,7 @@ class StateMechine:
# May be slow, use cautionsly...
def getTaskTypes(self): # those that can run (directly/indirectly) from the current state
- def typesToStrings(types):
+ def typesToStrings(types) -> List:
ss = []
for t in types:
ss.append(t.__name__)
@@ -1029,13 +1038,14 @@ class StateMechine:
# ref:
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
- def _weighted_choice_sub(self, weights):
+ def _weighted_choice_sub(self, weights) -> int:
# TODO: use our dice to ensure it being determinstic?
rnd = random.random() * sum(weights)
for i, w in enumerate(weights):
rnd -= w
if rnd < 0:
return i
+ raise CrashGenError("Unexpected no choice")
class Database:
''' We use this to represent an actual TDengine database inside a service instance,
@@ -1047,8 +1057,8 @@ class Database:
'''
_clsLock = threading.Lock() # class wide lock
_lastInt = 101 # next one is initial integer
- _lastTick = 0
- _lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions
+ _lastTick = None # Optional[datetime]
+ _lastLaggingTick = None # Optional[datetime] # lagging tick, for out-of-sequence (oos) data insertions
def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
self._dbNum = dbNum # we assign a number to databases, for our testing purpose
@@ -1104,7 +1114,7 @@ class Database:
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp(
t3.timestamp() + elSec2) # see explanation above
- Logging.info("Setting up TICKS to start from: {}".format(t4))
+ Logging.debug("Setting up TICKS to start from: {}".format(t4))
return t4
@classmethod
@@ -1113,14 +1123,14 @@ class Database:
Fetch a timestamp tick, with some random factor, may not be unique.
'''
with cls._clsLock: # prevent duplicate tick
- if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized
+ if cls._lastLaggingTick is None or cls._lastTick is None : # not initialized
# 10k at 1/20 chance, should be enough to avoid overlaps
tick = cls.setupLastTick()
cls._lastTick = tick
cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast
# if : # should be quite a bit into the future
- if gConfig.mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
+ if Config.isSet('mix_oos_data') and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
return cls._lastLaggingTick
else: # regular
@@ -1302,10 +1312,10 @@ class Task():
]:
return True # These are the ALWAYS-ACCEPTABLE ones
# This case handled below already.
- # elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
+ # elif (errno in [ 0x0B ]) and Settings.getConfig().auto_start_service:
# return True # We may get "network unavilable" when restarting service
- elif gConfig.ignore_errors: # something is specified on command line
- moreErrnos = [int(v, 0) for v in gConfig.ignore_errors.split(',')]
+ elif Config.getConfig().ignore_errors: # something is specified on command line
+ moreErrnos = [int(v, 0) for v in Config.getConfig().ignore_errors.split(',')]
if errno in moreErrnos:
return True
elif errno == 0x200 : # invalid SQL, we need to div in a bit more
@@ -1341,7 +1351,7 @@ class Task():
self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno)
- if (gConfig.continue_on_exception): # user choose to continue
+ if (Config.getConfig().continue_on_exception): # user choose to continue
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql()))
self._err = err
@@ -1356,7 +1366,7 @@ class Task():
self.__class__.__name__,
errno2, err, wt.getDbConn().getLastSql())
self.logDebug(errMsg)
- if gConfig.debug:
+ if Config.getConfig().debug:
# raise # so that we see full stack
traceback.print_exc()
print(
@@ -1370,13 +1380,13 @@ class Task():
self._err = e
self._aborted = True
traceback.print_exc()
- except BaseException as e:
+ except BaseException as e2:
self.logInfo("Python base exception encountered")
- self._err = e
+ # self._err = e2 # Exception/BaseException incompatible!
self._aborted = True
traceback.print_exc()
- except BaseException: # TODO: what is this again??!!
- raise RuntimeError("Punt")
+ # except BaseException: # TODO: what is this again??!!
+ # raise RuntimeError("Punt")
# self.logDebug(
# "[=] Unexpected exception, SQL: {}".format(
# wt.getDbConn().getLastSql()))
@@ -1421,11 +1431,11 @@ class Task():
class ExecutionStats:
def __init__(self):
# total/success times for a task
- self._execTimes: Dict[str, [int, int]] = {}
+ self._execTimes: Dict[str, List[int]] = {}
self._tasksInProgress = 0
self._lock = threading.Lock()
- self._firstTaskStartTime = None
- self._execStartTime = None
+ self._firstTaskStartTime = 0.0
+ self._execStartTime = 0.0
self._errors = {}
self._elapsedTime = 0.0 # total elapsed time
self._accRunTime = 0.0 # accumulated run time
@@ -1470,7 +1480,7 @@ class ExecutionStats:
self._tasksInProgress -= 1
if self._tasksInProgress == 0: # all tasks have stopped
self._accRunTime += (time.time() - self._firstTaskStartTime)
- self._firstTaskStartTime = None
+ self._firstTaskStartTime = 0.0
def registerFailure(self, reason):
self._failed = True
@@ -1554,7 +1564,7 @@ class StateTransitionTask(Task):
def getRegTableName(cls, i):
if ( StateTransitionTask._baseTableNumber is None): # Set it one time
StateTransitionTask._baseTableNumber = Dice.throw(
- 999) if gConfig.dynamic_db_table_names else 0
+ 999) if Config.getConfig().dynamic_db_table_names else 0
return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
def execute(self, wt: WorkerThread):
@@ -1574,14 +1584,14 @@ class TaskCreateDb(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# was: self.execWtSql(wt, "create database db")
repStr = ""
- if gConfig.num_replicas != 1:
- # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
- numReplica = gConfig.num_replicas # fixed, always
+ if Config.getConfig().num_replicas != 1:
+ # numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N
+ numReplica = Config.getConfig().num_replicas # fixed, always
repStr = "replica {}".format(numReplica)
- updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active
+ updatePostfix = "update 1" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active
dbName = self._db.getName()
self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
- if dbName == "db_0" and gConfig.use_shadow_db:
+ if dbName == "db_0" and Config.getConfig().use_shadow_db:
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
class TaskDropDb(StateTransitionTask):
@@ -1614,10 +1624,11 @@ class TaskCreateSuperTable(StateTransitionTask):
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place
- sTable.create(wt.getDbConn(),
- {'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'},
- dropIfExists = True
- )
+ sTable.create(wt.getDbConn(),
+ {'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, {
+ 'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT},
+ dropIfExists=True
+ )
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that
# automatically
@@ -1645,9 +1656,7 @@ class TdSuperTable:
return dbc.existsSuperTable(self._stName)
# TODO: odd semantic, create() method is usually static?
- def create(self, dbc, cols: dict, tags: dict,
- dropIfExists = False
- ):
+ def create(self, dbc, cols: TdColumns, tags: TdTags, dropIfExists = False):
'''Creating a super table'''
dbName = self._dbName
@@ -1658,17 +1667,17 @@ class TdSuperTable:
dbc.execute("DROP TABLE {}".format(fullTableName))
else: # error
raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
-
+
# Now let's create
sql = "CREATE TABLE {} ({})".format(
fullTableName,
- ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]))
- if tags is None :
- sql += " TAGS (dummy int) "
- else:
+ ",".join(['%s %s'%(k,v.value) for (k,v) in cols.items()]))
+ if tags :
sql += " TAGS ({})".format(
- ",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
- )
+ ",".join(['%s %s'%(k,v.value) for (k,v) in tags.items()])
+ )
+ else:
+ sql += " TAGS (dummy int) "
dbc.execute(sql)
def getRegTables(self, dbc: DbConn):
@@ -1686,7 +1695,7 @@ class TdSuperTable:
def hasRegTables(self, dbc: DbConn):
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
- def ensureTable(self, task: Task, dbc: DbConn, regTableName: str):
+ def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str):
dbName = self._dbName
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already
@@ -1694,7 +1703,7 @@ class TdSuperTable:
# acquire a lock first, so as to be able to *verify*. More details in TD-1471
fullTableName = dbName + '.' + regTableName
- if task is not None: # optional lock
+ if task is not None: # TODO: what happens if we don't lock the table
task.lockTable(fullTableName)
Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
# print("(" + fullTableName[-3:] + ")", end="", flush=True)
@@ -1886,7 +1895,7 @@ class TaskDropSuperTable(StateTransitionTask):
if Dice.throw(2) == 0:
# print("_7_", end="", flush=True)
tblSeq = list(range(
- 2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
+ 2 + (self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES)))
random.shuffle(tblSeq)
tickOutput = False # if we have spitted out a "d" character for "drop regular table"
isSuccess = True
@@ -1952,13 +1961,13 @@ class TaskRestartService(StateTransitionTask):
@classmethod
def canBeginFrom(cls, state: AnyState):
- if gConfig.auto_start_service:
+ if Config.getConfig().auto_start_service:
return state.canDropFixedSuperTable() # Basicallly when we have the super table
return False # don't run this otherwise
CHANCE_TO_RESTART_SERVICE = 200
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
- if not gConfig.auto_start_service: # only execute when we are in -a mode
+ if not Config.getConfig().auto_start_service: # only execute when we are in -a mode
print("_a", end="", flush=True)
return
@@ -1980,12 +1989,12 @@ class TaskAddData(StateTransitionTask):
activeTable: Set[int] = set()
# We use these two files to record operations to DB, useful for power-off tests
- fAddLogReady = None # type: TextIOWrapper
- fAddLogDone = None # type: TextIOWrapper
+ fAddLogReady = None # type: Optional[io.TextIOWrapper]
+ fAddLogDone = None # type: Optional[io.TextIOWrapper]
@classmethod
def prepToRecordOps(cls):
- if gConfig.record_ops:
+ if Config.getConfig().record_ops:
if (cls.fAddLogReady is None):
Logging.info(
"Recording in a file operations to be performed...")
@@ -2003,7 +2012,7 @@ class TaskAddData(StateTransitionTask):
return state.canAddData()
def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor):
- numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
+ numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
fullTableName = db.getName() + '.' + regTableName
sql = "INSERT INTO {} VALUES ".format(fullTableName)
@@ -2015,21 +2024,23 @@ class TaskAddData(StateTransitionTask):
dbc.execute(sql)
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
- numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
+ numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
for j in range(numRecords): # number of records per table
nextInt = db.getNextInt()
nextTick = db.getNextTick()
nextColor = db.getNextColor()
- if gConfig.record_ops:
+ if Config.getConfig().record_ops:
self.prepToRecordOps()
+ if self.fAddLogReady is None:
+ raise CrashGenError("Unexpected empty fAddLogReady")
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
self.fAddLogReady.flush()
- os.fsync(self.fAddLogReady)
+ os.fsync(self.fAddLogReady.fileno())
# TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName
- if gConfig.verify_data:
+ if Config.getConfig().verify_data:
self.lockTable(fullTableName)
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
@@ -2042,7 +2053,7 @@ class TaskAddData(StateTransitionTask):
dbc.execute(sql)
# Quick hack, attach an update statement here. TODO: create an "update" task
- if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
+ if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
nextInt = db.getNextInt()
nextColor = db.getNextColor()
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
@@ -2053,12 +2064,12 @@ class TaskAddData(StateTransitionTask):
dbc.execute(sql)
except: # Any exception at all
- if gConfig.verify_data:
+ if Config.getConfig().verify_data:
self.unlockTable(fullTableName)
raise
# Now read it back and verify, we might encounter an error if table is dropped
- if gConfig.verify_data: # only if command line asks for it
+ if Config.getConfig().verify_data: # only if command line asks for it
try:
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
format(db.getName(), regTableName, nextTick))
@@ -2085,17 +2096,19 @@ class TaskAddData(StateTransitionTask):
# Successfully wrote the data into the DB, let's record it somehow
te.recordDataMark(nextInt)
- if gConfig.record_ops:
+ if Config.getConfig().record_ops:
+ if self.fAddLogDone is None:
+ raise CrashGenError("Unexpected empty fAddLogDone")
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
self.fAddLogDone.flush()
- os.fsync(self.fAddLogDone)
+ os.fsync(self.fAddLogDone.fileno())
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
db = self._db
dbc = wt.getDbConn()
- numTables = self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES
- numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
+ numTables = self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES
+ numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
tblSeq = list(range(numTables ))
random.shuffle(tblSeq) # now we have random sequence
for i in tblSeq:
@@ -2110,7 +2123,7 @@ class TaskAddData(StateTransitionTask):
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
fullTableName = dbName + '.' + regTableName
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
- sTable.ensureTable(self, wt.getDbConn(), regTableName) # Ensure the table exists
+ sTable.ensureRegTable(self, wt.getDbConn(), regTableName) # Ensure the table exists
# self._unlockTable(fullTableName)
if Dice.throw(1) == 0: # 1 in 2 chance
@@ -2125,7 +2138,9 @@ class ThreadStacks: # stack info for all threads
def __init__(self):
self._allStacks = {}
allFrames = sys._current_frames()
- for th in threading.enumerate():
+ for th in threading.enumerate():
+ if th.ident is None:
+ continue
stack = traceback.extract_stack(allFrames[th.ident])
self._allStacks[th.native_id] = stack
@@ -2246,14 +2261,15 @@ class ClientManager:
def run(self, svcMgr):
# self._printLastNumbers()
- global gConfig
+ # global gConfig
# Prepare Tde Instance
global gContainer
tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"
- dbManager = DbManager(gConfig.connector_type, tInst.getDbTarget()) # Regular function
- thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
+ cfg = Config.getConfig()
+ dbManager = DbManager(cfg.connector_type, tInst.getDbTarget()) # Regular function
+ thPool = ThreadPool(cfg.num_threads, cfg.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager)
Logging.info("Starting client instance: {}".format(tInst))
@@ -2266,7 +2282,8 @@ class ClientManager:
# Release global variables
- gConfig = None
+ # gConfig = None
+ Config.clearConfig()
gSvcMgr = None
logger = None
@@ -2297,7 +2314,7 @@ class ClientManager:
class MainExec:
def __init__(self):
self._clientMgr = None
- self._svcMgr = None # type: ServiceManager
+ self._svcMgr = None # type: Optional[ServiceManager]
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
@@ -2317,7 +2334,7 @@ class MainExec:
def runClient(self):
global gSvcMgr
- if gConfig.auto_start_service:
+ if Config.getConfig().auto_start_service:
gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
gSvcMgr.startTaosServices() # we start, don't run
@@ -2326,26 +2343,18 @@ class MainExec:
try:
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
except requests.exceptions.ConnectionError as err:
- Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
+ Logging.warning("Failed to open REST connection to DB: {}".format(err))
# don't raise
return ret
def runService(self):
global gSvcMgr
- gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert
+ gSvcMgr = self._svcMgr = ServiceManager(Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
gSvcMgr.run() # run to some end state
gSvcMgr = self._svcMgr = None
- def init(self): # TODO: refactor
- global gContainer
- gContainer = Container() # micky-mouse DI
-
- global gSvcMgr # TODO: refactor away
- gSvcMgr = None
-
- # Super cool Python argument library:
- # https://docs.python.org/3/library/argparse.html
+ def _buildCmdLineParser(self):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent('''\
@@ -2466,20 +2475,29 @@ class MainExec:
action='store_true',
help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
- global gConfig
- gConfig = parser.parse_args()
- crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var
+ return parser
+
+
+ def init(self): # TODO: refactor
+ global gContainer
+ gContainer = Container() # micky-mouse DI
+
+ global gSvcMgr # TODO: refactor away
+ gSvcMgr = None
+
+ parser = self._buildCmdLineParser()
+ Config.init(parser)
# Sanity check for arguments
- if gConfig.use_shadow_db and gConfig.max_dbs>1 :
+ if Config.getConfig().use_shadow_db and Config.getConfig().max_dbs>1 :
raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
- Logging.clsInit(gConfig)
+ Logging.clsInit(Config.getConfig().debug)
Dice.seed(0) # initial seeding of dice
def run(self):
- if gConfig.run_tdengine: # run server
+ if Config.getConfig().run_tdengine: # run server
try:
self.runService()
return 0 # success
diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py
index cdbf2db4da..95507f0142 100644
--- a/tests/pytest/crash_gen/service_manager.py
+++ b/tests/pytest/crash_gen/service_manager.py
@@ -1,25 +1,33 @@
+from __future__ import annotations
+
import os
import io
import sys
+from enum import Enum
import threading
import signal
import logging
import time
-import subprocess
-
-from typing import IO, List
+from subprocess import PIPE, Popen, TimeoutExpired
+from typing import BinaryIO, Generator, IO, List, NewType, Optional
+import typing
try:
import psutil
except:
print("Psutil module needed, please install: sudo pip3 install psutil")
sys.exit(-1)
-
from queue import Queue, Empty
-from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
-from .db import DbConn, DbTarget
-import crash_gen.settings
+from .shared.config import Config
+from .shared.db import DbTarget, DbConn
+from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice
+from .shared.types import DirPath
+
+# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
+# from crash_gen.db import DbConn, DbTarget
+# from crash_gen.settings import Config
+# from crash_gen.types import DirPath
class TdeInstance():
"""
@@ -68,7 +76,10 @@ class TdeInstance():
self._fepPort = fepPort
self._tInstNum = tInstNum
- self._smThread = ServiceManagerThread()
+
+ # An "Tde Instance" will *contain* a "sub process" object, with will/may use a thread internally
+ # self._smThread = ServiceManagerThread()
+ self._subProcess = None # type: Optional[TdeSubProcess]
def getDbTarget(self):
return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port)
@@ -153,23 +164,24 @@ quorum 2
def getExecFile(self): # .../taosd
return self._buildDir + "/build/bin/taosd"
- def getRunDir(self): # TODO: rename to "root dir" ?!
- return self._buildDir + self._subdir
+ def getRunDir(self) -> DirPath : # TODO: rename to "root dir" ?!
+ return DirPath(self._buildDir + self._subdir)
- def getCfgDir(self): # path, not file
- return self.getRunDir() + "/cfg"
+ def getCfgDir(self) -> DirPath : # path, not file
+ return DirPath(self.getRunDir() + "/cfg")
- def getLogDir(self):
- return self.getRunDir() + "/log"
+ def getLogDir(self) -> DirPath :
+ return DirPath(self.getRunDir() + "/log")
def getHostAddr(self):
return "127.0.0.1"
def getServiceCmdLine(self): # to start the instance
cmdLine = []
- if crash_gen.settings.gConfig.track_memory_leaks:
+ if Config.getConfig().track_memory_leaks:
Logging.info("Invoking VALGRIND on service...")
cmdLine = ['valgrind', '--leak-check=yes']
+ # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
return cmdLine
@@ -196,27 +208,46 @@ quorum 2
dbc.close()
def getStatus(self):
- return self._smThread.getStatus()
+ # return self._smThread.getStatus()
+ if self._subProcess is None:
+ return Status(Status.STATUS_EMPTY)
+ return self._subProcess.getStatus()
- def getSmThread(self):
- return self._smThread
+ # def getSmThread(self):
+ # return self._smThread
def start(self):
- if not self.getStatus().isStopped():
+ if self.getStatus().isActive():
raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus()))
Logging.info("Starting TDengine instance: {}".format(self))
self.generateCfgFile() # service side generates config file, client does not
self.rotateLogs()
- self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
+ # self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
+ self._subProcess = TdeSubProcess(self.getServiceCmdLine(), self.getLogDir())
def stop(self):
- self._smThread.stop()
+ self._subProcess.stop()
+ self._subProcess = None
def isFirst(self):
return self._tInstNum == 0
+ def printFirst10Lines(self):
+ if self._subProcess is None:
+ Logging.warning("Incorrect TI status for procIpcBatch-10 operation")
+ return
+ self._subProcess.procIpcBatch(trimToTarget=10, forceOutput=True)
+
+ def procIpcBatch(self):
+ if self._subProcess is None:
+ Logging.warning("Incorrect TI status for procIpcBatch operation")
+ return
+ self._subProcess.procIpcBatch() # may enounter EOF and change status to STOPPED
+ if self._subProcess.getStatus().isStopped():
+ self._subProcess.stop()
+ self._subProcess = None
class TdeSubProcess:
"""
@@ -225,42 +256,57 @@ class TdeSubProcess:
It takes a TdeInstance object as its parameter, with the rationale being
"a sub process runs an instance".
+
+ We aim to ensure that this object has exactly the same life-cycle as the
+ underlying sub process.
"""
# RET_ALREADY_STOPPED = -1
# RET_TIME_OUT = -3
# RET_SUCCESS = -4
- def __init__(self):
- self.subProcess = None # type: subprocess.Popen
- # if tInst is None:
- # raise CrashGenError("Empty instance not allowed in TdeSubProcess")
- # self._tInst = tInst # Default create at ServiceManagerThread
+ def __init__(self, cmdLine: List[str], logDir: DirPath):
+ # Create the process + managing thread immediately
+
+ Logging.info("Attempting to start TAOS sub process...")
+ self._popen = self._start(cmdLine) # the actual sub process
+ self._smThread = ServiceManagerThread(self, logDir) # A thread to manage the sub process, mostly to process the IO
+ Logging.info("Successfully started TAOS process: {}".format(self))
+
+
def __repr__(self):
- if self.subProcess is None:
- return '[TdeSubProc: Empty]'
- return '[TdeSubProc: pid = {}]'.format(self.getPid())
+ # if self.subProcess is None:
+ # return '[TdeSubProc: Empty]'
+ return '[TdeSubProc: pid = {}, status = {}]'.format(
+ self.getPid(), self.getStatus() )
- def getStdOut(self):
- return self.subProcess.stdout
+ def getStdOut(self) -> BinaryIO :
+ if self._popen.universal_newlines : # alias of text_mode
+ raise CrashGenError("We need binary mode for STDOUT IPC")
+ # Logging.info("Type of stdout is: {}".format(type(self._popen.stdout)))
+ return typing.cast(BinaryIO, self._popen.stdout)
- def getStdErr(self):
- return self.subProcess.stderr
+ def getStdErr(self) -> BinaryIO :
+ if self._popen.universal_newlines : # alias of text_mode
+ raise CrashGenError("We need binary mode for STDERR IPC")
+ return typing.cast(BinaryIO, self._popen.stderr)
- def isRunning(self):
- return self.subProcess is not None
+ # Now it's always running, since we matched the life cycle
+ # def isRunning(self):
+ # return self.subProcess is not None
def getPid(self):
- return self.subProcess.pid
+ return self._popen.pid
- def start(self, cmdLine):
+ def _start(self, cmdLine) -> Popen :
ON_POSIX = 'posix' in sys.builtin_module_names
# Sanity check
- if self.subProcess: # already there
- raise RuntimeError("Corrupt process state")
+ # if self.subProcess: # already there
+ # raise RuntimeError("Corrupt process state")
+
# Prepare environment variables for coverage information
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
myEnv = os.environ.copy()
@@ -270,15 +316,12 @@ class TdeSubProcess:
# print("Starting TDengine with env: ", myEnv.items())
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
- useShell = True # Needed to pass environments into it
- self.subProcess = subprocess.Popen(
- # ' '.join(cmdLine) if useShell else cmdLine,
- # shell=useShell,
- ' '.join(cmdLine),
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- # bufsize=1, # not supported in binary mode
+ # useShell = True # Needed to pass environments into it
+ return Popen(
+ ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
+ shell=True, # Always use shell, since we need to pass ENV vars
+ stdout=PIPE,
+ stderr=PIPE,
close_fds=ON_POSIX,
env=myEnv
) # had text=True, which interferred with reading EOF
@@ -288,7 +331,9 @@ class TdeSubProcess:
def stop(self):
"""
- Stop a sub process, DO NOT return anything, process all conditions INSIDE
+ Stop a sub process, DO NOT return anything, process all conditions INSIDE.
+
+ Calling function should immediately delete/unreference the object
Common POSIX signal values (from man -7 signal):
SIGHUP 1
@@ -306,29 +351,39 @@ class TdeSubProcess:
SIGSEGV 11
SIGUSR2 12
"""
- if not self.subProcess:
- Logging.error("Sub process already stopped")
+ # self._popen should always be valid.
+
+ Logging.info("Terminating TDengine service running as the sub process...")
+ if self.getStatus().isStopped():
+ Logging.info("Service already stopped")
+ return
+ if self.getStatus().isStopping():
+ Logging.info("Service is already being stopped, pid: {}".format(self.getPid()))
return
- retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
+ self.setStatus(Status.STATUS_STOPPING)
+
+ retCode = self._popen.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
if retCode: # valid return code, process ended
# retCode = -retCode # only if valid
Logging.warning("TSP.stop(): process ended itself")
- self.subProcess = None
+ # self.subProcess = None
return
# process still alive, let's interrupt it
- self._stopForSure(self.subProcess, self.STOP_SIGNAL) # success if no exception
- self.subProcess = None
+ self._stopForSure(self._popen, self.STOP_SIGNAL) # success if no exception
- # sub process should end, then IPC queue should end, causing IO thread to end
+ # sub process should end, then IPC queue should end, causing IO thread to end
+ self._smThread.stop() # stop for sure too
+
+ self.setStatus(Status.STATUS_STOPPED)
@classmethod
- def _stopForSure(cls, proc: subprocess.Popen, sig: int):
+ def _stopForSure(cls, proc: Popen, sig: int):
'''
Stop a process and all sub processes with a singal, and SIGKILL if necessary
'''
- def doKillTdService(proc: subprocess.Popen, sig: int):
+ def doKillTdService(proc: Popen, sig: int):
Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))
proc.send_signal(sig)
try:
@@ -340,7 +395,7 @@ class TdeSubProcess:
else:
Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
return True # terminated successfully
- except subprocess.TimeoutExpired as err:
+ except TimeoutExpired as err:
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig))
return False # failed to terminate
@@ -349,22 +404,22 @@ class TdeSubProcess:
Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig))
child.send_signal(sig)
try:
- retCode = child.wait(20)
- if (- retCode) == signal.SIGSEGV: # Crashed
+ retCode = child.wait(20) # type: ignore
+ if (- retCode) == signal.SIGSEGV: # type: ignore # Crashed
Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid))
- elif (- retCode) == sig :
+ elif (- retCode) == sig : # type: ignore
Logging.info("Sub-sub process terminated with expected return code {}".format(sig))
else:
- Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
+ Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) # type: ignore
return True # terminated successfully
except psutil.TimeoutExpired as err:
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig))
return False # did not terminate
- def doKill(proc: subprocess.Popen, sig: int):
+ def doKill(proc: Popen, sig: int):
pid = proc.pid
try:
- topSubProc = psutil.Process(pid)
+ topSubProc = psutil.Process(pid) # Now that we are doing "exec -c", should not have children any more
for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False
Logging.warning("Unexpected child to be killed")
doKillChild(child, sig)
@@ -389,19 +444,26 @@ class TdeSubProcess:
return doKill(proc, sig)
def hardKill(proc):
- return doKill(proc, signal.SIGKILL)
-
-
+ return doKill(proc, signal.SIGKILL)
pid = proc.pid
Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig))
if softKill(proc, sig):
- return# success
+ return # success
if sig != signal.SIGKILL: # really was soft above
if hardKill(proc):
- return
+ return
raise CrashGenError("Failed to stop process, pid={}".format(pid))
+ def getStatus(self):
+ return self._smThread.getStatus()
+
+ def setStatus(self, status):
+ self._smThread.setStatus(status)
+
+ def procIpcBatch(self, trimToTarget=0, forceOutput=False):
+ self._smThread.procIpcBatch(trimToTarget, forceOutput)
+
class ServiceManager:
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
@@ -498,10 +560,10 @@ class ServiceManager:
def isActive(self):
"""
Determine if the service/cluster is active at all, i.e. at least
- one thread is not "stopped".
+ one instance is active
"""
for ti in self._tInsts:
- if not ti.getStatus().isStopped():
+ if ti.getStatus().isActive():
return True
return False
@@ -539,10 +601,10 @@ class ServiceManager:
# while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
status = ti.getStatus()
if status.isRunning():
- th = ti.getSmThread()
- th.procIpcBatch() # regular processing,
+ # th = ti.getSmThread()
+ ti.procIpcBatch() # regular processing,
if status.isStopped():
- th.procIpcBatch() # one last time?
+ ti.procIpcBatch() # one last time?
# self._updateThreadStatus()
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
@@ -572,7 +634,8 @@ class ServiceManager:
if not ti.isFirst():
tFirst = self._getFirstInstance()
tFirst.createDnode(ti.getDbTarget())
- ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
+ ti.printFirst10Lines()
+ # ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
def stopTaosServices(self):
with self._lock:
@@ -618,21 +681,24 @@ class ServiceManagerThread:
"""
MAX_QUEUE_SIZE = 10000
- def __init__(self):
+ def __init__(self, subProc: TdeSubProcess, logDir: str):
# Set the sub process
- self._tdeSubProcess = None # type: TdeSubProcess
+ # self._tdeSubProcess = None # type: TdeSubProcess
# Arrange the TDengine instance
# self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
# self._tInst = tInst or TdeInstance() # Need an instance
- self._thread = None # The actual thread, # type: threading.Thread
- self._thread2 = None # watching stderr
+ # self._thread = None # type: Optional[threading.Thread] # The actual thread, # type: threading.Thread
+ # self._thread2 = None # type: Optional[threading.Thread] Thread # watching stderr
self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
+ self._start(subProc, logDir)
+
def __repr__(self):
- return "[SvcMgrThread: status={}, subProc={}]".format(
- self.getStatus(), self._tdeSubProcess)
+ raise CrashGenError("SMT status moved to TdeSubProcess")
+ # return "[SvcMgrThread: status={}, subProc={}]".format(
+ # self.getStatus(), self._tdeSubProcess)
def getStatus(self):
'''
@@ -640,30 +706,33 @@ class ServiceManagerThread:
'''
return self._status
+ def setStatus(self, statusVal: int):
+ self._status.set(statusVal)
+
# Start the thread (with sub process), and wait for the sub service
# to become fully operational
- def start(self, cmdLine : str, logDir: str):
+ def _start(self, subProc :TdeSubProcess, logDir: str):
'''
Request the manager thread to start a new sub process, and manage it.
:param cmdLine: the command line to invoke
:param logDir: the logging directory, to hold stdout/stderr files
'''
- if self._thread:
- raise RuntimeError("Unexpected _thread")
- if self._tdeSubProcess:
- raise RuntimeError("TDengine sub process already created/running")
+ # if self._thread:
+ # raise RuntimeError("Unexpected _thread")
+ # if self._tdeSubProcess:
+ # raise RuntimeError("TDengine sub process already created/running")
- Logging.info("Attempting to start TAOS service: {}".format(self))
+ # Moved to TdeSubProcess
+ # Logging.info("Attempting to start TAOS service: {}".format(self))
self._status.set(Status.STATUS_STARTING)
- self._tdeSubProcess = TdeSubProcess()
- self._tdeSubProcess.start(cmdLine) # TODO: verify process is running
+ # self._tdeSubProcess = TdeSubProcess.start(cmdLine) # TODO: verify process is running
- self._ipcQueue = Queue()
+ self._ipcQueue = Queue() # type: Queue
self._thread = threading.Thread( # First thread captures server OUTPUT
target=self.svcOutputReader,
- args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir))
+ args=(subProc.getStdOut(), self._ipcQueue, logDir))
self._thread.daemon = True # thread dies with the program
self._thread.start()
time.sleep(0.01)
@@ -675,7 +744,7 @@ class ServiceManagerThread:
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
target=self.svcErrorReader,
- args=(self._tdeSubProcess.getStdErr(), self._ipcQueue, logDir))
+ args=(subProc.getStdErr(), self._ipcQueue, logDir))
self._thread2.daemon = True # thread dies with the program
self._thread2.start()
time.sleep(0.01)
@@ -690,14 +759,14 @@ class ServiceManagerThread:
Progress.emit(Progress.SERVICE_START_NAP)
# print("_zz_", end="", flush=True)
if self._status.isRunning():
- Logging.info("[] TDengine service READY to process requests")
- Logging.info("[] TAOS service started: {}".format(self))
+ Logging.info("[] TDengine service READY to process requests: pid={}".format(subProc.getPid()))
+ # Logging.info("[] TAOS service started: {}".format(self))
# self._verifyDnode(self._tInst) # query and ensure dnode is ready
# Logging.debug("[] TAOS Dnode verified: {}".format(self))
return # now we've started
# TODO: handle failure-to-start better?
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
- raise RuntimeError("TDengine service did not start successfully: {}".format(self))
+ raise RuntimeError("TDengine service DID NOT achieve READY status: pid={}".format(subProc.getPid()))
def _verifyDnode(self, tInst: TdeInstance):
dbc = DbConn.createNative(tInst.getDbTarget())
@@ -717,70 +786,45 @@ class ServiceManagerThread:
break
if not isValid:
print("Failed to start dnode, sleep for a while")
- time.sleep(600)
+ time.sleep(10.0)
raise RuntimeError("Failed to start Dnode, expected port not found: {}".
format(tInst.getPort()))
dbc.close()
def stop(self):
# can be called from both main thread or signal handler
- Logging.info("Terminating TDengine service running as the sub process...")
- if self.getStatus().isStopped():
- Logging.info("Service already stopped")
- return
- if self.getStatus().isStopping():
- Logging.info("Service is already being stopped, pid: {}".format(self._tdeSubProcess.getPid()))
- return
- # Linux will send Control-C generated SIGINT to the TDengine process
- # already, ref:
- # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
- if not self._tdeSubProcess:
- raise RuntimeError("sub process object missing")
- self._status.set(Status.STATUS_STOPPING)
- # retCode = self._tdeSubProcess.stop()
- # try:
- # retCode = self._tdeSubProcess.stop()
- # # print("Attempted to stop sub process, got return code: {}".format(retCode))
- # if retCode == signal.SIGSEGV : # SGV
- # Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
- # except subprocess.TimeoutExpired as err:
- # Logging.info("Time out waiting for TDengine service process to exit")
- if not self._tdeSubProcess.stop(): # everything withing
- if self._tdeSubProcess.isRunning(): # still running, should now never happen
- Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
- self._tdeSubProcess.getPid()))
- else:
- self._tdeSubProcess = None # not running any more
- self.join() # stop the thread, change the status, etc.
+ # Linux will send Control-C generated SIGINT to the TDengine process already, ref:
+ # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
+
+ self.join() # stop the thread, status change moved to TdeSubProcess
# Check if it's really stopped
outputLines = 10 # for last output
if self.getStatus().isStopped():
self.procIpcBatch(outputLines) # one last time
- Logging.debug("End of TDengine Service Output: {}".format(self))
+ Logging.debug("End of TDengine Service Output")
Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n")
else:
- print("WARNING: SMT did not terminate as expected: {}".format(self))
+ print("WARNING: SMT did not terminate as expected")
def join(self):
# TODO: sanity check
- if not self.getStatus().isStopping():
+ s = self.getStatus()
+ if s.isStopping() or s.isStopped(): # we may be stopping ourselves, or have been stopped/killed by others
+ if self._thread or self._thread2 :
+ if self._thread:
+ self._thread.join()
+ self._thread = None
+ if self._thread2: # STD ERR thread
+ self._thread2.join()
+ self._thread2 = None
+ else:
+ Logging.warning("Joining empty thread, doing nothing")
+ else:
raise RuntimeError(
"SMT.Join(): Unexpected status: {}".format(self._status))
- if self._thread or self._thread2 :
- if self._thread:
- self._thread.join()
- self._thread = None
- if self._thread2: # STD ERR thread
- self._thread2.join()
- self._thread2 = None
- else:
- print("Joining empty thread, doing nothing")
-
- self._status.set(Status.STATUS_STOPPED)
-
def _trimQueue(self, targetSize):
if targetSize <= 0:
return # do nothing
@@ -799,6 +843,10 @@ class ServiceManagerThread:
TD_READY_MSG = "TDengine is initialized successfully"
def procIpcBatch(self, trimToTarget=0, forceOutput=False):
+ '''
+ Process a batch of STDOUT/STDERR data, until we read EMPTY from
+ the queue.
+ '''
self._trimQueue(trimToTarget) # trim if necessary
# Process all the output generated by the underlying sub process,
# managed by IO thread
@@ -827,35 +875,54 @@ class ServiceManagerThread:
print(pBar, end="", flush=True)
print('\b\b\b\b', end="", flush=True)
- def svcOutputReader(self, out: IO, queue, logDir: str):
+ BinaryChunk = NewType('BinaryChunk', bytes) # line with binary data, directly from STDOUT, etc.
+ TextChunk = NewType('TextChunk', str) # properly decoded, suitable for printing, etc.
+
+ @classmethod
+ def _decodeBinaryChunk(cls, bChunk: bytes) -> Optional[TextChunk] :
+ try:
+ tChunk = bChunk.decode("utf-8").rstrip()
+ return cls.TextChunk(tChunk)
+ except UnicodeError:
+ print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437')))
+ return None
+
+ def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str
+ ) -> Generator[TextChunk, None, None]:
+ '''
+ Take an input stream with binary data, produced a generator of decoded
+ "text chunks", and also save the original binary data in a log file.
+ '''
+ os.makedirs(logDir, exist_ok=True)
+ logF = open(os.path.join(logDir, logFile), 'wb')
+ for bChunk in iter(streamIn.readline, b''):
+ logF.write(bChunk) # Write to log file immediately
+ tChunk = self._decodeBinaryChunk(bChunk) # decode
+ if tChunk is not None:
+ yield tChunk # TODO: split into actual text lines
+
+ # At the end...
+ streamIn.close() # Close the stream
+ logF.close() # Close the output file
+
+ def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str):
'''
The infinite routine that processes the STDOUT stream for the sub process being managed.
- :param out: the IO stream object used to fetch the data from
- :param queue: the queue where we dump the roughly parsed line-by-line data
+ :param stdOut: the IO stream object used to fetch the data from
+ :param queue: the queue where we dump the roughly parsed chunk-by-chunk text data
:param logDir: where we should dump a verbatim output file
'''
- os.makedirs(logDir, exist_ok=True)
- logFile = os.path.join(logDir,'stdout.log')
- fOut = open(logFile, 'wb')
+
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...")
- # for line in out :
- for line in iter(out.readline, b''):
- fOut.write(line)
- # print("Finished reading a line: {}".format(line))
- # print("Adding item to queue...")
- try:
- line = line.decode("utf-8").rstrip()
- except UnicodeError:
- print("\nNon-UTF8 server output: {}\n".format(line))
-
- # This might block, and then causing "out" buffer to block
- queue.put(line)
+ # stdOut.readline() # Skip the first output? TODO: remove?
+ for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') :
+ queue.put(tChunk) # tChunk garanteed not to be None
self._printProgress("_i")
if self._status.isStarting(): # we are starting, let's see if we have started
- if line.find(self.TD_READY_MSG) != -1: # found
+ if tChunk.find(self.TD_READY_MSG) != -1: # found
Logging.info("Waiting for the service to become FULLY READY")
time.sleep(1.0) # wait for the server to truly start. TODO: remove this
Logging.info("Service is now FULLY READY") # TODO: more ID info here?
@@ -869,18 +936,17 @@ class ServiceManagerThread:
print("_w", end="", flush=True)
# queue.put(line)
- # meaning sub process must have died
- Logging.info("EOF for TDengine STDOUT: {}".format(self))
- out.close() # Close the stream
- fOut.close() # Close the output file
+ # stdOut has no more data, meaning sub process must have died
+ Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
+ self.setStatus(Status.STATUS_STOPPED)
- def svcErrorReader(self, err: IO, queue, logDir: str):
- os.makedirs(logDir, exist_ok=True)
- logFile = os.path.join(logDir,'stderr.log')
- fErr = open(logFile, 'wb')
- for line in iter(err.readline, b''):
- fErr.write(line)
- Logging.info("TDengine STDERR: {}".format(line))
- Logging.info("EOF for TDengine STDERR: {}".format(self))
- err.close()
- fErr.close()
\ No newline at end of file
+ def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str):
+ # os.makedirs(logDir, exist_ok=True)
+ # logFile = os.path.join(logDir,'stderr.log')
+ # fErr = open(logFile, 'wb')
+ # for line in iter(err.readline, b''):
+ for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') :
+ queue.put(tChunk) # tChunk garanteed not to be None
+ # fErr.write(line)
+ Logging.info("TDengine STDERR: {}".format(tChunk))
+ Logging.info("EOF for TDengine STDERR")
diff --git a/tests/pytest/crash_gen/settings.py b/tests/pytest/crash_gen/settings.py
deleted file mode 100644
index 3c4c91e6e0..0000000000
--- a/tests/pytest/crash_gen/settings.py
+++ /dev/null
@@ -1,8 +0,0 @@
-from __future__ import annotations
-import argparse
-
-gConfig: argparse.Namespace
-
-def init():
- global gConfig
- gConfig = []
\ No newline at end of file
diff --git a/tests/pytest/crash_gen/shared/config.py b/tests/pytest/crash_gen/shared/config.py
new file mode 100644
index 0000000000..7b9f7c3873
--- /dev/null
+++ b/tests/pytest/crash_gen/shared/config.py
@@ -0,0 +1,42 @@
+from __future__ import annotations
+import argparse
+
+from typing import Optional
+
+from .misc import CrashGenError
+
+# from crash_gen.misc import CrashGenError
+
+# gConfig: Optional[argparse.Namespace]
+
+class Config:
+ _config = None # type Optional[argparse.Namespace]
+
+ @classmethod
+ def init(cls, parser: argparse.ArgumentParser):
+ if cls._config is not None:
+ raise CrashGenError("Config can only be initialized once")
+ cls._config = parser.parse_args()
+ # print(cls._config)
+
+ @classmethod
+ def setConfig(cls, config: argparse.Namespace):
+ cls._config = config
+
+ @classmethod
+ # TODO: check items instead of exposing everything
+ def getConfig(cls) -> argparse.Namespace:
+ if cls._config is None:
+ raise CrashGenError("invalid state")
+ return cls._config
+
+ @classmethod
+ def clearConfig(cls):
+ cls._config = None
+
+ @classmethod
+ def isSet(cls, cfgKey):
+ cfg = cls.getConfig()
+ if cfgKey not in cfg:
+ return False
+ return cfg.__getattribute__(cfgKey)
\ No newline at end of file
diff --git a/tests/pytest/crash_gen/db.py b/tests/pytest/crash_gen/shared/db.py
similarity index 93%
rename from tests/pytest/crash_gen/db.py
rename to tests/pytest/crash_gen/shared/db.py
index 62a369c41a..75931ace48 100644
--- a/tests/pytest/crash_gen/db.py
+++ b/tests/pytest/crash_gen/shared/db.py
@@ -1,24 +1,26 @@
from __future__ import annotations
import sys
+import os
+import datetime
import time
import threading
import requests
from requests.auth import HTTPBasicAuth
+
import taos
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
-from .misc import Logging, CrashGenError, Helper, Dice
-import os
-import datetime
import traceback
# from .service_manager import TdeInstance
-import crash_gen.settings
+from .config import Config
+from .misc import Logging, CrashGenError, Helper
+from .types import QueryResult
class DbConn:
TYPE_NATIVE = "native-c"
@@ -79,7 +81,7 @@ class DbConn:
raise RuntimeError("Cannot query database until connection is open")
nRows = self.query(sql)
if nRows != 1:
- raise taos.error.ProgrammingError(
+ raise CrashGenError(
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
(CrashGenError.INVALID_EMPTY_RESULT if nRows==0 else CrashGenError.INVALID_MULTIPLE_RESULT)
)
@@ -115,7 +117,7 @@ class DbConn:
try:
self.execute(sql)
return True # ignore num of results, return success
- except taos.error.ProgrammingError as err:
+ except taos.error.Error as err:
return False # failed, for whatever TAOS reason
# Not possile to reach here, non-TAOS exception would have been thrown
@@ -126,7 +128,7 @@ class DbConn:
def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden")
- def getQueryResult(self):
+ def getQueryResult(self) -> QueryResult :
raise RuntimeError("Unexpected execution, should be overriden")
def getResultRows(self):
@@ -221,7 +223,7 @@ class DbConnRest(DbConn):
class MyTDSql:
# Class variables
_clsLock = threading.Lock() # class wide locking
- longestQuery = None # type: str
+ longestQuery = '' # type: str
longestQueryTime = 0.0 # seconds
lqStartTime = 0.0
# lqEndTime = 0.0 # Not needed, as we have the two above already
@@ -249,7 +251,13 @@ class MyTDSql:
def _execInternal(self, sql):
startTime = time.time()
# Logging.debug("Executing SQL: " + sql)
+ # ret = None # TODO: use strong type here
+ # try: # Let's not capture the error, and let taos.error.ProgrammingError pass through
ret = self._cursor.execute(sql)
+ # except taos.error.ProgrammingError as err:
+ # Logging.warning("Taos SQL execution error: {}, SQL: {}".format(err.msg, sql))
+ # raise CrashGenError(err.msg)
+
# print("\nSQL success: {}".format(sql))
queryTime = time.time() - startTime
# Record the query time
@@ -261,7 +269,7 @@ class MyTDSql:
cls.lqStartTime = startTime
# Now write to the shadow database
- if crash_gen.settings.gConfig.use_shadow_db:
+ if Config.isSet('use_shadow_db'):
if sql[:11] == "INSERT INTO":
if sql[:16] == "INSERT INTO db_0":
sql2 = "INSERT INTO db_s" + sql[16:]
@@ -453,31 +461,11 @@ class DbManager():
''' Release the underlying DB connection upon deletion of DbManager '''
self.cleanUp()
- def getDbConn(self):
+ def getDbConn(self) -> DbConn :
+ if self._dbConn is None:
+ raise CrashGenError("Unexpected empty DbConn")
return self._dbConn
- # TODO: not used any more, to delete
- def pickAndAllocateTable(self): # pick any table, and "use" it
- return self.tableNumQueue.pickAndAllocate()
-
- # TODO: Not used any more, to delete
- def addTable(self):
- with self._lock:
- tIndex = self.tableNumQueue.push()
- return tIndex
-
- # Not used any more, to delete
- def releaseTable(self, i): # return the table back, so others can use it
- self.tableNumQueue.release(i)
-
- # TODO: not used any more, delete
- def getTableNameToDelete(self):
- tblNum = self.tableNumQueue.pop() # TODO: race condition!
- if (not tblNum): # maybe false
- return False
-
- return "table_{}".format(tblNum)
-
def cleanUp(self):
if self._dbConn:
self._dbConn.close()
diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/shared/misc.py
similarity index 90%
rename from tests/pytest/crash_gen/misc.py
rename to tests/pytest/crash_gen/shared/misc.py
index 9774ec5455..90ad802ff1 100644
--- a/tests/pytest/crash_gen/misc.py
+++ b/tests/pytest/crash_gen/shared/misc.py
@@ -3,6 +3,7 @@ import random
import logging
import os
import sys
+from typing import Optional
import taos
@@ -39,14 +40,14 @@ class MyLoggingAdapter(logging.LoggerAdapter):
class Logging:
- logger = None
+ logger = None # type: Optional[MyLoggingAdapter]
@classmethod
def getLogger(cls):
- return logger
+ return cls.logger
@classmethod
- def clsInit(cls, gConfig): # TODO: refactor away gConfig
+ def clsInit(cls, debugMode: bool):
if cls.logger:
return
@@ -60,13 +61,9 @@ class Logging:
# Logging adapter, to be used as a logger
# print("setting logger variable")
# global logger
- cls.logger = MyLoggingAdapter(_logger, [])
-
- if (gConfig.debug):
- cls.logger.setLevel(logging.DEBUG) # default seems to be INFO
- else:
- cls.logger.setLevel(logging.INFO)
-
+ cls.logger = MyLoggingAdapter(_logger, {})
+ cls.logger.setLevel(logging.DEBUG if debugMode else logging.INFO) # default seems to be INFO
+
@classmethod
def info(cls, msg):
cls.logger.info(msg)
@@ -84,6 +81,7 @@ class Logging:
cls.logger.error(msg)
class Status:
+ STATUS_EMPTY = 99
STATUS_STARTING = 1
STATUS_RUNNING = 2
STATUS_STOPPING = 3
@@ -95,12 +93,16 @@ class Status:
def __repr__(self):
return "[Status: v={}]".format(self._status)
- def set(self, status):
+ def set(self, status: int):
self._status = status
def get(self):
return self._status
+ def isEmpty(self):
+ ''' Empty/Undefined '''
+ return self._status == Status.STATUS_EMPTY
+
def isStarting(self):
return self._status == Status.STATUS_STARTING
@@ -117,6 +119,9 @@ class Status:
def isStable(self):
return self.isRunning() or self.isStopped()
+ def isActive(self):
+ return self.isStarting() or self.isRunning() or self.isStopping()
+
# Deterministic random number generator
class Dice():
seeded = False # static, uninitialized
diff --git a/tests/pytest/crash_gen/shared/types.py b/tests/pytest/crash_gen/shared/types.py
new file mode 100644
index 0000000000..814a821917
--- /dev/null
+++ b/tests/pytest/crash_gen/shared/types.py
@@ -0,0 +1,28 @@
+from typing import Any, List, Dict, NewType
+from enum import Enum
+
+DirPath = NewType('DirPath', str)
+
+QueryResult = NewType('QueryResult', List[List[Any]])
+
+class TdDataType(Enum):
+ '''
+ Use a Python Enum types of represent all the data types in TDengine.
+
+ Ref: https://www.taosdata.com/cn/documentation/taos-sql#data-type
+ '''
+ TIMESTAMP = 'TIMESTAMP'
+ INT = 'INT'
+ BIGINT = 'BIGINT'
+ FLOAT = 'FLOAT'
+ DOUBLE = 'DOUBLE'
+ BINARY = 'BINARY'
+ BINARY16 = 'BINARY(16)' # TODO: get rid of this hack
+ BINARY200 = 'BINARY(200)'
+ SMALLINT = 'SMALLINT'
+ TINYINT = 'TINYINT'
+ BOOL = 'BOOL'
+ NCHAR = 'NCHAR'
+
+TdColumns = Dict[str, TdDataType]
+TdTags = Dict[str, TdDataType]
diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh
index b5aae6fcef..a748c9dd2d 100755
--- a/tests/pytest/fulltest.sh
+++ b/tests/pytest/fulltest.sh
@@ -305,6 +305,7 @@ python3 ./test.py -f functions/function_top.py -r 1
python3 ./test.py -f functions/function_twa.py -r 1
python3 ./test.py -f functions/function_twa_test2.py
python3 ./test.py -f functions/function_stddev_td2555.py
+python3 ./test.py -f functions/showOfflineThresholdIs864000.py
python3 ./test.py -f insert/metadataUpdate.py
python3 ./test.py -f query/last_cache.py
python3 ./test.py -f query/last_row_cache.py
diff --git a/tests/pytest/functions/showOfflineThresholdIs864000.py b/tests/pytest/functions/showOfflineThresholdIs864000.py
new file mode 100644
index 0000000000..6cce869bf2
--- /dev/null
+++ b/tests/pytest/functions/showOfflineThresholdIs864000.py
@@ -0,0 +1,36 @@
+###################################################################
+# Copyright (c) 2016 by TAOS Technologies, Inc.
+# All rights reserved.
+#
+# This file is proprietary and confidential to TAOS Technologies.
+# No part of this file may be reproduced, stored, transmitted,
+# disclosed or used in any form or by any means other than as
+# expressly provided by the written permission from Jianhui Tao
+#
+###################################################################
+
+# -*- coding: utf-8 -*-
+
+import sys
+from util.log import *
+from util.cases import *
+from util.sql import *
+from util.dnodes import *
+
+
+class TDTestCase:
+ def init(self, conn, logSql):
+ tdLog.debug(f"start to execute {__file__}")
+ tdSql.init(conn.cursor(), logSql)
+
+ def run(self):
+ tdSql.query("show variables")
+ tdSql.checkData(51, 1, 864000)
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+
+tdCases.addWindows(__file__, TDTestCase())
+tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
diff --git a/tests/pytest/perf_gen.py b/tests/pytest/perf_gen.py
new file mode 100755
index 0000000000..f0402fbb6b
--- /dev/null
+++ b/tests/pytest/perf_gen.py
@@ -0,0 +1,485 @@
+#!/usr/bin/python3.8
+
+from abc import abstractmethod
+
+import time
+from datetime import datetime
+
+from influxdb_client import InfluxDBClient, Point, WritePrecision, BucketsApi
+from influxdb_client.client.write_api import SYNCHRONOUS
+
+import argparse
+import textwrap
+import subprocess
+import sys
+
+import taos
+
+from crash_gen.crash_gen_main import Database, TdSuperTable
+from crash_gen.service_manager import TdeInstance
+
+from crash_gen.shared.config import Config
+from crash_gen.shared.db import DbConn
+from crash_gen.shared.misc import Dice, Logging, Helper
+from crash_gen.shared.types import TdDataType
+
+
+# NUM_PROCESSES = 10
+# NUM_REPS = 1000
+
+tick = int(time.time() - 5000000.0) # for now we will create max 5M record
+value = 101
+
+DB_NAME = 'mydb'
+TIME_SERIES_NAME = 'widget'
+
+MAX_SHELF = 500 # shelf number runs up to this, non-inclusive
+ITEMS_PER_SHELF = 5
+BATCH_SIZE = 2000 # Number of data points per request
+
+# None_RW:
+# INFLUX_TOKEN='RRzVQZs8ERCpV9cS2RXqgtM_Y6FEZuJ7Tuk0aHtZItFTfcM9ajixtGDhW8HzqNIBmG3hmztw-P4sHOstfJvjFA=='
+# DevOrg_RW:
+# INFLUX_TOKEN='o1P8sEhBmXKhxBmNuiCyOUKv8d7qm5wUjMff9AbskBu2LcmNPQzU77NrAn5hDil8hZ0-y1AGWpzpL-4wqjFdkA=='
+# DevOrg_All_Access
+INFLUX_TOKEN='T2QTr4sloJhINH_oSrwSS-WIIZYjDfD123NK4ou3b7ajRs0c0IphCh3bNc0OsDZQRW1HyCby7opdEndVYFGTWQ=='
+INFLUX_ORG="DevOrg"
+INFLUX_BUCKET="Bucket01"
+
+def writeTaosBatch(dbc, tblName):
+ # Database.setupLastTick()
+ global value, tick
+
+ data = []
+ for i in range(0, 100):
+ data.append("('{}', {})".format(Database.getNextTick(), value) )
+ value += 1
+
+ sql = "INSERT INTO {} VALUES {}".format(tblName, ''.join(data))
+ dbc.execute(sql)
+
+class PerfGenError(taos.error.ProgrammingError):
+ pass
+
+class Benchmark():
+
+ # @classmethod
+ # def create(cls, dbType):
+ # if dbType == 'taos':
+ # return TaosBenchmark()
+ # elif dbType == 'influx':
+ # return InfluxBenchmark()
+ # else:
+ # raise RuntimeError("Unknown DB type: {}".format(dbType))
+
+ def __init__(self, dbType, loopCount = 0):
+ self._dbType = dbType
+ self._setLoopCount(loopCount)
+
+ def _setLoopCount(self, loopCount):
+ cfgLoopCount = Config.getConfig().loop_count
+ if loopCount == 0: # use config
+ self._loopCount = cfgLoopCount
+ else:
+ if cfgLoopCount :
+ Logging.warning("Ignoring loop count for fixed-loop-count benchmarks: {}".format(cfgLoopCount))
+ self._loopCount = loopCount
+
+ @abstractmethod
+ def doIterate(self):
+ '''
+ Execute the benchmark directly, without invoking sub processes,
+ effectively using one execution thread.
+ '''
+ pass
+
+ @abstractmethod
+ def prepare(self):
+ '''
+ Preparation needed to run a certain benchmark
+ '''
+ pass
+
+ @abstractmethod
+ def execute(self):
+ '''
+ Actually execute the benchmark
+ '''
+ Logging.warning("Unexpected execution")
+
+ @property
+ def name(self):
+ return self.__class__.__name__
+
+ def run(self):
+ print("Running benchmark: {}, class={} ...".format(self.name, self.__class__))
+ startTime = time.time()
+
+ # Prepare to execute the benchmark
+ self.prepare()
+
+ # Actually execute the benchmark
+ self.execute()
+
+ # if Config.getConfig().iterate_directly: # execute directly
+ # Logging.debug("Iterating...")
+ # self.doIterate()
+ # else:
+ # Logging.debug("Executing via sub process...")
+ # startTime = time.time()
+ # self.prepare()
+ # self.spawnProcesses()
+ # self.waitForProcecess()
+ # duration = time.time() - startTime
+ # Logging.info("Benchmark execution completed in {:.3f} seconds".format(duration))
+ Logging.info("Benchmark {} finished in {:.3f} seconds".format(
+ self.name, time.time()-startTime))
+
+ def spawnProcesses(self):
+ self._subProcs = []
+ for j in range(0, Config.getConfig().subprocess_count):
+ ON_POSIX = 'posix' in sys.builtin_module_names
+ tblName = 'cars_reg_{}'.format(j)
+ cmdLineStr = './perf_gen.sh -t {} -i -n {} -l {}'.format(
+ self._dbType,
+ tblName,
+ Config.getConfig().loop_count
+ )
+ if Config.getConfig().debug:
+ cmdLineStr += ' -d'
+ subProc = subprocess.Popen(cmdLineStr,
+ shell = True,
+ close_fds = ON_POSIX)
+ self._subProcs.append(subProc)
+
+ def waitForProcecess(self):
+ for sp in self._subProcs:
+ sp.wait(300)
+
+
+class TaosBenchmark(Benchmark):
+
+ def __init__(self, loopCount):
+ super().__init__('taos', loopCount)
+ # self._dbType = 'taos'
+ tInst = TdeInstance()
+ self._dbc = DbConn.createNative(tInst.getDbTarget())
+ self._dbc.open()
+ self._sTable = TdSuperTable(TIME_SERIES_NAME + '_s', DB_NAME)
+
+ def doIterate(self):
+ tblName = Config.getConfig().target_table_name
+ print("Benchmarking TAOS database (1 pass) for: {}".format(tblName))
+ self._dbc.execute("USE {}".format(DB_NAME))
+
+ self._sTable.ensureRegTable(None, self._dbc, tblName)
+ try:
+ lCount = Config.getConfig().loop_count
+ print("({})".format(lCount))
+ for i in range(0, lCount):
+ writeTaosBatch(self._dbc, tblName)
+ except taos.error.ProgrammingError as err:
+ Logging.error("Failed to write batch")
+
+ def prepare(self):
+ self._dbc.execute("CREATE DATABASE IF NOT EXISTS {}".format(DB_NAME))
+ self._dbc.execute("USE {}".format(DB_NAME))
+ # Create the super table
+ self._sTable.drop(self._dbc, True)
+ self._sTable.create(self._dbc,
+ {'ts': TdDataType.TIMESTAMP,
+ 'temperature': TdDataType.INT,
+ 'pressure': TdDataType.INT,
+ 'notes': TdDataType.BINARY200
+ },
+ {'rack': TdDataType.INT,
+ 'shelf': TdDataType.INT,
+ 'barcode': TdDataType.BINARY16
+ })
+
+ def execSql(self, sql):
+ try:
+ self._dbc.execute(sql)
+ except taos.error.ProgrammingError as err:
+ Logging.warning("SQL Error: 0x{:X}, {}, SQL: {}".format(
+ Helper.convertErrno(err.errno), err.msg, sql))
+ raise
+
+ def executeWrite(self):
+ # Sample: INSERT INTO t1 USING st TAGS(1) VALUES(now, 1) t2 USING st TAGS(2) VALUES(now, 2)
+ sqlPrefix = "INSERT INTO "
+ dataTemplate = "{} USING {} TAGS({},{},'barcode_{}') VALUES('{}',{},{},'{}') "
+
+ stName = self._sTable.getName()
+ BATCH_SIZE = 2000 # number of items per request batch
+ ITEMS_PER_SHELF = 5
+
+ # rackSize = 10 # shelves per rack
+ # shelfSize = 100 # items per shelf
+ batchCount = self._loopCount // BATCH_SIZE
+ lastRack = 0
+ for i in range(batchCount):
+ sql = sqlPrefix
+ for j in range(BATCH_SIZE):
+ n = i*BATCH_SIZE + j # serial number
+ # values first
+ # rtName = 'rt_' + str(n) # table name contains serial number, has info
+ temperature = 20 + (n % 10)
+ pressure = 70 + (n % 10)
+ # tags
+ shelf = (n // ITEMS_PER_SHELF) % MAX_SHELF # shelf number
+ rack = n // (ITEMS_PER_SHELF * MAX_SHELF) # rack number
+ barcode = rack + shelf
+ # table name
+ tableName = "reg_" + str(rack) + '_' + str(shelf)
+ # now the SQL
+ sql += dataTemplate.format(tableName, stName,# table name
+ rack, shelf, barcode, # tags
+ Database.getNextTick(), temperature, pressure, 'xxx') # values
+ lastRack = rack
+ self.execSql(sql)
+ Logging.info("Last Rack: {}".format(lastRack))
+
+class TaosWriteBenchmark(TaosBenchmark):
+ def execute(self):
+ self.executeWrite()
+
+class Taos100kWriteBenchmark(TaosWriteBenchmark):
+ def __init__(self):
+ super().__init__(100*1000)
+
+class Taos10kWriteBenchmark(TaosWriteBenchmark):
+ def __init__(self):
+ super().__init__(10*1000)
+
+class Taos1mWriteBenchmark(TaosWriteBenchmark):
+ def __init__(self):
+ super().__init__(1000*1000)
+
+class Taos5mWriteBenchmark(TaosWriteBenchmark):
+ def __init__(self):
+ super().__init__(5*1000*1000)
+
+class Taos1kQueryBenchmark(TaosBenchmark):
+ def __init__(self):
+ super().__init__(1000)
+
+class Taos1MCreationBenchmark(TaosBenchmark):
+ def __init__(self):
+ super().__init__(1000000)
+
+
+class InfluxBenchmark(Benchmark):
+ def __init__(self, loopCount):
+ super().__init__('influx', loopCount)
+ # self._dbType = 'influx'
+
+
+ # self._client = InfluxDBClient(host='localhost', port=8086)
+
+ # def _writeBatch(self, tblName):
+ # global value, tick
+ # data = []
+ # for i in range(0, 100):
+ # line = "{},device={} value={} {}".format(
+ # TIME_SERIES_NAME,
+ # tblName,
+ # value,
+ # tick*1000000000)
+ # # print(line)
+ # data.append(line)
+ # value += 1
+ # tick +=1
+
+ # self._client.write(data, {'db':DB_NAME}, protocol='line')
+
+ def executeWrite(self):
+ global tick # influx tick #TODO refactor
+
+ lineTemplate = TIME_SERIES_NAME + ",rack={},shelf={},barcode='barcode_{}' temperature={},pressure={} {}"
+
+ batchCount = self._loopCount // BATCH_SIZE
+ for i in range(batchCount):
+ lineBatch = []
+ for j in range(BATCH_SIZE):
+ n = i*BATCH_SIZE + j # serial number
+ # values first
+ # rtName = 'rt_' + str(n) # table name contains serial number, has info
+ temperature = 20 + (n % 10)
+ pressure = 70 + (n % 10)
+ # tags
+ shelf = (n // ITEMS_PER_SHELF) % MAX_SHELF # shelf number
+ rack = n // (ITEMS_PER_SHELF * MAX_SHELF) # rack number
+ barcode = rack + shelf
+ # now the SQL
+ line = lineTemplate.format(
+ rack, shelf, barcode, # tags
+ temperature, pressure, # values
+ tick * 1000000000 )
+ tick += 1
+ lineBatch.append(line)
+ write_api = self._client.write_api(write_options=SYNCHRONOUS)
+ write_api.write(INFLUX_BUCKET, INFLUX_ORG, lineBatch)
+ # self._client.write(lineBatch, {'db':DB_NAME}, protocol='line')
+
+ # def doIterate(self):
+ # tblName = Config.getConfig().target_table_name
+ # print("Benchmarking INFLUX database (1 pass) for: {}".format(tblName))
+
+ # for i in range(0, Config.getConfig().loop_count):
+ # self._writeBatch(tblName)
+
+ def _getOrgIdByName(self, orgName):
+ """Find org by name.
+
+ """
+ orgApi = self._client.organizations_api()
+ orgs = orgApi.find_organizations()
+ for org in orgs:
+ if org.name == orgName:
+ return org.id
+ raise PerfGenError("Org not found with name: {}".format(orgName))
+
+ def _fetchAuth(self):
+ authApi = self._client.authorizations_api()
+ auths = authApi.find_authorizations()
+ for auth in auths:
+ if auth.token == INFLUX_TOKEN :
+ return auth
+ raise PerfGenError("No proper auth found")
+
+ def _verifyPermissions(self, perms: list):
+ if list:
+ return #OK
+ raise PerfGenError("No permission found")
+
+ def prepare(self):
+ self._client = InfluxDBClient(
+ url="http://127.0.0.1:8086",
+ token=INFLUX_TOKEN,
+ org=INFLUX_ORG)
+
+ auth = self._fetchAuth()
+
+ self._verifyPermissions(auth.permissions)
+
+ bktApi = self._client.buckets_api()
+ # Delete
+ bkt = bktApi.find_bucket_by_name(INFLUX_BUCKET)
+ if bkt:
+ bktApi.delete_bucket(bkt)
+ # Recreate
+
+ orgId = self._getOrgIdByName(INFLUX_ORG)
+ bktApi.create_bucket(bucket=None, bucket_name=INFLUX_BUCKET, org_id=orgId)
+
+ # self._client.drop_database(DB_NAME)
+ # self._client.create_database(DB_NAME)
+ # self._client.switch_database(DB_NAME)
+
+class InfluxWriteBenchmark(InfluxBenchmark):
+ def execute(self):
+ return self.executeWrite()
+
+class Influx10kWriteBenchmark(InfluxWriteBenchmark):
+ def __init__(self):
+ super().__init__(10*1000)
+
+class Influx100kWriteBenchmark(InfluxWriteBenchmark):
+ def __init__(self):
+ super().__init__(100*1000)
+
+class Influx1mWriteBenchmark(InfluxWriteBenchmark):
+ def __init__(self):
+ super().__init__(1000*1000)
+
+class Influx5mWriteBenchmark(InfluxWriteBenchmark):
+ def __init__(self):
+ super().__init__(5*1000*1000)
+
+def _buildCmdLineParser():
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=textwrap.dedent('''\
+ TDengine Performance Benchmarking Tool
+ ---------------------------------------------------------------------
+
+ '''))
+
+ parser.add_argument(
+ '-b',
+ '--benchmark-name',
+ action='store',
+ default='Taos1kQuery',
+ type=str,
+ help='Benchmark to use (default: Taos1kQuery)')
+
+ parser.add_argument(
+ '-d',
+ '--debug',
+ action='store_true',
+ help='Turn on DEBUG mode for more logging (default: false)')
+
+ parser.add_argument(
+ '-i',
+ '--iterate-directly',
+ action='store_true',
+ help='Execution operations directly without sub-process (default: false)')
+
+ parser.add_argument(
+ '-l',
+ '--loop-count',
+ action='store',
+ default=1000,
+ type=int,
+ help='Number of loops to perform, 100 operations per loop. (default: 1000)')
+
+ parser.add_argument(
+ '-n',
+ '--target-table-name',
+ action='store',
+ default=None,
+ type=str,
+ help='Regular table name in target DB (default: None)')
+
+ parser.add_argument(
+ '-s',
+ '--subprocess-count',
+ action='store',
+ default=4,
+ type=int,
+ help='Number of sub processes to spawn. (default: 10)')
+
+ parser.add_argument(
+ '-t',
+ '--target-database',
+ action='store',
+ default='taos',
+ type=str,
+ help='Benchmark target: taos, influx (default: taos)')
+
+ return parser
+
+def main():
+ parser = _buildCmdLineParser()
+ Config.init(parser)
+ Logging.clsInit(Config.getConfig().debug)
+ Dice.seed(0) # initial seeding of dice
+
+ bName = Config.getConfig().benchmark_name
+ bClassName = bName + 'Benchmark'
+ x = globals()
+ if bClassName in globals():
+ bClass = globals()[bClassName]
+ bm = bClass() # Benchmark object
+ bm.run()
+ else:
+ raise PerfGenError("No such benchmark: {}".format(bName))
+
+ # bm = Benchmark.create(Config.getConfig().target_database)
+ # bm.run()
+
+if __name__ == "__main__":
+ main()
+
+
diff --git a/tests/pytest/perf_gen.sh b/tests/pytest/perf_gen.sh
new file mode 100755
index 0000000000..fcedd2d407
--- /dev/null
+++ b/tests/pytest/perf_gen.sh
@@ -0,0 +1,60 @@
+#!/bin/bash
+
+# This is the script for us to try to cause the TDengine server or client to crash
+#
+# PREPARATION
+#
+# 1. Build an compile the TDengine source code that comes with this script, in the same directory tree
+# 2. Please follow the direction in our README.md, and build TDengine in the build/ directory
+# 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg
+# 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg
+# 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above
+# 6. Make sure you have the proper Python packages: # sudo apt install python3-setuptools python3-pip python3-distutils
+#
+# RUNNING THIS SCRIPT
+#
+# This script assumes the source code directory is intact, and that the binaries has been built in the
+# build/ directory, as such, will will load the Python libraries in the directory tree, and also load
+# the TDengine client shared library (so) file, in the build/directory, as evidenced in the env
+# variables below.
+#
+# Running the script is simple, no parameter is needed (for now, but will change in the future).
+#
+# Happy Crashing...
+
+
+# Due to the heavy path name assumptions/usage, let us require that the user be in the current directory
+EXEC_DIR=`dirname "$0"`
+if [[ $EXEC_DIR != "." ]]
+then
+ echo "ERROR: Please execute `basename "$0"` in its own directory (for now anyway, pardon the dust)"
+ exit -1
+fi
+
+CURR_DIR=`pwd`
+IN_TDINTERNAL="community"
+if [[ "$CURR_DIR" == *"$IN_TDINTERNAL"* ]]; then
+ TAOS_DIR=$CURR_DIR/../../..
+ TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
+ LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6,7|rev`/lib
+else
+ TAOS_DIR=$CURR_DIR/../..
+ TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
+ LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6|rev`/lib
+fi
+
+# Now getting ready to execute Python
+# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
+PYTHON_EXEC=python3.8
+
+# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
+export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3:$(pwd)
+
+# Then let us set up the library path so that our compiled SO file can be loaded by Python
+export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
+
+# Now we are all let, and let's see if we can find a crash. Note we pass all params
+PERF_GEN_EXEC=perf_gen.py
+$PYTHON_EXEC $PERF_GEN_EXEC $@
+
+
diff --git a/tests/pytest/table/column_name.py b/tests/pytest/table/column_name.py
index a180d3f752..0f24b98f3a 100644
--- a/tests/pytest/table/column_name.py
+++ b/tests/pytest/table/column_name.py
@@ -88,10 +88,9 @@ class TDTestCase:
# TSIM:
# TSIM: print =============== step4
tdLog.info('=============== step4')
- # TSIM: sql create table $tb (ts timestamp,
- # a0123456789012345678901234567890123456789 int)
+ # TSIM: sql create table $tb (ts timestamp, a0123456789012345678901234567890123456789 int)
getMaxColNum = "grep -w '#define TSDB_COL_NAME_LEN' ../../src/inc/taosdef.h|awk '{print $3}'"
- boundary = int(subprocess.check_output(getMaxColNum, shell=True))
+ boundary = int(subprocess.check_output(getMaxColNum, shell=True)) - 1
tdLog.info("get max column name length is %d" % boundary)
chars = string.ascii_uppercase + string.ascii_lowercase
diff --git a/tests/script/general/parser/alter_stable.sim b/tests/script/general/parser/alter_stable.sim
index 8a7f4fa924..afdd7d3edf 100644
--- a/tests/script/general/parser/alter_stable.sim
+++ b/tests/script/general/parser/alter_stable.sim
@@ -22,7 +22,7 @@ sql_error alter table mt1 change tag a 1
sql_error create table mtx1 (ts timestamp, c1 int) tags (123 int)
-sql create table mt2 (ts timestamp, c1 int) tags (abc012345678901234567890123456789012345678901234567890123456789def int)
+sql_error create table mt2 (ts timestamp, c1 int) tags (abc012345678901234567890123456789012345678901234567890123456789def int)
sql create table mt3 (ts timestamp, c1 int) tags (abc012345678901234567890123456789012345678901234567890123456789 int)
sql_error alter table mt3 change tag abc012345678901234567890123456789012345678901234567890123456789 abcdefg012345678901234567890123456789012345678901234567890123456789
sql alter table mt3 change tag abc012345678901234567890123456789012345678901234567890123456789 abcdefg0123456789012345678901234567890123456789
diff --git a/tests/script/general/parser/create_tb.sim b/tests/script/general/parser/create_tb.sim
index eb6e4f71c3..ca57f401b9 100644
--- a/tests/script/general/parser/create_tb.sim
+++ b/tests/script/general/parser/create_tb.sim
@@ -114,7 +114,11 @@ sql_error create table $tb (ts timestamp, $tag int)
sql_error create table $tb (ts timestamp, $tags int)
sql_error create table $tb (ts timestamp, $sint int)
sql_error create table $tb (ts timestamp, $tint int)
-sql_error create table $tb (ts timestamp, $nchar int)
+sql_error create table $tb (ts timestamp, $nchar int)
+
+# too long column name
+sql_error create table $tb (ts timestamp, abcde_123456789_123456789_123456789_123456789_123456789_123456789 int)
+sql_error create table tx(ts timestamp, k int) tags(abcd5_123456789_123456789_123456789_123456789_123456789_123456789 int)
print illegal_column_names test passed
# case5: chinese_char_in_table_support
diff --git a/tests/script/general/parser/dbtbnameValidate.sim b/tests/script/general/parser/dbtbnameValidate.sim
index f2e6de81f1..bc3bfefafb 100644
--- a/tests/script/general/parser/dbtbnameValidate.sim
+++ b/tests/script/general/parser/dbtbnameValidate.sim
@@ -119,4 +119,8 @@ if $rows != 4 then
return -1
endi
+print ================>td-4147
+sql_error create table tx(ts timestamp, a1234_0123456789_0123456789_0123456789_0123456789_0123456789_0123456789 int)
+
+
system sh/exec.sh -n dnode1 -s stop -x SIGINT