Merge branch 'develop' of https://github.com/taosdata/TDengine into develop

This commit is contained in:
yihaoDeng 2021-04-01 13:20:00 +08:00
commit 4cbaacb279
35 changed files with 771 additions and 338 deletions

6
Jenkinsfile vendored
View File

@ -42,12 +42,12 @@ def pre_test(){
killall -9 taosd ||echo "no taosd running" killall -9 taosd ||echo "no taosd running"
killall -9 gdb || echo "no gdb running" killall -9 gdb || echo "no gdb running"
cd ${WKC} cd ${WKC}
git checkout develop
git reset --hard HEAD~10 >/dev/null git reset --hard HEAD~10 >/dev/null
git checkout develop
git pull >/dev/null git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
find ${WKC}/tests/pytest -name \'*\'.sql -exec rm -rf {} \\; git clean -dfx
cd ${WK} cd ${WK}
git reset --hard HEAD~10 git reset --hard HEAD~10
git checkout develop git checkout develop
@ -55,7 +55,7 @@ def pre_test(){
cd ${WK} cd ${WK}
export TZ=Asia/Harbin export TZ=Asia/Harbin
date date
rm -rf ${WK}/debug git clean -dfx
mkdir debug mkdir debug
cd debug cd debug
cmake .. > /dev/null cmake .. > /dev/null

View File

@ -32,7 +32,7 @@ ELSEIF (TD_WINDOWS)
#INSTALL(TARGETS taos RUNTIME DESTINATION driver) #INSTALL(TARGETS taos RUNTIME DESTINATION driver)
#INSTALL(TARGETS shell RUNTIME DESTINATION .) #INSTALL(TARGETS shell RUNTIME DESTINATION .)
IF (TD_MVN_INSTALLED) IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.24-dist.jar DESTINATION connector/jdbc) INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.25-dist.jar DESTINATION connector/jdbc)
ENDIF () ENDIF ()
ELSEIF (TD_DARWIN) ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh") SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")

View File

@ -29,21 +29,21 @@ taos> DESCRIBE meters;
## <a class="anchor" id="data-type"></a>支持的数据类型 ## <a class="anchor" id="data-type"></a>支持的数据类型
使用TDengine最重要的是时间戳。创建并插入记录、查询历史记录的时候均需要指定时间戳。时间戳有如下规则 使用 TDengine最重要的是时间戳。创建并插入记录、查询历史记录的时候均需要指定时间戳。时间戳有如下规则
- 时间格式为```YYYY-MM-DD HH:mm:ss.MS```, 默认时间分辨率为毫秒。比如:```2017-08-12 18:25:58.128``` - 时间格式为 ```YYYY-MM-DD HH:mm:ss.MS```默认时间分辨率为毫秒。比如:```2017-08-12 18:25:58.128```
- 内部函数now是服务器的当前时间 - 内部函数 now 是客户端的当前时间
- 插入记录时,如果时间戳为now插入数据时使用服务器当前时间 - 插入记录时,如果时间戳为 now插入数据时使用提交这条记录的客户端的当前时间
- Epoch Time: 时间戳也可以是一个长整数表示从1970-01-01 08:00:00.000开始的毫秒数 - Epoch Time:时间戳也可以是一个长整数,表示从 1970-01-01 08:00:00.000 开始的毫秒数
- 时间可以加减,比如 now-2h表明查询时刻向前推2个小时(最近2小时)。 数字后面的时间单位可以是 a(毫秒)、s(秒)、 m(分)、h(小时)、d(天)、w(周)。 比如select * from t1 where ts > now-2w and ts <= now-1w, 表示查询两周前整整一周的数据。 在指定降频操作(down sampling)的时间窗口(interval)时,时间单位还可以使用 n(自然月) 和 y(自然年)。 - 时间可以加减,比如 now-2h表明查询时刻向前推 2 个小时(最近 2 小时)。数字后面的时间单位可以是 u(微秒)、a(毫秒)、s(秒)、m(分)、h(小时)、d(天)、w(周)。 比如 `select * from t1 where ts > now-2w and ts <= now-1w`表示查询两周前整整一周的数据。在指定降频操作down sampling的时间窗口interval时,时间单位还可以使用 n(自然月) 和 y(自然年)。
TDengine缺省的时间戳是毫秒精度但通过修改配置参数enableMicrosecond就可支持微秒。 TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableMicrosecond 就可支持微秒。
在TDengine中普通表的数据模型中可使用以下10种数据类型。 在TDengine中普通表的数据模型中可使用以下 10 种数据类型。
| | 类型 | Bytes | 说明 | | | 类型 | Bytes | 说明 |
| ---- | :-------: | ------ | ------------------------------------------------------------ | | ---- | :-------: | ------ | ------------------------------------------------------------ |
| 1 | TIMESTAMP | 8 | 时间戳。缺省精度毫秒,可支持微秒。从格林威治时间 1970-01-01 00:00:00.000 (UTC/GMT) 开始,计时不能早于该时间。 | | 1 | TIMESTAMP | 8 | 时间戳。缺省精度毫秒,可支持微秒。从格林威治时间 1970-01-01 00:00:00.000 (UTC/GMT) 开始,计时不能早于该时间。(从 2.0.18 版本开始,已经去除了这一时间范围限制) |
| 2 | INT | 4 | 整型,范围 [-2^31+1, 2^31-1], -2^31 用作 NULL | | 2 | INT | 4 | 整型,范围 [-2^31+1, 2^31-1], -2^31 用作 NULL |
| 3 | BIGINT | 8 | 长整型,范围 [-2^63+1, 2^63-1], -2^63 用于 NULL | | 3 | BIGINT | 8 | 长整型,范围 [-2^63+1, 2^63-1], -2^63 用于 NULL |
| 4 | FLOAT | 4 | 浮点型,有效位数 6-7范围 [-3.4E38, 3.4E38] | | 4 | FLOAT | 4 | 浮点型,有效位数 6-7范围 [-3.4E38, 3.4E38] |

View File

@ -120,7 +120,7 @@ function clean_service_on_systemd() {
if [ "$verMode" == "cluster" ]; then if [ "$verMode" == "cluster" ]; then
nginx_service_config="${service_config_dir}/${nginx_service_name}.service" nginx_service_config="${service_config_dir}/${nginx_service_name}.service"
if [ -d ${bin_dir}/web ]; then if [ -d ${install_nginxd_dir} ]; then
if systemctl is-active --quiet ${nginx_service_name}; then if systemctl is-active --quiet ${nginx_service_name}; then
echo "Nginx for TDengine is running, stopping it..." echo "Nginx for TDengine is running, stopping it..."
${csudo} systemctl stop ${nginx_service_name} &> /dev/null || echo &> /dev/null ${csudo} systemctl stop ${nginx_service_name} &> /dev/null || echo &> /dev/null

View File

@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED)
ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME} ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME}
POST_BUILD POST_BUILD
COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.24-dist.jar ${LIBRARY_OUTPUT_PATH} COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.25-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMENT "build jdbc driver") COMMENT "build jdbc driver")
ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME}) ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME})

View File

@ -5,7 +5,7 @@
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.24</version> <version>2.0.25</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>JDBCDriver</name> <name>JDBCDriver</name>

