From e0805bdda1003ac3d9cd9b4027bb480aafcc0682 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Sun, 29 Aug 2021 09:14:50 +0000 Subject: [PATCH 01/20] [TD-5929] create testcases for TD-5929 ---> diff cols support or --- tests/pytest/fulltest.sh | 2 +- tests/pytest/query/queryDiffColsOr.py | 95 ++++++++++++++++++--------- 2 files changed, 65 insertions(+), 32 deletions(-) diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index 886e2a365e..06ec3c6bfa 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -390,7 +390,7 @@ python3 ./test.py -f alter/alterColMultiTimes.py python3 ./test.py -f query/queryWildcardLength.py python3 ./test.py -f query/queryTbnameUpperLower.py python3 ./test.py -f query/query.py - +python3 ./test.py -f query/queryDiffColsOr.py #======================p4-end=============== diff --git a/tests/pytest/query/queryDiffColsOr.py b/tests/pytest/query/queryDiffColsOr.py index feeab84a7e..e9e791da9f 100644 --- a/tests/pytest/query/queryDiffColsOr.py +++ b/tests/pytest/query/queryDiffColsOr.py @@ -10,13 +10,10 @@ ################################################################### # -*- coding: utf-8 -*- -from copy import deepcopy from util.log import tdLog from util.cases import tdCases from util.sql import tdSql from util.common import tdCom - - class TDTestCase: def init(self, conn, logSql): tdLog.debug("start to execute %s" % __file__) @@ -409,6 +406,62 @@ class TDTestCase: tdSql.checkRows(10) tdSql.checkEqual(int(res[9][0]), 10) + def queryMultiTbWithTag(self, tb_name): + # tags (1, 1, 1, 3, 1.1, 1.1, "binary", "nchar", true, 1)') + + tdSql.execute( + f'CREATE TABLE {tb_name}_sub2 using {tb_name} tags (2, 2, 2, 4, 2.2, 2.2, "binary2", "nchar2", true, 12)') + tdSql.execute( + f'CREATE TABLE {tb_name}_sub3 using {tb_name} tags (3, 3, 3, 3, 3.3, 3.3, "binary3", "nchar3", true, 13)') + tdSql.execute( + f'insert into {tb_name}_sub2 values ("2021-01-25 12:00:00", 2, 2, 2, 4, 2.2, 2.2, "binary2", "nchar2", true, 12)') + tdSql.execute( + f'insert into {tb_name}_sub3 values ("2021-01-27 12:00:00", 3, 3, 3, 3, 3.3, 3.3, "binary3", "nchar3", true, 13)') + ## select count avg sum from (condition_A or condition_B and like and in) where condition_A or condition_B or condition_tag_C or condition_tag_D or like and in interval + query_sql = f'select count(*), avg(c6), sum(c3) from (select * from {tb_name} where c1 >1 or c2 = 2 and c7 like "binar_" and c4 in (3, 5)) where c1 != 2 or c3 = 1 or t1=2 or t1=3 or c8 like "ncha_" and c9 in (true) interval(8d)' + res = tdSql.query(query_sql, True) + tdSql.checkRows(3) + tdSql.checkEqual(int(res[0][1]), 3) + tdSql.checkEqual(int(res[0][2]), 1) + tdSql.checkEqual(int(res[0][3]), 10) + tdSql.checkEqual(int(res[1][1]), 3) + tdSql.checkEqual(int(res[1][2]), 3) + tdSql.checkEqual(int(res[1][3]), 3) + tdSql.checkEqual(int(res[2][1]), 3) + tdSql.checkEqual(int(res[2][2]), 2) + tdSql.checkEqual(int(res[2][3]), 6) + + + # ! to confirm + ## select count avg sum from (condition_A or condition_B or condition_tag_C or condition_tag_D and like and in) where condition_A or condition_B or like and in interval + # query_sql = f'select count(*), avg(c6), sum(c3) from (select * from {tb_name} where t1 = 3 and t1 = 2 or c1 >1 or c2 = 2 and c7 like "binar_" and c4 in (3, 5)) where c1 != 2 or c3 = 1 or c8 like "ncha_" and c9 in (true) interval(8d)' + # res = tdSql.query(query_sql, True) + # tdSql.checkRows(3) + # tdSql.checkEqual(int(res[0][1]), 3) + # tdSql.checkEqual(int(res[0][2]), 1) + # tdSql.checkEqual(int(res[0][3]), 10) + # tdSql.checkEqual(int(res[1][1]), 3) + # tdSql.checkEqual(int(res[1][2]), 3) + # tdSql.checkEqual(int(res[1][3]), 3) + # tdSql.checkEqual(int(res[2][1]), 3) + # tdSql.checkEqual(int(res[2][2]), 2) + # tdSql.checkEqual(int(res[2][3]), 6) + + ## select count avg sum from (condition_A and condition_B and and line and in and ts and condition_tag_A and condition_tag_B and between) where condition_C orr condition_D or condition_tag_C or condition_tag_D or like and in interval + query_sql = f'select count(*), avg(c6), sum(c3) from (select * from {tb_name} where c1 >= 1 and c2 = 2 and c7 like "binar_" and c4 in (3, 5) and ts > "2021-01-11 12:00:00" and t1 < 2 and t1 > 0 and c6 between 0 and 7) where c1 != 2 or c3 = 1 or t1=2 or t1=3 or c8 like "ncha_" and c9 in (true) interval(8d)' + res = tdSql.query(query_sql, True) + tdSql.checkRows(2) + tdSql.checkEqual(int(res[0][1]), 2) + tdSql.checkEqual(int(res[0][2]), 1) + tdSql.checkEqual(int(res[0][3]), 2) + tdSql.checkEqual(int(res[1][1]), 1) + tdSql.checkEqual(int(res[1][2]), 1) + tdSql.checkEqual(int(res[1][3]), 1) + + # ! to confirm + #select * from (select * from pyclqtwi where c1 >1 or c2 = 2 and c7 like "binar_" and c4 in (3, 5) and ts > "2021-01-11 12:00:00") where c1 != 2 or c3 = 1 or t1=2 or t1=3 or c8 like "ncha_" and c9 in (true) ; + #DB error: invalid operation: invalid expression (0.008747s) + def checkTbColTypeOperator(self): ''' Ordinary table full column type and operator @@ -492,33 +545,13 @@ class TDTestCase: ''' tb_name = self.initStb() self.queryMultiTb(tb_name) - - - # tb_name1 = tdCom.getLongName(8, "letters") - # tb_name2 = tdCom.getLongName(8, "letters") - # tb_name3 = tdCom.getLongName(8, "letters") - # tdSql.execute( - # f"CREATE TABLE {tb_name1} (ts timestamp, c1 tinyint, c2 smallint, c3 int)") - # tdSql.execute( - # f"CREATE TABLE {tb_name2} (ts timestamp, c1 tinyint, c2 smallint, c3 int)") - # tdSql.execute( - # f"CREATE TABLE {tb_name3} (ts timestamp, c1 tinyint, c2 smallint, c3 int)") - # insert_sql_list = [f'insert into {tb_name1} values ("2021-01-01 12:00:00", 1, 5, 1)', - # f'insert into {tb_name1} values ("2021-01-03 12:00:00", 2, 4, 1)', - # f'insert into {tb_name1} values ("2021-01-05 12:00:00", 3, 2, 1)', - # f'insert into {tb_name2} values ("2021-01-01 12:00:00", 4, 2, 1)', - # f'insert into {tb_name2} values ("2021-01-02 12:00:00", 5, 1, 1)', - # f'insert into {tb_name2} values ("2021-01-04 12:00:00", 1, 2, 1)', - # f'insert into {tb_name3} values ("2021-01-02 12:00:00", 4, 2, 1)', - # f'insert into {tb_name3} values ("2021-01-06 12:00:00", 5, 1, 1)', - # f'insert into {tb_name3} values ("2021-01-07 12:00:00", 1, 2, 1)', - # ] - # for sql in insert_sql_list: - # tdSql.execute(sql) - # tdSql.query( - # f'select * from {tb_name1} t1, {tb_name2}, {tb_name3} t3 t2 where (t1.ts=t2.ts or t2.ts=t3.ts)') - # tdSql.checkRows(4) - + + def checkMultiTbWithTag(self): + ''' + test Multi tb with tag + ''' + tb_name = self.initStb() + self.queryMultiTbWithTag(tb_name) def run(self): tdSql.prepare() @@ -534,7 +567,7 @@ class TDTestCase: self.checkStbPreCal() self.checkMultiTb() self.checkMultiStb() - + self.checkMultiTbWithTag() def stop(self): tdSql.close() From 51849220bdc1e0164df4d31d1a673e0f28801e42 Mon Sep 17 00:00:00 2001 From: xywang Date: Mon, 30 Aug 2021 02:54:33 +0800 Subject: [PATCH 02/20] [TD-6408]: fixed json error when timestamp was before Epoch --- src/plugins/http/src/httpJson.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/plugins/http/src/httpJson.c b/src/plugins/http/src/httpJson.c index 3c72b795ee..b9067f0639 100644 --- a/src/plugins/http/src/httpJson.c +++ b/src/plugins/http/src/httpJson.c @@ -275,7 +275,7 @@ void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) { quot = t / 1000; fractionLen = 5; format = ".%03" PRId64; - mod = t % 1000; + mod = ((t) % 1000 + 1000) % 1000; break; } @@ -283,7 +283,7 @@ void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) { quot = t / 1000000; fractionLen = 8; format = ".%06" PRId64; - mod = t % 1000000; + mod = ((t) % 1000000 + 1000000) % 1000000; break; } @@ -291,7 +291,7 @@ void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) { quot = t / 1000000000; fractionLen = 11; format = ".%09" PRId64; - mod = t % 1000000000; + mod = ((t) % 1000000000 + 1000000000) % 1000000000; break; } @@ -322,7 +322,7 @@ void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) { quot = t / 1000; fractionLen = 5; format = ".%03" PRId64; - mod = t % 1000; + mod = ((t) % 1000 + 1000) % 1000; break; } @@ -330,7 +330,7 @@ void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) { quot = t / 1000000; fractionLen = 8; format = ".%06" PRId64; - mod = t % 1000000; + mod = ((t) % 1000000 + 1000000) % 1000000; break; } @@ -338,7 +338,7 @@ void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) { quot = t / 1000000000; fractionLen = 11; format = ".%09" PRId64; - mod = t % 1000000000; + mod = ((t) % 1000000000 + 1000000000) % 1000000000; break; } From 6912568b8af57942a6f67157e70facc95fae3289 Mon Sep 17 00:00:00 2001 From: xywang Date: Mon, 30 Aug 2021 03:37:58 +0800 Subject: [PATCH 03/20] [TD-6408]: one second more than actual timestamp --- src/plugins/http/src/httpJson.c | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/plugins/http/src/httpJson.c b/src/plugins/http/src/httpJson.c index b9067f0639..86e0f2f40b 100644 --- a/src/plugins/http/src/httpJson.c +++ b/src/plugins/http/src/httpJson.c @@ -272,26 +272,35 @@ void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) { switch (timePrecision) { case TSDB_TIME_PRECISION_MILLI: { + mod = ((t) % 1000 + 1000) % 1000; + if (t < 0 && mod != 0) { + t -= 1000; + } quot = t / 1000; fractionLen = 5; format = ".%03" PRId64; - mod = ((t) % 1000 + 1000) % 1000; break; } case TSDB_TIME_PRECISION_MICRO: { + mod = ((t) % 1000000 + 1000000) % 1000000; + if (t < 0 && mod != 0) { + t -= 1000000; + } quot = t / 1000000; fractionLen = 8; format = ".%06" PRId64; - mod = ((t) % 1000000 + 1000000) % 1000000; break; } case TSDB_TIME_PRECISION_NANO: { + mod = ((t) % 1000000000 + 1000000000) % 1000000000; + if (t < 0 && mod != 0) { + t -= 1000000000; + } quot = t / 1000000000; fractionLen = 11; format = ".%09" PRId64; - mod = ((t) % 1000000000 + 1000000000) % 1000000000; break; } @@ -319,26 +328,35 @@ void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) { switch (timePrecision) { case TSDB_TIME_PRECISION_MILLI: { + mod = ((t) % 1000 + 1000) % 1000; + if (t < 0 && mod != 0) { + t -= 1000; + } quot = t / 1000; fractionLen = 5; format = ".%03" PRId64; - mod = ((t) % 1000 + 1000) % 1000; break; } case TSDB_TIME_PRECISION_MICRO: { + mod = ((t) % 1000000 + 1000000) % 1000000; + if (t < 0 && mod != 0) { + t -= 1000000; + } quot = t / 1000000; fractionLen = 8; format = ".%06" PRId64; - mod = ((t) % 1000000 + 1000000) % 1000000; break; } case TSDB_TIME_PRECISION_NANO: { + mod = ((t) % 1000000000 + 1000000000) % 1000000000; + if (t < 0 && mod != 0) { + t -= 1000000000; + } quot = t / 1000000000; fractionLen = 11; format = ".%09" PRId64; - mod = ((t) % 1000000000 + 1000000000) % 1000000000; break; } From 2a28f3d744c90eaacfa4095bd4e5c9cc7eb27645 Mon Sep 17 00:00:00 2001 From: zhaoyanggh Date: Mon, 30 Aug 2021 10:51:25 +0800 Subject: [PATCH 04/20] add sqlt & sqlutc test script[ci skip] --- tests/script/http/httpTestSqlUtc.c | 128 +++++++++++++++++++++++++++++ tests/script/http/httpTestSqlt.c | 128 +++++++++++++++++++++++++++++ tests/script/http/makefile | 9 +- 3 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 tests/script/http/httpTestSqlUtc.c create mode 100644 tests/script/http/httpTestSqlt.c diff --git a/tests/script/http/httpTestSqlUtc.c b/tests/script/http/httpTestSqlUtc.c new file mode 100644 index 0000000000..8a88d1f285 --- /dev/null +++ b/tests/script/http/httpTestSqlUtc.c @@ -0,0 +1,128 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAXLINE 1024 + +typedef struct { + pthread_t pid; + int threadId; + int rows; + int tables; +} ThreadObj; + +void post(char *ip,int port,char *page,char *msg) { + int sockfd,n; + char recvline[MAXLINE]; + struct sockaddr_in servaddr; + char content[4096]; + char content_page[50]; + sprintf(content_page,"POST /%s HTTP/1.1\r\n",page); + char content_host[50]; + sprintf(content_host,"HOST: %s:%d\r\n",ip,port); + char content_type[] = "Content-Type: text/plain\r\n"; + char Auth[] = "Authorization: Basic cm9vdDp0YW9zZGF0YQ==\r\n"; + char content_len[50]; + sprintf(content_len,"Content-Length: %ld\r\n\r\n",strlen(msg)); + sprintf(content,"%s%s%s%s%s%s",content_page,content_host,content_type,Auth,content_len,msg); + if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0) { + printf("socket error\n"); + } + bzero(&servaddr,sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_port = htons(port); + if(inet_pton(AF_INET,ip,&servaddr.sin_addr) <= 0) { + printf("inet_pton error\n"); + } + if(connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr)) < 0) { + printf("connect error\n"); + } + write(sockfd,content,strlen(content)); + printf("%s\n", content); + while((n = read(sockfd,recvline,MAXLINE)) > 0) { + recvline[n] = 0; + if(fputs(recvline,stdout) == EOF) { + printf("fputs error\n"); + } + } + if(n < 0) { + printf("read error\n"); + } +} + +void singleThread() { + char ip[] = "127.0.0.1"; + int port = 6041; + char page[] = "rest/sqlutc"; + char page1[] = "rest/sqlutc/db1"; + char page2[] = "rest/sqlutc/db2"; + char nonexit[] = "rest/sqlutc/xxdb"; + + post(ip,port,page,"drop database if exists db1"); + post(ip,port,page,"create database if not exists db1"); + post(ip,port,page,"drop database if exists db2"); + post(ip,port,page,"create database if not exists db2"); + post(ip,port,page1,"create table t11 (ts timestamp, c1 int)"); + post(ip,port,page2,"create table t21 (ts timestamp, c1 int)"); + post(ip,port,page1,"insert into t11 values (now, 1)"); + post(ip,port,page2,"insert into t21 values (now, 2)"); + post(ip,port,nonexit,"create database if not exists db3"); +} + +void execute(void *params) { + char ip[] = "127.0.0.1"; + int port = 6041; + char page[] = "rest/sqlutc"; + char *unique = calloc(1, 1024); + char *sql = calloc(1, 1024); + ThreadObj *pThread = (ThreadObj *)params; + printf("Thread %d started\n", pThread->threadId); + sprintf(unique, "rest/sql/db%d",pThread->threadId); + sprintf(sql, "drop database if exists db%d", pThread->threadId); + post(ip,port,page, sql); + sprintf(sql, "create database if not exists db%d", pThread->threadId); + post(ip,port,page, sql); + for (int i = 0; i < pThread->tables; i++) { + sprintf(sql, "create table t%d (ts timestamp, c1 int)", i); + post(ip,port,unique, sql); + } + for (int i = 0; i < pThread->rows; i++) { + sprintf(sql, "insert into t%d values (now + %ds, %d)", pThread->threadId, i, pThread->threadId); + post(ip,port,unique, sql); + } + free(unique); + free(sql); + return; +} + +void multiThread() { + int numOfThreads = 100; + int numOfTables = 100; + int numOfRows = 1; + ThreadObj *threads = calloc((size_t)numOfThreads, sizeof(ThreadObj)); + for (int i = 0; i < numOfThreads; i++) { + ThreadObj *pthread = threads + i; + pthread_attr_t thattr; + pthread->threadId = i + 1; + pthread->rows = numOfRows; + pthread->tables = numOfTables; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + pthread_create(&pthread->pid, &thattr, (void *(*)(void *))execute, pthread); + } + for (int i = 0; i < numOfThreads; i++) { + pthread_join(threads[i].pid, NULL); + } + free(threads); +} + +int main() { + singleThread(); + multiThread(); + exit(0); +} \ No newline at end of file diff --git a/tests/script/http/httpTestSqlt.c b/tests/script/http/httpTestSqlt.c new file mode 100644 index 0000000000..82885a4a2d --- /dev/null +++ b/tests/script/http/httpTestSqlt.c @@ -0,0 +1,128 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAXLINE 1024 + +typedef struct { + pthread_t pid; + int threadId; + int rows; + int tables; +} ThreadObj; + +void post(char *ip,int port,char *page,char *msg) { + int sockfd,n; + char recvline[MAXLINE]; + struct sockaddr_in servaddr; + char content[4096]; + char content_page[50]; + sprintf(content_page,"POST /%s HTTP/1.1\r\n",page); + char content_host[50]; + sprintf(content_host,"HOST: %s:%d\r\n",ip,port); + char content_type[] = "Content-Type: text/plain\r\n"; + char Auth[] = "Authorization: Basic cm9vdDp0YW9zZGF0YQ==\r\n"; + char content_len[50]; + sprintf(content_len,"Content-Length: %ld\r\n\r\n",strlen(msg)); + sprintf(content,"%s%s%s%s%s%s",content_page,content_host,content_type,Auth,content_len,msg); + if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0) { + printf("socket error\n"); + } + bzero(&servaddr,sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_port = htons(port); + if(inet_pton(AF_INET,ip,&servaddr.sin_addr) <= 0) { + printf("inet_pton error\n"); + } + if(connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr)) < 0) { + printf("connect error\n"); + } + write(sockfd,content,strlen(content)); + printf("%s\n", content); + while((n = read(sockfd,recvline,MAXLINE)) > 0) { + recvline[n] = 0; + if(fputs(recvline,stdout) == EOF) { + printf("fputs error\n"); + } + } + if(n < 0) { + printf("read error\n"); + } +} + +void singleThread() { + char ip[] = "127.0.0.1"; + int port = 6041; + char page[] = "rest/sqlt"; + char page1[] = "rest/sqlt/db1"; + char page2[] = "rest/sqlt/db2"; + char nonexit[] = "rest/sqlt/xxdb"; + + post(ip,port,page,"drop database if exists db1"); + post(ip,port,page,"create database if not exists db1"); + post(ip,port,page,"drop database if exists db2"); + post(ip,port,page,"create database if not exists db2"); + post(ip,port,page1,"create table t11 (ts timestamp, c1 int)"); + post(ip,port,page2,"create table t21 (ts timestamp, c1 int)"); + post(ip,port,page1,"insert into t11 values (now, 1)"); + post(ip,port,page2,"insert into t21 values (now, 2)"); + post(ip,port,nonexit,"create database if not exists db3"); +} + +void execute(void *params) { + char ip[] = "127.0.0.1"; + int port = 6041; + char page[] = "rest/sqlt"; + char *unique = calloc(1, 1024); + char *sql = calloc(1, 1024); + ThreadObj *pThread = (ThreadObj *)params; + printf("Thread %d started\n", pThread->threadId); + sprintf(unique, "rest/sql/db%d",pThread->threadId); + sprintf(sql, "drop database if exists db%d", pThread->threadId); + post(ip,port,page, sql); + sprintf(sql, "create database if not exists db%d", pThread->threadId); + post(ip,port,page, sql); + for (int i = 0; i < pThread->tables; i++) { + sprintf(sql, "create table t%d (ts timestamp, c1 int)", i); + post(ip,port,unique, sql); + } + for (int i = 0; i < pThread->rows; i++) { + sprintf(sql, "insert into t%d values (now + %ds, %d)", pThread->threadId, i, pThread->threadId); + post(ip,port,unique, sql); + } + free(unique); + free(sql); + return; +} + +void multiThread() { + int numOfThreads = 100; + int numOfTables = 100; + int numOfRows = 1; + ThreadObj *threads = calloc((size_t)numOfThreads, sizeof(ThreadObj)); + for (int i = 0; i < numOfThreads; i++) { + ThreadObj *pthread = threads + i; + pthread_attr_t thattr; + pthread->threadId = i + 1; + pthread->rows = numOfRows; + pthread->tables = numOfTables; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + pthread_create(&pthread->pid, &thattr, (void *(*)(void *))execute, pthread); + } + for (int i = 0; i < numOfThreads; i++) { + pthread_join(threads[i].pid, NULL); + } + free(threads); +} + +int main() { + singleThread(); + multiThread(); + exit(0); +} \ No newline at end of file diff --git a/tests/script/http/makefile b/tests/script/http/makefile index d1be683eda..50886cf6b1 100644 --- a/tests/script/http/makefile +++ b/tests/script/http/makefile @@ -1,2 +1,9 @@ all: - gcc -g httpTest.c -o httpTest -lpthread \ No newline at end of file + gcc -g httpTest.c -o httpTest -lpthread + gcc -g httpTestSqlt.c -o httpTestSqlt -lpthread + gcc -g httpTestSqlUtc.c -o httpTestSqlUtc -lpthread + +clean: + rm httpTest + rm httpTestSqlt + rm httpTestSqlUtc \ No newline at end of file From 43fab7f6f772ac4368d8776611004398d1a93e52 Mon Sep 17 00:00:00 2001 From: zhaoyanggh Date: Mon, 30 Aug 2021 11:09:11 +0800 Subject: [PATCH 05/20] fix test script --- tests/script/http/httpTestSqlUtc.c | 2 +- tests/script/http/httpTestSqlt.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/script/http/httpTestSqlUtc.c b/tests/script/http/httpTestSqlUtc.c index 8a88d1f285..643c884a1a 100644 --- a/tests/script/http/httpTestSqlUtc.c +++ b/tests/script/http/httpTestSqlUtc.c @@ -82,7 +82,7 @@ void execute(void *params) { char *sql = calloc(1, 1024); ThreadObj *pThread = (ThreadObj *)params; printf("Thread %d started\n", pThread->threadId); - sprintf(unique, "rest/sql/db%d",pThread->threadId); + sprintf(unique, "rest/sqlutc/db%d",pThread->threadId); sprintf(sql, "drop database if exists db%d", pThread->threadId); post(ip,port,page, sql); sprintf(sql, "create database if not exists db%d", pThread->threadId); diff --git a/tests/script/http/httpTestSqlt.c b/tests/script/http/httpTestSqlt.c index 82885a4a2d..2eaaee0f99 100644 --- a/tests/script/http/httpTestSqlt.c +++ b/tests/script/http/httpTestSqlt.c @@ -82,7 +82,7 @@ void execute(void *params) { char *sql = calloc(1, 1024); ThreadObj *pThread = (ThreadObj *)params; printf("Thread %d started\n", pThread->threadId); - sprintf(unique, "rest/sql/db%d",pThread->threadId); + sprintf(unique, "rest/sqlt/db%d",pThread->threadId); sprintf(sql, "drop database if exists db%d", pThread->threadId); post(ip,port,page, sql); sprintf(sql, "create database if not exists db%d", pThread->threadId); From e119a533071470d74a6c0735df9a5fff8c7b1422 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Aug 2021 16:12:30 +0800 Subject: [PATCH 06/20] [td-255] code refactor. --- src/query/src/qExecutor.c | 87 +++++++++------------------------------ 1 file changed, 19 insertions(+), 68 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 700cf17fd3..8fefed51c8 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6388,6 +6388,19 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { return pInfo->binfo.pRes; } +static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SQueryRuntimeEnv* pRuntimeEnv, bool* newgroup) { + pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; + int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; + taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); + + taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); + taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); + + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); + pInfo->existNewGroupBlock = NULL; + *newgroup = true; +} + static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { *newgroup = false; @@ -6399,16 +6412,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRunt // handle the cached new group data block if (pInfo->existNewGroupBlock) { - pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; - int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; - taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); - - taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); - taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); - pInfo->existNewGroupBlock = NULL; - *newgroup = true; + doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup); } } @@ -6427,26 +6431,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { return pInfo->pRes; } -// if (taosFillHasMoreResults(pInfo->pFillInfo)) { -// *newgroup = false; -// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); -// return pInfo->pRes; -// } -// -// // handle the cached new group data block -// if (pInfo->existNewGroupBlock) { -// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; -// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; -// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); -// -// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); -// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); -// -// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); -// pInfo->existNewGroupBlock = NULL; -// *newgroup = true; -// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; -// } while(1) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -6493,46 +6477,13 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { return pInfo->pRes; } - -// if (taosFillHasMoreResults(pInfo->pFillInfo)) { -// *newgroup = false; -// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); -// return pInfo->pRes; -// } -// -// // handle the cached new group data block -// if (pInfo->existNewGroupBlock) { -// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; -// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; -// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); -// -// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); -// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); -// -// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); -// pInfo->existNewGroupBlock = NULL; -// *newgroup = true; -// -// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { -// return pInfo->pRes; -// } -// -//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; -// } - } else if (pInfo->existNewGroupBlock) { // try next group - pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; - int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey; - taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); + assert(pBlock != NULL); + doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup); - taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); - taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); - pInfo->existNewGroupBlock = NULL; - *newgroup = true; - - return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { + return pInfo->pRes; + } } else { return NULL; } From 4a5d0f205fde8f9ee0146b652e34ab13e234f813 Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Mon, 30 Aug 2021 18:30:03 +0800 Subject: [PATCH 07/20] [ci skip]update jenkinsfile --- Jenkinsfile | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 6109e4811a..e9ea8bafd3 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -256,13 +256,11 @@ pipeline { steps { pre_test() - catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') { - timeout(time: 60, unit: 'MINUTES'){ - sh ''' - cd ${WKC}/tests/pytest - ./crash_gen.sh -a -p -t 4 -s 2000 - ''' - } + timeout(time: 60, unit: 'MINUTES'){ + sh ''' + cd ${WKC}/tests/pytest + ./crash_gen.sh -a -p -t 4 -s 2000 + ''' } timeout(time: 60, unit: 'MINUTES'){ // sh ''' From d31c5f1dc86d0c3ac827165d83a3886bd170bcdd Mon Sep 17 00:00:00 2001 From: Linhe Huo Date: Mon, 30 Aug 2021 23:02:03 +0800 Subject: [PATCH 08/20] [TD-6449]: prefer english only in method documentations in python connector [ci skip] (#7711) --- src/connector/python/taos/cinterface.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/connector/python/taos/cinterface.py b/src/connector/python/taos/cinterface.py index 42dac3c2e8..a1b6fe312b 100644 --- a/src/connector/python/taos/cinterface.py +++ b/src/connector/python/taos/cinterface.py @@ -102,9 +102,7 @@ _libtaos.taos_get_client_info.restype = c_char_p def taos_get_client_info(): # type: () -> str - """Get client version info. - 获取客户端版本信息。 - """ + """Get client version info.""" return _libtaos.taos_get_client_info().decode() @@ -114,6 +112,7 @@ _libtaos.taos_get_server_info.argtypes = (c_void_p,) def taos_get_server_info(connection): # type: (c_void_p) -> str + """Get server version as string.""" return _libtaos.taos_get_server_info(connection).decode() @@ -134,11 +133,10 @@ _libtaos.taos_connect.argtypes = c_char_p, c_char_p, c_char_p, c_char_p, c_uint1 def taos_connect(host=None, user="root", password="taosdata", db=None, port=0): # type: (None|str, str, str, None|str, int) -> c_void_p """Create TDengine database connection. - 创建数据库连接,初始化连接上下文。其中需要用户提供的参数包含: - - host: server hostname/FQDN, TDengine管理主节点的FQDN - - user: user name/用户名 - - password: user password / 用户密码 + - host: server hostname/FQDN + - user: user name + - password: user password - db: database name (optional) - port: server port @@ -187,11 +185,10 @@ _libtaos.taos_connect_auth.argtypes = c_char_p, c_char_p, c_char_p, c_char_p, c_ def taos_connect_auth(host=None, user="root", auth="", db=None, port=0): # type: (None|str, str, str, None|str, int) -> c_void_p - """ - 创建数据库连接,初始化连接上下文。其中需要用户提供的参数包含: + """Connect server with auth token. - - host: server hostname/FQDN, TDengine管理主节点的FQDN - - user: user name/用户名 + - host: server hostname/FQDN + - user: user name - auth: base64 encoded auth token - db: database name (optional) - port: server port From 2483985d4b81e888bff0a1aa7b2f4987e5e3c4f1 Mon Sep 17 00:00:00 2001 From: Linhe Huo Date: Mon, 30 Aug 2021 23:04:52 +0800 Subject: [PATCH 09/20] [TD-6449]: fix python3.6 datetime parsing in pytest util package (#7704) --- tests/pytest/util/sql.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index dfe1e4a582..2b654a3793 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -21,7 +21,15 @@ import shutil import pandas as pd from util.log import * - +def _parse_datetime(timestr): + try: + return datetime.datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S.%f') + except ValueError: + pass + try: + return datetime.datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S') + except ValueError: + pass class TDSql: def __init__(self): @@ -181,7 +189,7 @@ class TDSql: tdLog.info("sql:%s, row:%d col:%d data:%d == expect:%s" % (self.sql, row, col, self.queryResult[row][col], data)) else: - if self.queryResult[row][col] == datetime.datetime.fromisoformat(data): + if self.queryResult[row][col] == _parse_datetime(data): tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" % (self.sql, row, col, self.queryResult[row][col], data)) return From 56776b62fe09a0cf7a46a20354b9fc056ab7ab3e Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 31 Aug 2021 00:24:40 +0800 Subject: [PATCH 10/20] [TD-6448]: taosdemo stmt rand race. (#7719) Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 173 +++++++++++++++--------------------- 1 file changed, 71 insertions(+), 102 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 70ccbe0924..29443f4fd4 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -291,7 +291,6 @@ typedef struct SSuperTable_S { uint64_t lenOfTagOfOneRow; char* sampleDataBuf; - char* sampleBindArray; //int sampleRowCount; //int sampleUsePos; @@ -438,7 +437,8 @@ typedef struct SQueryMetaInfo_S { typedef struct SThreadInfo_S { TAOS * taos; TAOS_STMT *stmt; - int64_t *bind_ts; + char* sampleBindArray; + int64_t *bind_ts; int threadID; char db_name[TSDB_DB_NAME_LEN]; uint32_t time_precision; @@ -5738,20 +5738,6 @@ static void postFreeResource() { free(g_Dbs.db[i].superTbls[j].sampleDataBuf); g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL; } - if (g_Dbs.db[i].superTbls[j].sampleBindArray) { - for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) { - uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)( - g_Dbs.db[i].superTbls[j].sampleBindArray - + sizeof(uintptr_t *) * k)); - for (int c = 1; c < g_Dbs.db[i].superTbls[j].columnCount + 1; c++) { - TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c)); - if (bind) - tmfree(bind->buffer); - } - tmfree((char *)tmp); - } - } - tmfree((char *)g_Dbs.db[i].superTbls[j].sampleBindArray); if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) { free(g_Dbs.db[i].superTbls[j].tagDataBuf); @@ -6085,9 +6071,6 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) int32_t affectedRows; SSuperTable* stbInfo = pThreadInfo->stbInfo; - verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, - __func__, __LINE__, pThreadInfo->buffer); - uint16_t iface; if (stbInfo) iface = stbInfo->iface; @@ -6105,12 +6088,18 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) switch(iface) { case TAOSC_IFACE: + verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, pThreadInfo->buffer); + affectedRows = queryDbExec( pThreadInfo->taos, pThreadInfo->buffer, INSERT_TYPE, false); break; case REST_IFACE: + verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, pThreadInfo->buffer); + if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, pThreadInfo->buffer, pThreadInfo)) { affectedRows = -1; @@ -7088,12 +7077,12 @@ static int32_t prepareStbStmtBindRand( return 0; } -static int32_t prepareStbStmtBindWithSample( +static int32_t prepareStbStmtBindStartTime( + char *tableName, int64_t *ts, char *bindArray, SSuperTable *stbInfo, int64_t startTime, int32_t recSeq, - int32_t timePrec, - int64_t samplePos) + int32_t timePrec) { TAOS_BIND *bind; @@ -7110,6 +7099,10 @@ static int32_t prepareStbStmtBindWithSample( } else { *bind_ts = startTime + stbInfo->timeStampStep * recSeq; } + + verbosePrint("%s() LN%d, tableName: %s, bind_ts=%"PRId64"\n", + __func__, __LINE__, tableName, *bind_ts); + bind->buffer_length = sizeof(int64_t); bind->buffer = bind_ts; bind->length = &bind->buffer_length; @@ -7118,7 +7111,7 @@ static int32_t prepareStbStmtBindWithSample( return 0; } -static int32_t prepareStbStmtRand( +UNUSED_FUNC static int32_t prepareStbStmtRand( threadInfo *pThreadInfo, char *tableName, int64_t tableSeq, @@ -7299,14 +7292,14 @@ static int32_t prepareStbStmtWithSample( uint32_t k; for (k = 0; k < batch;) { char *bindArray = (char *)(*((uintptr_t *) - (stbInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos)))); + (pThreadInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos)))); /* columnCount + 1 (ts) */ - if (-1 == prepareStbStmtBindWithSample( + if (-1 == prepareStbStmtBindStartTime( + tableName, pThreadInfo->bind_ts, bindArray, stbInfo, startTime, k, - pThreadInfo->time_precision, - *pSamplePos + pThreadInfo->time_precision /* is column */)) { return -1; } @@ -7427,8 +7420,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int64_t nTimeStampStep; uint64_t insert_interval; - bool sourceRand; - SSuperTable* stbInfo = pThreadInfo->stbInfo; if (stbInfo) { @@ -7443,18 +7434,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { maxSqlLen = stbInfo->maxSqlLen; nTimeStampStep = stbInfo->timeStampStep; insert_interval = stbInfo->insertInterval; - if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) { - sourceRand = true; - } else { - sourceRand = false; // from sample data file - } } else { insertRows = g_args.num_of_DPT; interlaceRows = g_args.interlace_rows; maxSqlLen = g_args.max_sql_len; nTimeStampStep = g_args.timestamp_step; insert_interval = g_args.insert_interval; - sourceRand = true; } debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", @@ -7539,25 +7524,14 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int32_t generated; if (stbInfo) { if (stbInfo->iface == STMT_IFACE) { - if (sourceRand) { - generated = prepareStbStmtRand( - pThreadInfo, - tableName, - tableSeq, - batchPerTbl, - insertRows, 0, - startTime - ); - } else { - generated = prepareStbStmtWithSample( - pThreadInfo, - tableName, - tableSeq, - batchPerTbl, - insertRows, 0, - startTime, - &(pThreadInfo->samplePos)); - } + generated = prepareStbStmtWithSample( + pThreadInfo, + tableName, + tableSeq, + batchPerTbl, + insertRows, 0, + startTime, + &(pThreadInfo->samplePos)); } else { generated = generateStbInterlaceData( pThreadInfo, @@ -7747,17 +7721,6 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; - bool sourceRand; - if (stbInfo) { - if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) { - sourceRand = true; - } else { - sourceRand = false; // from sample data file - } - } else { - sourceRand = true; - } - pThreadInfo->samplePos = 0; int percentComplete = 0; @@ -7796,32 +7759,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { int32_t generated; if (stbInfo) { if (stbInfo->iface == STMT_IFACE) { - if (sourceRand) { -/* generated = prepareStbStmtRand( - pThreadInfo, - tableName, - tableSeq, - g_args.num_of_RPR, - insertRows, - i, start_time - ); - */ - generated = prepareStbStmtWithSample( - pThreadInfo, - tableName, - tableSeq, - g_args.num_of_RPR, - insertRows, i, start_time, - &(pThreadInfo->samplePos)); - } else { - generated = prepareStbStmtWithSample( - pThreadInfo, - tableName, - tableSeq, - g_args.num_of_RPR, - insertRows, i, start_time, - &(pThreadInfo->samplePos)); - } + generated = prepareStbStmtWithSample( + pThreadInfo, + tableName, + tableSeq, + g_args.num_of_RPR, + insertRows, i, start_time, + &(pThreadInfo->samplePos)); } else { generated = generateStbProgressiveData( stbInfo, @@ -7849,6 +7793,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { &remainderBufLen); } } + + verbosePrint("[%d] %s() LN%d generated=%d\n", + pThreadInfo->threadID, + __func__, __LINE__, generated); + if (generated > 0) i += generated; else @@ -8059,17 +8008,22 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * return 0; } -static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) +static int parseSampleFileToStmt( + threadInfo *pThreadInfo, + SSuperTable *stbInfo, uint32_t timePrec) { - stbInfo->sampleBindArray = calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); - if (stbInfo->sampleBindArray == NULL) { + pThreadInfo->sampleBindArray = + calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); + if (pThreadInfo->sampleBindArray == NULL) { errorPrint2("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n", - __func__, __LINE__, (uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); + __func__, __LINE__, + (uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); return -1; } for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) { - char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); + char *bindArray = + calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); if (bindArray == NULL) { errorPrint2("%s() LN%d, Failed to allocate %d bind params\n", __func__, __LINE__, (stbInfo->columnCount + 1)); @@ -8122,7 +8076,8 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) free(bindBuffer); } } - *((uintptr_t *)(stbInfo->sampleBindArray + (sizeof(char *)) * i)) = (uintptr_t)bindArray; + *((uintptr_t *)(pThreadInfo->sampleBindArray + (sizeof(char *)) * i)) = + (uintptr_t)bindArray; } return 0; @@ -8312,10 +8267,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, pstr += sprintf(pstr, ")"); debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer); - - if (stbInfo) { - parseSampleFileToStmt(stbInfo, timePrec); - } } for (int i = 0; i < threads; i++) { @@ -8348,7 +8299,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, || ((stbInfo) && (stbInfo->iface == STMT_IFACE))) { - pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); if (NULL == pThreadInfo->stmt) { free(pids); @@ -8370,6 +8320,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, exit(EXIT_FAILURE); } pThreadInfo->bind_ts = malloc(sizeof(int64_t)); + + if (stbInfo) { + parseSampleFileToStmt(pThreadInfo, stbInfo, timePrec); + } } } else { pThreadInfo->taos = NULL; @@ -8420,6 +8374,21 @@ static void startMultiThreadInsertData(int threads, char* db_name, tsem_destroy(&(pThreadInfo->lock_sem)); taos_close(pThreadInfo->taos); + if (pThreadInfo->sampleBindArray) { + for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) { + uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)( + pThreadInfo->sampleBindArray + + sizeof(uintptr_t *) * k)); + for (int c = 1; c < pThreadInfo->stbInfo->columnCount + 1; c++) { + TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c)); + if (bind) + tmfree(bind->buffer); + } + tmfree((char *)tmp); + } + tmfree(pThreadInfo->sampleBindArray); + } + debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n", __func__, __LINE__, pThreadInfo->threadID, pThreadInfo->totalInsertRows, From 8f32f23de9c73b9106eda6780503b164aabd4064 Mon Sep 17 00:00:00 2001 From: Zhiyu Yang <69311263+zyyang-taosdata@users.noreply.github.com> Date: Tue, 31 Aug 2021 09:29:52 +0800 Subject: [PATCH 11/20] [TD-6432]: fix batchFetch BufferUnderflowException (#7675) --- src/client/src/TSDBJNIConnector.c | 202 +++++++++++------- .../taosdata/jdbc/TSDBResultSetBlockData.java | 10 +- 2 files changed, 127 insertions(+), 85 deletions(-) diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index 7ba613de88..506c8d64b9 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -20,12 +20,42 @@ #include "com_taosdata_jdbc_TSDBJNIConnector.h" -#define jniFatal(...) { if (jniDebugFlag & DEBUG_FATAL) { taosPrintLog("JNI FATAL ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }} -#define jniError(...) { if (jniDebugFlag & DEBUG_ERROR) { taosPrintLog("JNI ERROR ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }} -#define jniWarn(...) { if (jniDebugFlag & DEBUG_WARN) { taosPrintLog("JNI WARN ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }} -#define jniInfo(...) { if (jniDebugFlag & DEBUG_INFO) { taosPrintLog("JNI ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); }} -#define jniDebug(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); }} -#define jniTrace(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); }} +#define jniFatal(...) \ + { \ + if (jniDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("JNI FATAL ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ + } \ + } +#define jniError(...) \ + { \ + if (jniDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("JNI ERROR ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ + } \ + } +#define jniWarn(...) \ + { \ + if (jniDebugFlag & DEBUG_WARN) { \ + taosPrintLog("JNI WARN ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ + } \ + } +#define jniInfo(...) \ + { \ + if (jniDebugFlag & DEBUG_INFO) { \ + taosPrintLog("JNI ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ + } \ + } +#define jniDebug(...) \ + { \ + if (jniDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); \ + } \ + } +#define jniTrace(...) \ + { \ + if (jniDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); \ + } \ + } int __init = 0; @@ -60,14 +90,14 @@ jmethodID g_blockdataSetByteArrayFp; jmethodID g_blockdataSetNumOfRowsFp; jmethodID g_blockdataSetNumOfColsFp; -#define JNI_SUCCESS 0 -#define JNI_TDENGINE_ERROR -1 +#define JNI_SUCCESS 0 +#define JNI_TDENGINE_ERROR -1 #define JNI_CONNECTION_NULL -2 #define JNI_RESULT_SET_NULL -3 #define JNI_NUM_OF_FIELDS_0 -4 -#define JNI_SQL_NULL -5 -#define JNI_FETCH_END -6 -#define JNI_OUT_OF_MEMORY -7 +#define JNI_SQL_NULL -5 +#define JNI_FETCH_END -6 +#define JNI_OUT_OF_MEMORY -7 static void jniGetGlobalMethod(JNIEnv *env) { // make sure init function executed once @@ -129,13 +159,13 @@ static void jniGetGlobalMethod(JNIEnv *env) { } static int32_t check_for_params(jobject jobj, jlong conn, jlong res) { - if ((TAOS*) conn == NULL) { + if ((TAOS *)conn == NULL) { jniError("jobj:%p, connection is closed", jobj); return JNI_CONNECTION_NULL; } - if ((TAOS_RES *) res == NULL) { - jniError("jobj:%p, conn:%p, res is null", jobj, (TAOS*) conn); + if ((TAOS_RES *)res == NULL) { + jniError("jobj:%p, conn:%p, res is null", jobj, (TAOS *)conn); return JNI_RESULT_SET_NULL; } @@ -216,7 +246,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setOptions(JNIEnv JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEnv *env, jobject jobj, jstring jhost, jint jport, jstring jdbName, jstring juser, jstring jpass) { - jlong ret = 0; + jlong ret = 0; const char *host = NULL; const char *user = NULL; const char *pass = NULL; @@ -246,7 +276,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn jniDebug("jobj:%p, pass not specified, use default password", jobj); } - ret = (jlong) taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport); + ret = (jlong)taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport); if (ret == 0) { jniError("jobj:%p, conn:%p, connect to database failed, host=%s, user=%s, dbname=%s, port=%d", jobj, (void *)ret, (char *)host, (char *)user, (char *)dbname, (int32_t)jport); @@ -289,7 +319,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp( jsize len = (*env)->GetArrayLength(env, jsql); - char *str = (char *) calloc(1, sizeof(char) * (len + 1)); + char *str = (char *)calloc(1, sizeof(char) * (len + 1)); if (str == NULL) { jniError("jobj:%p, conn:%p, alloc memory failed", jobj, tscon); return JNI_OUT_OF_MEMORY; @@ -315,16 +345,17 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp( } free(str); - return (jlong) pSql; + return (jlong)pSql; } -JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNIEnv *env, jobject jobj, jlong con, jlong tres) { +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNIEnv *env, jobject jobj, jlong con, + jlong tres) { int32_t code = check_for_params(jobj, con, tres); if (code != JNI_SUCCESS) { return code; } - return (jint)taos_errno((TAOS_RES*) tres); + return (jint)taos_errno((TAOS_RES *)tres); } JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(JNIEnv *env, jobject jobj, jlong tres) { @@ -334,7 +365,7 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con, jlong tres) { - TAOS *tscon = (TAOS *)con; + TAOS * tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, tres); if (code != JNI_SUCCESS) { return code; @@ -359,7 +390,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp( SSqlObj *pSql = (TAOS_RES *)tres; - return (tscIsUpdateQuery(pSql)? 1:0); + return (tscIsUpdateQuery(pSql) ? 1 : 0); } JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp(JNIEnv *env, jobject jobj, jlong con, @@ -370,21 +401,22 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp( } taos_free_result((void *)res); - jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, (TAOS*) con, (void *)res); + jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, (TAOS *)con, (void *)res); return JNI_SUCCESS; } JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, jlong con, jlong res) { - TAOS *tscon = (TAOS *)con; + TAOS * tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, res); if (code != JNI_SUCCESS) { return code; } jint ret = taos_affected_rows((SSqlObj *)res); - jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (TAOS *)con, (TAOS_RES *)res, (int32_t)ret); + jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (TAOS *)con, (TAOS_RES *)res, + (int32_t)ret); return ret; } @@ -392,13 +424,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsIm JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaDataImp(JNIEnv *env, jobject jobj, jlong con, jlong res, jobject arrayListObj) { - TAOS *tscon = (TAOS *)con; + TAOS * tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, res); if (code != JNI_SUCCESS) { return code; } - TAOS_RES* tres = (TAOS_RES*) res; + TAOS_RES * tres = (TAOS_RES *)res; TAOS_FIELD *fields = taos_fetch_fields(tres); int32_t num_fields = taos_num_fields(tres); @@ -452,7 +484,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn int32_t numOfFields = taos_num_fields(result); if (numOfFields == 0) { - jniError("jobj:%p, conn:%p, resultset:%p, fields size %d", jobj, tscon, (void*)res, numOfFields); + jniError("jobj:%p, conn:%p, resultset:%p, fields size %d", jobj, tscon, (void *)res, numOfFields); return JNI_NUM_OF_FIELDS_0; } @@ -460,7 +492,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn if (row == NULL) { int code = taos_errno(result); if (code == TSDB_CODE_SUCCESS) { - jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void*)res, numOfFields); + jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void *)res, + numOfFields); return JNI_FETCH_END; } else { jniDebug("jobj:%p, conn:%p, interrupted query", jobj, tscon); @@ -468,7 +501,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn } } - int32_t* length = taos_fetch_lengths(result); + int32_t *length = taos_fetch_lengths(result); char tmp[TSDB_MAX_BYTES_PER_ROW] = {0}; @@ -533,7 +566,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn } JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNIEnv *env, jobject jobj, jlong con, - jlong res, jobject rowobj) { + jlong res, jobject rowobj) { TAOS * tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, res); if (code != JNI_SUCCESS) { @@ -564,8 +597,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNI (*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields); for (int i = 0; i < numOfFields; i++) { - (*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, i, fields[i].bytes * numOfRows, - jniFromNCharToByteArray(env, (char *)row[i], fields[i].bytes * numOfRows)); + int bytes = fields[i].bytes; + + if (fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_NCHAR) { + bytes += 2; + } + (*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, i, bytes * numOfRows, + jniFromNCharToByteArray(env, (char *)row[i], bytes * numOfRows)); } return JNI_SUCCESS; @@ -585,7 +623,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm } JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jlong con, - jboolean restart, jstring jtopic, jstring jsql, jint jinterval) { + jboolean restart, jstring jtopic, + jstring jsql, jint jinterval) { jlong sub = 0; TAOS *taos = (TAOS *)con; char *topic = NULL; @@ -682,8 +721,8 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getTsCharset(J * @param res the TAOS_RES object, i.e. the SSqlObject * @return precision 0:ms 1:us 2:ns */ -JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultTimePrecisionImp(JNIEnv *env, jobject jobj, jlong con, - jlong res) { +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultTimePrecisionImp(JNIEnv *env, jobject jobj, + jlong con, jlong res) { TAOS *tscon = (TAOS *)con; if (tscon == NULL) { jniError("jobj:%p, connection is closed", jobj); @@ -699,7 +738,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultTimePrec return taos_result_precision(result); } -JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(JNIEnv *env, jobject jobj, jbyteArray jsql, jlong con) { +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(JNIEnv *env, jobject jobj, + jbyteArray jsql, jlong con) { TAOS *tscon = (TAOS *)con; if (tscon == NULL) { jniError("jobj:%p, connection already closed", jobj); @@ -713,7 +753,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(J jsize len = (*env)->GetArrayLength(env, jsql); - char *str = (char *) calloc(1, sizeof(char) * (len + 1)); + char *str = (char *)calloc(1, sizeof(char) * (len + 1)); if (str == NULL) { jniError("jobj:%p, conn:%p, alloc memory failed", jobj, tscon); return JNI_OUT_OF_MEMORY; @@ -724,25 +764,27 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(J // todo handle error } - TAOS_STMT* pStmt = taos_stmt_init(tscon); - int32_t code = taos_stmt_prepare(pStmt, str, len); + TAOS_STMT *pStmt = taos_stmt_init(tscon); + int32_t code = taos_stmt_prepare(pStmt, str, len); tfree(str); if (code != TSDB_CODE_SUCCESS) { jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); return JNI_TDENGINE_ERROR; } - return (jlong) pStmt; + return (jlong)pStmt; } -JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp(JNIEnv *env, jobject jobj, jlong stmt, jstring jname, jlong conn) { +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp(JNIEnv *env, jobject jobj, + jlong stmt, jstring jname, + jlong conn) { TAOS *tsconn = (TAOS *)conn; if (tsconn == NULL) { jniError("jobj:%p, connection already closed", jobj); return JNI_CONNECTION_NULL; } - TAOS_STMT* pStmt = (TAOS_STMT*) stmt; + TAOS_STMT *pStmt = (TAOS_STMT *)stmt; if (pStmt == NULL) { jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn); return JNI_SQL_NULL; @@ -750,7 +792,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI const char *name = (*env)->GetStringUTFChars(env, jname, NULL); - int32_t code = taos_stmt_set_tbname((void*)stmt, name); + int32_t code = taos_stmt_set_tbname((void *)stmt, name); if (code != TSDB_CODE_SUCCESS) { (*env)->ReleaseStringUTFChars(env, jname, name); @@ -763,8 +805,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI return JNI_SUCCESS; } -JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(JNIEnv *env, jobject jobj, jlong stmt, - jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList, jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) { +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp( + JNIEnv *env, jobject jobj, jlong stmt, jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList, + jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) { TAOS *tscon = (TAOS *)con; if (tscon == NULL) { jniError("jobj:%p, connection already closed", jobj); @@ -798,14 +841,14 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J } // bind multi-rows with only one invoke. - TAOS_MULTI_BIND* b = calloc(1, sizeof(TAOS_MULTI_BIND)); + TAOS_MULTI_BIND *b = calloc(1, sizeof(TAOS_MULTI_BIND)); - b->num = numOfRows; - b->buffer_type = dataType; // todo check data type - b->buffer_length = IS_VAR_DATA_TYPE(dataType)? dataBytes:tDataTypes[dataType].bytes; - b->is_null = nullArray; - b->buffer = colBuf; - b->length = (int32_t*)lengthArray; + b->num = numOfRows; + b->buffer_type = dataType; // todo check data type + b->buffer_length = IS_VAR_DATA_TYPE(dataType) ? dataBytes : tDataTypes[dataType].bytes; + b->is_null = nullArray; + b->buffer = colBuf; + b->length = (int32_t *)lengthArray; // set the length and is_null array if (!IS_VAR_DATA_TYPE(dataType)) { @@ -829,14 +872,15 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J return JNI_SUCCESS; } -JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con) { +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt, + jlong con) { TAOS *tscon = (TAOS *)con; if (tscon == NULL) { jniError("jobj:%p, connection already closed", jobj); return JNI_CONNECTION_NULL; } - TAOS_STMT *pStmt = (TAOS_STMT*) stmt; + TAOS_STMT *pStmt = (TAOS_STMT *)stmt; if (pStmt == NULL) { jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon); return JNI_SQL_NULL; @@ -853,14 +897,15 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J return JNI_SUCCESS; } -JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con) { +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, + jlong con) { TAOS *tscon = (TAOS *)con; if (tscon == NULL) { jniError("jobj:%p, connection already closed", jobj); return JNI_CONNECTION_NULL; } - TAOS_STMT *pStmt = (TAOS_STMT*) stmt; + TAOS_STMT *pStmt = (TAOS_STMT *)stmt; if (pStmt == NULL) { jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon); return JNI_SQL_NULL; @@ -876,15 +921,16 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv return JNI_SUCCESS; } -JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(JNIEnv *env, jobject jobj, - jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, jbyteArray lengthList, jbyteArray nullList, jlong conn) { +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp( + JNIEnv *env, jobject jobj, jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, + jbyteArray lengthList, jbyteArray nullList, jlong conn) { TAOS *tsconn = (TAOS *)conn; if (tsconn == NULL) { jniError("jobj:%p, connection already closed", jobj); return JNI_CONNECTION_NULL; } - TAOS_STMT* pStmt = (TAOS_STMT*) stmt; + TAOS_STMT *pStmt = (TAOS_STMT *)stmt; if (pStmt == NULL) { jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn); return JNI_SQL_NULL; @@ -898,39 +944,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI } len = (*env)->GetArrayLength(env, lengthList); - int64_t *lengthArray = (int64_t*) calloc(1, len); - (*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray); + int64_t *lengthArray = (int64_t *)calloc(1, len); + (*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray); if ((*env)->ExceptionCheck(env)) { } len = (*env)->GetArrayLength(env, typeList); - char *typeArray = (char*) calloc(1, len); - (*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte*) typeArray); + char *typeArray = (char *)calloc(1, len); + (*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte *)typeArray); if ((*env)->ExceptionCheck(env)) { } len = (*env)->GetArrayLength(env, nullList); - int32_t *nullArray = (int32_t*) calloc(1, len); - (*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray); + int32_t *nullArray = (int32_t *)calloc(1, len); + (*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray); if ((*env)->ExceptionCheck(env)) { } const char *name = (*env)->GetStringUTFChars(env, tableName, NULL); - char* curTags = tagsData; + char * curTags = tagsData; TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND)); - for(int32_t i = 0; i < numOfTags; ++i) { + for (int32_t i = 0; i < numOfTags; ++i) { tagsBind[i].buffer_type = typeArray[i]; - tagsBind[i].buffer = curTags; + tagsBind[i].buffer = curTags; tagsBind[i].is_null = &nullArray[i]; - tagsBind[i].length = (uintptr_t*) &lengthArray[i]; + tagsBind[i].length = (uintptr_t *)&lengthArray[i]; curTags += lengthArray[i]; } - int32_t code = taos_stmt_set_tbname_tags((void*)stmt, name, tagsBind); + int32_t code = taos_stmt_set_tbname_tags((void *)stmt, name, tagsBind); - int32_t nTags = (int32_t) numOfTags; + int32_t nTags = (int32_t)numOfTags; jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags); tfree(tagsData); @@ -948,28 +994,28 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI } JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj, - jobjectArray lines, jlong conn) { + jobjectArray lines, jlong conn) { TAOS *taos = (TAOS *)conn; if (taos == NULL) { jniError("jobj:%p, connection already closed", jobj); return JNI_CONNECTION_NULL; } - int numLines = (*env)->GetArrayLength(env, lines); - char** c_lines = calloc(numLines, sizeof(char*)); + int numLines = (*env)->GetArrayLength(env, lines); + char **c_lines = calloc(numLines, sizeof(char *)); if (c_lines == NULL) { jniError("c_lines:%p, alloc memory failed", c_lines); return JNI_OUT_OF_MEMORY; } for (int i = 0; i < numLines; ++i) { - jstring line = (jstring) ((*env)->GetObjectArrayElement(env, lines, i)); - c_lines[i] = (char*)(*env)->GetStringUTFChars(env, line, 0); + jstring line = (jstring)((*env)->GetObjectArrayElement(env, lines, i)); + c_lines[i] = (char *)(*env)->GetStringUTFChars(env, line, 0); } int code = taos_insert_lines(taos, c_lines, numLines); for (int i = 0; i < numLines; ++i) { - jstring line = (jstring) ((*env)->GetObjectArrayElement(env, lines, i)); + jstring line = (jstring)((*env)->GetObjectArrayElement(env, lines, i)); (*env)->ReleaseStringUTFChars(env, line, c_lines[i]); } diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSetBlockData.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSetBlockData.java index 6211f61dc5..ff49677b01 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSetBlockData.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSetBlockData.java @@ -32,6 +32,7 @@ import java.util.List; import com.taosdata.jdbc.utils.NullType; public class TSDBResultSetBlockData { + private static final int BINARY_LENGTH_OFFSET = 2; private int numOfRows = 0; private int rowIndex = 0; @@ -404,10 +405,8 @@ public class TSDBResultSetBlockData { case TSDBConstants.TSDB_DATA_TYPE_BINARY: { ByteBuffer bb = (ByteBuffer) this.colData.get(col); - bb.position(fieldSize * this.rowIndex); - + bb.position((fieldSize + BINARY_LENGTH_OFFSET) * this.rowIndex); int length = bb.getShort(); - byte[] dest = new byte[length]; bb.get(dest, 0, length); if (NullType.isBinaryNull(dest, length)) { @@ -419,16 +418,13 @@ public class TSDBResultSetBlockData { case TSDBConstants.TSDB_DATA_TYPE_NCHAR: { ByteBuffer bb = (ByteBuffer) this.colData.get(col); - bb.position(fieldSize * this.rowIndex); - + bb.position((fieldSize + BINARY_LENGTH_OFFSET) * this.rowIndex); int length = bb.getShort(); - byte[] dest = new byte[length]; bb.get(dest, 0, length); if (NullType.isNcharNull(dest, length)) { return null; } - try { String charset = TaosGlobalConfig.getCharset(); return new String(dest, charset); From f61eb53f1bed7892423ad00f803f74a819bd9c2c Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 31 Aug 2021 10:58:20 +0800 Subject: [PATCH 12/20] Hotfix/sangshuduo/td 6294 taosdemo long arg (#7722) * [TD-6294]: taosdemo support long arg fix conflict with develop branch. * fix few words. * declare default child tables number. * add support email prompt. * support the way no space between param and value * fix uncatched status. * fix -PP arg. * fix total child tables bug. * fix taosdemo long args bug. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 120 +++++++++++++++++++----------------- 1 file changed, 63 insertions(+), 57 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 29443f4fd4..9ae70b384b 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -754,12 +754,11 @@ static void printHelp() { "Set the replica parameters of the database, Default 1, min: 1, max: 3."); printf("%s%s%s%s\n", indent, "-m, --table-prefix=TABLEPREFIX", "\t", "Table prefix name. Default is 'd'."); - printf("%s%s%s%s\n", indent, "-s, --sql-file=FILE", "\t\t", "The select sql file."); + printf("%s%s%s%s\n", indent, "-s, --sql-file=FILE", "\t\t", + "The select sql file."); printf("%s%s%s%s\n", indent, "-N, --normal-table", "\t\t", "Use normal table flag."); printf("%s%s%s%s\n", indent, "-o, --output=FILE", "\t\t", "Direct output to the named file. Default is './output.txt'."); - printf("%s%s%s%s\n", indent, "-s, --sql-file=FILE", "\t\t", - "The select sql file."); printf("%s%s%s%s\n", indent, "-q, --query-mode=MODE", "\t\t", "Query mode -- 0: SYNC, 1: ASYNC. Default is SYNC."); printf("%s%s%s%s\n", indent, "-b, --data-type=DATATYPE", "\t", @@ -831,6 +830,12 @@ static bool isStringNumber(char *input) return true; } +static void errorWrongValue(char *program, char *wrong_arg, char *wrong_value) +{ + fprintf(stderr, "%s %s: %s is an invalid value\n", program, wrong_arg, wrong_value); + fprintf(stderr, "Try `taosdemo --help' or `taosdemo --usage' for more information.\n"); +} + static void errorUnreconized(char *program, char *wrong_arg) { fprintf(stderr, "%s: unrecognized options '%s'\n", program, wrong_arg); @@ -900,7 +905,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } tstrncpy(configDir, argv[++i], TSDB_FILENAME_LEN); } else if (0 == strncmp(argv[i], "-c", strlen("-c"))) { - tstrncpy(configDir, (char *)(argv[i] + strlen("-")), TSDB_FILENAME_LEN); + tstrncpy(configDir, (char *)(argv[i] + strlen("-c")), TSDB_FILENAME_LEN); } else if (strlen("--config-dir") == strlen(argv[i])) { if (argc == i+1) { errorPrintReqArg3(argv[0], "--config-dir"); @@ -983,7 +988,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } else if (0 == strcasecmp(argv[i+1], "stmt")) { arguments->iface = STMT_IFACE; } else { - errorPrintReqArg(argv[0], "I"); + errorWrongValue(argv[0], "-I", argv[i+1]); exit(EXIT_FAILURE); } i++; @@ -1006,7 +1011,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } else if (0 == strcasecmp((char *)(argv[i] + strlen("-I")), "stmt")) { arguments->iface = STMT_IFACE; } else { - errorPrintReqArg3(argv[0], "-I"); + errorWrongValue(argv[0], "-I", + (char *)(argv[i] + strlen("-I"))); exit(EXIT_FAILURE); } } else if (strlen("--interface") == strlen(argv[i])) { @@ -1021,7 +1027,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } else if (0 == strcasecmp(argv[i+1], "stmt")) { arguments->iface = STMT_IFACE; } else { - errorPrintReqArg3(argv[0], "--interface"); + errorWrongValue(argv[0], "--interface", argv[i+1]); exit(EXIT_FAILURE); } i++; @@ -1094,9 +1100,9 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } arguments->sqlFile = argv[++i]; } else if (0 == strncmp(argv[i], "--sql-file=", strlen("--sql-file="))) { - arguments->host = (char *)(argv[i++] + strlen("--sql-file=")); + arguments->sqlFile = (char *)(argv[i++] + strlen("--sql-file=")); } else if (0 == strncmp(argv[i], "-s", strlen("-s"))) { - arguments->host = (char *)(argv[i++] + strlen("-s")); + arguments->sqlFile = (char *)(argv[i++] + strlen("-s")); } else if (strlen("--sql-file") == strlen(argv[i])) { if (argc == i+1) { errorPrintReqArg3(argv[0], "--sql-file"); @@ -1644,6 +1650,54 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->debug_print = true; } else if (strcmp(argv[i], "-gg") == 0) { arguments->verbose_print = true; + } else if ((0 == strncmp(argv[i], "-R", strlen("-R"))) + || (0 == strncmp(argv[i], "--disorder-range", + strlen("--disorder-range")))) { + if (strlen("-R") == strlen(argv[i])) { + if (argc == i+1) { + errorPrintReqArg(argv[0], "R"); + exit(EXIT_FAILURE); + } else if (!isStringNumber(argv[i+1])) { + errorPrintReqArg2(argv[0], "R"); + exit(EXIT_FAILURE); + } + arguments->disorderRange = atoi(argv[++i]); + } else if (0 == strncmp(argv[i], "--disorder-range=", + strlen("--disorder-range="))) { + if (isStringNumber((char *)(argv[i] + strlen("--disorder-range=")))) { + arguments->disorderRange = + atoi((char *)(argv[i]+strlen("--disorder-range="))); + } else { + errorPrintReqArg2(argv[0], "--disorder-range"); + exit(EXIT_FAILURE); + } + } else if (0 == strncmp(argv[i], "-R", strlen("-R"))) { + if (isStringNumber((char *)(argv[i] + strlen("-R")))) { + arguments->disorderRange = + atoi((char *)(argv[i]+strlen("-R"))); + } else { + errorPrintReqArg2(argv[0], "-R"); + exit(EXIT_FAILURE); + } + + if (arguments->disorderRange < 0) { + errorPrint("Invalid disorder range %d, will be set to %d\n", + arguments->disorderRange, 1000); + arguments->disorderRange = 1000; + } + } else if (strlen("--disorder-range") == strlen(argv[i])) { + if (argc == i+1) { + errorPrintReqArg3(argv[0], "--disorder-range"); + exit(EXIT_FAILURE); + } else if (!isStringNumber(argv[i+1])) { + errorPrintReqArg2(argv[0], "--disorder-range"); + exit(EXIT_FAILURE); + } + arguments->disorderRange = atoi(argv[++i]); + } else { + errorUnreconized(argv[0], argv[i]); + exit(EXIT_FAILURE); + } } else if ((0 == strncmp(argv[i], "-O", strlen("-O"))) || (0 == strncmp(argv[i], "--disorder", strlen("--disorder")))) { if (2 == strlen(argv[i])) { @@ -1694,54 +1748,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->disorderRatio, 0); arguments->disorderRatio = 0; } - } else if ((0 == strncmp(argv[i], "-R", strlen("-R"))) - || (0 == strncmp(argv[i], "--disorder-range", - strlen("--disorder-range")))) { - if (2 == strlen(argv[i])) { - if (argc == i+1) { - errorPrintReqArg(argv[0], "R"); - exit(EXIT_FAILURE); - } else if (!isStringNumber(argv[i+1])) { - errorPrintReqArg2(argv[0], "R"); - exit(EXIT_FAILURE); - } - arguments->disorderRange = atoi(argv[++i]); - } else if (0 == strncmp(argv[i], "--disorder-range=", - strlen("--disorder-range="))) { - if (isStringNumber((char *)(argv[i] + strlen("--disorder-range=")))) { - arguments->disorderRange = - atoi((char *)(argv[i]+strlen("--disorder-rnage="))); - } else { - errorPrintReqArg2(argv[0], "--disorder-range"); - exit(EXIT_FAILURE); - } - } else if (0 == strncmp(argv[i], "-R", strlen("-R"))) { - if (isStringNumber((char *)(argv[i] + strlen("-R")))) { - arguments->disorderRange = - atoi((char *)(argv[i]+strlen("-R"))); - } else { - errorPrintReqArg2(argv[0], "-R"); - exit(EXIT_FAILURE); - } - - if (arguments->disorderRange < 0) { - errorPrint("Invalid disorder range %d, will be set to %d\n", - arguments->disorderRange, 1000); - arguments->disorderRange = 1000; - } - } else if (strlen("--disorder-range") == strlen(argv[i])) { - if (argc == i+1) { - errorPrintReqArg3(argv[0], "--disorder-range"); - exit(EXIT_FAILURE); - } else if (!isStringNumber(argv[i+1])) { - errorPrintReqArg2(argv[0], "--disorder-range"); - exit(EXIT_FAILURE); - } - arguments->disorderRange = atoi(argv[++i]); - } else { - errorUnreconized(argv[0], argv[i]); - exit(EXIT_FAILURE); - } } else if ((0 == strncmp(argv[i], "-a", strlen("-a"))) || (0 == strncmp(argv[i], "--replica", strlen("--replica")))) { From 8f383b6f4324de1a10b23a3254ff963092060cbc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 31 Aug 2021 13:38:38 +0800 Subject: [PATCH 13/20] [TD-6481] The establishment of TCP connection failed due to too high concurrent query --- src/util/src/tsocket.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 77941cba82..8d69a87e77 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -488,7 +488,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { return -1; } - if (listen(sockFd, 10) < 0) { + if (listen(sockFd, 1024) < 0) { uError("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); taosCloseSocket(sockFd); return -1; From bfccafc73aea5b5b05808c654e9708c9f02285c1 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 31 Aug 2021 13:46:09 +0800 Subject: [PATCH 14/20] Hotfix/sangshuduo/td 6294 taosdemo long arg (#7729) * [TD-6294]: taosdemo support long arg fix conflict with develop branch. * fix few words. * declare default child tables number. * add support email prompt. * support the way no space between param and value * fix uncatched status. * fix -PP arg. * fix total child tables bug. * fix taosdemo long args bug. * fix default column number Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 9ae70b384b..87102cc1c7 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -105,7 +105,7 @@ extern char configDir[]; #define DEFAULT_TIMESTAMP_STEP 1 #define DEFAULT_INTERLACE_ROWS 0 -#define DEFAULT_DATATYPE_NUM 3 +#define DEFAULT_DATATYPE_NUM 1 #define DEFAULT_CHILDTABLES 10000 From c547b894d9cb89b32c42be4fa4459f697571923f Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Tue, 31 Aug 2021 15:38:18 +0800 Subject: [PATCH 15/20] [TD-6468]: add test case for jdbc stmt --- .../jdbc/TSDBPreparedStatementTest.java | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBPreparedStatementTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBPreparedStatementTest.java index 6bddd3f428..3d76e1f98d 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBPreparedStatementTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBPreparedStatementTest.java @@ -586,6 +586,130 @@ public class TSDBPreparedStatementTest { Assert.assertEquals(numOfRows, rows); } + @Test + public void bindDataQueryTest() throws SQLException { + Statement stmt = conn.createStatement(); + + stmt.execute("drop table if exists weather_test"); + stmt.execute("create table weather_test(ts timestamp, f1 nchar(10), f2 binary(10)) tags (t1 int, t2 binary(10))"); + + int numOfRows = 1; + + TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags(?,?) (ts, f2) values(?, ?)"); + s.setTableName("w2"); + s.setTagInt(0, 1); + s.setTagString(1, "test"); + + + ArrayList ts = new ArrayList<>(); + for (int i = 0; i < numOfRows; i++) { + ts.add(System.currentTimeMillis() + i); + } + s.setTimestamp(0, ts); + + ArrayList s2 = new ArrayList<>(); + for (int i = 0; i < numOfRows; i++) { + s2.add("test" + i % 4); + } + s.setString(1, s2, 10); + + s.columnDataAddBatch(); + s.columnDataExecuteBatch(); + s.columnDataCloseBatch(); + + String sql = "select * from weather_test where t1 >= ? and t1 <= ?"; + TSDBPreparedStatement s1 = (TSDBPreparedStatement) conn.prepareStatement(sql); + s1.setInt(1, 0); + s1.setInt(2, 10); + + ResultSet rs = s1.executeQuery(); + int rows = 0; + while (rs.next()) { + rows++; + } + Assert.assertEquals(numOfRows, rows); + } + + @Test + public void setTagNullTest()throws SQLException { + Statement stmt = conn.createStatement(); + + stmt.execute("drop table if exists weather_test"); + stmt.execute("create table weather_test(ts timestamp, c1 int) tags (t1 tinyint, t2 smallint, t3 int, t4 bigint, t5 float, t6 double, t7 bool, t8 binary(10), t9 nchar(10))"); + + int numOfRows = 1; + + TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags(?,?,?,?,?,?,?,?,?) values(?, ?)"); + s.setTableName("w3"); + s.setTagNull(0, TSDBConstants.TSDB_DATA_TYPE_TINYINT); + s.setTagNull(1, TSDBConstants.TSDB_DATA_TYPE_SMALLINT); + s.setTagNull(2, TSDBConstants.TSDB_DATA_TYPE_INT); + s.setTagNull(3, TSDBConstants.TSDB_DATA_TYPE_BIGINT); + s.setTagNull(4, TSDBConstants.TSDB_DATA_TYPE_FLOAT); + s.setTagNull(5, TSDBConstants.TSDB_DATA_TYPE_DOUBLE); + s.setTagNull(6, TSDBConstants.TSDB_DATA_TYPE_BOOL); + s.setTagNull(7, TSDBConstants.TSDB_DATA_TYPE_BINARY); + s.setTagNull(8, TSDBConstants.TSDB_DATA_TYPE_NCHAR); + + ArrayList ts = new ArrayList<>(); + for (int i = 0; i < numOfRows; i++) { + ts.add(System.currentTimeMillis() + i); + } + s.setTimestamp(0, ts); + + ArrayList s2 = new ArrayList<>(); + for (int i = 0; i < numOfRows; i++) { + s2.add(i); + } + s.setInt(1, s2); + + s.columnDataAddBatch(); + s.columnDataExecuteBatch(); + s.columnDataCloseBatch(); + } + + private String stringGenerator(int length) { + String source = "abcdefghijklmnopqrstuvwxyz"; + StringBuilder sb = new StringBuilder(); + Random rand = new Random(); + for(int i = 0; i < length; i++) { + sb.append(source.charAt(rand.nextInt(26))); + } + return sb.toString(); + } + + @Test(expected = SQLException.class) + public void setMaxTableNameTest()throws SQLException { + Statement stmt = conn.createStatement(); + + stmt.execute("drop table if exists weather_test"); + stmt.execute("create table weather_test(ts timestamp, c1 int) tags (t1 int)"); + + TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags(?) values(?, ?)"); + String tbname = stringGenerator(193); + s.setTableName(tbname); + s.setTagInt(0, 1); + + int numOfRows = 1; + + ArrayList ts = new ArrayList<>(); + for (int i = 0; i < numOfRows; i++) { + ts.add(System.currentTimeMillis() + i); + } + s.setTimestamp(0, ts); + + ArrayList s2 = new ArrayList<>(); + for (int i = 0; i < numOfRows; i++) { + s2.add(i); + } + s.setInt(1, s2); + + s.columnDataAddBatch(); + s.columnDataExecuteBatch(); + s.columnDataCloseBatch(); + } + + @Test(expected = SQLException.class) public void createTwoSameDbTest() throws SQLException { // when From 84a42afacdd4d6d99ee29a270c269f2752ede4bb Mon Sep 17 00:00:00 2001 From: Elias Soong Date: Tue, 31 Aug 2021 16:30:24 +0800 Subject: [PATCH 16/20] [TD-2639] : describe operator "is null / is not null". --- documentation20/cn/08.connector/01.java/docs.md | 2 +- documentation20/cn/08.connector/docs.md | 4 ++-- documentation20/cn/11.administrator/docs.md | 4 ++-- documentation20/cn/12.taos-sql/docs.md | 3 ++- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/documentation20/cn/08.connector/01.java/docs.md b/documentation20/cn/08.connector/01.java/docs.md index edd81a49cd..def8d4a905 100644 --- a/documentation20/cn/08.connector/01.java/docs.md +++ b/documentation20/cn/08.connector/01.java/docs.md @@ -46,7 +46,7 @@ TDengine 的 JDBC 驱动实现尽可能与关系型数据库驱动保持一致 -注意:与 JNI 方式不同,RESTful 接口是无状态的。在使用JDBC-RESTful时,需要在sql中指定表、超级表的数据库名称。(从 TDengine 2.1.8.0 版本开始,也可以在 RESTful url 中指定当前 SQL 语句所使用的默认数据库名。)例如: +注意:与 JNI 方式不同,RESTful 接口是无状态的。在使用JDBC-RESTful时,需要在sql中指定表、超级表的数据库名称。(从 TDengine 2.2.0.0 版本开始,也可以在 RESTful url 中指定当前 SQL 语句所使用的默认数据库名。)例如: ```sql INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('beijing') VALUES(now, 24.6); ``` diff --git a/documentation20/cn/08.connector/docs.md b/documentation20/cn/08.connector/docs.md index ea3f9a4d55..f5411bf80a 100644 --- a/documentation20/cn/08.connector/docs.md +++ b/documentation20/cn/08.connector/docs.md @@ -654,7 +654,7 @@ conn.close() 为支持各种不同类型平台的开发,TDengine 提供符合 REST 设计标准的 API,即 RESTful API。为最大程度降低学习成本,不同于其他数据库 RESTful API 的设计方法,TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。RESTful 连接器的使用参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。 -注意:与标准连接器的一个区别是,RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。(从 2.1.8.0 版本开始,支持在 RESTful url 中指定 db_name,这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 url 中指定的这个 db_name。) +注意:与标准连接器的一个区别是,RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。(从 2.2.0.0 版本开始,支持在 RESTful url 中指定 db_name,这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 url 中指定的这个 db_name。) ### 安装 @@ -695,7 +695,7 @@ http://:/rest/sql/[db_name] - fqnd: 集群中的任一台主机 FQDN 或 IP 地址 - port: 配置文件中 httpPort 配置项,缺省为 6041 -- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。(从 2.1.8.0 版本开始支持) +- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。(从 2.2.0.0 版本开始支持) 例如:http://h1.taos.com:6041/rest/sql/test 是指向地址为 h1.taos.com:6041 的 url,并将默认使用的数据库库名设置为 test。 diff --git a/documentation20/cn/11.administrator/docs.md b/documentation20/cn/11.administrator/docs.md index 29e49aa902..99953233f0 100644 --- a/documentation20/cn/11.administrator/docs.md +++ b/documentation20/cn/11.administrator/docs.md @@ -800,7 +800,7 @@ taos -n sync -P 6042 -h `taos -n speed -h -P 6030 -N 10 -l 10000000 -S TCP` -从 2.1.8.0 版本开始,taos 工具新提供了一个网络速度诊断的模式,可以对一个正在运行中的 taosd 实例或者 `taos -n server` 方式模拟的一个服务端实例,以非压缩传输的方式进行网络测速。这个模式下可供调整的参数如下: +从 2.2.0.0 版本开始,taos 工具新提供了一个网络速度诊断的模式,可以对一个正在运行中的 taosd 实例或者 `taos -n server` 方式模拟的一个服务端实例,以非压缩传输的方式进行网络测速。这个模式下可供调整的参数如下: -n:设为“speed”时,表示对网络速度进行诊断。 -h:所要连接的服务端的 FQDN 或 ip 地址。如果不设置这一项,会使用本机 taos.cfg 文件中 FQDN 参数的设置作为默认值。 @@ -813,7 +813,7 @@ taos -n sync -P 6042 -h `taos -n fqdn -h ` -从 2.1.8.0 版本开始,taos 工具新提供了一个 FQDN 解析速度的诊断模式,可以对一个目标 FQDN 地址尝试解析,并记录解析过程中所消耗的时间。这个模式下可供调整的参数如下: +从 2.2.0.0 版本开始,taos 工具新提供了一个 FQDN 解析速度的诊断模式,可以对一个目标 FQDN 地址尝试解析,并记录解析过程中所消耗的时间。这个模式下可供调整的参数如下: -n:设为“fqdn”时,表示对 FQDN 解析进行诊断。 -h:所要解析的目标 FQDN 地址。如果不设置这一项,会使用本机 taos.cfg 文件中 FQDN 参数的设置作为默认值。 diff --git a/documentation20/cn/12.taos-sql/docs.md b/documentation20/cn/12.taos-sql/docs.md index 9552a8fb2c..52ad98dcda 100644 --- a/documentation20/cn/12.taos-sql/docs.md +++ b/documentation20/cn/12.taos-sql/docs.md @@ -713,6 +713,7 @@ Query OK, 1 row(s) in set (0.001091s) | <= | smaller than or equal to | **`timestamp`** and all numeric types | | = | equal to | all types | | <> | not equal to | all types | +| is [not] null | is null or is not null | all types | | between and | within a certain range | **`timestamp`** and all numeric types | | in | match any value in a set | all types except first column `timestamp` | | like | match a wildcard string | **`binary`** **`nchar`** | @@ -722,7 +723,7 @@ Query OK, 1 row(s) in set (0.001091s) 1. <> 算子也可以写为 != ,请注意,这个算子不能用于数据表第一列的 timestamp 字段。 2. like 算子使用通配符字符串进行匹配检查。 * 在通配符字符串中:'%'(百分号)匹配 0 到任意个字符;'\_'(下划线)匹配单个任意字符。 - * 如果希望匹配字符串中原本就带有的 \_(下划线)字符,那么可以在通配符字符串中写作 `\_`,也即加一个反斜线来进行转义。(从 2.1.8.0 版本开始支持) + * 如果希望匹配字符串中原本就带有的 \_(下划线)字符,那么可以在通配符字符串中写作 `\_`,也即加一个反斜线来进行转义。(从 2.2.0.0 版本开始支持) * 通配符字符串最长不能超过 20 字节。(从 2.1.6.1 版本开始,通配符字符串的长度放宽到了 100 字节,并可以通过 taos.cfg 中的 maxWildCardsLength 参数来配置这一长度限制。但不建议使用太长的通配符字符串,将有可能严重影响 LIKE 操作的执行性能。) 3. 同时进行多个字段的范围过滤,需要使用关键词 AND 来连接不同的查询条件,暂不支持 OR 连接的不同列之间的查询过滤条件。 4. 针对单一字段的过滤,如果是时间过滤条件,则一条语句中只支持设定一个;但针对其他的(普通)列或标签列,则可以使用 `OR` 关键字进行组合条件的查询过滤。例如: `((value > 20 AND value < 30) OR (value < 12))`。 From c014347dc2a74c6811a4efa35c8ecf2175dc1a39 Mon Sep 17 00:00:00 2001 From: Elias Soong Date: Tue, 31 Aug 2021 17:06:49 +0800 Subject: [PATCH 17/20] [TS-2] : describe nested query. --- documentation20/cn/12.taos-sql/docs.md | 28 ++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/documentation20/cn/12.taos-sql/docs.md b/documentation20/cn/12.taos-sql/docs.md index 52ad98dcda..93f5e8f56c 100644 --- a/documentation20/cn/12.taos-sql/docs.md +++ b/documentation20/cn/12.taos-sql/docs.md @@ -717,8 +717,6 @@ Query OK, 1 row(s) in set (0.001091s) | between and | within a certain range | **`timestamp`** and all numeric types | | in | match any value in a set | all types except first column `timestamp` | | like | match a wildcard string | **`binary`** **`nchar`** | -| % | match with any char sequences | **`binary`** **`nchar`** | -| _ | match with a single char | **`binary`** **`nchar`** | 1. <> 算子也可以写为 != ,请注意,这个算子不能用于数据表第一列的 timestamp 字段。 2. like 算子使用通配符字符串进行匹配检查。 @@ -730,6 +728,32 @@ Query OK, 1 row(s) in set (0.001091s) 5. 从 2.0.17.0 版本开始,条件过滤开始支持 BETWEEN AND 语法,例如 `WHERE col2 BETWEEN 1.5 AND 3.25` 表示查询条件为“1.5 ≤ col2 ≤ 3.25”。 6. 从 2.1.4.0 版本开始,条件过滤开始支持 IN 算子,例如 `WHERE city IN ('Beijing', 'Shanghai')`。说明:BOOL 类型写作 `{true, false}` 或 `{0, 1}` 均可,但不能写作 0、1 之外的整数;FLOAT 和 DOUBLE 类型会受到浮点数精度影响,集合内的值在精度范围内认为和数据行的值完全相等才能匹配成功;TIMESTAMP 类型支持非主键的列。 + +### 嵌套查询 + +“嵌套查询”又称为“子查询”,也即在一条 SQL 语句中,“内层查询”的计算结果可以作为“外层查询”的计算对象来使用。 + +从 2.2.0.0 版本开始,TDengine 的查询引擎开始支持在 FROM 子句中使用非关联子查询(“非关联”的意思是,子查询不会用到父查询中的参数)。也即在普通 SELECT 语句的 tb_name_list 位置,用一个独立的 SELECT 语句来代替(这一 SELECT 语句被包含在英文圆括号内),于是完整的嵌套查询 SQL 语句形如: + +```mysql +SELECT ... FROM (SELECT ... FROM ...) ...; +``` + +说明: +1. 目前仅支持一层嵌套,也即不能在子查询中再嵌入子查询。 +2. 内层查询的返回结果将作为“虚拟表”供外层查询使用,此虚拟表可以使用 AS 语法做重命名,以便于外层查询中方便引用。 +3. 目前不能在“连续查询”功能中使用子查询。 +4. 在内层和外层查询中,都支持普通的表间/超级表间 JOIN。内层查询的计算结果也可以再参与数据子表的 JOIN 操作。 +5. 目前内层查询、外层查询均不支持 UNION 操作。 +6. 内层查询支持的功能特性与非嵌套的查询语句能力是一致的。 + * 内层查询的 ORDER BY 子句一般没有意义,建议避免这样的写法以免无谓的资源消耗。 +7. 与非嵌套的查询语句相比,外层查询所能支持的功能特性存在如下限制: + * 计算函数部分: + 1. 如果内层查询的结果数据未提供时间戳,那么计算过程依赖时间戳的函数在外层会无法正常工作。例如:TOP, BOTTOM, FIRST, LAST, DIFF。 + 2. 计算过程需要两遍扫描的函数,在外层查询中无法正常工作。例如:此类函数包括:STDDEV, PERCENTILE。 + * 外层查询中不支持 IN 算子,但在内层中可以使用。 + * 外层查询不支持 GROUP BY。 + ### UNION ALL 操作符 From 4d9f80d1558dc42dd41cd73cb727f92d3c2c90eb Mon Sep 17 00:00:00 2001 From: Elias Soong Date: Tue, 31 Aug 2021 17:46:27 +0800 Subject: [PATCH 18/20] [TD-2581] : support 'OR' join query conditions between columns. --- documentation20/cn/12.taos-sql/docs.md | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/documentation20/cn/12.taos-sql/docs.md b/documentation20/cn/12.taos-sql/docs.md index 93f5e8f56c..dc20054e3c 100644 --- a/documentation20/cn/12.taos-sql/docs.md +++ b/documentation20/cn/12.taos-sql/docs.md @@ -724,7 +724,9 @@ Query OK, 1 row(s) in set (0.001091s) * 如果希望匹配字符串中原本就带有的 \_(下划线)字符,那么可以在通配符字符串中写作 `\_`,也即加一个反斜线来进行转义。(从 2.2.0.0 版本开始支持) * 通配符字符串最长不能超过 20 字节。(从 2.1.6.1 版本开始,通配符字符串的长度放宽到了 100 字节,并可以通过 taos.cfg 中的 maxWildCardsLength 参数来配置这一长度限制。但不建议使用太长的通配符字符串,将有可能严重影响 LIKE 操作的执行性能。) 3. 同时进行多个字段的范围过滤,需要使用关键词 AND 来连接不同的查询条件,暂不支持 OR 连接的不同列之间的查询过滤条件。 + * 从 2.3.0.0 版本开始,已支持完整的同一列和/或不同列间的 AND/OR 运算。 4. 针对单一字段的过滤,如果是时间过滤条件,则一条语句中只支持设定一个;但针对其他的(普通)列或标签列,则可以使用 `OR` 关键字进行组合条件的查询过滤。例如: `((value > 20 AND value < 30) OR (value < 12))`。 + * 从 2.3.0.0 版本开始,允许使用多个时间过滤条件,但首列时间戳的过滤运算结果只能包含一个区间。 5. 从 2.0.17.0 版本开始,条件过滤开始支持 BETWEEN AND 语法,例如 `WHERE col2 BETWEEN 1.5 AND 3.25` 表示查询条件为“1.5 ≤ col2 ≤ 3.25”。 6. 从 2.1.4.0 版本开始,条件过滤开始支持 IN 算子,例如 `WHERE city IN ('Beijing', 'Shanghai')`。说明:BOOL 类型写作 `{true, false}` 或 `{0, 1}` 均可,但不能写作 0、1 之外的整数;FLOAT 和 DOUBLE 类型会受到浮点数精度影响,集合内的值在精度范围内认为和数据行的值完全相等才能匹配成功;TIMESTAMP 类型支持非主键的列。 @@ -1458,17 +1460,19 @@ SELECT AVG(current), MAX(current), LEASTSQUARES(current, start_val, step_val), P - SELECT 语句的查询结果,最多允许返回 1024 列(语句中的函数调用可能也会占用一些列空间),超限时需要显式指定较少的返回数据列,以避免语句执行报错。 - 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制。 -## TAOS SQL其他约定 +## TAOS SQL 其他约定 **GROUP BY的限制** -TAOS SQL支持对标签、TBNAME进行GROUP BY操作,也支持普通列进行GROUP BY,前提是:仅限一列且该列的唯一值小于10万个。 +TAOS SQL 支持对标签、TBNAME 进行 GROUP BY 操作,也支持普通列进行 GROUP BY,前提是:仅限一列且该列的唯一值小于 10 万个。 -**JOIN操作的限制** +**JOIN 操作的限制** -TAOS SQL支持表之间按主键时间戳来join两张表的列,暂不支持两个表之间聚合后的四则运算。 +TAOS SQL 支持表之间按主键时间戳来 join 两张表的列,暂不支持两个表之间聚合后的四则运算。 -**IS NOT NULL与不为空的表达式适用范围** +JOIN 查询的不同表的过滤条件之间不能为 OR。 -IS NOT NULL支持所有类型的列。不为空的表达式为 <>"",仅对非数值类型的列适用。 +**IS NOT NULL 与不为空的表达式适用范围** + +IS NOT NULL 支持所有类型的列。不为空的表达式为 <>"",仅对非数值类型的列适用。 From f506afb2697a5516edb9ae956252f9370e14b557 Mon Sep 17 00:00:00 2001 From: xiaolei li <85657333+xleili@users.noreply.github.com> Date: Wed, 1 Sep 2021 00:16:14 +0800 Subject: [PATCH 19/20] Nodejs/xiaolei/td 2979 (#7369) * nodejs/xiaolei/TD-2979-test-1970 * nodejs/xiaolei/TD-2979-test-1970 * nodejs/xiaolei/TD-2979-test-1970 * nodejs 1970 test&dry * Jenkins&test1970 * ci fix test1970 * ci fix twice * fix test1970 ci 3rd --- Jenkinsfile | 3 +- tests/examples/nodejs/test1970.js | 125 ++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 tests/examples/nodejs/test1970.js diff --git a/Jenkinsfile b/Jenkinsfile index e9ea8bafd3..deee5f473b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -234,6 +234,7 @@ pipeline { cd ${WKC}/tests/examples/nodejs npm install td2.0-connector > /dev/null 2>&1 node nodejsChecker.js host=localhost + node test1970.js ''' sh ''' cd ${WKC}/tests/examples/C#/taosdemo @@ -451,4 +452,4 @@ pipeline { ) } } -} \ No newline at end of file +} diff --git a/tests/examples/nodejs/test1970.js b/tests/examples/nodejs/test1970.js new file mode 100644 index 0000000000..5177a7371e --- /dev/null +++ b/tests/examples/nodejs/test1970.js @@ -0,0 +1,125 @@ +const taos = require('td2.0-connector'); +var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:0}) +var c1 = conn.cursor(); // Initializing a new cursor + +let stime = new Date(); +let interval = 1000; + +function convertDateToTS(date) { + let tsArr = date.toISOString().split("T") + return "\"" + tsArr[0] + " " + tsArr[1].substring(0, tsArr[1].length - 1) + "\""; +} + +function R(l, r) { + return Math.random() * (r - l) - r; +} + +function randomBool() { + if (Math.random() < 0.5) { + return true; + } + return false; +} + +// Initialize +const dbname = "nodejs_1970_db"; +const tbname = "t1"; + +let dropDB = "drop database if exists " + dbname +console.log(dropDB);//asdasdasd +c1.execute(dropDB);///asdasd + +let createDB = "create database " + dbname + " keep 36500" +console.log(createDB); +c1.execute(createDB); + +let useTbl = "use " + dbname +console.log(useTbl) +c1.execute(useTbl); + +let createTbl = "create table if not exists " + tbname + "(ts timestamp,id int)" +console.log(createTbl); +c1.execute(createTbl); + +//1969-12-31 23:59:59.999 +//1970-01-01 00:00:00.000 +//1970-01-01 07:59:59.999 +//1970-01-01 08:00:00.000a +//1628928479484 2021-08-14 08:07:59.484 +let sql1 = "insert into " + dbname + "." + tbname + " values('1969-12-31 23:59:59.999',1)" +console.log(sql1); +c1.execute(sql1); + +let sql2 = "insert into " + dbname + "." + tbname + " values('1970-01-01 00:00:00.000',2)" +console.log(sql2); +c1.execute(sql2); + +let sql3 = "insert into " + dbname + "." + tbname + " values('1970-01-01 07:59:59.999',3)" +console.log(sql3); +c1.execute(sql3); + +let sql4 = "insert into " + dbname + "." + tbname + " values('1970-01-01 08:00:00.000',4)" +console.log(sql4); +c1.execute(sql4); + +let sql5 = "insert into " + dbname + "." + tbname + " values('2021-08-14 08:07:59.484',5)" +console.log(sql5); +c1.execute(sql5); + +// Select +let query1 = "select * from " + dbname + "." + tbname +console.log(query1); +c1.execute(query1); + +var d = c1.fetchall(); +console.log(c1.fields); +for (let i = 0; i < d.length; i++) + console.log(d[i][0].valueOf()); + +//initialize +let initSql1 = "drop table if exists " + tbname +console.log(initSql1); +c1.execute(initSql1); + +console.log(createTbl); +c1.execute(createTbl); +c1.execute(useTbl) + +//-28800001 1969-12-31 23:59:59.999 +//-28800000 1970-01-01 00:00:00.000 +//-1 1970-01-01 07:59:59.999 +//0 1970-01-01 08:00:00.00 +//1628928479484 2021-08-14 08:07:59.484 +let sql11 = "insert into " + dbname + "." + tbname + " values(-28800001,11)"; +console.log(sql11); +c1.execute(sql11); + +let sql12 = "insert into " + dbname + "." + tbname + " values(-28800000,12)" +console.log(sql12); +c1.execute(sql12); + +let sql13 = "insert into " + dbname + "." + tbname + " values(-1,13)" +console.log(sql13); +c1.execute(sql13); + +let sql14 = "insert into " + dbname + "." + tbname + " values(0,14)" +console.log(sql14); +c1.execute(sql14); + +let sql15 = "insert into " + dbname + "." + tbname + " values(1628928479484,15)" +console.log(sql15); +c1.execute(sql15); + +// Select +console.log(query1); +c1.execute(query1); + +var d = c1.fetchall(); +console.log(c1.fields); +for (let i = 0; i < d.length; i++) + console.log(d[i][0].valueOf()); + +setTimeout(function () { + conn.close(); +}, 2000); + From ea87f4f92bfecfd4ac2927580c0dc20314eb6977 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 31 Aug 2021 13:23:06 +0800 Subject: [PATCH 20/20] [TD-6474]: fix coredump when merging empty block --- src/common/src/tdataformat.c | 15 +++++++++----- src/tsdb/src/tsdbCommit.c | 39 ++++++++++++++++-------------------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index aa60803dac..61378c79c4 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -448,6 +448,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols int dcol = 0; while (dcol < pCols->numOfCols) { + bool setCol = 0; SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= schemaNCols(pSchema)) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); @@ -458,13 +459,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols STColumn *pRowCol = schemaColAt(pSchema, rcol); if (pRowCol->colId == pDataCol->colId) { void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); + if(!isNull(value, pDataCol->type)) setCol = 1; dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dcol++; rcol++; } else if (pRowCol->colId < pDataCol->colId) { rcol++; } else { - if(forceSetNull) { + if(forceSetNull || setCol) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } dcol++; @@ -482,6 +484,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo int nRowCols = kvRowNCols(row); while (dcol < pCols->numOfCols) { + bool setCol = 0; SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); @@ -493,13 +496,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo if (colIdx->colId == pDataCol->colId) { void *value = tdGetKvRowDataOfCol(row, colIdx->offset); + if(!isNull(value, pDataCol->type)) setCol = 1; dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); ++dcol; ++rcol; } else if (colIdx->colId < pDataCol->colId) { ++rcol; } else { - if (forceSetNull) { + if(forceSetNull || setCol) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } ++dcol; @@ -518,7 +522,6 @@ void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, b } } -//TODO: refactor this function to eliminate additional memory copy int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(target->numOfCols == source->numOfCols); @@ -534,7 +537,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int * ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); for (int i = 0; i < rowsToMerge; i++) { for (int j = 0; j < source->numOfCols; j++) { - if (source->cols[j].len > 0) { + if (source->cols[j].len > 0 || target->cols[j].len > 0) { dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows, target->maxPoints); } @@ -578,7 +581,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i if (key1 < key2) { for (int i = 0; i < src1->numOfCols; i++) { ASSERT(target->cols[i].type == src1->cols[i].type); - if (src1->cols[i].len > 0) { + if (src1->cols[i].len > 0 || target->cols[i].len > 0) { dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, target->maxPoints); } @@ -596,6 +599,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, target->maxPoints); + } else if(target->cols[i].len > 0) { + dataColSetNullAt(&target->cols[i], target->numOfRows); } } target->numOfRows++; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 15fc3cc47d..0311048780 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -1418,13 +1418,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt while (true) { key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); - bool isRowDel = false; SMemRow row = tsdbNextIterRow(pCommitIter->pIter); if (row == NULL || memRowKey(row) > maxKey) { key2 = INT64_MAX; } else { key2 = memRowKey(row); - isRowDel = memRowDeleted(row); } if (key1 == INT64_MAX && key2 == INT64_MAX) break; @@ -1439,36 +1437,33 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt pTarget->numOfRows++; (*iter)++; } else if (key1 > key2) { - if (!isRowDel) { - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendMemRowToDataCol(row, pSchema, pTarget, true); + if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); + ASSERT(pSchema != NULL); } + tdAppendMemRowToDataCol(row, pSchema, pTarget, true); + tSkipListIterNext(pCommitIter->pIter); } else { - if (update) { - if (!isRowDel) { - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); - } - } else { - ASSERT(!isRowDel); - + if (update != TD_ROW_OVERWRITE_UPDATE) { + //copy disk data for (int i = 0; i < pDataCols->numOfCols; i++) { //TODO: dataColAppendVal may fail dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, pTarget->maxPoints); } - pTarget->numOfRows++; + if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; + } + if (update != TD_ROW_DISCARD_UPDATE) { + //copy mem data + if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); + ASSERT(pSchema != NULL); + } + + tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); } (*iter)++; tSkipListIterNext(pCommitIter->pIter);