Merge branch 'develop' into long_query

This commit is contained in:
AlexDuan 2021-09-02 09:52:19 +08:00
commit 27fd99f85a
22 changed files with 979 additions and 415 deletions

15
Jenkinsfile vendored
View File

@ -234,6 +234,7 @@ pipeline {
cd ${WKC}/tests/examples/nodejs
npm install td2.0-connector > /dev/null 2>&1
node nodejsChecker.js host=localhost
node test1970.js
'''
sh '''
cd ${WKC}/tests/examples/C#/taosdemo
@ -256,13 +257,11 @@ pipeline {
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
timeout(time: 60, unit: 'MINUTES'){
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
timeout(time: 60, unit: 'MINUTES'){
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
timeout(time: 60, unit: 'MINUTES'){
// sh '''
@ -453,4 +452,4 @@ pipeline {
)
}
}
}
}

View File

@ -46,7 +46,7 @@ TDengine 的 JDBC 驱动实现尽可能与关系型数据库驱动保持一致
</tr>
</table>
注意:与 JNI 方式不同RESTful 接口是无状态的。在使用JDBC-RESTful时需要在sql中指定表、超级表的数据库名称。从 TDengine 2.1.8.0 版本开始,也可以在 RESTful url 中指定当前 SQL 语句所使用的默认数据库名。)例如:
注意:与 JNI 方式不同RESTful 接口是无状态的。在使用JDBC-RESTful时需要在sql中指定表、超级表的数据库名称。从 TDengine 2.2.0.0 版本开始,也可以在 RESTful url 中指定当前 SQL 语句所使用的默认数据库名。)例如:
```sql
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('beijing') VALUES(now, 24.6);
```

View File