View File

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.24</version> <version>2.0.25</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>JDBCDriver</name> <name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url> <url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
@ -62,6 +62,14 @@
</dependencies> </dependencies>
<build> <build>
<resources>
<resource>
<directory>src/main/resources</directory>
<excludes>
<exclude>**/*.md</exclude>
</excludes>
</resource>
</resources>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>

View File

@ -15,7 +15,7 @@ public abstract class AbstractParameterMetaData extends WrapperImpl implements P
@Override @Override
public int getParameterCount() throws SQLException { public int getParameterCount() throws SQLException {
return parameters.length; return parameters == null ? 0 : parameters.length;
} }
@Override @Override

View File

@ -29,45 +29,25 @@ public class TSDBJNIConnector {
private static volatile Boolean isInitialized = false; private static volatile Boolean isInitialized = false;
private TaosInfo taosInfo = TaosInfo.getInstance(); private TaosInfo taosInfo = TaosInfo.getInstance();
// Connection pointer used in C
private long taos = TSDBConstants.JNI_NULL_POINTER;
// result set status in current connection
private boolean isResultsetClosed = true;
private int affectedRows = -1;
static { static {
System.loadLibrary("taos"); System.loadLibrary("taos");
System.out.println("java.library.path:" + System.getProperty("java.library.path")); System.out.println("java.library.path:" + System.getProperty("java.library.path"));
} }
/**
* Connection pointer used in C
*/
private long taos = TSDBConstants.JNI_NULL_POINTER;
/**
* Result set pointer for the current connection
*/
// private long taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
/**
* result set status in current connection
*/
private boolean isResultsetClosed = true;
private int affectedRows = -1;
/**
* Whether the connection is closed
*/
public boolean isClosed() { public boolean isClosed() {
return this.taos == TSDBConstants.JNI_NULL_POINTER; return this.taos == TSDBConstants.JNI_NULL_POINTER;
} }
/**
* Returns the status of last result set in current connection
*/
public boolean isResultsetClosed() { public boolean isResultsetClosed() {
return this.isResultsetClosed; return this.isResultsetClosed;
} }
/**
* Initialize static variables in JNI to optimize performance
*/
public static void init(String configDir, String locale, String charset, String timezone) throws SQLWarning { public static void init(String configDir, String locale, String charset, String timezone) throws SQLWarning {
synchronized (isInitialized) { synchronized (isInitialized) {
if (!isInitialized) { if (!isInitialized) {
@ -93,11 +73,6 @@ public class TSDBJNIConnector {
public static native String getTsCharset(); public static native String getTsCharset();
/**
* Get connection pointer
*
* @throws SQLException
*/
public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException { public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException {
if (this.taos != TSDBConstants.JNI_NULL_POINTER) { if (this.taos != TSDBConstants.JNI_NULL_POINTER) {
// this.closeConnectionImp(this.taos); // this.closeConnectionImp(this.taos);
@ -185,13 +160,6 @@ public class TSDBJNIConnector {
private native String getErrMsgImp(long pSql); private native String getErrMsgImp(long pSql);
/**
* Get resultset pointer
* Each connection should have a single open result set at a time
*/
// public long getResultSet() {
// return taosResultSetPointer;
// }
private native long getResultSetImp(long connection, long pSql); private native long getResultSetImp(long connection, long pSql);
public boolean isUpdateQuery(long pSql) { public boolean isUpdateQuery(long pSql) {
@ -231,6 +199,7 @@ public class TSDBJNIConnector {
// } // }
// return resCode; // return resCode;
// } // }
private native int freeResultSetImp(long connection, long result); private native int freeResultSetImp(long connection, long result);
/** /**
@ -323,8 +292,7 @@ public class TSDBJNIConnector {
* Validate if a <I>create table</I> sql statement is correct without actually creating that table * Validate if a <I>create table</I> sql statement is correct without actually creating that table
*/ */
public boolean validateCreateTableSql(String sql) { public boolean validateCreateTableSql(String sql) {
long connection = taos; int res = validateCreateTableSqlImp(taos, sql.getBytes());
int res = validateCreateTableSqlImp(connection, sql.getBytes());
return res != 0 ? false : true; return res != 0 ? false : true;
} }

View File

@ -84,9 +84,9 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
return new Timestamp(row.getDate(colIndex).getTime()); return new Timestamp(row.getDate(colIndex).getTime());
case TSDBConstants.TSDB_DATA_TYPE_BINARY: case TSDBConstants.TSDB_DATA_TYPE_BINARY:
return row.getString(colIndex).getBytes(); return row.getString(colIndex) == null ? null : row.getString(colIndex).getBytes();
case TSDBConstants.TSDB_DATA_TYPE_NCHAR: case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
return row.getString(colIndex); return row.getString(colIndex) == null ? null : row.getString(colIndex);
default: default:
return row.get(colIndex); return row.get(colIndex);
} }

View File

@ -0,0 +1,17 @@
package com.taosdata.jdbc.utils;
public class OSUtils {
private static final String OS = System.getProperty("os.name").toLowerCase();
public static boolean isWindows() {
return OS.indexOf("win") >= 0;
}
public static boolean isMac() {
return OS.indexOf("mac") >= 0;
}
public static boolean isLinux() {
return OS.indexOf("nux") >= 0;
}
}

View File

@ -17,7 +17,6 @@ public class DriverAutoloadTest {
@Test @Test
public void testRestful() throws SQLException { public void testRestful() throws SQLException {
// Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
final String url = "jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata"; final String url = "jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(url, properties); Connection conn = DriverManager.getConnection(url, properties);
Assert.assertNotNull(conn); Assert.assertNotNull(conn);

View File

@ -0,0 +1,64 @@
package com.taosdata.jdbc.cases;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
public class NullValueInResultSetForJdbcRestfulTest {
private static final String host = "127.0.0.1";
Connection conn;
@Test
public void test() {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select * from weather");
ResultSetMetaData meta = rs.getMetaData();
while (rs.next()) {
for (int i = 1; i <= meta.getColumnCount(); i++) {
Object value = rs.getObject(i);
System.out.print(meta.getColumnLabel(i) + ": " + value + "\t");
}
System.out.println();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
@Before
public void before() throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata";
conn = DriverManager.getConnection(url);
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_null");
stmt.execute("create database if not exists test_null");
stmt.execute("use test_null");
stmt.execute("create table weather(ts timestamp, f1 int, f2 bigint, f3 float, f4 double, f5 smallint, f6 tinyint, f7 bool, f8 binary(64), f9 nchar(64))");
stmt.executeUpdate("insert into weather(ts, f1) values(now+1s, 1)");
stmt.executeUpdate("insert into weather(ts, f2) values(now+2s, 2)");
stmt.executeUpdate("insert into weather(ts, f3) values(now+3s, 3.0)");
stmt.executeUpdate("insert into weather(ts, f4) values(now+4s, 4.0)");
stmt.executeUpdate("insert into weather(ts, f5) values(now+5s, 5)");
stmt.executeUpdate("insert into weather(ts, f6) values(now+6s, 6)");
stmt.executeUpdate("insert into weather(ts, f7) values(now+7s, true)");
stmt.executeUpdate("insert into weather(ts, f8) values(now+8s, 'hello')");
stmt.executeUpdate("insert into weather(ts, f9) values(now+9s, '涛思数据')");
} catch (SQLException e) {
e.printStackTrace();
}
}
@After
public void after() {
try {
if (conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,30 @@
package com.taosdata.jdbc.utils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class OSUtilsTest {
private String OS;
@Test
public void inWindows() {
Assert.assertEquals(OS.indexOf("win") >= 0, OSUtils.isWindows());
}
@Test
public void isMac() {
Assert.assertEquals(OS.indexOf("mac") >= 0, OSUtils.isMac());
}
@Test
public void isLinux() {
Assert.assertEquals(OS.indexOf("nux") >= 0, OSUtils.isLinux());
}
@Before
public void before() {
OS = System.getProperty("os.name").toLowerCase();
}
}

View File

@ -237,6 +237,18 @@ static int32_t dnodeInitStorage() {
return -1; return -1;
} }
TDIR *tdir = tfsOpendir("vnode_bak/.staging");
if (tfsReaddir(tdir) != NULL) {
dError("vnode_bak/.staging dir not empty, fix it first.");
tfsClosedir(tdir);
return -1;
}
if (tfsMkdir("vnode_bak/.staging") < 0) {
dError("failed to create vnode_bak/.staging dir since %s", tstrerror(terrno));
return -1;
}
dnodeCheckDataDirOpenned(tsDnodeDir); dnodeCheckDataDirOpenned(tsDnodeDir);
dInfo("dnode storage is initialized at %s", tsDnodeDir); dInfo("dnode storage is initialized at %s", tsDnodeDir);

View File

@ -205,7 +205,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead.msgType], qtypeStr[qtype], pWrite->pHead.version); pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead.msgType], qtypeStr[qtype], pWrite->pHead.version);
pWrite->code = vnodeProcessWrite(pVnode, &pWrite->pHead, qtype, pWrite); pWrite->code = vnodeProcessWrite(pVnode, &pWrite->pHead, qtype, pWrite);
if (pWrite->code <= 0) pWrite->processedCount = 1; if (pWrite->code <= 0) atomic_add_fetch_32(&pWrite->processedCount, 1);
if (pWrite->code > 0) pWrite->code = 0; if (pWrite->code > 0) pWrite->code = 0;
if (pWrite->code == 0 && pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true; if (pWrite->code == 0 && pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;

View File

@ -259,7 +259,7 @@ do { \
#define TSDB_MIN_TABLES 4 #define TSDB_MIN_TABLES 4
#define TSDB_MAX_TABLES 10000000 #define TSDB_MAX_TABLES 10000000
#define TSDB_DEFAULT_TABLES 1000000 #define TSDB_DEFAULT_TABLES 1000000
#define TSDB_TABLES_STEP 100 #define TSDB_TABLES_STEP 1000
#define TSDB_MIN_DAYS_PER_FILE 1 #define TSDB_MIN_DAYS_PER_FILE 1
#define TSDB_MAX_DAYS_PER_FILE 3650 #define TSDB_MAX_DAYS_PER_FILE 3650

View File

@ -79,9 +79,6 @@ typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion);
// get file version // get file version
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver); typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
// reset version
typedef int32_t (*FResetVersion)(int32_t vgId, uint64_t fver);
typedef int32_t (*FSendFile)(void *tsdb, SOCKET socketFd); typedef int32_t (*FSendFile)(void *tsdb, SOCKET socketFd);
typedef int32_t (*FRecvFile)(void *tsdb, SOCKET socketFd); typedef int32_t (*FRecvFile)(void *tsdb, SOCKET socketFd);
@ -99,7 +96,6 @@ typedef struct {
FStartSyncFile startSyncFileFp; FStartSyncFile startSyncFileFp;
FStopSyncFile stopSyncFileFp; FStopSyncFile stopSyncFileFp;
FGetVersion getVersionFp; FGetVersion getVersionFp;
FResetVersion resetVersionFp;
FSendFile sendFileFp; FSendFile sendFileFp;
FRecvFile recvFileFp; FRecvFile recvFileFp;
} SSyncInfo; } SSyncInfo;

View File

@ -85,7 +85,7 @@ enum TEST_MODE {
#define MAX_NUM_DATATYPE 10 #define MAX_NUM_DATATYPE 10
#define MAX_DB_COUNT 8 #define MAX_DB_COUNT 8
#define MAX_SUPER_TABLE_COUNT 8 #define MAX_SUPER_TABLE_COUNT 200
#define MAX_COLUMN_COUNT 1024 #define MAX_COLUMN_COUNT 1024
#define MAX_TAG_COUNT 128 #define MAX_TAG_COUNT 128
@ -124,7 +124,7 @@ typedef enum enum_INSERT_MODE {
typedef enum enumQUERY_TYPE { typedef enum enumQUERY_TYPE {
NO_INSERT_TYPE, NO_INSERT_TYPE,
INSERT_TYPE, INSERT_TYPE,
QUERY_TYPE_BUT QUERY_TYPE_BUT
} QUERY_TYPE; } QUERY_TYPE;
@ -229,7 +229,7 @@ typedef struct SColumn_S {
typedef struct SSuperTable_S { typedef struct SSuperTable_S {
char sTblName[MAX_TB_NAME_SIZE+1]; char sTblName[MAX_TB_NAME_SIZE+1];
int childTblCount; int childTblCount;
bool childTblExists; // 0: no, 1: yes bool childTblExists; // 0: no, 1: yes
int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE]; char childTblPrefix[MAX_TB_NAME_SIZE];
@ -239,15 +239,15 @@ typedef struct SSuperTable_S {
int childTblOffset; int childTblOffset;
int multiThreadWriteOneTbl; // 0: no, 1: yes int multiThreadWriteOneTbl; // 0: no, 1: yes
int interlaceRows; // int interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x% int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision int disorderRange; // ms or us by database precision
int maxSqlLen; // int maxSqlLen; //
int insertInterval; // insert interval, will override global insert interval int insertInterval; // insert interval, will override global insert interval
int64_t insertRows; // 0: no limit int64_t insertRows; // 0: no limit
int timeStampStep; int timeStampStep;
char startTimestamp[MAX_TB_NAME_SIZE]; // char startTimestamp[MAX_TB_NAME_SIZE]; //
char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json
char sampleFile[MAX_FILE_NAME_LEN+1]; char sampleFile[MAX_FILE_NAME_LEN+1];
char tagsFile[MAX_FILE_NAME_LEN+1]; char tagsFile[MAX_FILE_NAME_LEN+1];
@ -539,7 +539,7 @@ SArguments g_args = {
true, // insert_only true, // insert_only
false, // debug_print false, // debug_print
false, // verbose_print false, // verbose_print
false, // performance statistic print false, // performance statistic print
false, // answer_yes; false, // answer_yes;
"./output.txt", // output_file "./output.txt", // output_file
0, // mode : sync or async 0, // mode : sync or async
@ -641,7 +641,7 @@ static void printHelp() {
"The password to use when connecting to the server. Default is 'taosdata'."); "The password to use when connecting to the server. Default is 'taosdata'.");
printf("%s%s%s%s\n", indent, "-c", indent, printf("%s%s%s%s\n", indent, "-c", indent,
"Configuration directory. Default is '/etc/taos/'."); "Configuration directory. Default is '/etc/taos/'.");
#endif #endif
printf("%s%s%s%s\n", indent, "-h", indent, printf("%s%s%s%s\n", indent, "-h", indent,
"The host to connect to TDengine. Default is localhost."); "The host to connect to TDengine. Default is localhost.");
printf("%s%s%s%s\n", indent, "-p", indent, printf("%s%s%s%s\n", indent, "-p", indent,
@ -684,7 +684,7 @@ static void printHelp() {
"Print debug info."); "Print debug info.");
printf("%s%s%s%s\n", indent, "-V, --version", indent, printf("%s%s%s%s\n", indent, "-V, --version", indent,
"Print version info."); "Print version info.");
/* printf("%s%s%s%s\n", indent, "-D", indent, /* printf("%s%s%s%s\n", indent, "-D", indent,
"if elete database if exists. 0: no, 1: yes, default is 1"); "if elete database if exists. 0: no, 1: yes, default is 1");
*/ */
} }
@ -749,7 +749,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(argv[i], "SMALLINT") && strcasecmp(argv[i], "SMALLINT")
&& strcasecmp(argv[i], "BIGINT") && strcasecmp(argv[i], "BIGINT")
&& strcasecmp(argv[i], "DOUBLE") && strcasecmp(argv[i], "DOUBLE")
&& strcasecmp(argv[i], "BINARY") && strcasecmp(argv[i], "BINARY")
&& strcasecmp(argv[i], "NCHAR")) { && strcasecmp(argv[i], "NCHAR")) {
printHelp(); printHelp();
ERROR_EXIT( "Invalid data_type!\n"); ERROR_EXIT( "Invalid data_type!\n");
@ -1762,7 +1762,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
} }
for (int i = 0; i < dbCount; i++) { for (int i = 0; i < dbCount; i++) {
// printf database info // printf database info
printfDbInfoForQueryToFile(filename, dbInfos[i], i); printfDbInfoForQueryToFile(filename, dbInfos[i], i);
// show db.vgroups // show db.vgroups
@ -2098,7 +2098,7 @@ static int calcRowLen(SSuperTable* superTbls) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6; lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6;
} else if (strcasecmp(dataType, "FLOAT") == 0) { } else if (strcasecmp(dataType, "FLOAT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 22; lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 22;
} else if (strcasecmp(dataType, "DOUBLE") == 0) { } else if (strcasecmp(dataType, "DOUBLE") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 42; lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 42;
} else { } else {
printf("get error tag type : %s\n", dataType); printf("get error tag type : %s\n", dataType);
@ -2262,7 +2262,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
/* /*
if (TBL_ALREADY_EXISTS == superTbls->childTblExists) { if (TBL_ALREADY_EXISTS == superTbls->childTblExists) {
//get all child table name use cmd: select tbname from superTblName; //get all child table name use cmd: select tbname from superTblName;
int childTblCount = 10000; int childTblCount = 10000;
superTbls->childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN); superTbls->childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
if (superTbls->childTblName == NULL) { if (superTbls->childTblName == NULL) {
@ -2289,7 +2289,7 @@ static int createSuperTable(TAOS * taos, char* dbName,
int lenOfOneRow = 0; int lenOfOneRow = 0;
for (colIndex = 0; colIndex < superTbls->columnCount; colIndex++) { for (colIndex = 0; colIndex < superTbls->columnCount; colIndex++) {
char* dataType = superTbls->columns[colIndex].dataType; char* dataType = superTbls->columns[colIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) { if (strcasecmp(dataType, "BINARY") == 0) {
len += snprintf(cols + len, STRING_LEN - len, len += snprintf(cols + len, STRING_LEN - len,
", col%d %s(%d)", colIndex, "BINARY", ", col%d %s(%d)", colIndex, "BINARY",
@ -2386,7 +2386,7 @@ static int createSuperTable(TAOS * taos, char* dbName,
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex,
"FLOAT"); "FLOAT");
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 22; lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 22;
} else if (strcasecmp(dataType, "DOUBLE") == 0) { } else if (strcasecmp(dataType, "DOUBLE") == 0) {
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex,
"DOUBLE"); "DOUBLE");
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 42; lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 42;
@ -2638,7 +2638,7 @@ static void* createTable(void *sarg)
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
} }
if (0 != len) { if (0 != len) {
verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer);
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE, false)) { if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE, false)) {
@ -2703,7 +2703,7 @@ static int startMultiThreadCreateChildTable(
t_info->minDelay = INT16_MAX; t_info->minDelay = INT16_MAX;
pthread_create(pids + i, NULL, createTable, t_info); pthread_create(pids + i, NULL, createTable, t_info);
} }
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
} }
@ -2920,7 +2920,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
cJSON* stbInfo, SSuperTable* superTbls) { cJSON* stbInfo, SSuperTable* superTbls) {
bool ret = false; bool ret = false;
// columns // columns
cJSON *columns = cJSON_GetObjectItem(stbInfo, "columns"); cJSON *columns = cJSON_GetObjectItem(stbInfo, "columns");
if (columns && columns->type != cJSON_Array) { if (columns && columns->type != cJSON_Array) {
printf("ERROR: failed to read json, columns not found\n"); printf("ERROR: failed to read json, columns not found\n");
@ -2958,7 +2958,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
count = 1; count = 1;
} }
// column info // column info
memset(&columnCase, 0, sizeof(StrColumn)); memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(column, "type"); cJSON *dataType = cJSON_GetObjectItem(column, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) {
@ -2989,7 +2989,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
count = 1; count = 1;
index = 0; index = 0;
// tags // tags
cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags"); cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags");
if (!tags || tags->type != cJSON_Array) { if (!tags || tags->type != cJSON_Array) {
debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__); debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__);
@ -3018,7 +3018,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
count = 1; count = 1;
} }
// column info // column info
memset(&columnCase, 0, sizeof(StrColumn)); memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(tag, "type"); cJSON *dataType = cJSON_GetObjectItem(tag, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) {
@ -3166,7 +3166,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if (numRecPerReq && numRecPerReq->type == cJSON_Number) { if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
g_args.num_of_RPR = numRecPerReq->valueint; g_args.num_of_RPR = numRecPerReq->valueint;
} else if (!numRecPerReq) { } else if (!numRecPerReq) {
g_args.num_of_RPR = 100; g_args.num_of_RPR = 0xffff;
} else { } else {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
@ -3209,7 +3209,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON* dbinfos = cJSON_GetArrayItem(dbs, i); cJSON* dbinfos = cJSON_GetArrayItem(dbs, i);
if (dbinfos == NULL) continue; if (dbinfos == NULL) continue;
// dbinfo // dbinfo
cJSON *dbinfo = cJSON_GetObjectItem(dbinfos, "dbinfo"); cJSON *dbinfo = cJSON_GetObjectItem(dbinfos, "dbinfo");
if (!dbinfo || dbinfo->type != cJSON_Object) { if (!dbinfo || dbinfo->type != cJSON_Object) {
printf("ERROR: failed to read json, dbinfo not found\n"); printf("ERROR: failed to read json, dbinfo not found\n");
@ -3615,7 +3615,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} }
g_Dbs.db[i].superTbls[j].maxSqlLen = len; g_Dbs.db[i].superTbls[j].maxSqlLen = len;
} else if (!maxSqlLen) { } else if (!maxSqlLen) {
g_Dbs.db[i].superTbls[j].maxSqlLen = TSDB_MAX_SQL_LEN; g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len;
} else { } else {
printf("ERROR: failed to read json, maxSqlLen not found\n"); printf("ERROR: failed to read json, maxSqlLen not found\n");
goto PARSE_OVER; goto PARSE_OVER;
@ -3748,7 +3748,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* user = cJSON_GetObjectItem(root, "user"); cJSON* user = cJSON_GetObjectItem(root, "user");
if (user && user->type == cJSON_String && user->valuestring != NULL) { if (user && user->type == cJSON_String && user->valuestring != NULL) {
tstrncpy(g_queryInfo.user, user->valuestring, MAX_USERNAME_SIZE); tstrncpy(g_queryInfo.user, user->valuestring, MAX_USERNAME_SIZE);
} else if (!user) { } else if (!user) {
tstrncpy(g_queryInfo.user, "root", MAX_USERNAME_SIZE); ; tstrncpy(g_queryInfo.user, "root", MAX_USERNAME_SIZE); ;
} }
@ -3805,7 +3805,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
// super_table_query // super_table_query
cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query"); cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query");
if (!specifiedQuery) { if (!specifiedQuery) {
g_queryInfo.specifiedQueryInfo.concurrent = 0; g_queryInfo.specifiedQueryInfo.concurrent = 0;
@ -3930,7 +3930,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} }
} }
// sub_table_query // sub_table_query
cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query"); cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query");
if (!superQuery) { if (!superQuery) {
g_queryInfo.superQueryInfo.threadCnt = 0; g_queryInfo.superQueryInfo.threadCnt = 0;
@ -3996,7 +3996,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval"); cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval");
if (subinterval && subinterval->type == cJSON_Number) { if (subinterval && subinterval->type == cJSON_Number) {
g_queryInfo.superQueryInfo.subscribeInterval = subinterval->valueint; g_queryInfo.superQueryInfo.subscribeInterval = subinterval->valueint;
} else if (!subinterval) { } else if (!subinterval) {
//printf("failed to read json, subscribe interval no found\n"); //printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER; //goto PARSE_OVER;
g_queryInfo.superQueryInfo.subscribeInterval = 10000; g_queryInfo.superQueryInfo.subscribeInterval = 10000;
@ -4090,7 +4090,7 @@ static bool getInfoFromJsonFile(char* file) {
} }
bool ret = false; bool ret = false;
int maxLen = 64000; int maxLen = 6400000;
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp); int len = fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
@ -4200,71 +4200,77 @@ static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp,
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
(*sampleUsePos)++; (*sampleUsePos)++;
return dataLen; return dataLen;
} }
static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* stbInfo) { static int generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stbInfo) {
int dataLen = 0; int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); char *pstr = recBuf;
int maxLen = MAX_DATA_SIZE;
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp);
for (int i = 0; i < stbInfo->columnCount; i++) { for (int i = 0; i < stbInfo->columnCount; i++) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6))
|| (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) { || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "binary or nchar length overflow, max size:%u\n", errorPrint( "binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN); (uint32_t)TSDB_MAX_BINARY_LEN);
return (-1); return -1;
} }
char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1); char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1);
if (NULL == buf) { if (NULL == buf) {
errorPrint( "calloc failed! size:%d\n", stbInfo->columns[i].dataLen); errorPrint( "calloc failed! size:%d\n", stbInfo->columns[i].dataLen);
return (-1); return -1;
} }
rand_string(buf, stbInfo->columns[i].dataLen); rand_string(buf, stbInfo->columns[i].dataLen);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "\'%s\', ", buf); dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\', ", buf);
tmfree(buf); tmfree(buf);
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, } else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"int", 3)) { "int", 3)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%d, ", rand_int()); "%d, ", rand_int());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, } else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"bigint", 6)) { "bigint", 6)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%"PRId64", ", rand_bigint()); "%"PRId64", ", rand_bigint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, } else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"float", 5)) { "float", 5)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%f, ", rand_float()); "%f, ", rand_float());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, } else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"double", 6)) { "double", 6)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%f, ", rand_double()); "%f, ", rand_double());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, } else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"smallint", 8)) { "smallint", 8)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_smallint()); dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d, ", rand_smallint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "tinyint", 7)) { } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "tinyint", 7)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_tinyint()); dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d, ", rand_tinyint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "bool", 4)) { } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "bool", 4)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_bool()); dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d, ", rand_bool());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "timestamp", 9)) { } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "timestamp", 9)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint()); dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint());
} else { } else {
errorPrint( "No support data type: %s\n", stbInfo->columns[i].dataType); errorPrint( "No support data type: %s\n", stbInfo->columns[i].dataType);
return (-1); return -1;
} }
} }
dataLen -= 2; dataLen -= 2;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); dataLen += snprintf(pstr + dataLen, maxLen - dataLen, ")");
return dataLen; verbosePrint("%s() LN%d, recBuf:\n\t%s\n", __func__, __LINE__, recBuf);
return strlen(recBuf);
} }
static int32_t generateData(char *res, char **data_type, static int32_t generateData(char *recBuf, char **data_type,
int num_of_cols, int64_t timestamp, int lenOfBinary) { int num_of_cols, int64_t timestamp, int lenOfBinary) {
memset(res, 0, MAX_DATA_SIZE); memset(recBuf, 0, MAX_DATA_SIZE);
char *pstr = res; char *pstr = recBuf;
pstr += sprintf(pstr, "(%" PRId64, timestamp); pstr += sprintf(pstr, "(%" PRId64, timestamp);
int c = 0; int c = 0;
@ -4285,7 +4291,7 @@ static int32_t generateData(char *res, char **data_type,
} else if (strcasecmp(data_type[i % c], "smallint") == 0) { } else if (strcasecmp(data_type[i % c], "smallint") == 0) {
pstr += sprintf(pstr, ", %d", rand_smallint()); pstr += sprintf(pstr, ", %d", rand_smallint());
} else if (strcasecmp(data_type[i % c], "int") == 0) { } else if (strcasecmp(data_type[i % c], "int") == 0) {
pstr += sprintf(pstr, ", %d", rand_int()); pstr += sprintf(pstr, ", %d", rand_int());
} else if (strcasecmp(data_type[i % c], "bigint") == 0) { } else if (strcasecmp(data_type[i % c], "bigint") == 0) {
pstr += sprintf(pstr, ", %" PRId64, rand_bigint()); pstr += sprintf(pstr, ", %" PRId64, rand_bigint());
} else if (strcasecmp(data_type[i % c], "float") == 0) { } else if (strcasecmp(data_type[i % c], "float") == 0) {
@ -4308,7 +4314,7 @@ static int32_t generateData(char *res, char **data_type,
free(s); free(s);
} }
if (pstr - res > MAX_DATA_SIZE) { if (strlen(recBuf) > MAX_DATA_SIZE) {
perror("column length too long, abort"); perror("column length too long, abort");
exit(-1); exit(-1);
} }
@ -4316,7 +4322,7 @@ static int32_t generateData(char *res, char **data_type,
pstr += sprintf(pstr, ")"); pstr += sprintf(pstr, ")");
return (int32_t)(pstr - res); return (int32_t)strlen(recBuf);
} }
static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
@ -4325,9 +4331,9 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
sampleDataBuf = calloc( sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) { if (sampleDataBuf == NULL) {
errorPrint("%s() LN%d, Failed to calloc %d Bytes, reason:%s\n", errorPrint("%s() LN%d, Failed to calloc %d Bytes, reason:%s\n",
__func__, __LINE__, __func__, __LINE__,
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno)); strerror(errno));
return -1; return -1;
} }
@ -4396,7 +4402,7 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
static int generateDataTail(char *tableName, int32_t tableSeq, static int generateDataTail(char *tableName, int32_t tableSeq,
threadInfo* pThreadInfo, SSuperTable* superTblInfo, threadInfo* pThreadInfo, SSuperTable* superTblInfo,
int batch, char* buffer, int64_t insertRows, int batch, char* buffer, int remainderBufLen, int64_t insertRows,
int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) { int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) {
int len = 0; int len = 0;
int ncols_per_record = 1; // count first col ts int ncols_per_record = 1; // count first col ts
@ -4413,18 +4419,19 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
int k = 0; int k = 0;
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
if (superTblInfo) { char data[MAX_DATA_SIZE];
int retLen = 0; int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) { "sample", strlen("sample"))) {
retLen = getRowDataFromSample( retLen = getRowDataFromSample(
buffer + len, data,
superTblInfo->maxSqlLen - len, remainderBufLen,
startTime + superTblInfo->timeStampStep * k, startTime + superTblInfo->timeStampStep * k,
superTblInfo, superTblInfo,
pSamplePos); pSamplePos);
} else if (0 == strncasecmp(superTblInfo->dataSource, } else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) { "rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100; int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio if (0 != superTblInfo->disorderRatio
@ -4433,60 +4440,56 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
+ superTblInfo->timeStampStep * k + superTblInfo->timeStampStep * k
- taosRandom() % superTblInfo->disorderRange; - taosRandom() % superTblInfo->disorderRange;
retLen = generateRowData( retLen = generateRowData(
buffer + len, data,
superTblInfo->maxSqlLen - len,
d, d,
superTblInfo); superTblInfo);
} else { } else {
retLen = generateRowData( retLen = generateRowData(
buffer + len, data,
superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * k, startTime + superTblInfo->timeStampStep * k,
superTblInfo); superTblInfo);
} }
} }
if (retLen < 0) { if (retLen > remainderBufLen) {
return -1; break;
} }
len += retLen; buffer += snprintf(buffer, retLen + 1, "%s", data);
k++;
if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite len += retLen;
k++; remainderBufLen -= retLen;
break;
}
} else { } else {
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
char data[MAX_DATA_SIZE]; char **data_type = g_args.datatype;
char **data_type = g_args.datatype; int lenOfBinary = g_args.len_of_binary;
int lenOfBinary = g_args.len_of_binary;
if ((g_args.disorderRatio != 0) if ((g_args.disorderRatio != 0)
&& (rand_num < g_args.disorderRange)) { && (rand_num < g_args.disorderRange)) {
int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k
- taosRandom() % 1000000 + rand_num; - taosRandom() % 1000000 + rand_num;
len = generateData(data, data_type, retLen = generateData(data, data_type,
ncols_per_record, d, lenOfBinary); ncols_per_record, d, lenOfBinary);
} else { } else {
len = generateData(data, data_type, retLen = generateData(data, data_type,
ncols_per_record, ncols_per_record,
startTime + DEFAULT_TIMESTAMP_STEP * k, startTime + DEFAULT_TIMESTAMP_STEP * k,
lenOfBinary); lenOfBinary);
} }
if (len > remainderBufLen)
break;
buffer += sprintf(buffer, " %s", data); buffer += sprintf(buffer, " %s", data);
if (strlen(buffer) >= (g_args.max_sql_len - 256)) { // too long k++;
k++; len += retLen;
break; remainderBufLen -= retLen;
}
} }
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer); __func__, __LINE__, len, k, buffer);
k++;
startFrom ++; startFrom ++;
if (startFrom >= insertRows) { if (startFrom >= insertRows) {
@ -4570,20 +4573,25 @@ static int generateProgressiveDataBuffer(char *pTblName,
assert(buffer != NULL); assert(buffer != NULL);
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int remainderBufLen = maxSqlLen;
memset(buffer, 0, maxSqlLen);
char *pstr = buffer; char *pstr = buffer;
int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo, int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo,
buffer); buffer);
pstr += headLen; pstr += headLen;
remainderBufLen -= headLen;
int k; int k;
int dataLen; int dataLen;
k = generateDataTail(pTblName, tableSeq, pThreadInfo, superTblInfo, k = generateDataTail(pTblName, tableSeq, pThreadInfo, superTblInfo,
g_args.num_of_RPR, pstr, insertRows, startFrom, g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom,
startTime, startTime,
pSamplePos, &dataLen); pSamplePos, &dataLen);
return k; return k;
} }
@ -4656,13 +4664,18 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int generatedRecPerTbl = 0; int generatedRecPerTbl = 0;
bool flagSleep = true; bool flagSleep = true;
int sleepTimeTotal = 0; int sleepTimeTotal = 0;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int remainderBufLen;
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) { if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
flagSleep = false; flagSleep = false;
} }
// generate data // generate data
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); memset(buffer, 0, maxSqlLen);
remainderBufLen = maxSqlLen;
char *pstr = buffer; char *pstr = buffer;
int recOfBatch = 0; int recOfBatch = 0;
@ -4685,6 +4698,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__, i, buffer); pThreadInfo->threadID, __func__, __LINE__, i, buffer);
pstr += headLen; pstr += headLen;
remainderBufLen -= headLen;
int dataLen = 0; int dataLen = 0;
verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n", verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
@ -4698,13 +4713,20 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} else { } else {
startTime = 1500000000000; startTime = 1500000000000;
} }
generateDataTail( int generated = generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo, tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, insertRows, 0, batchPerTbl, pstr, remainderBufLen, insertRows, 0,
startTime, startTime,
&(pThreadInfo->samplePos), &dataLen); &(pThreadInfo->samplePos), &dataLen);
if (generated < 0) {
debugPrint("[%d] %s() LN%d, generated data is %d\n",
pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_and_statistics_interlace;
}
pstr += dataLen; pstr += dataLen;
remainderBufLen -= dataLen;
recOfBatch += batchPerTbl; recOfBatch += batchPerTbl;
startTime += batchPerTbl * superTblInfo->timeStampStep; startTime += batchPerTbl * superTblInfo->timeStampStep;
pThreadInfo->totalInsertRows += batchPerTbl; pThreadInfo->totalInsertRows += batchPerTbl;
@ -4796,9 +4818,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
free_and_statistics_interlace: free_and_statistics_interlace:
tmfree(buffer); tmfree(buffer);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
pThreadInfo->threadID, pThreadInfo->threadID,
pThreadInfo->totalInsertRows, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows); pThreadInfo->totalAffectedRows);
return NULL; return NULL;
} }
@ -4929,16 +4951,16 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
free_and_statistics_2: free_and_statistics_2:
tmfree(buffer); tmfree(buffer);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
pThreadInfo->threadID, pThreadInfo->threadID,
pThreadInfo->totalInsertRows, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows); pThreadInfo->totalAffectedRows);
return NULL; return NULL;
} }
static void* syncWrite(void *sarg) { static void* syncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo; SSuperTable* superTblInfo = winfo->superTblInfo;
int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows; int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
@ -4953,7 +4975,7 @@ static void* syncWrite(void *sarg) {
} }
static void callBack(void *param, TAOS_RES *res, int code) { static void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* winfo = (threadInfo*)param; threadInfo* winfo = (threadInfo*)param;
SSuperTable* superTblInfo = winfo->superTblInfo; SSuperTable* superTblInfo = winfo->superTblInfo;
int insert_interval = int insert_interval =
@ -4966,7 +4988,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
} }
char *buffer = calloc(1, winfo->superTblInfo->maxSqlLen); char *buffer = calloc(1, winfo->superTblInfo->maxSqlLen);
char *data = calloc(1, MAX_DATA_SIZE); char data[MAX_DATA_SIZE];
char *pstr = buffer; char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix,
winfo->start_table_from); winfo->start_table_from);
@ -4978,7 +5000,6 @@ static void callBack(void *param, TAOS_RES *res, int code) {
if (winfo->start_table_from > winfo->end_table_to) { if (winfo->start_table_from > winfo->end_table_to) {
tsem_post(&winfo->lock_sem); tsem_post(&winfo->lock_sem);
free(buffer); free(buffer);
free(data);
taos_free_result(res); taos_free_result(res);
return; return;
} }
@ -4988,11 +5009,9 @@ static void callBack(void *param, TAOS_RES *res, int code) {
if (0 != winfo->superTblInfo->disorderRatio if (0 != winfo->superTblInfo->disorderRatio
&& rand_num < winfo->superTblInfo->disorderRatio) { && rand_num < winfo->superTblInfo->disorderRatio) {
int64_t d = winfo->lastTs - taosRandom() % 1000000 + rand_num; int64_t d = winfo->lastTs - taosRandom() % 1000000 + rand_num;
//generateData(data, datatype, ncols_per_record, d, len_of_binary); generateRowData(data, d, winfo->superTblInfo);
generateRowData(data, MAX_DATA_SIZE, d, winfo->superTblInfo);
} else { } else {
//generateData(data, datatype, ncols_per_record, start_time += 1000, len_of_binary); generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo);
generateRowData(data, MAX_DATA_SIZE, winfo->lastTs += 1000, winfo->superTblInfo);
} }
pstr += sprintf(pstr, "%s", data); pstr += sprintf(pstr, "%s", data);
winfo->counter++; winfo->counter++;
@ -5007,7 +5026,6 @@ static void callBack(void *param, TAOS_RES *res, int code) {
} }
taos_query_a(winfo->taos, buffer, callBack, winfo); taos_query_a(winfo->taos, buffer, callBack, winfo);
free(buffer); free(buffer);
free(data);
taos_free_result(res); taos_free_result(res);
} }
@ -5373,7 +5391,7 @@ static void *readTable(void *sarg) {
} }
static void *readMetric(void *sarg) { static void *readMetric(void *sarg) {
#if 1 #if 1
threadInfo *rinfo = (threadInfo *)sarg; threadInfo *rinfo = (threadInfo *)sarg;
TAOS *taos = rinfo->taos; TAOS *taos = rinfo->taos;
char command[BUFFER_SIZE] = "\0"; char command[BUFFER_SIZE] = "\0";
@ -5524,7 +5542,7 @@ static int insertTestProcess() {
//int64_t totalInsertRows = 0; //int64_t totalInsertRows = 0;
//int64_t totalAffectedRows = 0; //int64_t totalAffectedRows = 0;
//for (int i = 0; i < g_Dbs.dbCount; i++) { //for (int i = 0; i < g_Dbs.dbCount; i++) {
// for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows; // totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows;
// totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows; // totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows;
@ -5921,7 +5939,7 @@ static void *subSubscribeProcess(void *sarg) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[i], winfo->threadID); g_queryInfo.superQueryInfo.result[i], winfo->threadID);
} }
tsub[i] = subscribeImpl(winfo->taos, subSqlstr, topic, tmpFile); tsub[i] = subscribeImpl(winfo->taos, subSqlstr, topic, tmpFile);
if (NULL == tsub[i]) { if (NULL == tsub[i]) {
taos_close(winfo->taos); taos_close(winfo->taos);
return NULL; return NULL;
@ -6109,7 +6127,7 @@ static int subscribeTestProcess() {
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) { && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt *
sizeof(pthread_t)); sizeof(pthread_t));
infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt *
sizeof(threadInfo)); sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", errorPrint("%s() LN%d, malloc failed for create threads\n",
@ -6256,7 +6274,7 @@ static void setParaFromArg(){
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].maxSqlLen = g_args.max_sql_len;
g_Dbs.db[0].superTbls[0].columnCount = 0; g_Dbs.db[0].superTbls[0].columnCount = 0;
for (int i = 0; i < MAX_NUM_DATATYPE; i++) { for (int i = 0; i < MAX_NUM_DATATYPE; i++) {

View File

@ -117,7 +117,6 @@ typedef struct SSyncNode {
FStartSyncFile startSyncFileFp; FStartSyncFile startSyncFileFp;
FStopSyncFile stopSyncFileFp; FStopSyncFile stopSyncFileFp;
FGetVersion getVersionFp; FGetVersion getVersionFp;
FResetVersion resetVersionFp;
FSendFile sendFileFp; FSendFile sendFileFp;
FRecvFile recvFileFp; FRecvFile recvFileFp;
pthread_mutex_t mutex; pthread_mutex_t mutex;

View File

@ -182,7 +182,6 @@ int64_t syncStart(const SSyncInfo *pInfo) {
pNode->startSyncFileFp = pInfo->startSyncFileFp; pNode->startSyncFileFp = pInfo->startSyncFileFp;
pNode->stopSyncFileFp = pInfo->stopSyncFileFp; pNode->stopSyncFileFp = pInfo->stopSyncFileFp;
pNode->getVersionFp = pInfo->getVersionFp; pNode->getVersionFp = pInfo->getVersionFp;
pNode->resetVersionFp = pInfo->resetVersionFp;
pNode->sendFileFp = pInfo->sendFileFp; pNode->sendFileFp = pInfo->sendFileFp;
pNode->recvFileFp = pInfo->recvFileFp; pNode->recvFileFp = pInfo->recvFileFp;

View File

@ -238,7 +238,6 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
(*pNode->stopSyncFileFp)(pNode->vgId, fversion); (*pNode->stopSyncFileFp)(pNode->vgId, fversion);
nodeVersion = fversion; nodeVersion = fversion;
if (pNode->resetVersionFp) (*pNode->resetVersionFp)(pNode->vgId, fversion);
sInfo("%s, start to restore wal, fver:%" PRIu64, pPeer->id, nodeVersion); sInfo("%s, start to restore wal, fver:%" PRIu64, pPeer->id, nodeVersion);
uint64_t wver = 0; uint64_t wver = 0;

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_BACKUP_H
#define TDENGINE_VNODE_BACKUP_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitBackup();
void vnodeCleanupBackup();
int32_t vnodeBackup(int32_t vgId);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -30,7 +30,6 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion);
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
int32_t vnodeResetVersion(int32_t vgId, uint64_t fver);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force); void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force);

172
src/vnode/src/vnodeBackup.c Normal file
View File

@ -0,0 +1,172 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "tglobal.h"
#include "tfs.h"
#include "vnodeBackup.h"
#include "vnodeMain.h"
typedef struct {
int32_t vgId;
} SVBackupMsg;
typedef struct {
pthread_t thread;
int32_t workerId;
} SVBackupWorker;
typedef struct {
int32_t num;
SVBackupWorker *worker;
} SVBackupWorkerPool;
static SVBackupWorkerPool tsVBackupPool;
static taos_qset tsVBackupQset;
static taos_queue tsVBackupQueue;
static void vnodeProcessBackupMsg(SVBackupMsg *pMsg) {
int32_t vgId = pMsg->vgId;
char newDir[TSDB_FILENAME_LEN] = {0};
char stagingDir[TSDB_FILENAME_LEN] = {0};
sprintf(newDir, "%s/vnode%d", "vnode_bak", vgId);
sprintf(stagingDir, "%s/.staging/vnode%d", "vnode_bak", vgId);
if (tsEnableVnodeBak) {
tfsRmdir(newDir);
tfsRename(stagingDir, newDir);
} else {
vInfo("vgId:%d, vnode backup not enabled", vgId);
tfsRmdir(stagingDir);
}
}
static void *vnodeBackupFunc(void *param) {
while (1) {
SVBackupMsg *pMsg = NULL;
if (taosReadQitemFromQset(tsVBackupQset, NULL, (void **)&pMsg, NULL) == 0) {
vDebug("qset:%p, vbackup got no message from qset, exiting", tsVBackupQset);
break;
}
vTrace("vgId:%d, will be processed in vbackup queue", pMsg->vgId);
vnodeProcessBackupMsg(pMsg);
vTrace("vgId:%d, disposed in vbackup worker", pMsg->vgId);
taosFreeQitem(pMsg);
}
return NULL;
}
static int32_t vnodeStartBackup() {
tsVBackupQueue = taosOpenQueue();
if (tsVBackupQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY;
taosAddIntoQset(tsVBackupQset, tsVBackupQueue, NULL);
for (int32_t i = 0; i < tsVBackupPool.num; ++i) {
SVBackupWorker *pWorker = tsVBackupPool.worker + i;
pWorker->workerId = i;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, vnodeBackupFunc, pWorker) != 0) {
vError("failed to create thread to process vbackup queue, reason:%s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
vDebug("vbackup:%d is launched, total:%d", pWorker->workerId, tsVBackupPool.num);
}
vDebug("vbackup queue:%p is allocated", tsVBackupQueue);
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeWriteIntoBackupWorker(int32_t vgId) {
SVBackupMsg *pMsg = taosAllocateQitem(sizeof(SVBackupMsg));
if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY;
pMsg->vgId = vgId;
int32_t code = taosWriteQitem(tsVBackupQueue, TAOS_QTYPE_RPC, pMsg);
if (code == 0) code = TSDB_CODE_DND_ACTION_IN_PROGRESS;
return code;
}
int32_t vnodeBackup(int32_t vgId) {
vTrace("vgId:%d, will backup", vgId);
return vnodeWriteIntoBackupWorker(vgId);
}
int32_t vnodeInitBackup() {
tsVBackupQset = taosOpenQset();
tsVBackupPool.num = 1;
tsVBackupPool.worker = calloc(sizeof(SVBackupWorker), tsVBackupPool.num);
if (tsVBackupPool.worker == NULL) return -1;
for (int32_t i = 0; i < tsVBackupPool.num; ++i) {
SVBackupWorker *pWorker = tsVBackupPool.worker + i;
pWorker->workerId = i;
vDebug("vbackup:%d is created", i);
}
vDebug("vbackup is initialized, num:%d qset:%p", tsVBackupPool.num, tsVBackupQset);
return vnodeStartBackup();
}
void vnodeCleanupBackup() {
for (int32_t i = 0; i < tsVBackupPool.num; ++i) {
SVBackupWorker *pWorker = tsVBackupPool.worker + i;
if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(tsVBackupQset);
}
vDebug("vbackup:%d is closed", i);
}
for (int32_t i = 0; i < tsVBackupPool.num; ++i) {
SVBackupWorker *pWorker = tsVBackupPool.worker + i;
vDebug("vbackup:%d start to join", i);
if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL);
}
vDebug("vbackup:%d join success", i);
}
vDebug("vbackup is closed, qset:%p", tsVBackupQset);
taosCloseQset(tsVBackupQset);
tsVBackupQset = NULL;
tfree(tsVBackupPool.worker);
vDebug("vbackup queue:%p is freed", tsVBackupQueue);
taosCloseQueue(tsVBackupQueue);
tsVBackupQueue = NULL;
}

View File

@ -27,6 +27,7 @@
#include "vnodeVersion.h" #include "vnodeVersion.h"
#include "vnodeMgmt.h" #include "vnodeMgmt.h"
#include "vnodeWorker.h" #include "vnodeWorker.h"
#include "vnodeBackup.h"
#include "vnodeMain.h" #include "vnodeMain.h"
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
@ -364,7 +365,6 @@ int32_t vnodeOpen(int32_t vgId) {
syncInfo.startSyncFileFp = vnodeStartSyncFile; syncInfo.startSyncFileFp = vnodeStartSyncFile;
syncInfo.stopSyncFileFp = vnodeStopSyncFile; syncInfo.stopSyncFileFp = vnodeStopSyncFile;
syncInfo.getVersionFp = vnodeGetVersion; syncInfo.getVersionFp = vnodeGetVersion;
syncInfo.resetVersionFp = vnodeResetVersion;
syncInfo.sendFileFp = tsdbSyncSend; syncInfo.sendFileFp = tsdbSyncSend;
syncInfo.recvFileFp = tsdbSyncRecv; syncInfo.recvFileFp = tsdbSyncRecv;
syncInfo.pTsdb = pVnode->tsdb; syncInfo.pTsdb = pVnode->tsdb;
@ -448,18 +448,14 @@ void vnodeDestroy(SVnodeObj *pVnode) {
if (pVnode->dropped) { if (pVnode->dropped) {
char rootDir[TSDB_FILENAME_LEN] = {0}; char rootDir[TSDB_FILENAME_LEN] = {0};
char newDir[TSDB_FILENAME_LEN] = {0}; char stagingDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", "vnode", vgId); sprintf(rootDir, "%s/vnode%d", "vnode", vgId);
sprintf(newDir, "%s/vnode%d", "vnode_bak", vgId); sprintf(stagingDir, "%s/.staging/vnode%d", "vnode_bak", vgId);
if (0 == tsEnableVnodeBak) { tfsRename(rootDir, stagingDir);
vInfo("vgId:%d, vnode backup not enabled", pVnode->vgId);
} else { vnodeBackup(vgId);
tfsRmdir(newDir);
tfsRename(rootDir, newDir);
}
tfsRmdir(rootDir);
dnodeSendStatusMsgToMnode(); dnodeSendStatusMsgToMnode();
} }

View File

@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "dnode.h" #include "dnode.h"
#include "vnodeStatus.h" #include "vnodeStatus.h"
#include "vnodeBackup.h"
#include "vnodeWorker.h" #include "vnodeWorker.h"
#include "vnodeRead.h" #include "vnodeRead.h"
#include "vnodeWrite.h" #include "vnodeWrite.h"
@ -29,6 +30,7 @@ static void vnodeCleanupHash(void);
static void vnodeIncRef(void *ptNode); static void vnodeIncRef(void *ptNode);
static SStep tsVnodeSteps[] = { static SStep tsVnodeSteps[] = {
{"vnode-backup", vnodeInitBackup, vnodeCleanupBackup},
{"vnode-worker", vnodeInitMWorker, vnodeCleanupMWorker}, {"vnode-worker", vnodeInitMWorker, vnodeCleanupMWorker},
{"vnode-write", vnodeInitWrite, vnodeCleanupWrite}, {"vnode-write", vnodeInitWrite, vnodeCleanupWrite},
{"vnode-read", vnodeInitRead, vnodeCleanupRead}, {"vnode-read", vnodeInitRead, vnodeCleanupRead},

View File

@ -107,8 +107,9 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion) {
pVnode->fversion = fversion; pVnode->fversion = fversion;
pVnode->version = fversion; pVnode->version = fversion;
vnodeSaveVersion(pVnode); vnodeSaveVersion(pVnode);
walResetVersion(pVnode->wal, fversion);
vDebug("vgId:%d, datafile is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion); vInfo("vgId:%d, datafile is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
vnodeSetReadyStatus(pVnode); vnodeSetReadyStatus(pVnode);
vnodeRelease(pVnode); vnodeRelease(pVnode);
@ -158,22 +159,6 @@ int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
return code; return code;
} }
int32_t vnodeResetVersion(int32_t vgId, uint64_t fver) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while reset version", vgId);
return -1;
}
pVnode->fversion = fver;
pVnode->version = fver;
walResetVersion(pVnode->wal, fver);
vInfo("vgId:%d, version reset to %" PRIu64, vgId, fver);
vnodeRelease(pVnode);
return 0;
}
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) { void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code, force); syncConfirmForward(pVnode->sync, version, code, force);

View File

@ -96,6 +96,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// write into WAL // write into WAL
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
if (code < 0) { if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code); vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
return code; return code;
} }
@ -104,7 +105,10 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// write data locally // write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet);
if (code < 0) return code; if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
return code;
}
return syncCode; return syncCode;
} }

View File

@ -68,12 +68,12 @@ public class JDBCDemo {
} }
private void insert() { private void insert() {
final String sql = "insert into test.weather (ts, temperature, humidity) values(now, 20.5, 34)"; final String sql = "insert into " + dbName + "." + tbName + " (ts, temperature, humidity) values(now, 20.5, 34)";
exuete(sql); exuete(sql);
} }
private void select() { private void select() {
final String sql = "select * from test.weather"; final String sql = "select * from "+ dbName + "." + tbName;
executeQuery(sql); executeQuery(sql);
} }

View File

@ -139,6 +139,18 @@ python3 ./test.py -f import_merge/importInsertThenImport.py
python3 ./test.py -f import_merge/importCSV.py python3 ./test.py -f import_merge/importCSV.py
#======================p1-end=============== #======================p1-end===============
#======================p2-start=============== #======================p2-start===============
# tools
python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdemoTestWithoutMetric.py
python3 test.py -f tools/taosdemoTestWithJson.py
python3 test.py -f tools/taosdemoTestLimitOffset.py
python3 test.py -f tools/taosdemoTestTblAlt.py
python3 test.py -f tools/taosdemoTestSampleData.py
python3 test.py -f tools/taosdemoTestInterlace.py
python3 test.py -f tools/taosdemoTestQuery.py
# update # update
python3 ./test.py -f update/allow_update.py python3 ./test.py -f update/allow_update.py
python3 ./test.py -f update/allow_update-0.py python3 ./test.py -f update/allow_update-0.py
@ -201,6 +213,8 @@ python3 ./test.py -f query/bug2119.py
python3 ./test.py -f query/isNullTest.py python3 ./test.py -f query/isNullTest.py
python3 ./test.py -f query/queryWithTaosdKilled.py python3 ./test.py -f query/queryWithTaosdKilled.py
python3 ./test.py -f query/floatCompare.py python3 ./test.py -f query/floatCompare.py
python3 ./test.py -f query/query1970YearsAf.py
python3 ./test.py -f query/bug3351.py
python3 ./test.py -f query/bug3375.py python3 ./test.py -f query/bug3375.py
@ -247,18 +261,6 @@ python3 test.py -f subscribe/supertable.py
#======================p3-end=============== #======================p3-end===============
#======================p4-start=============== #======================p4-start===============
# tools
python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdemoTestWithoutMetric.py
python3 test.py -f tools/taosdemoTestWithJson.py
python3 test.py -f tools/taosdemoTestLimitOffset.py
python3 test.py -f tools/taosdemoTest2.py
python3 test.py -f tools/taosdemoTestSampleData.py
python3 test.py -f tools/taosdemoTestInterlace.py
python3 test.py -f tools/taosdemoTestQuery.py
python3 ./test.py -f update/merge_commit_data-0.py python3 ./test.py -f update/merge_commit_data-0.py
# wal # wal
python3 ./test.py -f wal/addOldWalTest.py python3 ./test.py -f wal/addOldWalTest.py

View File

@ -0,0 +1,74 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db keep 36500")
tdSql.execute("use db")
tdLog.printNoPrefix("==========step1:create table && insert data")
tdSql.execute(
"create table stb1 (ts timestamp, c1 int) TAGS(t1 int)"
)
tdSql.execute("create table t0 using stb1 tags(1)")
tdSql.execute("insert into t0 values (-865000000, 1)")
tdSql.execute("insert into t0 values (-864000000, 2)")
tdSql.execute("insert into t0 values (-863000000, 3)")
tdSql.execute("insert into t0 values (-15230000, 4)")
tdSql.execute("insert into t0 values (-15220000, 5)")
tdSql.execute("insert into t0 values (-15210000, 6)")
tdLog.printNoPrefix("==========step2:query")
# bug1:when ts > -864000000, return 0 rows;
# bug2:when ts = -15220000, return 0 rows.
tdSql.query('select * from t0 where ts < -864000000')
tdSql.checkRows(1)
tdSql.query('select * from t0 where ts <= -864000000')
tdSql.checkRows(2)
tdSql.query('select * from t0 where ts = -864000000')
tdSql.checkRows(1)
tdSql.query('select * from t0 where ts > -864000000')
tdSql.checkRows(4)
tdSql.query('select * from t0 where ts >= -864000000')
tdSql.checkRows(5)
tdSql.query('select * from t0 where ts < -15220000')
tdSql.checkRows(4)
tdSql.query('select * from t0 where ts <= -15220000')
tdSql.checkRows(5)
tdSql.query('select * from t0 where ts = -15220000')
tdSql.checkRows(1)
tdSql.query('select * from t0 where ts > -15220000')
tdSql.checkRows(1)
tdSql.query('select * from t0 where ts >= -15220000')
tdSql.checkRows(2)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -22,25 +22,34 @@ import datetime
from util.log import * from util.log import *
from util.sql import * from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import *
from util.dnodes import TDDnode
class TDTestCase: class TDTestCase:
def __init__(self):
self.path = ""
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
def getCfgDir(self, path): def getcfgPath(self, path):
binPath = os.path.dirname(os.path.realpath(__file__)) binPath = os.path.dirname(os.path.realpath(__file__))
binPath = binPath + "/../../../debug/" binPath = binPath + "/../../../debug/"
tdLog.debug("binPath %s" % (binPath)) tdLog.debug(f"binPath {binPath}")
binPath = os.path.realpath(binPath) binPath = os.path.realpath(binPath)
tdLog.debug("binPath real path %s" % (binPath)) tdLog.debug(f"binPath real path {binPath}")
if path == "": if path == "":
self.path = os.path.abspath(binPath + "../../") self.path = os.path.abspath(binPath + "../../")
else: else:
self.path = os.path.realpath(path) self.path = os.path.realpath(path)
return self.path
self.cfgDir = "%s/sim/psim/cfg" % (self.path) def getCfgDir(self):
self.getcfgPath(self.path)
self.cfgDir = f"{self.path}/sim/psim/cfg"
return self.cfgDir return self.cfgDir
def creatcfg(self): def creatcfg(self):
@ -63,7 +72,7 @@ class TDTestCase:
"update": 0 "update": 0
} }
# 设置创建的超级表格式 # set stable schema
stable1 = { stable1 = {
"name": "stb2", "name": "stb2",
"child_table_exists": "no", "child_table_exists": "no",
@ -75,35 +84,49 @@ class TDTestCase:
"insert_rows": 5000, "insert_rows": 5000,
"multi_thread_write_one_tbl": "no", "multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0, "number_of_tbl_in_one_sql": 0,
"rows_per_tbl": 1000, "rows_per_tbl": 1,
"max_sql_len": 65480, "max_sql_len": 65480,
"disorder_ratio": 0, "disorder_ratio": 0,
"disorder_range": 1000, "disorder_range": 1000,
"timestamp_step": 19000000, "timestamp_step": 20000,
"start_timestamp": "1969-01-01 00:00:00.000", "start_timestamp": "1969-12-31 00:00:00.000",
"sample_format": "csv", "sample_format": "csv",
"sample_file": "./sample.csv", "sample_file": "./sample.csv",
"tags_file": "", "tags_file": "",
"columns": [ "columns": [
{"type": "INT"}, {"type": "INT", "count": 2},
{"type": "DOUBLE", "count": 10}, {"type": "DOUBLE", "count": 2},
{"type": "BINARY", "len": 16, "count": 3}, {"type": "BIGINT", "count": 2},
{"type": "BINARY", "len": 32, "count": 6} {"type": "FLOAT", "count": 2},
{"type": "SMALLINT", "count": 2},
{"type": "TINYINT", "count": 2},
{"type": "BOOL", "count": 2},
{"type": "NCHAR", "len": 3, "count": 1},
{"type": "BINARY", "len": 8, "count": 1}
], ],
"tags": [ "tags": [
{"type": "INT", "count": 2},
{"type": "DOUBLE", "count": 2},
{"type": "BIGINT", "count": 2},
{"type": "FLOAT", "count": 2},
{"type": "SMALLINT", "count": 2},
{"type": "TINYINT", "count": 2}, {"type": "TINYINT", "count": 2},
{"type": "BINARY", "len": 16, "count": 5} {"type": "BOOL", "count": 2},
{"type": "NCHAR", "len": 3, "count": 1},
{"type": "BINARY", "len": 8, "count": 1}
] ]
} }
# 需要创建多个超级表时只需创建不同的超级表格式并添加至super_tables # create different stables like stable1 and add to list super_tables
super_tables = [stable1] super_tables = []
super_tables.append(stable1)
database = { database = {
"dbinfo": dbinfo, "dbinfo": dbinfo,
"super_tables": super_tables "super_tables": super_tables
} }
cfgdir = self.getCfgDir("") cfgdir = self.getCfgDir()
create_table = { create_table = {
"filetype": "insert", "filetype": "insert",
"cfgdir": cfgdir, "cfgdir": cfgdir,
@ -121,17 +144,82 @@ class TDTestCase:
} }
return create_table return create_table
def inserttable(self): def createinsertfile(self):
create_table = self.creatcfg() create_table = self.creatcfg()
date = datetime.datetime.now().strftime("%Y%m%d%H%M") date = datetime.datetime.now().strftime("%Y%m%d%H%M")
file_create_table = f"/tmp/insert_{date}.json" file_create_table = f"/tmp/insert_{date}.json"
with open(file_create_table, 'w') as f: with open(file_create_table, 'w') as f:
json.dump(create_table, f) json.dump(create_table, f)
return file_create_table
create_table_cmd = f"taosdemo -f {file_create_table}" def inserttable(self, filepath):
create_table_cmd = f"taosdemo -f {filepath} > /dev/null 2>&1"
_ = subprocess.check_output(create_table_cmd, shell=True).decode("utf-8") _ = subprocess.check_output(create_table_cmd, shell=True).decode("utf-8")
def sqlsquery(self):
# stable query
tdSql.query(
"select * from stb2 where stb2.ts < '1970-01-01 00:00:00.000' "
)
tdSql.checkRows(43200)
tdSql.query(
"select * from stb2 where stb2.ts >= '1970-01-01 00:00:00.000' "
)
tdSql.checkRows(6800)
tdSql.query(
"select * from stb2 where stb2.ts > '1969-12-31 23:00:00.000' and stb2.ts <'1970-01-01 01:00:00.000' "
)
tdSql.checkRows(3590)
# child-tables query
tdSql.query(
"select * from t0 where t0.ts < '1970-01-01 00:00:00.000' "
)
tdSql.checkRows(4320)
tdSql.query(
"select * from t1 where t1.ts >= '1970-01-01 00:00:00.000' "
)
tdSql.checkRows(680)
tdSql.query(
"select * from t9 where t9.ts > '1969-12-31 22:00:00.000' and t9.ts <'1970-01-01 02:00:00.000' "
)
tdSql.checkRows(719)
tdSql.query(
"select * from t0,t1 where t0.ts=t1.ts and t1.ts >= '1970-01-01 00:00:00.000' "
)
tdSql.checkRows(680)
tdSql.query(
"select diff(col1) from t0 where t0.ts >= '1970-01-01 00:00:00.000' "
)
tdSql.checkRows(679)
tdSql.query(
"select t0,col1 from stb2 where stb2.ts < '1970-01-01 00:00:00.000' order by ts"
)
tdSql.checkRows(43200)
# query with timestamp in 'where ...'
tdSql.query(
"select * from stb2 where stb2.ts > -28800000 "
)
tdSql.checkRows(6790)
tdSql.query(
"select * from stb2 where stb2.ts > -28800000 and stb2.ts < '1970-01-01 08:00:00.000' "
)
tdSql.checkRows(6790)
tdSql.query(
"select * from stb2 where stb2.ts < -28800000 and stb2.ts > '1969-12-31 22:00:00.000' "
)
tdSql.checkRows(3590)
def run(self): def run(self):
s = 'reset query cache' s = 'reset query cache'
@ -142,84 +230,24 @@ class TDTestCase:
tdSql.execute(s) tdSql.execute(s)
tdLog.info("==========step1:create table stable and child table,then insert data automatically") tdLog.info("==========step1:create table stable and child table,then insert data automatically")
self.inserttable() insertfile = self.createinsertfile()
# tdSql.execute( self.inserttable(insertfile)
# '''create table if not exists supt
# (ts timestamp, c1 int, c2 float, c3 bigint, c4 double, c5 smallint, c6 tinyint)
# tags(location binary(64), type int, isused bool , family nchar(64))'''
# )
# tdSql.execute("create table t1 using supt tags('beijing', 1, 1, '自行车')")
# tdSql.execute("create table t2 using supt tags('shanghai', 2, 0, '拖拉机')")
# tdSql.execute(
# f"insert into t1 values (-31564800000, 6, 5, 4, 3, 2, 1)"
# )
tdLog.info("==========step2:query join") tdLog.info("==========step2:query join")
self.sqlsquery()
# stable query # after wal and sync, check again
tdSql.query( tdSql.query("show dnodes")
"select * from stb2 where stb2.ts < '1970-01-01 00:00:00.000' " index = tdSql.getData(0, 0)
) tdDnodes.stop(index)
tdSql.checkRows(16600) tdDnodes.start(index)
tdSql.query( tdLog.info("==========step3: query join again")
"select * from stb2 where stb2.ts >= '1970-01-01 00:00:00.000' " self.sqlsquery()
)
tdSql.checkRows(33400)
tdSql.query(
"select * from stb2 where stb2.ts > '1969-12-01 00:00:00.000' and stb2.ts <'1970-01-31 00:00:00.000' "
)
tdSql.checkRows(2780)
# child-table query
tdSql.query(
"select * from t0 where t0.ts < '1970-01-01 00:00:00.000' "
)
tdSql.checkRows(1660)
tdSql.query(
"select * from t1 where t1.ts >= '1970-01-01 00:00:00.000' "
)
tdSql.checkRows(3340)
tdSql.query(
"select * from t9 where t9.ts > '1969-12-01 00:00:00.000' and t9.ts <'1970-01-31 00:00:00.000' "
)
tdSql.checkRows(278)
tdSql.query(
"select * from t0,t1 where t0.ts=t1.ts and t1.ts >= '1970-01-01 00:00:00.000' "
)
tdSql.checkRows(3340)
tdSql.query(
"select diff(col1) from t0 where t0.ts >= '1970-01-01 00:00:00.000' "
)
tdSql.checkRows(3339)
tdSql.query(
"select t0,col1 from stb2 where stb2.ts < '1970-01-01 00:00:00.000' order by ts"
)
tdSql.checkRows(16600)
# query with timestamp in 'where ...'
tdSql.query(
"select * from stb2 where stb2.ts > -28800000 "
)
tdSql.checkRows(33400)
tdSql.query(
"select * from stb2 where stb2.ts > -28800000 and stb2.ts < '1970-01-01 08:00:00.000' "
)
tdSql.checkRows(20)
tdSql.query(
"select * from stb2 where stb2.ts < -28800000 and stb2.ts > '1969-12-31 16:00:00.000' "
)
# delete temporary file
rm_cmd = f"rm -f /tmp/insert* > /dev/null 2>&1"
_ = subprocess.check_output(rm_cmd, shell=True).decode("utf-8")
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -29,10 +29,33 @@ class TDTestCase:
self.numberOfTables = 10 self.numberOfTables = 10
self.numberOfRecords = 1000000 self.numberOfRecords = 1000000
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def insertDataAndAlterTable(self, threadID): def insertDataAndAlterTable(self, threadID):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath + "/build/bin/"
if(threadID == 0): if(threadID == 0):
os.system("taosdemo -y -t %d -n %d" % os.system("%staosdemo -y -t %d -n %d" %
(self.numberOfTables, self.numberOfRecords)) (binPath, self.numberOfTables, self.numberOfRecords))
if(threadID == 1): if(threadID == 1):
time.sleep(2) time.sleep(2)
print("use test") print("use test")
@ -47,7 +70,13 @@ class TDTestCase:
# check if all the tables have heen created # check if all the tables have heen created
while True: while True:
tdSql.query("show tables") try:
tdSql.query("show tables")
except Exception as e:
tdLog.info("show tables test failed")
time.sleep(1)
continue
rows = tdSql.queryRows rows = tdSql.queryRows
print("number of tables: %d" % rows) print("number of tables: %d" % rows)
if(rows == self.numberOfTables): if(rows == self.numberOfTables):
@ -56,16 +85,23 @@ class TDTestCase:
# check if there are any records in the last created table # check if there are any records in the last created table
while True: while True:
print("query started") print("query started")
tdSql.query("select * from test.t9") try:
tdSql.query("select * from test.t9")
except Exception as e:
tdLog.info("select * test failed")
time.sleep(2)
continue
rows = tdSql.queryRows rows = tdSql.queryRows
print("number of records: %d" % rows) print("number of records: %d" % rows)
if(rows > 0): if(rows > 0):
break break
time.sleep(1) time.sleep(1)
print("alter table test.meters add column col10 int") print("alter table test.meters add column col10 int")
tdSql.execute("alter table test.meters add column col10 int") tdSql.execute("alter table test.meters add column col10 int")
print("insert into test.t0 values (now, 1, 2, 3, 4, 0.1, 0.01,'test', '测试', TRUE, 1610000000000, 0)") print("insert into test.t9 values (now, 1, 2, 3, 4, 0.1, 0.01,'test', '测试', TRUE, 1610000000000, 0)")
tdSql.execute("insert into test.t0 values (now, 1, 2, 3, 4, 0.1, 0.01,'test', '测试', TRUE, 1610000000000, 0)") tdSql.execute("insert into test.t9 values (now, 1, 2, 3, 4, 0.1, 0.01,'test', '测试', TRUE, 1610000000000, 0)")
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
@ -78,6 +114,8 @@ class TDTestCase:
t1.join() t1.join()
t2.join() t2.join()
time.sleep(3)
tdSql.query("select count(*) from test.meters") tdSql.query("select count(*) from test.meters")
tdSql.checkData(0, 0, self.numberOfRecords * self.numberOfTables + 1) tdSql.checkData(0, 0, self.numberOfRecords * self.numberOfTables + 1)

View File

@ -25,29 +25,24 @@ function stopTaosd {
function dohavecore(){ function dohavecore(){
corefile=`find $corepath -mmin 1` corefile=`find $corepath -mmin 1`
core_file=`echo $corefile|cut -d " " -f2` core_file=`echo $corefile|cut -d " " -f2`
echo "corefile:$core_file"
echo "corepath:$corepath"
ls -l $corepath
proc=`echo $corefile|cut -d "_" -f3` proc=`echo $corefile|cut -d "_" -f3`
if [ -n "$corefile" ];then if [ -n "$corefile" ];then
echo 'taosd or taos has generated core' echo 'taosd or taos has generated core'
rm case.log
if [[ "$tests_dir" == *"$IN_TDINTERNAL"* ]] && [[ $1 == 1 ]]; then if [[ "$tests_dir" == *"$IN_TDINTERNAL"* ]] && [[ $1 == 1 ]]; then
cd ../../../ cd ../../../
tar -zcPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz debug/build/bin/taosd debug/build/bin/tsim debug/build/lib/libtaos*so* tar -zcPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz debug/build/bin/taosd debug/build/bin/tsim debug/build/lib/libtaos*so*
if [[ $2 == 1 ]];then if [[ $2 == 1 ]];then
cp -r sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S"` cp -r sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S"`
rm -rf sim/case.log
else else
cd community cd community
cp -r sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" ` cp -r sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" `
rm -rf sim/case.log
fi fi
else else
cd ../../ cd ../../
if [[ $1 == 1 ]];then if [[ $1 == 1 ]];then
tar -zcPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz debug/build/bin/taosd debug/build/bin/tsim debug/build/lib/libtaos*so* tar -zcPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz debug/build/bin/taosd debug/build/bin/tsim debug/build/lib/libtaos*so*
cp -r sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" ` cp -r sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" `
rm -rf sim/case.log
fi fi
fi fi
if [[ $1 == 1 ]];then if [[ $1 == 1 ]];then
@ -82,7 +77,6 @@ function runSimCaseOneByOne {
# fi # fi
end_time=`date +%s` end_time=`date +%s`
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a out.log echo execution time of $case was `expr $end_time - $start_time`s. | tee -a out.log
dohavecore 0
fi fi
done < $1 done < $1
} }
@ -99,26 +93,25 @@ function runSimCaseOneByOnefq {
date +%F\ %T | tee -a out.log date +%F\ %T | tee -a out.log
if [[ "$tests_dir" == *"$IN_TDINTERNAL"* ]]; then if [[ "$tests_dir" == *"$IN_TDINTERNAL"* ]]; then
echo -n $case echo -n $case
./test.sh -f $case > ../../../sim/case.log 2>&1 && \ ./test.sh -f $case > case.log 2>&1 && \
( grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../../../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \ ( grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../../../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \
( grep -q 'script.*success.*m$' ../../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \ ( grep -q 'script.*success.*m$' ../../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \
( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && cat ../../../sim/case.log ) ( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && cat case.log )
else else
echo -n $case echo -n $case
./test.sh -f $case > ../../sim/case.log 2>&1 && \ ./test.sh -f $case > ../../sim/case.log 2>&1 && \
( grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \ ( grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \
( grep -q 'script.*success.*m$' ../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \ ( grep -q 'script.*success.*m$' ../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \
( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && cat ../../sim/case.log ) ( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && cat case.log )
fi fi
out_log=`tail -1 out.log ` out_log=`tail -1 out.log `
if [[ $out_log =~ 'failed' ]];then if [[ $out_log =~ 'failed' ]];then
rm case.log
if [[ "$tests_dir" == *"$IN_TDINTERNAL"* ]]; then if [[ "$tests_dir" == *"$IN_TDINTERNAL"* ]]; then
cp -r ../../../sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S"` cp -r ../../../sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S"`
rm -rf ../../../sim/case.log
else else
cp -r ../../sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" ` cp -r ../../sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" `
rm -rf ../../sim/case.log
fi fi
dohavecore $2 1 dohavecore $2 1
if [[ $2 == 1 ]];then if [[ $2 == 1 ]];then
@ -159,7 +152,6 @@ function runPyCaseOneByOne {
else else
$line > /dev/null 2>&1 $line > /dev/null 2>&1
fi fi
dohavecore 0
fi fi
done < $1 done < $1
} }
@ -185,7 +177,7 @@ function runPyCaseOneByOnefq() {
start_time=`date +%s` start_time=`date +%s`
date +%F\ %T | tee -a pytest-out.log date +%F\ %T | tee -a pytest-out.log
echo -n $case echo -n $case
$line > ../../sim/case.log 2>&1 && \ $line > case.log 2>&1 && \
echo -e "${GREEN} success${NC}" | tee -a pytest-out.log || \ echo -e "${GREEN} success${NC}" | tee -a pytest-out.log || \
echo -e "${RED} failed${NC}" | tee -a pytest-out.log echo -e "${RED} failed${NC}" | tee -a pytest-out.log
end_time=`date +%s` end_time=`date +%s`
@ -193,8 +185,8 @@ function runPyCaseOneByOnefq() {
if [[ $out_log =~ 'failed' ]];then if [[ $out_log =~ 'failed' ]];then
cp -r ../../sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" ` cp -r ../../sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" `
echo '=====================log===================== ' echo '=====================log===================== '
cat ../../sim/case.log cat case.log
rm -rf ../../sim/case.log rm -rf case.log
dohavecore $2 2 dohavecore $2 2
if [[ $2 == 1 ]];then if [[ $2 == 1 ]];then
exit 8 exit 8