@ -654,7 +654,7 @@ conn.close()
为支持各种不同类型平台的开发TDengine 提供符合 REST 设计标准的 API即 RESTful API。为最大程度降低学习成本不同于其他数据库 RESTful API 的设计方法TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。RESTful 连接器的使用参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。
注意与标准连接器的一个区别是RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。(从 2.1.8.0 版本开始,支持在 RESTful url 中指定 db_name这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 url 中指定的这个 db_name。
注意与标准连接器的一个区别是RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。(从 2.2.0.0 版本开始,支持在 RESTful url 中指定 db_name这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 url 中指定的这个 db_name。
### 安装
@ -695,7 +695,7 @@ http://<fqdn>:<port>/rest/sql/[db_name]
- fqnd: 集群中的任一台主机 FQDN 或 IP 地址
- port: 配置文件中 httpPort 配置项,缺省为 6041
- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。(从 2.1.8.0 版本开始支持)
- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。(从 2.2.0.0 版本开始支持)
例如http://h1.taos.com:6041/rest/sql/test 是指向地址为 h1.taos.com:6041 的 url并将默认使用的数据库库名设置为 test。

View File

@ -800,7 +800,7 @@ taos -n sync -P 6042 -h <fqdn of server>
`taos -n speed -h <fqdn of server> -P 6030 -N 10 -l 10000000 -S TCP`
从 2.1.8.0 版本开始taos 工具新提供了一个网络速度诊断的模式,可以对一个正在运行中的 taosd 实例或者 `taos -n server` 方式模拟的一个服务端实例,以非压缩传输的方式进行网络测速。这个模式下可供调整的参数如下:
从 2.2.0.0 版本开始taos 工具新提供了一个网络速度诊断的模式,可以对一个正在运行中的 taosd 实例或者 `taos -n server` 方式模拟的一个服务端实例,以非压缩传输的方式进行网络测速。这个模式下可供调整的参数如下:
-n设为“speed”时表示对网络速度进行诊断。
-h所要连接的服务端的 FQDN 或 ip 地址。如果不设置这一项,会使用本机 taos.cfg 文件中 FQDN 参数的设置作为默认值。
@ -813,7 +813,7 @@ taos -n sync -P 6042 -h <fqdn of server>
`taos -n fqdn -h <fqdn of server>`
从 2.1.8.0 版本开始taos 工具新提供了一个 FQDN 解析速度的诊断模式,可以对一个目标 FQDN 地址尝试解析,并记录解析过程中所消耗的时间。这个模式下可供调整的参数如下:
从 2.2.0.0 版本开始taos 工具新提供了一个 FQDN 解析速度的诊断模式,可以对一个目标 FQDN 地址尝试解析,并记录解析过程中所消耗的时间。这个模式下可供调整的参数如下:
-n设为“fqdn”时表示对 FQDN 解析进行诊断。
-h所要解析的目标 FQDN 地址。如果不设置这一项,会使用本机 taos.cfg 文件中 FQDN 参数的设置作为默认值。

View File

@ -713,22 +713,49 @@ Query OK, 1 row(s) in set (0.001091s)
| <= | smaller than or equal to | **`timestamp`** and all numeric types |
| = | equal to | all types |
| <> | not equal to | all types |
| is [not] null | is null or is not null | all types |
| between and | within a certain range | **`timestamp`** and all numeric types |
| in | match any value in a set | all types except first column `timestamp` |
| like | match a wildcard string | **`binary`** **`nchar`** |
| % | match with any char sequences | **`binary`** **`nchar`** |
| _ | match with a single char | **`binary`** **`nchar`** |
1. <> 算子也可以写为 != ,请注意,这个算子不能用于数据表第一列的 timestamp 字段。
2. like 算子使用通配符字符串进行匹配检查。
* 在通配符字符串中:'%'(百分号)匹配 0 到任意个字符;'\_'(下划线)匹配单个任意字符。
* 如果希望匹配字符串中原本就带有的 \_下划线字符那么可以在通配符字符串中写作 `\_`,也即加一个反斜线来进行转义。(从 2.1.8.0 版本开始支持)
* 如果希望匹配字符串中原本就带有的 \_下划线字符那么可以在通配符字符串中写作 `\_`,也即加一个反斜线来进行转义。(从 2.2.0.0 版本开始支持)
* 通配符字符串最长不能超过 20 字节。(从 2.1.6.1 版本开始,通配符字符串的长度放宽到了 100 字节,并可以通过 taos.cfg 中的 maxWildCardsLength 参数来配置这一长度限制。但不建议使用太长的通配符字符串,将有可能严重影响 LIKE 操作的执行性能。)
3. 同时进行多个字段的范围过滤,需要使用关键词 AND 来连接不同的查询条件,暂不支持 OR 连接的不同列之间的查询过滤条件。
* 从 2.3.0.0 版本开始,已支持完整的同一列和/或不同列间的 AND/OR 运算。
4. 针对单一字段的过滤,如果是时间过滤条件,则一条语句中只支持设定一个;但针对其他的(普通)列或标签列,则可以使用 `OR` 关键字进行组合条件的查询过滤。例如: `((value > 20 AND value < 30) OR (value < 12))`
* 从 2.3.0.0 版本开始,允许使用多个时间过滤条件,但首列时间戳的过滤运算结果只能包含一个区间。
5. 从 2.0.17.0 版本开始,条件过滤开始支持 BETWEEN AND 语法,例如 `WHERE col2 BETWEEN 1.5 AND 3.25` 表示查询条件为“1.5 ≤ col2 ≤ 3.25”。
6. 从 2.1.4.0 版本开始,条件过滤开始支持 IN 算子,例如 `WHERE city IN ('Beijing', 'Shanghai')`。说明BOOL 类型写作 `{true, false}``{0, 1}` 均可,但不能写作 0、1 之外的整数FLOAT 和 DOUBLE 类型会受到浮点数精度影响集合内的值在精度范围内认为和数据行的值完全相等才能匹配成功TIMESTAMP 类型支持非主键的列。<!-- REPLACE_OPEN_TO_ENTERPRISE__IN_OPERATOR_AND_UNSIGNED_INTEGER -->
<a class="anchor" id="nested"></a>
### 嵌套查询
“嵌套查询”又称为“子查询”,也即在一条 SQL 语句中,“内层查询”的计算结果可以作为“外层查询”的计算对象来使用。
从 2.2.0.0 版本开始TDengine 的查询引擎开始支持在 FROM 子句中使用非关联子查询(“非关联”的意思是,子查询不会用到父查询中的参数)。也即在普通 SELECT 语句的 tb_name_list 位置,用一个独立的 SELECT 语句来代替(这一 SELECT 语句被包含在英文圆括号内),于是完整的嵌套查询 SQL 语句形如:
```mysql
SELECT ... FROM (SELECT ... FROM ...) ...;
```
说明:
1. 目前仅支持一层嵌套,也即不能在子查询中再嵌入子查询。
2. 内层查询的返回结果将作为“虚拟表”供外层查询使用,此虚拟表可以使用 AS 语法做重命名,以便于外层查询中方便引用。
3. 目前不能在“连续查询”功能中使用子查询。
4. 在内层和外层查询中,都支持普通的表间/超级表间 JOIN。内层查询的计算结果也可以再参与数据子表的 JOIN 操作。
5. 目前内层查询、外层查询均不支持 UNION 操作。
6. 内层查询支持的功能特性与非嵌套的查询语句能力是一致的。
* 内层查询的 ORDER BY 子句一般没有意义,建议避免这样的写法以免无谓的资源消耗。
7. 与非嵌套的查询语句相比,外层查询所能支持的功能特性存在如下限制:
* 计算函数部分:
1. 如果内层查询的结果数据未提供时间戳那么计算过程依赖时间戳的函数在外层会无法正常工作。例如TOP, BOTTOM, FIRST, LAST, DIFF。
2. 计算过程需要两遍扫描的函数在外层查询中无法正常工作。例如此类函数包括STDDEV, PERCENTILE。
* 外层查询中不支持 IN 算子,但在内层中可以使用。
* 外层查询不支持 GROUP BY。
<a class="anchor" id="union"></a>
### UNION ALL 操作符
@ -1433,17 +1460,19 @@ SELECT AVG(current), MAX(current), LEASTSQUARES(current, start_val, step_val), P
- SELECT 语句的查询结果,最多允许返回 1024 列(语句中的函数调用可能也会占用一些列空间),超限时需要显式指定较少的返回数据列,以避免语句执行报错。
- 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制。
## TAOS SQL其他约定
## TAOS SQL 其他约定
**GROUP BY的限制**
TAOS SQL支持对标签、TBNAME进行GROUP BY操作也支持普通列进行GROUP BY前提是仅限一列且该列的唯一值小于10万个。
TAOS SQL 支持对标签、TBNAME 进行 GROUP BY 操作,也支持普通列进行 GROUP BY前提是仅限一列且该列的唯一值小于 10 万个。
**JOIN操作的限制**
**JOIN 操作的限制**
TAOS SQL支持表之间按主键时间戳来join两张表的列暂不支持两个表之间聚合后的四则运算。
TAOS SQL 支持表之间按主键时间戳来 join 两张表的列,暂不支持两个表之间聚合后的四则运算。
**IS NOT NULL与不为空的表达式适用范围**
JOIN 查询的不同表的过滤条件之间不能为 OR。
IS NOT NULL支持所有类型的列。不为空的表达式为 <>"",仅对非数值类型的列适用。
**IS NOT NULL 与不为空的表达式适用范围**
IS NOT NULL 支持所有类型的列。不为空的表达式为 <>"",仅对非数值类型的列适用。

View File

@ -20,12 +20,42 @@
#include "com_taosdata_jdbc_TSDBJNIConnector.h"
#define jniFatal(...) { if (jniDebugFlag & DEBUG_FATAL) { taosPrintLog("JNI FATAL ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }}
#define jniError(...) { if (jniDebugFlag & DEBUG_ERROR) { taosPrintLog("JNI ERROR ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }}
#define jniWarn(...) { if (jniDebugFlag & DEBUG_WARN) { taosPrintLog("JNI WARN ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }}
#define jniInfo(...) { if (jniDebugFlag & DEBUG_INFO) { taosPrintLog("JNI ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }}
#define jniDebug(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); }}
#define jniTrace(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); }}
#define jniFatal(...) \
{ \
if (jniDebugFlag & DEBUG_FATAL) { \
taosPrintLog("JNI FATAL ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \
} \
}
#define jniError(...) \
{ \
if (jniDebugFlag & DEBUG_ERROR) { \
taosPrintLog("JNI ERROR ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \
} \
}
#define jniWarn(...) \
{ \
if (jniDebugFlag & DEBUG_WARN) { \
taosPrintLog("JNI WARN ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \
} \
}
#define jniInfo(...) \
{ \
if (jniDebugFlag & DEBUG_INFO) { \
taosPrintLog("JNI ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \
} \
}
#define jniDebug(...) \
{ \
if (jniDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); \
} \
}
#define jniTrace(...) \
{ \
if (jniDebugFlag & DEBUG_TRACE) { \
taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); \
} \
}
int __init = 0;
@ -60,14 +90,14 @@ jmethodID g_blockdataSetByteArrayFp;
jmethodID g_blockdataSetNumOfRowsFp;
jmethodID g_blockdataSetNumOfColsFp;
#define JNI_SUCCESS 0
#define JNI_TDENGINE_ERROR -1
#define JNI_SUCCESS 0
#define JNI_TDENGINE_ERROR -1
#define JNI_CONNECTION_NULL -2
#define JNI_RESULT_SET_NULL -3
#define JNI_NUM_OF_FIELDS_0 -4
#define JNI_SQL_NULL -5
#define JNI_FETCH_END -6
#define JNI_OUT_OF_MEMORY -7
#define JNI_SQL_NULL -5
#define JNI_FETCH_END -6
#define JNI_OUT_OF_MEMORY -7
static void jniGetGlobalMethod(JNIEnv *env) {
// make sure init function executed once
@ -129,13 +159,13 @@ static void jniGetGlobalMethod(JNIEnv *env) {
}
static int32_t check_for_params(jobject jobj, jlong conn, jlong res) {
if ((TAOS*) conn == NULL) {
if ((TAOS *)conn == NULL) {
jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL;
}
if ((TAOS_RES *) res == NULL) {
jniError("jobj:%p, conn:%p, res is null", jobj, (TAOS*) conn);
if ((TAOS_RES *)res == NULL) {
jniError("jobj:%p, conn:%p, res is null", jobj, (TAOS *)conn);
return JNI_RESULT_SET_NULL;
}
@ -216,7 +246,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setOptions(JNIEnv
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEnv *env, jobject jobj, jstring jhost,
jint jport, jstring jdbName, jstring juser,
jstring jpass) {
jlong ret = 0;
jlong ret = 0;
const char *host = NULL;
const char *user = NULL;
const char *pass = NULL;
@ -246,7 +276,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn
jniDebug("jobj:%p, pass not specified, use default password", jobj);
}
ret = (jlong) taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport);
ret = (jlong)taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport);
if (ret == 0) {
jniError("jobj:%p, conn:%p, connect to database failed, host=%s, user=%s, dbname=%s, port=%d", jobj, (void *)ret,
(char *)host, (char *)user, (char *)dbname, (int32_t)jport);
@ -289,7 +319,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(
jsize len = (*env)->GetArrayLength(env, jsql);
char *str = (char *) calloc(1, sizeof(char) * (len + 1));
char *str = (char *)calloc(1, sizeof(char) * (len + 1));
if (str == NULL) {
jniError("jobj:%p, conn:%p, alloc memory failed", jobj, tscon);
return JNI_OUT_OF_MEMORY;
@ -315,16 +345,17 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(
}
free(str);
return (jlong) pSql;
return (jlong)pSql;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNIEnv *env, jobject jobj, jlong con, jlong tres) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNIEnv *env, jobject jobj, jlong con,
jlong tres) {
int32_t code = check_for_params(jobj, con, tres);
if (code != JNI_SUCCESS) {
return code;
}
return (jint)taos_errno((TAOS_RES*) tres);
return (jint)taos_errno((TAOS_RES *)tres);
}
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(JNIEnv *env, jobject jobj, jlong tres) {
@ -334,7 +365,7 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con,
jlong tres) {
TAOS *tscon = (TAOS *)con;
TAOS * tscon = (TAOS *)con;
int32_t code = check_for_params(jobj, con, tres);
if (code != JNI_SUCCESS) {
return code;
@ -359,7 +390,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp(
SSqlObj *pSql = (TAOS_RES *)tres;
return (tscIsUpdateQuery(pSql)? 1:0);
return (tscIsUpdateQuery(pSql) ? 1 : 0);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp(JNIEnv *env, jobject jobj, jlong con,
@ -370,21 +401,22 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp(
}
taos_free_result((void *)res);
jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, (TAOS*) con, (void *)res);
jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, (TAOS *)con, (void *)res);
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, jlong con,
jlong res) {
TAOS *tscon = (TAOS *)con;
TAOS * tscon = (TAOS *)con;
int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) {
return code;
}
jint ret = taos_affected_rows((SSqlObj *)res);
jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (TAOS *)con, (TAOS_RES *)res, (int32_t)ret);
jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (TAOS *)con, (TAOS_RES *)res,
(int32_t)ret);
return ret;
}
@ -392,13 +424,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsIm
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaDataImp(JNIEnv *env, jobject jobj,
jlong con, jlong res,
jobject arrayListObj) {
TAOS *tscon = (TAOS *)con;
TAOS * tscon = (TAOS *)con;
int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) {
return code;
}
TAOS_RES* tres = (TAOS_RES*) res;
TAOS_RES * tres = (TAOS_RES *)res;
TAOS_FIELD *fields = taos_fetch_fields(tres);
int32_t num_fields = taos_num_fields(tres);
@ -452,7 +484,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
int32_t numOfFields = taos_num_fields(result);
if (numOfFields == 0) {
jniError("jobj:%p, conn:%p, resultset:%p, fields size %d", jobj, tscon, (void*)res, numOfFields);
jniError("jobj:%p, conn:%p, resultset:%p, fields size %d", jobj, tscon, (void *)res, numOfFields);
return JNI_NUM_OF_FIELDS_0;
}
@ -460,7 +492,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
if (row == NULL) {
int code = taos_errno(result);
if (code == TSDB_CODE_SUCCESS) {
jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void*)res, numOfFields);
jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void *)res,
numOfFields);
return JNI_FETCH_END;
} else {
jniDebug("jobj:%p, conn:%p, interrupted query", jobj, tscon);
@ -468,7 +501,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
}
}
int32_t* length = taos_fetch_lengths(result);
int32_t *length = taos_fetch_lengths(result);
char tmp[TSDB_MAX_BYTES_PER_ROW] = {0};
@ -533,7 +566,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNIEnv *env, jobject jobj, jlong con,
jlong res, jobject rowobj) {
jlong res, jobject rowobj) {
TAOS * tscon = (TAOS *)con;
int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) {
@ -564,8 +597,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNI
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields);
for (int i = 0; i < numOfFields; i++) {
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, i, fields[i].bytes * numOfRows,
jniFromNCharToByteArray(env, (char *)row[i], fields[i].bytes * numOfRows));
int bytes = fields[i].bytes;
if (fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes += 2;
}
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, i, bytes * numOfRows,
jniFromNCharToByteArray(env, (char *)row[i], bytes * numOfRows));
}
return JNI_SUCCESS;
@ -585,7 +623,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jlong con,
jboolean restart, jstring jtopic, jstring jsql, jint jinterval) {
jboolean restart, jstring jtopic,
jstring jsql, jint jinterval) {
jlong sub = 0;
TAOS *taos = (TAOS *)con;
char *topic = NULL;
@ -682,8 +721,8 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getTsCharset(J
* @param res the TAOS_RES object, i.e. the SSqlObject
* @return precision 0:ms 1:us 2:ns
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultTimePrecisionImp(JNIEnv *env, jobject jobj, jlong con,
jlong res) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultTimePrecisionImp(JNIEnv *env, jobject jobj,
jlong con, jlong res) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection is closed", jobj);
@ -699,7 +738,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultTimePrec
return taos_result_precision(result);
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(JNIEnv *env, jobject jobj, jbyteArray jsql, jlong con) {
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(JNIEnv *env, jobject jobj,
jbyteArray jsql, jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
@ -713,7 +753,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(J
jsize len = (*env)->GetArrayLength(env, jsql);
char *str = (char *) calloc(1, sizeof(char) * (len + 1));
char *str = (char *)calloc(1, sizeof(char) * (len + 1));
if (str == NULL) {
jniError("jobj:%p, conn:%p, alloc memory failed", jobj, tscon);
return JNI_OUT_OF_MEMORY;
@ -724,25 +764,27 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(J
// todo handle error
}
TAOS_STMT* pStmt = taos_stmt_init(tscon);
int32_t code = taos_stmt_prepare(pStmt, str, len);
TAOS_STMT *pStmt = taos_stmt_init(tscon);
int32_t code = taos_stmt_prepare(pStmt, str, len);
tfree(str);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return (jlong) pStmt;
return (jlong)pStmt;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp(JNIEnv *env, jobject jobj, jlong stmt, jstring jname, jlong conn) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp(JNIEnv *env, jobject jobj,
jlong stmt, jstring jname,
jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
return JNI_SQL_NULL;
@ -750,7 +792,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
const char *name = (*env)->GetStringUTFChars(env, jname, NULL);
int32_t code = taos_stmt_set_tbname((void*)stmt, name);
int32_t code = taos_stmt_set_tbname((void *)stmt, name);
if (code != TSDB_CODE_SUCCESS) {
(*env)->ReleaseStringUTFChars(env, jname, name);
@ -763,8 +805,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
return JNI_SUCCESS;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(JNIEnv *env, jobject jobj, jlong stmt,
jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList, jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) {
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
JNIEnv *env, jobject jobj, jlong stmt, jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList,
jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
@ -798,14 +841,14 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
}
// bind multi-rows with only one invoke.
TAOS_MULTI_BIND* b = calloc(1, sizeof(TAOS_MULTI_BIND));
TAOS_MULTI_BIND *b = calloc(1, sizeof(TAOS_MULTI_BIND));
b->num = numOfRows;
b->buffer_type = dataType; // todo check data type
b->buffer_length = IS_VAR_DATA_TYPE(dataType)? dataBytes:tDataTypes[dataType].bytes;
b->is_null = nullArray;
b->buffer = colBuf;
b->length = (int32_t*)lengthArray;
b->num = numOfRows;
b->buffer_type = dataType; // todo check data type
b->buffer_length = IS_VAR_DATA_TYPE(dataType) ? dataBytes : tDataTypes[dataType].bytes;
b->is_null = nullArray;
b->buffer = colBuf;
b->length = (int32_t *)lengthArray;
// set the length and is_null array
if (!IS_VAR_DATA_TYPE(dataType)) {
@ -829,14 +872,15 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT *pStmt = (TAOS_STMT*) stmt;
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return JNI_SQL_NULL;
@ -853,14 +897,15 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT *pStmt = (TAOS_STMT*) stmt;
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return JNI_SQL_NULL;
@ -876,15 +921,16 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(JNIEnv *env, jobject jobj,
jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, jbyteArray lengthList, jbyteArray nullList, jlong conn) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(
JNIEnv *env, jobject jobj, jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList,
jbyteArray lengthList, jbyteArray nullList, jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
return JNI_SQL_NULL;
@ -898,39 +944,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t*) calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
int64_t *lengthArray = (int64_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, typeList);
char *typeArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte*) typeArray);
char *typeArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte *)typeArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
int32_t *nullArray = (int32_t*) calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
int32_t *nullArray = (int32_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
if ((*env)->ExceptionCheck(env)) {
}
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
char* curTags = tagsData;
char * curTags = tagsData;
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
for(int32_t i = 0; i < numOfTags; ++i) {
for (int32_t i = 0; i < numOfTags; ++i) {
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t*) &lengthArray[i];
tagsBind[i].length = (uintptr_t *)&lengthArray[i];
curTags += lengthArray[i];
}
int32_t code = taos_stmt_set_tbname_tags((void*)stmt, name, tagsBind);
int32_t code = taos_stmt_set_tbname_tags((void *)stmt, name, tagsBind);
int32_t nTags = (int32_t) numOfTags;
int32_t nTags = (int32_t)numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
@ -948,28 +994,28 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj,
jobjectArray lines, jlong conn) {
jobjectArray lines, jlong conn) {
TAOS *taos = (TAOS *)conn;
if (taos == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
int numLines = (*env)->GetArrayLength(env, lines);
char** c_lines = calloc(numLines, sizeof(char*));
int numLines = (*env)->GetArrayLength(env, lines);
char **c_lines = calloc(numLines, sizeof(char *));
if (c_lines == NULL) {
jniError("c_lines:%p, alloc memory failed", c_lines);
return JNI_OUT_OF_MEMORY;
}
for (int i = 0; i < numLines; ++i) {
jstring line = (jstring) ((*env)->GetObjectArrayElement(env, lines, i));
c_lines[i] = (char*)(*env)->GetStringUTFChars(env, line, 0);
jstring line = (jstring)((*env)->GetObjectArrayElement(env, lines, i));
c_lines[i] = (char *)(*env)->GetStringUTFChars(env, line, 0);
}
int code = taos_insert_lines(taos, c_lines, numLines);
for (int i = 0; i < numLines; ++i) {
jstring line = (jstring) ((*env)->GetObjectArrayElement(env, lines, i));
jstring line = (jstring)((*env)->GetObjectArrayElement(env, lines, i));
(*env)->ReleaseStringUTFChars(env, line, c_lines[i]);
}

View File

@ -448,6 +448,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
int dcol = 0;
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
@ -458,13 +459,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++;
rcol++;
} else if (pRowCol->colId < pDataCol->colId) {
rcol++;
} else {
if(forceSetNull) {
if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
dcol++;
@ -482,6 +484,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
int nRowCols = kvRowNCols(row);
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
@ -493,13 +496,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
++dcol;
++rcol;
} else if (colIdx->colId < pDataCol->colId) {
++rcol;
} else {
if (forceSetNull) {
if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
++dcol;
@ -518,7 +522,6 @@ void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, b
}
}
//TODO: refactor this function to eliminate additional memory copy
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfCols == source->numOfCols);
@ -534,7 +537,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0) {
if (source->cols[j].len > 0 || target->cols[j].len > 0) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
target->maxPoints);
}
@ -578,7 +581,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0) {
if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
}
@ -596,6 +599,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
} else if(target->cols[i].len > 0) {
dataColSetNullAt(&target->cols[i], target->numOfRows);
}
}
target->numOfRows++;

View File

@ -32,6 +32,7 @@ import java.util.List;
import com.taosdata.jdbc.utils.NullType;
public class TSDBResultSetBlockData {
private static final int BINARY_LENGTH_OFFSET = 2;
private int numOfRows = 0;
private int rowIndex = 0;
@ -404,10 +405,8 @@ public class TSDBResultSetBlockData {
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
ByteBuffer bb = (ByteBuffer) this.colData.get(col);
bb.position(fieldSize * this.rowIndex);
bb.position((fieldSize + BINARY_LENGTH_OFFSET) * this.rowIndex);
int length = bb.getShort();
byte[] dest = new byte[length];
bb.get(dest, 0, length);
if (NullType.isBinaryNull(dest, length)) {
@ -419,16 +418,13 @@ public class TSDBResultSetBlockData {
case TSDBConstants.TSDB_DATA_TYPE_NCHAR: {
ByteBuffer bb = (ByteBuffer) this.colData.get(col);
bb.position(fieldSize * this.rowIndex);
bb.position((fieldSize + BINARY_LENGTH_OFFSET) * this.rowIndex);
int length = bb.getShort();
byte[] dest = new byte[length];
bb.get(dest, 0, length);
if (NullType.isNcharNull(dest, length)) {
return null;
}
try {
String charset = TaosGlobalConfig.getCharset();
return new String(dest, charset);

View File

@ -586,6 +586,130 @@ public class TSDBPreparedStatementTest {
Assert.assertEquals(numOfRows, rows);
}
@Test
public void bindDataQueryTest() throws SQLException {
Statement stmt = conn.createStatement();
stmt.execute("drop table if exists weather_test");
stmt.execute("create table weather_test(ts timestamp, f1 nchar(10), f2 binary(10)) tags (t1 int, t2 binary(10))");
int numOfRows = 1;
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags(?,?) (ts, f2) values(?, ?)");
s.setTableName("w2");
s.setTagInt(0, 1);
s.setTagString(1, "test");
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
ArrayList<String> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
s2.add("test" + i % 4);
}
s.setString(1, s2, 10);
s.columnDataAddBatch();
s.columnDataExecuteBatch();
s.columnDataCloseBatch();
String sql = "select * from weather_test where t1 >= ? and t1 <= ?";
TSDBPreparedStatement s1 = (TSDBPreparedStatement) conn.prepareStatement(sql);
s1.setInt(1, 0);
s1.setInt(2, 10);
ResultSet rs = s1.executeQuery();
int rows = 0;
while (rs.next()) {
rows++;
}
Assert.assertEquals(numOfRows, rows);
}
@Test
public void setTagNullTest()throws SQLException {
Statement stmt = conn.createStatement();
stmt.execute("drop table if exists weather_test");
stmt.execute("create table weather_test(ts timestamp, c1 int) tags (t1 tinyint, t2 smallint, t3 int, t4 bigint, t5 float, t6 double, t7 bool, t8 binary(10), t9 nchar(10))");
int numOfRows = 1;
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags(?,?,?,?,?,?,?,?,?) values(?, ?)");
s.setTableName("w3");
s.setTagNull(0, TSDBConstants.TSDB_DATA_TYPE_TINYINT);
s.setTagNull(1, TSDBConstants.TSDB_DATA_TYPE_SMALLINT);
s.setTagNull(2, TSDBConstants.TSDB_DATA_TYPE_INT);
s.setTagNull(3, TSDBConstants.TSDB_DATA_TYPE_BIGINT);
s.setTagNull(4, TSDBConstants.TSDB_DATA_TYPE_FLOAT);
s.setTagNull(5, TSDBConstants.TSDB_DATA_TYPE_DOUBLE);
s.setTagNull(6, TSDBConstants.TSDB_DATA_TYPE_BOOL);
s.setTagNull(7, TSDBConstants.TSDB_DATA_TYPE_BINARY);
s.setTagNull(8, TSDBConstants.TSDB_DATA_TYPE_NCHAR);
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
ArrayList<Integer> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
s2.add(i);
}
s.setInt(1, s2);
s.columnDataAddBatch();
s.columnDataExecuteBatch();
s.columnDataCloseBatch();
}
private String stringGenerator(int length) {
String source = "abcdefghijklmnopqrstuvwxyz";
StringBuilder sb = new StringBuilder();
Random rand = new Random();
for(int i = 0; i < length; i++) {
sb.append(source.charAt(rand.nextInt(26)));
}
return sb.toString();
}
@Test(expected = SQLException.class)
public void setMaxTableNameTest()throws SQLException {
Statement stmt = conn.createStatement();
stmt.execute("drop table if exists weather_test");
stmt.execute("create table weather_test(ts timestamp, c1 int) tags (t1 int)");
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags(?) values(?, ?)");
String tbname = stringGenerator(193);
s.setTableName(tbname);
s.setTagInt(0, 1);
int numOfRows = 1;
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
ArrayList<Integer> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
s2.add(i);
}
s.setInt(1, s2);
s.columnDataAddBatch();
s.columnDataExecuteBatch();
s.columnDataCloseBatch();
}
@Test(expected = SQLException.class)
public void createTwoSameDbTest() throws SQLException {
// when

View File

@ -102,9 +102,7 @@ _libtaos.taos_get_client_info.restype = c_char_p
def taos_get_client_info():
# type: () -> str
"""Get client version info.
获取客户端版本信息
"""
"""Get client version info."""
return _libtaos.taos_get_client_info().decode()
@ -114,6 +112,7 @@ _libtaos.taos_get_server_info.argtypes = (c_void_p,)
def taos_get_server_info(connection):
# type: (c_void_p) -> str
"""Get server version as string."""
return _libtaos.taos_get_server_info(connection).decode()
@ -134,11 +133,10 @@ _libtaos.taos_connect.argtypes = c_char_p, c_char_p, c_char_p, c_char_p, c_uint1
def taos_connect(host=None, user="root", password="taosdata", db=None, port=0):
# type: (None|str, str, str, None|str, int) -> c_void_p
"""Create TDengine database connection.
创建数据库连接初始化连接上下文其中需要用户提供的参数包含
- host: server hostname/FQDN, TDengine管理主节点的FQDN
- user: user name/用户名
- password: user password / 用户密码
- host: server hostname/FQDN
- user: user name
- password: user password
- db: database name (optional)
- port: server port
@ -187,11 +185,10 @@ _libtaos.taos_connect_auth.argtypes = c_char_p, c_char_p, c_char_p, c_char_p, c_
def taos_connect_auth(host=None, user="root", auth="", db=None, port=0):
# type: (None|str, str, str, None|str, int) -> c_void_p
"""
创建数据库连接初始化连接上下文其中需要用户提供的参数包含
"""Connect server with auth token.
- host: server hostname/FQDN, TDengine管理主节点的FQDN
- user: user name/用户名
- host: server hostname/FQDN
- user: user name
- auth: base64 encoded auth token
- db: database name (optional)
- port: server port

View File

@ -105,7 +105,7 @@ extern char configDir[];
#define DEFAULT_TIMESTAMP_STEP 1
#define DEFAULT_INTERLACE_ROWS 0
#define DEFAULT_DATATYPE_NUM 3
#define DEFAULT_DATATYPE_NUM 1
#define DEFAULT_CHILDTABLES 10000
@ -291,7 +291,6 @@ typedef struct SSuperTable_S {
uint64_t lenOfTagOfOneRow;
char* sampleDataBuf;
char* sampleBindArray;
//int sampleRowCount;
//int sampleUsePos;
@ -438,7 +437,8 @@ typedef struct SQueryMetaInfo_S {
typedef struct SThreadInfo_S {
TAOS * taos;
TAOS_STMT *stmt;
int64_t *bind_ts;
char* sampleBindArray;
int64_t *bind_ts;
int threadID;
char db_name[TSDB_DB_NAME_LEN];
uint32_t time_precision;
@ -754,12 +754,11 @@ static void printHelp() {
"Set the replica parameters of the database, Default 1, min: 1, max: 3.");
printf("%s%s%s%s\n", indent, "-m, --table-prefix=TABLEPREFIX", "\t",
"Table prefix name. Default is 'd'.");
printf("%s%s%s%s\n", indent, "-s, --sql-file=FILE", "\t\t", "The select sql file.");
printf("%s%s%s%s\n", indent, "-s, --sql-file=FILE", "\t\t",
"The select sql file.");
printf("%s%s%s%s\n", indent, "-N, --normal-table", "\t\t", "Use normal table flag.");
printf("%s%s%s%s\n", indent, "-o, --output=FILE", "\t\t",
"Direct output to the named file. Default is './output.txt'.");
printf("%s%s%s%s\n", indent, "-s, --sql-file=FILE", "\t\t",
"The select sql file.");
printf("%s%s%s%s\n", indent, "-q, --query-mode=MODE", "\t\t",
"Query mode -- 0: SYNC, 1: ASYNC. Default is SYNC.");
printf("%s%s%s%s\n", indent, "-b, --data-type=DATATYPE", "\t",
@ -831,6 +830,12 @@ static bool isStringNumber(char *input)
return true;
}
static void errorWrongValue(char *program, char *wrong_arg, char *wrong_value)
{
fprintf(stderr, "%s %s: %s is an invalid value\n", program, wrong_arg, wrong_value);
fprintf(stderr, "Try `taosdemo --help' or `taosdemo --usage' for more information.\n");
}
static void errorUnreconized(char *program, char *wrong_arg)
{
fprintf(stderr, "%s: unrecognized options '%s'\n", program, wrong_arg);
@ -900,7 +905,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
tstrncpy(configDir, argv[++i], TSDB_FILENAME_LEN);
} else if (0 == strncmp(argv[i], "-c", strlen("-c"))) {
tstrncpy(configDir, (char *)(argv[i] + strlen("-")), TSDB_FILENAME_LEN);
tstrncpy(configDir, (char *)(argv[i] + strlen("-c")), TSDB_FILENAME_LEN);
} else if (strlen("--config-dir") == strlen(argv[i])) {
if (argc == i+1) {
errorPrintReqArg3(argv[0], "--config-dir");
@ -983,7 +988,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} else if (0 == strcasecmp(argv[i+1], "stmt")) {
arguments->iface = STMT_IFACE;
} else {
errorPrintReqArg(argv[0], "I");
errorWrongValue(argv[0], "-I", argv[i+1]);
exit(EXIT_FAILURE);
}
i++;
@ -1006,7 +1011,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} else if (0 == strcasecmp((char *)(argv[i] + strlen("-I")), "stmt")) {
arguments->iface = STMT_IFACE;
} else {
errorPrintReqArg3(argv[0], "-I");
errorWrongValue(argv[0], "-I",
(char *)(argv[i] + strlen("-I")));
exit(EXIT_FAILURE);
}
} else if (strlen("--interface") == strlen(argv[i])) {
@ -1021,7 +1027,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} else if (0 == strcasecmp(argv[i+1], "stmt")) {
arguments->iface = STMT_IFACE;
} else {
errorPrintReqArg3(argv[0], "--interface");
errorWrongValue(argv[0], "--interface", argv[i+1]);
exit(EXIT_FAILURE);
}
i++;
@ -1094,9 +1100,9 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
arguments->sqlFile = argv[++i];
} else if (0 == strncmp(argv[i], "--sql-file=", strlen("--sql-file="))) {
arguments->host = (char *)(argv[i++] + strlen("--sql-file="));
arguments->sqlFile = (char *)(argv[i++] + strlen("--sql-file="));
} else if (0 == strncmp(argv[i], "-s", strlen("-s"))) {
arguments->host = (char *)(argv[i++] + strlen("-s"));
arguments->sqlFile = (char *)(argv[i++] + strlen("-s"));
} else if (strlen("--sql-file") == strlen(argv[i])) {
if (argc == i+1) {
errorPrintReqArg3(argv[0], "--sql-file");
@ -1644,6 +1650,54 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->debug_print = true;
} else if (strcmp(argv[i], "-gg") == 0) {
arguments->verbose_print = true;
} else if ((0 == strncmp(argv[i], "-R", strlen("-R")))
|| (0 == strncmp(argv[i], "--disorder-range",
strlen("--disorder-range")))) {
if (strlen("-R") == strlen(argv[i])) {
if (argc == i+1) {
errorPrintReqArg(argv[0], "R");
exit(EXIT_FAILURE);
} else if (!isStringNumber(argv[i+1])) {
errorPrintReqArg2(argv[0], "R");
exit(EXIT_FAILURE);
}
arguments->disorderRange = atoi(argv[++i]);
} else if (0 == strncmp(argv[i], "--disorder-range=",
strlen("--disorder-range="))) {
if (isStringNumber((char *)(argv[i] + strlen("--disorder-range=")))) {
arguments->disorderRange =
atoi((char *)(argv[i]+strlen("--disorder-range=")));
} else {
errorPrintReqArg2(argv[0], "--disorder-range");
exit(EXIT_FAILURE);
}
} else if (0 == strncmp(argv[i], "-R", strlen("-R"))) {
if (isStringNumber((char *)(argv[i] + strlen("-R")))) {
arguments->disorderRange =
atoi((char *)(argv[i]+strlen("-R")));
} else {
errorPrintReqArg2(argv[0], "-R");
exit(EXIT_FAILURE);
}
if (arguments->disorderRange < 0) {
errorPrint("Invalid disorder range %d, will be set to %d\n",
arguments->disorderRange, 1000);
arguments->disorderRange = 1000;
}
} else if (strlen("--disorder-range") == strlen(argv[i])) {
if (argc == i+1) {
errorPrintReqArg3(argv[0], "--disorder-range");
exit(EXIT_FAILURE);
} else if (!isStringNumber(argv[i+1])) {
errorPrintReqArg2(argv[0], "--disorder-range");
exit(EXIT_FAILURE);
}
arguments->disorderRange = atoi(argv[++i]);
} else {
errorUnreconized(argv[0], argv[i]);
exit(EXIT_FAILURE);
}
} else if ((0 == strncmp(argv[i], "-O", strlen("-O")))
|| (0 == strncmp(argv[i], "--disorder", strlen("--disorder")))) {
if (2 == strlen(argv[i])) {
@ -1694,54 +1748,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->disorderRatio, 0);
arguments->disorderRatio = 0;
}
} else if ((0 == strncmp(argv[i], "-R", strlen("-R")))
|| (0 == strncmp(argv[i], "--disorder-range",
strlen("--disorder-range")))) {
if (2 == strlen(argv[i])) {
if (argc == i+1) {
errorPrintReqArg(argv[0], "R");
exit(EXIT_FAILURE);
} else if (!isStringNumber(argv[i+1])) {
errorPrintReqArg2(argv[0], "R");
exit(EXIT_FAILURE);
}
arguments->disorderRange = atoi(argv[++i]);
} else if (0 == strncmp(argv[i], "--disorder-range=",
strlen("--disorder-range="))) {
if (isStringNumber((char *)(argv[i] + strlen("--disorder-range=")))) {
arguments->disorderRange =
atoi((char *)(argv[i]+strlen("--disorder-rnage=")));
} else {
errorPrintReqArg2(argv[0], "--disorder-range");
exit(EXIT_FAILURE);
}
} else if (0 == strncmp(argv[i], "-R", strlen("-R"))) {
if (isStringNumber((char *)(argv[i] + strlen("-R")))) {
arguments->disorderRange =
atoi((char *)(argv[i]+strlen("-R")));
} else {
errorPrintReqArg2(argv[0], "-R");
exit(EXIT_FAILURE);
}
if (arguments->disorderRange < 0) {
errorPrint("Invalid disorder range %d, will be set to %d\n",
arguments->disorderRange, 1000);
arguments->disorderRange = 1000;
}
} else if (strlen("--disorder-range") == strlen(argv[i])) {
if (argc == i+1) {
errorPrintReqArg3(argv[0], "--disorder-range");
exit(EXIT_FAILURE);
} else if (!isStringNumber(argv[i+1])) {
errorPrintReqArg2(argv[0], "--disorder-range");
exit(EXIT_FAILURE);
}
arguments->disorderRange = atoi(argv[++i]);
} else {
errorUnreconized(argv[0], argv[i]);
exit(EXIT_FAILURE);
}
} else if ((0 == strncmp(argv[i], "-a", strlen("-a")))
|| (0 == strncmp(argv[i], "--replica",
strlen("--replica")))) {
@ -5738,20 +5744,6 @@ static void postFreeResource() {
free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL;
}
if (g_Dbs.db[i].superTbls[j].sampleBindArray) {
for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) {
uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)(
g_Dbs.db[i].superTbls[j].sampleBindArray
+ sizeof(uintptr_t *) * k));
for (int c = 1; c < g_Dbs.db[i].superTbls[j].columnCount + 1; c++) {
TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c));
if (bind)
tmfree(bind->buffer);
}
tmfree((char *)tmp);
}
}
tmfree((char *)g_Dbs.db[i].superTbls[j].sampleBindArray);
if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) {
free(g_Dbs.db[i].superTbls[j].tagDataBuf);
@ -6085,9 +6077,6 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
int32_t affectedRows;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer);
uint16_t iface;
if (stbInfo)
iface = stbInfo->iface;
@ -6105,12 +6094,18 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
switch(iface) {
case TAOSC_IFACE:
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer);
affectedRows = queryDbExec(
pThreadInfo->taos,
pThreadInfo->buffer, INSERT_TYPE, false);
break;
case REST_IFACE:
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer);
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
pThreadInfo->buffer, pThreadInfo)) {
affectedRows = -1;
@ -7088,12 +7083,12 @@ static int32_t prepareStbStmtBindRand(
return 0;
}
static int32_t prepareStbStmtBindWithSample(
static int32_t prepareStbStmtBindStartTime(
char *tableName,
int64_t *ts,
char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq,
int32_t timePrec,
int64_t samplePos)
int32_t timePrec)
{
TAOS_BIND *bind;
@ -7110,6 +7105,10 @@ static int32_t prepareStbStmtBindWithSample(
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
verbosePrint("%s() LN%d, tableName: %s, bind_ts=%"PRId64"\n",
__func__, __LINE__, tableName, *bind_ts);
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
@ -7118,7 +7117,7 @@ static int32_t prepareStbStmtBindWithSample(
return 0;
}
static int32_t prepareStbStmtRand(
UNUSED_FUNC static int32_t prepareStbStmtRand(
threadInfo *pThreadInfo,
char *tableName,
int64_t tableSeq,
@ -7299,14 +7298,14 @@ static int32_t prepareStbStmtWithSample(
uint32_t k;
for (k = 0; k < batch;) {
char *bindArray = (char *)(*((uintptr_t *)
(stbInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos))));
(pThreadInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos))));
/* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBindWithSample(
if (-1 == prepareStbStmtBindStartTime(
tableName,
pThreadInfo->bind_ts,
bindArray, stbInfo,
startTime, k,
pThreadInfo->time_precision,
*pSamplePos
pThreadInfo->time_precision
/* is column */)) {
return -1;
}
@ -7427,8 +7426,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t nTimeStampStep;
uint64_t insert_interval;
bool sourceRand;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
if (stbInfo) {
@ -7443,18 +7440,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
maxSqlLen = stbInfo->maxSqlLen;
nTimeStampStep = stbInfo->timeStampStep;
insert_interval = stbInfo->insertInterval;
if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
} else {
insertRows = g_args.num_of_DPT;
interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len;
nTimeStampStep = g_args.timestamp_step;
insert_interval = g_args.insert_interval;
sourceRand = true;
}
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
@ -7539,25 +7530,14 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int32_t generated;
if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) {
if (sourceRand) {
generated = prepareStbStmtRand(
pThreadInfo,
tableName,
tableSeq,
batchPerTbl,
insertRows, 0,
startTime
);
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
batchPerTbl,
insertRows, 0,
startTime,
&(pThreadInfo->samplePos));
}
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
batchPerTbl,
insertRows, 0,
startTime,
&(pThreadInfo->samplePos));
} else {
generated = generateStbInterlaceData(
pThreadInfo,
@ -7747,17 +7727,6 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
bool sourceRand;
if (stbInfo) {
if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
} else {
sourceRand = true;
}
pThreadInfo->samplePos = 0;
int percentComplete = 0;
@ -7796,32 +7765,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int32_t generated;
if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) {
if (sourceRand) {
/* generated = prepareStbStmtRand(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows,
i, start_time
);
*/
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
}
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
} else {
generated = generateStbProgressiveData(
stbInfo,
@ -7849,6 +7799,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
&remainderBufLen);
}
}
verbosePrint("[%d] %s() LN%d generated=%d\n",
pThreadInfo->threadID,
__func__, __LINE__, generated);
if (generated > 0)
i += generated;
else
@ -8059,17 +8014,22 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
return 0;
}
static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
static int parseSampleFileToStmt(
threadInfo *pThreadInfo,
SSuperTable *stbInfo, uint32_t timePrec)
{
stbInfo->sampleBindArray = calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
if (stbInfo->sampleBindArray == NULL) {
pThreadInfo->sampleBindArray =
calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
if (pThreadInfo->sampleBindArray == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n",
__func__, __LINE__, (uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
__func__, __LINE__,
(uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
return -1;
}
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
char *bindArray =
calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
@ -8122,7 +8082,8 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
free(bindBuffer);
}
}
*((uintptr_t *)(stbInfo->sampleBindArray + (sizeof(char *)) * i)) = (uintptr_t)bindArray;
*((uintptr_t *)(pThreadInfo->sampleBindArray + (sizeof(char *)) * i)) =
(uintptr_t)bindArray;
}
return 0;
@ -8312,10 +8273,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer);
if (stbInfo) {
parseSampleFileToStmt(stbInfo, timePrec);
}
}
for (int i = 0; i < threads; i++) {
@ -8348,7 +8305,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|| ((stbInfo)
&& (stbInfo->iface == STMT_IFACE))) {
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) {
free(pids);
@ -8370,6 +8326,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
exit(EXIT_FAILURE);
}
pThreadInfo->bind_ts = malloc(sizeof(int64_t));
if (stbInfo) {
parseSampleFileToStmt(pThreadInfo, stbInfo, timePrec);
}
}
} else {
pThreadInfo->taos = NULL;
@ -8420,6 +8380,21 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos);
if (pThreadInfo->sampleBindArray) {
for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) {
uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)(
pThreadInfo->sampleBindArray
+ sizeof(uintptr_t *) * k));
for (int c = 1; c < pThreadInfo->stbInfo->columnCount + 1; c++) {
TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c));
if (bind)
tmfree(bind->buffer);
}
tmfree((char *)tmp);
}
tmfree(pThreadInfo->sampleBindArray);
}
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__,
pThreadInfo->threadID, pThreadInfo->totalInsertRows,

View File

@ -272,26 +272,35 @@ void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) {
switch (timePrecision) {
case TSDB_TIME_PRECISION_MILLI: {
mod = ((t) % 1000 + 1000) % 1000;
if (t < 0 && mod != 0) {
t -= 1000;
}
quot = t / 1000;
fractionLen = 5;
format = ".%03" PRId64;
mod = t % 1000;
break;
}
case TSDB_TIME_PRECISION_MICRO: {
mod = ((t) % 1000000 + 1000000) % 1000000;
if (t < 0 && mod != 0) {
t -= 1000000;
}
quot = t / 1000000;
fractionLen = 8;
format = ".%06" PRId64;
mod = t % 1000000;
break;
}
case TSDB_TIME_PRECISION_NANO: {
mod = ((t) % 1000000000 + 1000000000) % 1000000000;
if (t < 0 && mod != 0) {
t -= 1000000000;
}
quot = t / 1000000000;
fractionLen = 11;
format = ".%09" PRId64;
mod = t % 1000000000;
break;
}
@ -319,26 +328,35 @@ void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) {
switch (timePrecision) {
case TSDB_TIME_PRECISION_MILLI: {
mod = ((t) % 1000 + 1000) % 1000;
if (t < 0 && mod != 0) {
t -= 1000;
}
quot = t / 1000;
fractionLen = 5;
format = ".%03" PRId64;
mod = t % 1000;
break;
}
case TSDB_TIME_PRECISION_MICRO: {
mod = ((t) % 1000000 + 1000000) % 1000000;
if (t < 0 && mod != 0) {
t -= 1000000;
}
quot = t / 1000000;
fractionLen = 8;
format = ".%06" PRId64;
mod = t % 1000000;
break;
}
case TSDB_TIME_PRECISION_NANO: {
mod = ((t) % 1000000000 + 1000000000) % 1000000000;
if (t < 0 && mod != 0) {
t -= 1000000000;
}
quot = t / 1000000000;
fractionLen = 11;
format = ".%09" PRId64;
mod = t % 1000000000;
break;
}

View File

@ -6388,6 +6388,19 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
return pInfo->binfo.pRes;
}
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SQueryRuntimeEnv* pRuntimeEnv, bool* newgroup) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
pInfo->existNewGroupBlock = NULL;
*newgroup = true;
}
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
*newgroup = false;
@ -6399,16 +6412,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRunt
// handle the cached new group data block
if (pInfo->existNewGroupBlock) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
pInfo->existNewGroupBlock = NULL;
*newgroup = true;
doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup);
}
}
@ -6427,26 +6431,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
return pInfo->pRes;
}
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
// *newgroup = false;
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
// return pInfo->pRes;
// }
//
// // handle the cached new group data block
// if (pInfo->existNewGroupBlock) {
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
//
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
//
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
// pInfo->existNewGroupBlock = NULL;
// *newgroup = true;
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
// }
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
@ -6493,46 +6477,13 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) {
return pInfo->pRes;
}
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
// *newgroup = false;
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
// return pInfo->pRes;
// }
//
// // handle the cached new group data block
// if (pInfo->existNewGroupBlock) {
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
//
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
//
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
// pInfo->existNewGroupBlock = NULL;
// *newgroup = true;
//
// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) {
// return pInfo->pRes;
// }
//
//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
// }
} else if (pInfo->existNewGroupBlock) { // try next group
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
assert(pBlock != NULL);
doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup);
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
pInfo->existNewGroupBlock = NULL;
*newgroup = true;
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) {
return pInfo->pRes;
}
} else {
return NULL;
}

View File

@ -1418,13 +1418,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
bool isRowDel = false;
SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
if (row == NULL || memRowKey(row) > maxKey) {
key2 = INT64_MAX;
} else {
key2 = memRowKey(row);
isRowDel = memRowDeleted(row);
}
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
@ -1439,36 +1437,33 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
pTarget->numOfRows++;
(*iter)++;
} else if (key1 > key2) {
if (!isRowDel) {
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
tSkipListIterNext(pCommitIter->pIter);
} else {
if (update) {
if (!isRowDel) {
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
} else {
ASSERT(!isRowDel);
if (update != TD_ROW_OVERWRITE_UPDATE) {
//copy disk data
for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
pTarget->numOfRows++;
if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
}
if (update != TD_ROW_DISCARD_UPDATE) {
//copy mem data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
(*iter)++;
tSkipListIterNext(pCommitIter->pIter);

View File

@ -488,7 +488,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
return -1;
}
if (listen(sockFd, 10) < 0) {
if (listen(sockFd, 1024) < 0) {
uError("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;

View File

@ -0,0 +1,125 @@
const taos = require('td2.0-connector');
var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:0})
var c1 = conn.cursor(); // Initializing a new cursor
let stime = new Date();
let interval = 1000;
function convertDateToTS(date) {
let tsArr = date.toISOString().split("T")
return "\"" + tsArr[0] + " " + tsArr[1].substring(0, tsArr[1].length - 1) + "\"";
}
function R(l, r) {
return Math.random() * (r - l) - r;
}
function randomBool() {
if (Math.random() < 0.5) {
return true;
}
return false;
}
// Initialize
const dbname = "nodejs_1970_db";
const tbname = "t1";
let dropDB = "drop database if exists " + dbname
console.log(dropDB);//asdasdasd
c1.execute(dropDB);///asdasd
let createDB = "create database " + dbname + " keep 36500"
console.log(createDB);
c1.execute(createDB);
let useTbl = "use " + dbname
console.log(useTbl)
c1.execute(useTbl);
let createTbl = "create table if not exists " + tbname + "(ts timestamp,id int)"
console.log(createTbl);
c1.execute(createTbl);
//1969-12-31 23:59:59.999
//1970-01-01 00:00:00.000
//1970-01-01 07:59:59.999
//1970-01-01 08:00:00.000a
//1628928479484 2021-08-14 08:07:59.484
let sql1 = "insert into " + dbname + "." + tbname + " values('1969-12-31 23:59:59.999',1)"
console.log(sql1);
c1.execute(sql1);
let sql2 = "insert into " + dbname + "." + tbname + " values('1970-01-01 00:00:00.000',2)"
console.log(sql2);
c1.execute(sql2);
let sql3 = "insert into " + dbname + "." + tbname + " values('1970-01-01 07:59:59.999',3)"
console.log(sql3);
c1.execute(sql3);
let sql4 = "insert into " + dbname + "." + tbname + " values('1970-01-01 08:00:00.000',4)"
console.log(sql4);
c1.execute(sql4);
let sql5 = "insert into " + dbname + "." + tbname + " values('2021-08-14 08:07:59.484',5)"
console.log(sql5);
c1.execute(sql5);
// Select
let query1 = "select * from " + dbname + "." + tbname
console.log(query1);
c1.execute(query1);
var d = c1.fetchall();
console.log(c1.fields);
for (let i = 0; i < d.length; i++)
console.log(d[i][0].valueOf());
//initialize
let initSql1 = "drop table if exists " + tbname
console.log(initSql1);
c1.execute(initSql1);
console.log(createTbl);
c1.execute(createTbl);
c1.execute(useTbl)
//-28800001 1969-12-31 23:59:59.999
//-28800000 1970-01-01 00:00:00.000
//-1 1970-01-01 07:59:59.999
//0 1970-01-01 08:00:00.00
//1628928479484 2021-08-14 08:07:59.484
let sql11 = "insert into " + dbname + "." + tbname + " values(-28800001,11)";
console.log(sql11);
c1.execute(sql11);
let sql12 = "insert into " + dbname + "." + tbname + " values(-28800000,12)"
console.log(sql12);
c1.execute(sql12);
let sql13 = "insert into " + dbname + "." + tbname + " values(-1,13)"
console.log(sql13);
c1.execute(sql13);
let sql14 = "insert into " + dbname + "." + tbname + " values(0,14)"
console.log(sql14);
c1.execute(sql14);
let sql15 = "insert into " + dbname + "." + tbname + " values(1628928479484,15)"
console.log(sql15);
c1.execute(sql15);
// Select
console.log(query1);
c1.execute(query1);
var d = c1.fetchall();
console.log(c1.fields);
for (let i = 0; i < d.length; i++)
console.log(d[i][0].valueOf());
setTimeout(function () {
conn.close();
}, 2000);

View File

@ -390,7 +390,7 @@ python3 ./test.py -f alter/alterColMultiTimes.py
python3 ./test.py -f query/queryWildcardLength.py
python3 ./test.py -f query/queryTbnameUpperLower.py
python3 ./test.py -f query/query.py
python3 ./test.py -f query/queryDiffColsOr.py
#======================p4-end===============

View File

@ -10,13 +10,10 @@
###################################################################
# -*- coding: utf-8 -*-
from copy import deepcopy
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
from util.common import tdCom
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
@ -409,6 +406,62 @@ class TDTestCase:
tdSql.checkRows(10)
tdSql.checkEqual(int(res[9][0]), 10)
def queryMultiTbWithTag(self, tb_name):
# tags (1, 1, 1, 3, 1.1, 1.1, "binary", "nchar", true, 1)')
tdSql.execute(
f'CREATE TABLE {tb_name}_sub2 using {tb_name} tags (2, 2, 2, 4, 2.2, 2.2, "binary2", "nchar2", true, 12)')
tdSql.execute(
f'CREATE TABLE {tb_name}_sub3 using {tb_name} tags (3, 3, 3, 3, 3.3, 3.3, "binary3", "nchar3", true, 13)')
tdSql.execute(
f'insert into {tb_name}_sub2 values ("2021-01-25 12:00:00", 2, 2, 2, 4, 2.2, 2.2, "binary2", "nchar2", true, 12)')
tdSql.execute(
f'insert into {tb_name}_sub3 values ("2021-01-27 12:00:00", 3, 3, 3, 3, 3.3, 3.3, "binary3", "nchar3", true, 13)')
## select count avg sum from (condition_A or condition_B and like and in) where condition_A or condition_B or condition_tag_C or condition_tag_D or like and in interval
query_sql = f'select count(*), avg(c6), sum(c3) from (select * from {tb_name} where c1 >1 or c2 = 2 and c7 like "binar_" and c4 in (3, 5)) where c1 != 2 or c3 = 1 or t1=2 or t1=3 or c8 like "ncha_" and c9 in (true) interval(8d)'
res = tdSql.query(query_sql, True)
tdSql.checkRows(3)
tdSql.checkEqual(int(res[0][1]), 3)
tdSql.checkEqual(int(res[0][2]), 1)
tdSql.checkEqual(int(res[0][3]), 10)
tdSql.checkEqual(int(res[1][1]), 3)
tdSql.checkEqual(int(res[1][2]), 3)
tdSql.checkEqual(int(res[1][3]), 3)
tdSql.checkEqual(int(res[2][1]), 3)
tdSql.checkEqual(int(res[2][2]), 2)
tdSql.checkEqual(int(res[2][3]), 6)
# ! to confirm
## select count avg sum from (condition_A or condition_B or condition_tag_C or condition_tag_D and like and in) where condition_A or condition_B or like and in interval
# query_sql = f'select count(*), avg(c6), sum(c3) from (select * from {tb_name} where t1 = 3 and t1 = 2 or c1 >1 or c2 = 2 and c7 like "binar_" and c4 in (3, 5)) where c1 != 2 or c3 = 1 or c8 like "ncha_" and c9 in (true) interval(8d)'
# res = tdSql.query(query_sql, True)
# tdSql.checkRows(3)
# tdSql.checkEqual(int(res[0][1]), 3)
# tdSql.checkEqual(int(res[0][2]), 1)
# tdSql.checkEqual(int(res[0][3]), 10)
# tdSql.checkEqual(int(res[1][1]), 3)
# tdSql.checkEqual(int(res[1][2]), 3)
# tdSql.checkEqual(int(res[1][3]), 3)
# tdSql.checkEqual(int(res[2][1]), 3)
# tdSql.checkEqual(int(res[2][2]), 2)
# tdSql.checkEqual(int(res[2][3]), 6)
## select count avg sum from (condition_A and condition_B and and line and in and ts and condition_tag_A and condition_tag_B and between) where condition_C orr condition_D or condition_tag_C or condition_tag_D or like and in interval
query_sql = f'select count(*), avg(c6), sum(c3) from (select * from {tb_name} where c1 >= 1 and c2 = 2 and c7 like "binar_" and c4 in (3, 5) and ts > "2021-01-11 12:00:00" and t1 < 2 and t1 > 0 and c6 between 0 and 7) where c1 != 2 or c3 = 1 or t1=2 or t1=3 or c8 like "ncha_" and c9 in (true) interval(8d)'
res = tdSql.query(query_sql, True)
tdSql.checkRows(2)
tdSql.checkEqual(int(res[0][1]), 2)
tdSql.checkEqual(int(res[0][2]), 1)
tdSql.checkEqual(int(res[0][3]), 2)
tdSql.checkEqual(int(res[1][1]), 1)
tdSql.checkEqual(int(res[1][2]), 1)
tdSql.checkEqual(int(res[1][3]), 1)
# ! to confirm
#select * from (select * from pyclqtwi where c1 >1 or c2 = 2 and c7 like "binar_" and c4 in (3, 5) and ts > "2021-01-11 12:00:00") where c1 != 2 or c3 = 1 or t1=2 or t1=3 or c8 like "ncha_" and c9 in (true) ;
#DB error: invalid operation: invalid expression (0.008747s)
def checkTbColTypeOperator(self):
'''
Ordinary table full column type and operator
@ -492,33 +545,13 @@ class TDTestCase:
'''
tb_name = self.initStb()
self.queryMultiTb(tb_name)
# tb_name1 = tdCom.getLongName(8, "letters")
# tb_name2 = tdCom.getLongName(8, "letters")
# tb_name3 = tdCom.getLongName(8, "letters")
# tdSql.execute(
# f"CREATE TABLE {tb_name1} (ts timestamp, c1 tinyint, c2 smallint, c3 int)")
# tdSql.execute(
# f"CREATE TABLE {tb_name2} (ts timestamp, c1 tinyint, c2 smallint, c3 int)")
# tdSql.execute(
# f"CREATE TABLE {tb_name3} (ts timestamp, c1 tinyint, c2 smallint, c3 int)")
# insert_sql_list = [f'insert into {tb_name1} values ("2021-01-01 12:00:00", 1, 5, 1)',
# f'insert into {tb_name1} values ("2021-01-03 12:00:00", 2, 4, 1)',
# f'insert into {tb_name1} values ("2021-01-05 12:00:00", 3, 2, 1)',
# f'insert into {tb_name2} values ("2021-01-01 12:00:00", 4, 2, 1)',
# f'insert into {tb_name2} values ("2021-01-02 12:00:00", 5, 1, 1)',
# f'insert into {tb_name2} values ("2021-01-04 12:00:00", 1, 2, 1)',
# f'insert into {tb_name3} values ("2021-01-02 12:00:00", 4, 2, 1)',
# f'insert into {tb_name3} values ("2021-01-06 12:00:00", 5, 1, 1)',
# f'insert into {tb_name3} values ("2021-01-07 12:00:00", 1, 2, 1)',
# ]
# for sql in insert_sql_list:
# tdSql.execute(sql)
# tdSql.query(
# f'select * from {tb_name1} t1, {tb_name2}, {tb_name3} t3 t2 where (t1.ts=t2.ts or t2.ts=t3.ts)')
# tdSql.checkRows(4)
def checkMultiTbWithTag(self):
'''
test Multi tb with tag
'''
tb_name = self.initStb()
self.queryMultiTbWithTag(tb_name)
def run(self):
tdSql.prepare()
@ -534,7 +567,7 @@ class TDTestCase:
self.checkStbPreCal()
self.checkMultiTb()
self.checkMultiStb()
self.checkMultiTbWithTag()
def stop(self):
tdSql.close()

View File

@ -21,7 +21,15 @@ import shutil
import pandas as pd
from util.log import *
def _parse_datetime(timestr):
try:
return datetime.datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S.%f')
except ValueError:
pass
try:
return datetime.datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S')
except ValueError:
pass
class TDSql:
def __init__(self):
@ -181,7 +189,7 @@ class TDSql:
tdLog.info("sql:%s, row:%d col:%d data:%d == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data))
else:
if self.queryResult[row][col] == datetime.datetime.fromisoformat(data):
if self.queryResult[row][col] == _parse_datetime(data):
tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data))
return

View File

@ -0,0 +1,128 @@
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <pthread.h>
#define MAXLINE 1024
typedef struct {
pthread_t pid;
int threadId;
int rows;
int tables;
} ThreadObj;
void post(char *ip,int port,char *page,char *msg) {
int sockfd,n;
char recvline[MAXLINE];
struct sockaddr_in servaddr;
char content[4096];
char content_page[50];
sprintf(content_page,"POST /%s HTTP/1.1\r\n",page);
char content_host[50];
sprintf(content_host,"HOST: %s:%d\r\n",ip,port);
char content_type[] = "Content-Type: text/plain\r\n";
char Auth[] = "Authorization: Basic cm9vdDp0YW9zZGF0YQ==\r\n";
char content_len[50];
sprintf(content_len,"Content-Length: %ld\r\n\r\n",strlen(msg));
sprintf(content,"%s%s%s%s%s%s",content_page,content_host,content_type,Auth,content_len,msg);
if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0) {
printf("socket error\n");
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
if(inet_pton(AF_INET,ip,&servaddr.sin_addr) <= 0) {
printf("inet_pton error\n");
}
if(connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr)) < 0) {
printf("connect error\n");
}
write(sockfd,content,strlen(content));
printf("%s\n", content);
while((n = read(sockfd,recvline,MAXLINE)) > 0) {
recvline[n] = 0;
if(fputs(recvline,stdout) == EOF) {
printf("fputs error\n");
}
}
if(n < 0) {
printf("read error\n");
}
}
void singleThread() {
char ip[] = "127.0.0.1";
int port = 6041;
char page[] = "rest/sqlutc";
char page1[] = "rest/sqlutc/db1";
char page2[] = "rest/sqlutc/db2";
char nonexit[] = "rest/sqlutc/xxdb";
post(ip,port,page,"drop database if exists db1");
post(ip,port,page,"create database if not exists db1");
post(ip,port,page,"drop database if exists db2");
post(ip,port,page,"create database if not exists db2");
post(ip,port,page1,"create table t11 (ts timestamp, c1 int)");
post(ip,port,page2,"create table t21 (ts timestamp, c1 int)");
post(ip,port,page1,"insert into t11 values (now, 1)");
post(ip,port,page2,"insert into t21 values (now, 2)");
post(ip,port,nonexit,"create database if not exists db3");
}
void execute(void *params) {
char ip[] = "127.0.0.1";
int port = 6041;
char page[] = "rest/sqlutc";
char *unique = calloc(1, 1024);
char *sql = calloc(1, 1024);
ThreadObj *pThread = (ThreadObj *)params;
printf("Thread %d started\n", pThread->threadId);
sprintf(unique, "rest/sqlutc/db%d",pThread->threadId);
sprintf(sql, "drop database if exists db%d", pThread->threadId);
post(ip,port,page, sql);
sprintf(sql, "create database if not exists db%d", pThread->threadId);
post(ip,port,page, sql);
for (int i = 0; i < pThread->tables; i++) {
sprintf(sql, "create table t%d (ts timestamp, c1 int)", i);
post(ip,port,unique, sql);
}
for (int i = 0; i < pThread->rows; i++) {
sprintf(sql, "insert into t%d values (now + %ds, %d)", pThread->threadId, i, pThread->threadId);
post(ip,port,unique, sql);
}
free(unique);
free(sql);
return;
}
void multiThread() {
int numOfThreads = 100;
int numOfTables = 100;
int numOfRows = 1;
ThreadObj *threads = calloc((size_t)numOfThreads, sizeof(ThreadObj));
for (int i = 0; i < numOfThreads; i++) {
ThreadObj *pthread = threads + i;
pthread_attr_t thattr;
pthread->threadId = i + 1;
pthread->rows = numOfRows;
pthread->tables = numOfTables;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&pthread->pid, &thattr, (void *(*)(void *))execute, pthread);
}
for (int i = 0; i < numOfThreads; i++) {
pthread_join(threads[i].pid, NULL);
}
free(threads);
}
int main() {
singleThread();
multiThread();
exit(0);
}

View File

@ -0,0 +1,128 @@
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <pthread.h>
#define MAXLINE 1024
typedef struct {
pthread_t pid;
int threadId;
int rows;
int tables;
} ThreadObj;
void post(char *ip,int port,char *page,char *msg) {
int sockfd,n;
char recvline[MAXLINE];
struct sockaddr_in servaddr;
char content[4096];
char content_page[50];
sprintf(content_page,"POST /%s HTTP/1.1\r\n",page);
char content_host[50];
sprintf(content_host,"HOST: %s:%d\r\n",ip,port);
char content_type[] = "Content-Type: text/plain\r\n";
char Auth[] = "Authorization: Basic cm9vdDp0YW9zZGF0YQ==\r\n";
char content_len[50];
sprintf(content_len,"Content-Length: %ld\r\n\r\n",strlen(msg));
sprintf(content,"%s%s%s%s%s%s",content_page,content_host,content_type,Auth,content_len,msg);
if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0) {
printf("socket error\n");
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
if(inet_pton(AF_INET,ip,&servaddr.sin_addr) <= 0) {
printf("inet_pton error\n");
}
if(connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr)) < 0) {
printf("connect error\n");
}
write(sockfd,content,strlen(content));
printf("%s\n", content);
while((n = read(sockfd,recvline,MAXLINE)) > 0) {
recvline[n] = 0;
if(fputs(recvline,stdout) == EOF) {
printf("fputs error\n");
}
}
if(n < 0) {
printf("read error\n");
}
}
void singleThread() {
char ip[] = "127.0.0.1";
int port = 6041;
char page[] = "rest/sqlt";
char page1[] = "rest/sqlt/db1";
char page2[] = "rest/sqlt/db2";
char nonexit[] = "rest/sqlt/xxdb";
post(ip,port,page,"drop database if exists db1");
post(ip,port,page,"create database if not exists db1");
post(ip,port,page,"drop database if exists db2");
post(ip,port,page,"create database if not exists db2");
post(ip,port,page1,"create table t11 (ts timestamp, c1 int)");
post(ip,port,page2,"create table t21 (ts timestamp, c1 int)");
post(ip,port,page1,"insert into t11 values (now, 1)");
post(ip,port,page2,"insert into t21 values (now, 2)");
post(ip,port,nonexit,"create database if not exists db3");
}
void execute(void *params) {
char ip[] = "127.0.0.1";
int port = 6041;
char page[] = "rest/sqlt";
char *unique = calloc(1, 1024);
char *sql = calloc(1, 1024);
ThreadObj *pThread = (ThreadObj *)params;
printf("Thread %d started\n", pThread->threadId);
sprintf(unique, "rest/sqlt/db%d",pThread->threadId);
sprintf(sql, "drop database if exists db%d", pThread->threadId);
post(ip,port,page, sql);
sprintf(sql, "create database if not exists db%d", pThread->threadId);
post(ip,port,page, sql);
for (int i = 0; i < pThread->tables; i++) {
sprintf(sql, "create table t%d (ts timestamp, c1 int)", i);
post(ip,port,unique, sql);
}
for (int i = 0; i < pThread->rows; i++) {
sprintf(sql, "insert into t%d values (now + %ds, %d)", pThread->threadId, i, pThread->threadId);
post(ip,port,unique, sql);
}
free(unique);
free(sql);
return;
}
void multiThread() {
int numOfThreads = 100;
int numOfTables = 100;
int numOfRows = 1;
ThreadObj *threads = calloc((size_t)numOfThreads, sizeof(ThreadObj));
for (int i = 0; i < numOfThreads; i++) {
ThreadObj *pthread = threads + i;
pthread_attr_t thattr;
pthread->threadId = i + 1;
pthread->rows = numOfRows;
pthread->tables = numOfTables;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&pthread->pid, &thattr, (void *(*)(void *))execute, pthread);
}
for (int i = 0; i < numOfThreads; i++) {
pthread_join(threads[i].pid, NULL);
}
free(threads);
}
int main() {
singleThread();
multiThread();
exit(0);
}

View File

@ -1,2 +1,9 @@
all:
gcc -g httpTest.c -o httpTest -lpthread
gcc -g httpTest.c -o httpTest -lpthread
gcc -g httpTestSqlt.c -o httpTestSqlt -lpthread
gcc -g httpTestSqlUtc.c -o httpTestSqlUtc -lpthread
clean:
rm httpTest
rm httpTestSqlt
rm httpTestSqlUtc