Merge pull request #17887 from taosdata/enh/add_reqId_interface
enh: add req_id interface
This commit is contained in:
commit
b8dd38430f
|
@ -15,6 +15,7 @@ IF (TD_LINUX)
|
||||||
add_executable(tmq "")
|
add_executable(tmq "")
|
||||||
add_executable(stream_demo "")
|
add_executable(stream_demo "")
|
||||||
add_executable(demoapi "")
|
add_executable(demoapi "")
|
||||||
|
add_executable(api_reqid "")
|
||||||
|
|
||||||
target_sources(tmq
|
target_sources(tmq
|
||||||
PRIVATE
|
PRIVATE
|
||||||
|
@ -31,6 +32,12 @@ IF (TD_LINUX)
|
||||||
"demoapi.c"
|
"demoapi.c"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
target_sources(api_reqid
|
||||||
|
PRIVATE
|
||||||
|
"api_with_reqid_test.c"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
target_link_libraries(tmq
|
target_link_libraries(tmq
|
||||||
taos_static
|
taos_static
|
||||||
)
|
)
|
||||||
|
@ -43,6 +50,11 @@ IF (TD_LINUX)
|
||||||
taos_static
|
taos_static
|
||||||
)
|
)
|
||||||
|
|
||||||
|
target_link_libraries(api_reqid
|
||||||
|
taos_static
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
target_include_directories(tmq
|
target_include_directories(tmq
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/os"
|
PUBLIC "${TD_SOURCE_DIR}/include/os"
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
|
@ -58,9 +70,16 @@ IF (TD_LINUX)
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
target_include_directories(api_reqid
|
||||||
|
PUBLIC "${TD_SOURCE_DIR}/include/client"
|
||||||
|
PUBLIC "${TD_SOURCE_DIR}/include/os"
|
||||||
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
|
)
|
||||||
|
|
||||||
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
|
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
|
||||||
SET_TARGET_PROPERTIES(stream_demo PROPERTIES OUTPUT_NAME stream_demo)
|
SET_TARGET_PROPERTIES(stream_demo PROPERTIES OUTPUT_NAME stream_demo)
|
||||||
SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi)
|
SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi)
|
||||||
|
SET_TARGET_PROPERTIES(api_reqid PROPERTIES OUTPUT_NAME api_reqid)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
IF (TD_DARWIN)
|
IF (TD_DARWIN)
|
||||||
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
|
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
|
||||||
|
|
|
@ -0,0 +1,449 @@
|
||||||
|
// sample code to verify all TDengine API
|
||||||
|
// to compile: gcc -o apitest apitest.c -ltaos
|
||||||
|
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include "taos.h"
|
||||||
|
static int64_t count = 10000;
|
||||||
|
|
||||||
|
int64_t genReqid() {
|
||||||
|
count += 100;
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
static void prepare_data(TAOS* taos) {
|
||||||
|
TAOS_RES* result;
|
||||||
|
result = taos_query_with_reqid(taos, "drop database if exists test;", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
usleep(100000);
|
||||||
|
result = taos_query_with_reqid(taos, "create database test precision 'us';", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
usleep(100000);
|
||||||
|
taos_select_db(taos, "test");
|
||||||
|
|
||||||
|
result = taos_query_with_reqid(taos, "create table meters(ts timestamp, a int) tags(area int);", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
|
||||||
|
result = taos_query_with_reqid(taos, "create table t0 using meters tags(0);", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
result = taos_query_with_reqid(taos, "create table t1 using meters tags(1);", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
result = taos_query_with_reqid(taos, "create table t2 using meters tags(2);", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
result = taos_query_with_reqid(taos, "create table t3 using meters tags(3);", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
result = taos_query_with_reqid(taos, "create table t4 using meters tags(4);", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
result = taos_query_with_reqid(taos, "create table t5 using meters tags(5);", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
result = taos_query_with_reqid(taos, "create table t6 using meters tags(6);", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
result = taos_query_with_reqid(taos, "create table t7 using meters tags(7);", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
result = taos_query_with_reqid(taos, "create table t8 using meters tags(8);", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
result = taos_query(taos, "create table t9 using meters tags(9);");
|
||||||
|
taos_free_result(result);
|
||||||
|
|
||||||
|
result = taos_query_with_reqid(taos,
|
||||||
|
"insert into t0 values('2020-01-01 00:00:00.000', 0)"
|
||||||
|
" ('2020-01-01 00:01:00.000', 0)"
|
||||||
|
" ('2020-01-01 00:02:00.000', 0)"
|
||||||
|
" t1 values('2020-01-01 00:00:00.000', 0)"
|
||||||
|
" ('2020-01-01 00:01:00.000', 0)"
|
||||||
|
" ('2020-01-01 00:02:00.000', 0)"
|
||||||
|
" ('2020-01-01 00:03:00.000', 0)"
|
||||||
|
" t2 values('2020-01-01 00:00:00.000', 0)"
|
||||||
|
" ('2020-01-01 00:01:00.000', 0)"
|
||||||
|
" ('2020-01-01 00:01:01.000', 0)"
|
||||||
|
" ('2020-01-01 00:01:02.000', 0)"
|
||||||
|
" t3 values('2020-01-01 00:01:02.000', 0)"
|
||||||
|
" t4 values('2020-01-01 00:01:02.000', 0)"
|
||||||
|
" t5 values('2020-01-01 00:01:02.000', 0)"
|
||||||
|
" t6 values('2020-01-01 00:01:02.000', 0)"
|
||||||
|
" t7 values('2020-01-01 00:01:02.000', 0)"
|
||||||
|
" t8 values('2020-01-01 00:01:02.000', 0)"
|
||||||
|
" t9 values('2020-01-01 00:01:02.000', 0)",
|
||||||
|
genReqid());
|
||||||
|
int affected = taos_affected_rows(result);
|
||||||
|
if (affected != 18) {
|
||||||
|
printf("\033[31m%d rows affected by last insert statement, but it should be 18\033[0m\n", affected);
|
||||||
|
}
|
||||||
|
taos_free_result(result);
|
||||||
|
// super tables subscription
|
||||||
|
usleep(1000000);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int print_result(TAOS_RES* res, int blockFetch) {
|
||||||
|
TAOS_ROW row = NULL;
|
||||||
|
int num_fields = taos_num_fields(res);
|
||||||
|
TAOS_FIELD* fields = taos_fetch_fields(res);
|
||||||
|
int nRows = 0;
|
||||||
|
|
||||||
|
if (blockFetch) {
|
||||||
|
int rows = 0;
|
||||||
|
while ((rows = taos_fetch_block(res, &row))) {
|
||||||
|
// for (int i = 0; i < rows; i++) {
|
||||||
|
// char temp[256];
|
||||||
|
// taos_print_row(temp, row + i, fields, num_fields);
|
||||||
|
// puts(temp);
|
||||||
|
// }
|
||||||
|
nRows += rows;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
while ((row = taos_fetch_row(res))) {
|
||||||
|
char temp[256] = {0};
|
||||||
|
taos_print_row(temp, row, fields, num_fields);
|
||||||
|
puts(temp);
|
||||||
|
nRows++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("%d rows consumed.\n", nRows);
|
||||||
|
return nRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void check_row_count(int line, TAOS_RES* res, int expected) {
|
||||||
|
int actual = print_result(res, expected % 2);
|
||||||
|
if (actual != expected) {
|
||||||
|
printf("\033[31mline %d: row count mismatch, expected: %d, actual: %d\033[0m\n", line, expected, actual);
|
||||||
|
} else {
|
||||||
|
printf("line %d: %d rows consumed as expected\n", line, actual);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void verify_query(TAOS* taos) {
|
||||||
|
prepare_data(taos);
|
||||||
|
|
||||||
|
int code = taos_load_table_info(taos, "t0,t1,t2,t3,t4,t5,t6,t7,t8,t9");
|
||||||
|
if (code != 0) {
|
||||||
|
printf("\033[31mfailed to load table info: 0x%08x\033[0m\n", code);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = taos_validate_sql(taos, "select * from nonexisttable");
|
||||||
|
if (code == 0) {
|
||||||
|
printf("\033[31mimpossible, the table does not exists\033[0m\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
code = taos_validate_sql(taos, "select * from meters");
|
||||||
|
if (code != 0) {
|
||||||
|
printf("\033[31mimpossible, the table does exists: 0x%08x\033[0m\n", code);
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_RES* res = taos_query_with_reqid(taos, "select * from meters", genReqid());
|
||||||
|
check_row_count(__LINE__, res, 18);
|
||||||
|
printf("result precision is: %d\n", taos_result_precision(res));
|
||||||
|
int c = taos_field_count(res);
|
||||||
|
printf("field count is: %d\n", c);
|
||||||
|
int* lengths = taos_fetch_lengths(res);
|
||||||
|
for (int i = 0; i < c; i++) {
|
||||||
|
printf("length of column %d is %d\n", i, lengths[i]);
|
||||||
|
}
|
||||||
|
taos_free_result(res);
|
||||||
|
|
||||||
|
res = taos_query_with_reqid(taos, "select * from t0", genReqid());
|
||||||
|
check_row_count(__LINE__, res, 3);
|
||||||
|
taos_free_result(res);
|
||||||
|
|
||||||
|
res = taos_query_with_reqid(taos, "select * from nonexisttable", genReqid());
|
||||||
|
code = taos_errno(res);
|
||||||
|
printf("code=%d, error msg=%s\n", code, taos_errstr(res));
|
||||||
|
taos_free_result(res);
|
||||||
|
|
||||||
|
res = taos_query_with_reqid(taos, "select * from meters", genReqid());
|
||||||
|
taos_stop_query(res);
|
||||||
|
taos_free_result(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
void retrieve_callback(void* param, TAOS_RES* tres, int numOfRows) {
|
||||||
|
if (numOfRows > 0) {
|
||||||
|
printf("%d rows async retrieved\n", numOfRows);
|
||||||
|
taos_fetch_rows_a(tres, retrieve_callback, param);
|
||||||
|
} else {
|
||||||
|
if (numOfRows < 0) {
|
||||||
|
printf("\033[31masync retrieve failed, code: %d\033[0m\n", numOfRows);
|
||||||
|
} else {
|
||||||
|
printf("async retrieve completed\n");
|
||||||
|
}
|
||||||
|
taos_free_result(tres);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void select_callback(void* param, TAOS_RES* tres, int code) {
|
||||||
|
if (code == 0 && tres) {
|
||||||
|
taos_fetch_rows_a(tres, retrieve_callback, param);
|
||||||
|
} else {
|
||||||
|
printf("\033[31masync select failed, code: %d\033[0m\n", code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void verify_async(TAOS* taos) {
|
||||||
|
prepare_data(taos);
|
||||||
|
taos_query_a_with_reqid(taos, "select * from meters", select_callback, NULL, genReqid());
|
||||||
|
usleep(1000000);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t verify_schema_less(TAOS* taos) {
|
||||||
|
TAOS_RES* result;
|
||||||
|
result = taos_query_with_reqid(taos, "drop database if exists test;", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
usleep(100000);
|
||||||
|
result = taos_query_with_reqid(taos, "create database test precision 'us' update 1;", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
usleep(100000);
|
||||||
|
|
||||||
|
taos_select_db(taos, "test");
|
||||||
|
result = taos_query_with_reqid(taos, "create stable ste(ts timestamp, f int) tags(t1 bigint)", genReqid());
|
||||||
|
taos_free_result(result);
|
||||||
|
usleep(100000);
|
||||||
|
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
|
char* lines[] = {
|
||||||
|
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
|
||||||
|
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns",
|
||||||
|
"ste,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1626056811823316532ns",
|
||||||
|
"st,t1=4i64,t2=5f64,t3=\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000ns",
|
||||||
|
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532ns",
|
||||||
|
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32i8,c6=64i16,c7=32i32,c8=88.88f32 1626056812843316532ns",
|
||||||
|
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 "
|
||||||
|
"1626006933640000000ns",
|
||||||
|
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 "
|
||||||
|
"1626006933640000000ns",
|
||||||
|
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 "
|
||||||
|
"1626006933641000000ns"};
|
||||||
|
|
||||||
|
taos_select_db(taos, "test");
|
||||||
|
|
||||||
|
TAOS_RES* res = taos_schemaless_insert_with_reqid(taos, lines, sizeof(lines) / sizeof(char*), TSDB_SML_LINE_PROTOCOL,
|
||||||
|
TSDB_SML_TIMESTAMP_NOT_CONFIGURED, genReqid());
|
||||||
|
if (taos_errno(res) != 0) {
|
||||||
|
printf("failed to insert schema-less data, reason: %s\n", taos_errstr(res));
|
||||||
|
} else {
|
||||||
|
int affectedRow = taos_affected_rows(res);
|
||||||
|
printf("successfully inserted %d rows\n", affectedRow);
|
||||||
|
}
|
||||||
|
taos_free_result(res);
|
||||||
|
|
||||||
|
return (code);
|
||||||
|
}
|
||||||
|
|
||||||
|
void veriry_stmt(TAOS* taos) {
|
||||||
|
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
|
||||||
|
taos_free_result(result);
|
||||||
|
usleep(100000);
|
||||||
|
result = taos_query(taos, "create database test;");
|
||||||
|
|
||||||
|
int code = taos_errno(result);
|
||||||
|
if (code != 0) {
|
||||||
|
printf("\033[31mfailed to create database, reason:%s\033[0m\n", taos_errstr(result));
|
||||||
|
taos_free_result(result);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taos_free_result(result);
|
||||||
|
|
||||||
|
usleep(100000);
|
||||||
|
taos_select_db(taos, "test");
|
||||||
|
|
||||||
|
// create table
|
||||||
|
const char* sql =
|
||||||
|
"create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin "
|
||||||
|
"binary(40), blob nchar(10))";
|
||||||
|
result = taos_query(taos, sql);
|
||||||
|
code = taos_errno(result);
|
||||||
|
if (code != 0) {
|
||||||
|
printf("\033[31mfailed to create table, reason:%s\033[0m\n", taos_errstr(result));
|
||||||
|
taos_free_result(result);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taos_free_result(result);
|
||||||
|
|
||||||
|
// insert 10 records
|
||||||
|
struct {
|
||||||
|
int64_t ts[10];
|
||||||
|
int8_t b[10];
|
||||||
|
int8_t v1[10];
|
||||||
|
int16_t v2[10];
|
||||||
|
int32_t v4[10];
|
||||||
|
int64_t v8[10];
|
||||||
|
float f4[10];
|
||||||
|
double f8[10];
|
||||||
|
char bin[10][40];
|
||||||
|
char blob[10][80];
|
||||||
|
} v;
|
||||||
|
|
||||||
|
int32_t* t8_len = malloc(sizeof(int32_t) * 10);
|
||||||
|
int32_t* t16_len = malloc(sizeof(int32_t) * 10);
|
||||||
|
int32_t* t32_len = malloc(sizeof(int32_t) * 10);
|
||||||
|
int32_t* t64_len = malloc(sizeof(int32_t) * 10);
|
||||||
|
int32_t* float_len = malloc(sizeof(int32_t) * 10);
|
||||||
|
int32_t* double_len = malloc(sizeof(int32_t) * 10);
|
||||||
|
int32_t* bin_len = malloc(sizeof(int32_t) * 10);
|
||||||
|
int32_t* blob_len = malloc(sizeof(int32_t) * 10);
|
||||||
|
|
||||||
|
TAOS_STMT* stmt = taos_stmt_init_with_reqid(taos, genReqid());
|
||||||
|
TAOS_MULTI_BIND params[10];
|
||||||
|
char is_null[10] = {0};
|
||||||
|
|
||||||
|
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
params[0].buffer_length = sizeof(v.ts[0]);
|
||||||
|
params[0].buffer = v.ts;
|
||||||
|
params[0].length = t64_len;
|
||||||
|
params[0].is_null = is_null;
|
||||||
|
params[0].num = 10;
|
||||||
|
|
||||||
|
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
|
||||||
|
params[1].buffer_length = sizeof(v.b[0]);
|
||||||
|
params[1].buffer = v.b;
|
||||||
|
params[1].length = t8_len;
|
||||||
|
params[1].is_null = is_null;
|
||||||
|
params[1].num = 10;
|
||||||
|
|
||||||
|
params[2].buffer_type = TSDB_DATA_TYPE_TINYINT;
|
||||||
|
params[2].buffer_length = sizeof(v.v1[0]);
|
||||||
|
params[2].buffer = v.v1;
|
||||||
|
params[2].length = t8_len;
|
||||||
|
params[2].is_null = is_null;
|
||||||
|
params[2].num = 10;
|
||||||
|
|
||||||
|
params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
|
||||||
|
params[3].buffer_length = sizeof(v.v2[0]);
|
||||||
|
params[3].buffer = v.v2;
|
||||||
|
params[3].length = t16_len;
|
||||||
|
params[3].is_null = is_null;
|
||||||
|
params[3].num = 10;
|
||||||
|
|
||||||
|
params[4].buffer_type = TSDB_DATA_TYPE_INT;
|
||||||
|
params[4].buffer_length = sizeof(v.v4[0]);
|
||||||
|
params[4].buffer = v.v4;
|
||||||
|
params[4].length = t32_len;
|
||||||
|
params[4].is_null = is_null;
|
||||||
|
params[4].num = 10;
|
||||||
|
|
||||||
|
params[5].buffer_type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
params[5].buffer_length = sizeof(v.v8[0]);
|
||||||
|
params[5].buffer = v.v8;
|
||||||
|
params[5].length = t64_len;
|
||||||
|
params[5].is_null = is_null;
|
||||||
|
params[5].num = 10;
|
||||||
|
|
||||||
|
params[6].buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||||
|
params[6].buffer_length = sizeof(v.f4[0]);
|
||||||
|
params[6].buffer = v.f4;
|
||||||
|
params[6].length = float_len;
|
||||||
|
params[6].is_null = is_null;
|
||||||
|
params[6].num = 10;
|
||||||
|
|
||||||
|
params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
|
||||||
|
params[7].buffer_length = sizeof(v.f8[0]);
|
||||||
|
params[7].buffer = v.f8;
|
||||||
|
params[7].length = double_len;
|
||||||
|
params[7].is_null = is_null;
|
||||||
|
params[7].num = 10;
|
||||||
|
|
||||||
|
params[8].buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
params[8].buffer_length = sizeof(v.bin[0]);
|
||||||
|
params[8].buffer = v.bin;
|
||||||
|
params[8].length = bin_len;
|
||||||
|
params[8].is_null = is_null;
|
||||||
|
params[8].num = 10;
|
||||||
|
|
||||||
|
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
|
||||||
|
params[9].buffer_length = sizeof(v.blob[0]);
|
||||||
|
params[9].buffer = v.blob;
|
||||||
|
params[9].length = blob_len;
|
||||||
|
params[9].is_null = is_null;
|
||||||
|
params[9].num = 10;
|
||||||
|
|
||||||
|
sql = "insert into ? (ts, b, v1, v2, v4, v8, f4, f8, bin, blob) values(?,?,?,?,?,?,?,?,?,?)";
|
||||||
|
code = taos_stmt_prepare(stmt, sql, 0);
|
||||||
|
if (code != 0) {
|
||||||
|
printf("\033[31mfailed to execute taos_stmt_prepare. error:%s\033[0m\n", taos_stmt_errstr(stmt));
|
||||||
|
taos_stmt_close(stmt);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = taos_stmt_set_tbname(stmt, "m1");
|
||||||
|
if (code != 0) {
|
||||||
|
printf("\033[31mfailed to execute taos_stmt_prepare. error:%s\033[0m\n", taos_stmt_errstr(stmt));
|
||||||
|
taos_stmt_close(stmt);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t ts = 1591060628000;
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
v.ts[i] = ts++;
|
||||||
|
is_null[i] = 0;
|
||||||
|
|
||||||
|
v.b[i] = (int8_t)i % 2;
|
||||||
|
v.v1[i] = (int8_t)i;
|
||||||
|
v.v2[i] = (int16_t)(i * 2);
|
||||||
|
v.v4[i] = (int32_t)(i * 4);
|
||||||
|
v.v8[i] = (int64_t)(i * 8);
|
||||||
|
v.f4[i] = (float)(i * 40);
|
||||||
|
v.f8[i] = (double)(i * 80);
|
||||||
|
for (int j = 0; j < sizeof(v.bin[0]); ++j) {
|
||||||
|
v.bin[i][j] = (char)(i + '0');
|
||||||
|
}
|
||||||
|
strcpy(v.blob[i], "一二三四五六七八九十");
|
||||||
|
|
||||||
|
t8_len[i] = sizeof(int8_t);
|
||||||
|
t16_len[i] = sizeof(int16_t);
|
||||||
|
t32_len[i] = sizeof(int32_t);
|
||||||
|
t64_len[i] = sizeof(int64_t);
|
||||||
|
float_len[i] = sizeof(float);
|
||||||
|
double_len[i] = sizeof(double);
|
||||||
|
bin_len[i] = sizeof(v.bin[0]);
|
||||||
|
blob_len[i] = (int32_t)strlen(v.blob[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_stmt_bind_param_batch(stmt, params);
|
||||||
|
taos_stmt_add_batch(stmt);
|
||||||
|
|
||||||
|
if (taos_stmt_execute(stmt) != 0) {
|
||||||
|
printf("\033[31mfailed to execute insert statement.error:%s\033[0m\n", taos_stmt_errstr(stmt));
|
||||||
|
taos_stmt_close(stmt);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_stmt_close(stmt);
|
||||||
|
|
||||||
|
free(t8_len);
|
||||||
|
free(t16_len);
|
||||||
|
free(t32_len);
|
||||||
|
free(t64_len);
|
||||||
|
free(float_len);
|
||||||
|
free(double_len);
|
||||||
|
free(bin_len);
|
||||||
|
free(blob_len);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char* argv[]) {
|
||||||
|
const char* host = "127.0.0.1";
|
||||||
|
const char* user = "root";
|
||||||
|
const char* passwd = "taosdata";
|
||||||
|
|
||||||
|
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
|
||||||
|
TAOS* taos = taos_connect(host, user, passwd, "", 0);
|
||||||
|
if (taos == NULL) {
|
||||||
|
printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("************ verify schema-less *************\n");
|
||||||
|
verify_schema_less(taos);
|
||||||
|
|
||||||
|
printf("************ verify query *************\n");
|
||||||
|
verify_query(taos);
|
||||||
|
|
||||||
|
printf("********* verify async query **********\n");
|
||||||
|
verify_async(taos);
|
||||||
|
|
||||||
|
printf("********* verify stmt query **********\n");
|
||||||
|
veriry_stmt(taos);
|
||||||
|
|
||||||
|
printf("done\n");
|
||||||
|
taos_close(taos);
|
||||||
|
taos_cleanup();
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -130,17 +130,16 @@ typedef struct TAOS_VGROUP_HASH_INFO {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
uint32_t hashBegin;
|
uint32_t hashBegin;
|
||||||
uint32_t hashEnd;
|
uint32_t hashEnd;
|
||||||
} TAOS_VGROUP_HASH_INFO ;
|
} TAOS_VGROUP_HASH_INFO;
|
||||||
|
|
||||||
typedef struct TAOS_DB_ROUTE_INFO {
|
typedef struct TAOS_DB_ROUTE_INFO {
|
||||||
int32_t routeVersion;
|
int32_t routeVersion;
|
||||||
int16_t hashPrefix;
|
int16_t hashPrefix;
|
||||||
int16_t hashSuffix;
|
int16_t hashSuffix;
|
||||||
int8_t hashMethod;
|
int8_t hashMethod;
|
||||||
int32_t vgNum;
|
int32_t vgNum;
|
||||||
TAOS_VGROUP_HASH_INFO *vgHash;
|
TAOS_VGROUP_HASH_INFO *vgHash;
|
||||||
} TAOS_DB_ROUTE_INFO ;
|
} TAOS_DB_ROUTE_INFO;
|
||||||
|
|
||||||
|
|
||||||
DLL_EXPORT void taos_cleanup(void);
|
DLL_EXPORT void taos_cleanup(void);
|
||||||
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
||||||
|
@ -153,6 +152,7 @@ DLL_EXPORT void taos_close(TAOS *taos);
|
||||||
const char *taos_data_type(int type);
|
const char *taos_data_type(int type);
|
||||||
|
|
||||||
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
|
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
|
||||||
|
DLL_EXPORT TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid);
|
||||||
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
|
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
|
||||||
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
|
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
|
||||||
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
|
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
|
||||||
|
@ -176,6 +176,7 @@ DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
|
||||||
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
|
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
|
||||||
|
|
||||||
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
|
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
|
||||||
|
DLL_EXPORT TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqId);
|
||||||
|
|
||||||
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
|
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
|
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
|
||||||
|
@ -207,17 +208,23 @@ DLL_EXPORT const char *taos_get_client_info();
|
||||||
DLL_EXPORT const char *taos_errstr(TAOS_RES *res);
|
DLL_EXPORT const char *taos_errstr(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_errno(TAOS_RES *res);
|
DLL_EXPORT int taos_errno(TAOS_RES *res);
|
||||||
|
|
||||||
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
|
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
|
||||||
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
DLL_EXPORT void taos_query_a_with_reqid(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, int64_t reqid);
|
||||||
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
||||||
|
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
||||||
DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
|
DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
|
||||||
|
|
||||||
DLL_EXPORT int taos_get_db_route_info(TAOS* taos, const char* db, TAOS_DB_ROUTE_INFO* dbInfo);
|
DLL_EXPORT int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInfo);
|
||||||
DLL_EXPORT int taos_get_table_vgId(TAOS* taos, const char* db, const char* table, int* vgId);
|
DLL_EXPORT int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId);
|
||||||
|
|
||||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
||||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
|
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
|
||||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision);
|
DLL_EXPORT TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol,
|
||||||
|
int precision, int64_t reqid);
|
||||||
|
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
|
||||||
|
int precision);
|
||||||
|
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows,
|
||||||
|
int protocol, int precision, int64_t reqid);
|
||||||
|
|
||||||
/* --------------------------TMQ INTERFACE------------------------------- */
|
/* --------------------------TMQ INTERFACE------------------------------- */
|
||||||
|
|
||||||
|
|
|
@ -271,7 +271,11 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
|
||||||
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
|
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
|
||||||
|
|
||||||
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
|
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
|
||||||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
|
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid);
|
||||||
|
|
||||||
|
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
|
||||||
|
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
|
||||||
|
int64_t reqid);
|
||||||
|
|
||||||
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
|
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
|
||||||
|
|
||||||
|
@ -318,11 +322,11 @@ void* createTscObj(const char* user, const char* auth, const char* db, int32_
|
||||||
void destroyTscObj(void* pObj);
|
void destroyTscObj(void* pObj);
|
||||||
STscObj* acquireTscObj(int64_t rid);
|
STscObj* acquireTscObj(int64_t rid);
|
||||||
int32_t releaseTscObj(int64_t rid);
|
int32_t releaseTscObj(int64_t rid);
|
||||||
void destroyAppInst(SAppInstInfo *pAppInfo);
|
void destroyAppInst(SAppInstInfo* pAppInfo);
|
||||||
|
|
||||||
uint64_t generateRequestId();
|
uint64_t generateRequestId();
|
||||||
|
|
||||||
void* createRequest(uint64_t connId, int32_t type);
|
void* createRequest(uint64_t connId, int32_t type, int64_t reqid);
|
||||||
void destroyRequest(SRequestObj* pRequest);
|
void destroyRequest(SRequestObj* pRequest);
|
||||||
SRequestObj* acquireRequest(int64_t rid);
|
SRequestObj* acquireRequest(int64_t rid);
|
||||||
int32_t releaseRequest(int64_t rid);
|
int32_t releaseRequest(int64_t rid);
|
||||||
|
@ -353,7 +357,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
|
||||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
||||||
|
|
||||||
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
|
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
|
||||||
SRequestObj** pRequest);
|
SRequestObj** pRequest, int64_t reqid);
|
||||||
|
|
||||||
void taos_close_internal(void* taos);
|
void taos_close_internal(void* taos);
|
||||||
|
|
||||||
|
|
|
@ -101,11 +101,18 @@ typedef struct STscStmt {
|
||||||
SStmtSQLInfo sql;
|
SStmtSQLInfo sql;
|
||||||
SStmtExecInfo exec;
|
SStmtExecInfo exec;
|
||||||
SStmtBindInfo bInfo;
|
SStmtBindInfo bInfo;
|
||||||
|
|
||||||
|
int64_t reqid;
|
||||||
} STscStmt;
|
} STscStmt;
|
||||||
|
|
||||||
extern char *gStmtStatusStr[];
|
extern char *gStmtStatusStr[];
|
||||||
|
|
||||||
#define STMT_LOG_SEQ(n) do { (pStmt)->seqId++; (pStmt)->seqIds[n]++; STMT_DLOG("the %dth:%d %s", (pStmt)->seqIds[n], (pStmt)->seqId, gStmtStatusStr[n]); } while (0)
|
#define STMT_LOG_SEQ(n) \
|
||||||
|
do { \
|
||||||
|
(pStmt)->seqId++; \
|
||||||
|
(pStmt)->seqIds[n]++; \
|
||||||
|
STMT_DLOG("the %dth:%d %s", (pStmt)->seqIds[n], (pStmt)->seqId, gStmtStatusStr[n]); \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
#define STMT_STATUS_NE(S) (pStmt->sql.status != STMT_##S)
|
#define STMT_STATUS_NE(S) (pStmt->sql.status != STMT_##S)
|
||||||
#define STMT_STATUS_EQ(S) (pStmt->sql.status == STMT_##S)
|
#define STMT_STATUS_EQ(S) (pStmt->sql.status == STMT_##S)
|
||||||
|
@ -141,7 +148,7 @@ extern char *gStmtStatusStr[];
|
||||||
#define STMT_ELOG_E(param) qError("stmt:%p " param, pStmt)
|
#define STMT_ELOG_E(param) qError("stmt:%p " param, pStmt)
|
||||||
#define STMT_DLOG_E(param) qDebug("stmt:%p " param, pStmt)
|
#define STMT_DLOG_E(param) qDebug("stmt:%p " param, pStmt)
|
||||||
|
|
||||||
TAOS_STMT *stmtInit(STscObj *taos);
|
TAOS_STMT *stmtInit(STscObj *taos, int64_t reqid);
|
||||||
int stmtClose(TAOS_STMT *stmt);
|
int stmtClose(TAOS_STMT *stmt);
|
||||||
int stmtExec(TAOS_STMT *stmt);
|
int stmtExec(TAOS_STMT *stmt);
|
||||||
const char *stmtErrstr(TAOS_STMT *stmt);
|
const char *stmtErrstr(TAOS_STMT *stmt);
|
||||||
|
|
|
@ -77,19 +77,19 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
|
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
|
||||||
|
|
||||||
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
|
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
|
||||||
// tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
|
// tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
|
||||||
// "us, exec:%" PRId64 "us",
|
// "us, exec:%" PRId64 "us",
|
||||||
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
|
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
|
||||||
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
|
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd -
|
||||||
// pRequest->metric.execEnd - pRequest->metric.semanticEnd);
|
// pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd);
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
|
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
|
||||||
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
|
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
|
||||||
// tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
|
// tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
|
||||||
// "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64,
|
// "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64,
|
||||||
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
|
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
|
||||||
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
|
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd -
|
||||||
// pRequest->metric.planEnd - pRequest->metric.semanticEnd,
|
// pRequest->metric.ctgEnd, pRequest->metric.planEnd - pRequest->metric.semanticEnd,
|
||||||
// pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId);
|
// pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId);
|
||||||
|
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
|
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
|
||||||
}
|
}
|
||||||
|
@ -271,7 +271,7 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon
|
||||||
|
|
||||||
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
|
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
|
||||||
|
|
||||||
void *createRequest(uint64_t connId, int32_t type) {
|
void *createRequest(uint64_t connId, int32_t type, int64_t reqid) {
|
||||||
SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
|
SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
|
||||||
if (NULL == pRequest) {
|
if (NULL == pRequest) {
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
@ -286,7 +286,7 @@ void *createRequest(uint64_t connId, int32_t type) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->resType = RES_TYPE__QUERY;
|
pRequest->resType = RES_TYPE__QUERY;
|
||||||
pRequest->requestId = generateRequestId();
|
pRequest->requestId = reqid == 0 ? generateRequestId() : reqid;
|
||||||
pRequest->metric.start = taosGetTimestampUs();
|
pRequest->metric.start = taosGetTimestampUs();
|
||||||
|
|
||||||
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
|
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
|
||||||
|
|
|
@ -154,8 +154,8 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
|
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
|
||||||
SRequestObj** pRequest) {
|
SRequestObj** pRequest, int64_t reqid) {
|
||||||
*pRequest = createRequest(connId, TSDB_SQL_SELECT);
|
*pRequest = createRequest(connId, TSDB_SQL_SELECT, reqid);
|
||||||
if (*pRequest == NULL) {
|
if (*pRequest == NULL) {
|
||||||
tscError("failed to malloc sqlObj, %s", sql);
|
tscError("failed to malloc sqlObj, %s", sql);
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -1230,7 +1230,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
|
||||||
return pTscObj;
|
return pTscObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT);
|
SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT, 0);
|
||||||
if (pRequest == NULL) {
|
if (pRequest == NULL) {
|
||||||
destroyTscObj(pTscObj);
|
destroyTscObj(pTscObj);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2234,7 +2234,37 @@ void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp,
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
int32_t code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest);
|
int32_t code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
fp(param, NULL, terrno);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRequest->body.queryFp = fp;
|
||||||
|
doAsyncQuery(pRequest, false);
|
||||||
|
}
|
||||||
|
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
|
||||||
|
int64_t reqid) {
|
||||||
|
if (sql == NULL || NULL == fp) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
if (fp) {
|
||||||
|
fp(param, NULL, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t sqlLen = strlen(sql);
|
||||||
|
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
|
||||||
|
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
||||||
|
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
||||||
|
fp(param, NULL, terrno);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRequestObj* pRequest = NULL;
|
||||||
|
int32_t code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
fp(param, NULL, terrno);
|
fp(param, NULL, terrno);
|
||||||
|
@ -2261,3 +2291,20 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
|
||||||
}
|
}
|
||||||
return param->pRequest;
|
return param->pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
|
||||||
|
if (NULL == taos) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
|
||||||
|
tsem_init(¶m->sem, 0, 0);
|
||||||
|
|
||||||
|
taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
|
||||||
|
tsem_wait(¶m->sem);
|
||||||
|
if (param->pRequest != NULL) {
|
||||||
|
param->pRequest->syncQuery = true;
|
||||||
|
}
|
||||||
|
return param->pRequest;
|
||||||
|
}
|
||||||
|
|
|
@ -248,6 +248,9 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false); }
|
TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false); }
|
||||||
|
TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqid) {
|
||||||
|
return taosQueryImplWithReqid(taos, sql, false, reqid);
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
if (res == NULL) {
|
if (res == NULL) {
|
||||||
|
@ -763,6 +766,11 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
|
||||||
taosAsyncQueryImpl(connId, sql, fp, param, false);
|
taosAsyncQueryImpl(connId, sql, fp, param, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taos_query_a_with_reqid(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, int64_t reqid) {
|
||||||
|
int64_t connId = *(int64_t *)taos;
|
||||||
|
taosAsyncQueryImplWithReqid(connId, sql, fp, param, false, reqid);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
|
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
|
||||||
const STscObj *pTscObj = pRequest->pTscObj;
|
const STscObj *pTscObj = pRequest->pTscObj;
|
||||||
|
|
||||||
|
@ -992,7 +1000,7 @@ int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInf
|
||||||
int64_t connId = *(int64_t *)taos;
|
int64_t connId = *(int64_t *)taos;
|
||||||
SRequestObj *pRequest = NULL;
|
SRequestObj *pRequest = NULL;
|
||||||
char *sql = "taos_get_db_route_info";
|
char *sql = "taos_get_db_route_info";
|
||||||
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -1041,7 +1049,7 @@ int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId
|
||||||
int64_t connId = *(int64_t *)taos;
|
int64_t connId = *(int64_t *)taos;
|
||||||
SRequestObj *pRequest = NULL;
|
SRequestObj *pRequest = NULL;
|
||||||
char *sql = "taos_get_table_vgId";
|
char *sql = "taos_get_table_vgId";
|
||||||
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -1102,7 +1110,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char *sql = "taos_load_table_info";
|
char *sql = "taos_load_table_info";
|
||||||
code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
goto _return;
|
goto _return;
|
||||||
|
@ -1147,7 +1155,22 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_STMT *pStmt = stmtInit(pObj);
|
TAOS_STMT *pStmt = stmtInit(pObj, 0);
|
||||||
|
|
||||||
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
|
||||||
|
return pStmt;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid) {
|
||||||
|
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
|
||||||
|
if (NULL == pObj) {
|
||||||
|
tscError("invalid parameter for %s", __FUNCTION__);
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_STMT *pStmt = stmtInit(pObj, reqid);
|
||||||
|
|
||||||
releaseTscObj(*(int64_t *)taos);
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
|
||||||
|
|
|
@ -410,7 +410,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
SVAlterTbReq vAlterTbReq = {0};
|
SVAlterTbReq vAlterTbReq = {0};
|
||||||
char* string = NULL;
|
char* string = NULL;
|
||||||
cJSON* json = NULL;
|
cJSON* json = NULL;
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||||
|
@ -525,7 +525,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
SVDropStbReq req = {0};
|
SVDropStbReq req = {0};
|
||||||
char* string = NULL;
|
char* string = NULL;
|
||||||
cJSON* json = NULL;
|
cJSON* json = NULL;
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||||
|
@ -558,7 +558,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
SVDropTbBatchReq req = {0};
|
SVDropTbBatchReq req = {0};
|
||||||
char* string = NULL;
|
char* string = NULL;
|
||||||
cJSON* json = NULL;
|
cJSON* json = NULL;
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||||
|
@ -603,7 +603,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
|
|
||||||
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
|
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -692,7 +692,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
|
|
||||||
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
|
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -773,7 +773,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
SHashObj* pVgroupHashmap = NULL;
|
SHashObj* pVgroupHashmap = NULL;
|
||||||
|
|
||||||
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
|
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -926,7 +926,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
SHashObj* pVgroupHashmap = NULL;
|
SHashObj* pVgroupHashmap = NULL;
|
||||||
|
|
||||||
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
|
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -1097,7 +1097,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
SArray* pArray = NULL;
|
SArray* pArray = NULL;
|
||||||
SVgDataBlocks* pVgData = NULL;
|
SVgDataBlocks* pVgData = NULL;
|
||||||
|
|
||||||
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
|
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -1217,7 +1217,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
SSubmitReq* subReq = NULL;
|
SSubmitReq* subReq = NULL;
|
||||||
|
|
||||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
|
||||||
if (!pRequest) {
|
if (!pRequest) {
|
||||||
uError("WriteRaw:createRequest error request is null");
|
uError("WriteRaw:createRequest error request is null");
|
||||||
code = terrno;
|
code = terrno;
|
||||||
|
@ -1281,7 +1281,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
|
||||||
int32_t schemaLen = 0;
|
int32_t schemaLen = 0;
|
||||||
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
||||||
|
|
||||||
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
||||||
subReq = taosMemoryCalloc(1, totalLen);
|
subReq = taosMemoryCalloc(1, totalLen);
|
||||||
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
|
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
|
||||||
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
||||||
|
@ -1407,7 +1407,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
|
||||||
if (!pRequest) {
|
if (!pRequest) {
|
||||||
uError("WriteRaw:createRequest error request is null");
|
uError("WriteRaw:createRequest error request is null");
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -1674,7 +1674,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
|
||||||
if (!pRequest) {
|
if (!pRequest) {
|
||||||
uError("WriteRaw:createRequest error request is null");
|
uError("WriteRaw:createRequest error request is null");
|
||||||
return terrno;
|
return terrno;
|
||||||
|
|
|
@ -28,12 +28,12 @@
|
||||||
#define QUOTE '"'
|
#define QUOTE '"'
|
||||||
#define SLASH '\\'
|
#define SLASH '\\'
|
||||||
|
|
||||||
#define JUMP_SPACE(sql, sqlEnd) \
|
#define JUMP_SPACE(sql, sqlEnd) \
|
||||||
while (sql < sqlEnd) { \
|
while (sql < sqlEnd) { \
|
||||||
if (*sql == SPACE) \
|
if (*sql == SPACE) \
|
||||||
sql++; \
|
sql++; \
|
||||||
else \
|
else \
|
||||||
break; \
|
break; \
|
||||||
}
|
}
|
||||||
// comma ,
|
// comma ,
|
||||||
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
|
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
|
||||||
|
@ -353,7 +353,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
|
||||||
pReq.numOfTags = taosArrayGetSize(pTags);
|
pReq.numOfTags = taosArrayGetSize(pTags);
|
||||||
pReq.pTags = pTags;
|
pReq.pTags = pTags;
|
||||||
|
|
||||||
code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest);
|
code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -1010,9 +1010,9 @@ static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const ch
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName, SHashObj *dumplicateKey,
|
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName,
|
||||||
SSmlMsgBuf *msg) {
|
SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
|
||||||
if(!cols) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
const char *sql = data;
|
const char *sql = data;
|
||||||
size_t childTableNameLen = strlen(tsSmlChildTableName);
|
size_t childTableNameLen = strlen(tsSmlChildTableName);
|
||||||
while (sql < sqlEnd) {
|
while (sql < sqlEnd) {
|
||||||
|
@ -1093,7 +1093,8 @@ static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *
|
||||||
}
|
}
|
||||||
|
|
||||||
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
|
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
|
||||||
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo, SArray *cols) {
|
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo,
|
||||||
|
SArray *cols) {
|
||||||
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
|
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
|
||||||
|
|
||||||
// parse metric
|
// parse metric
|
||||||
|
@ -1372,19 +1373,19 @@ static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
|
||||||
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
|
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
|
||||||
SHashObj *s1 = *(SHashObj **)key1;
|
SHashObj *s1 = *(SHashObj **)key1;
|
||||||
SHashObj *s2 = *(SHashObj **)key2;
|
SHashObj *s2 = *(SHashObj **)key2;
|
||||||
SSmlKv **kv1pp = (SSmlKv **)taosHashGet(s1, TS, TS_LEN);
|
SSmlKv **kv1pp = (SSmlKv **)taosHashGet(s1, TS, TS_LEN);
|
||||||
SSmlKv **kv2pp = (SSmlKv **)taosHashGet(s2, TS, TS_LEN);
|
SSmlKv **kv2pp = (SSmlKv **)taosHashGet(s2, TS, TS_LEN);
|
||||||
if(!kv1pp || !kv2pp){
|
if (!kv1pp || !kv2pp) {
|
||||||
uError("smlKvTimeHashCompare kv is null");
|
uError("smlKvTimeHashCompare kv is null");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
SSmlKv *kv1 = *kv1pp;
|
SSmlKv *kv1 = *kv1pp;
|
||||||
SSmlKv *kv2 = *kv2pp;
|
SSmlKv *kv2 = *kv2pp;
|
||||||
if(!kv1 || kv1->type != TSDB_DATA_TYPE_TIMESTAMP){
|
if (!kv1 || kv1->type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
uError("smlKvTimeHashCompare kv1");
|
uError("smlKvTimeHashCompare kv1");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if(!kv2 || kv2->type != TSDB_DATA_TYPE_TIMESTAMP){
|
if (!kv2 || kv2->type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
uError("smlKvTimeHashCompare kv2");
|
uError("smlKvTimeHashCompare kv2");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1947,7 +1948,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
|
static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
|
||||||
if(!cols) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
cJSON *metricVal = cJSON_GetObjectItem(root, "value");
|
cJSON *metricVal = cJSON_GetObjectItem(root, "value");
|
||||||
if (metricVal == NULL) {
|
if (metricVal == NULL) {
|
||||||
return TSDB_CODE_TSC_INVALID_JSON;
|
return TSDB_CODE_TSC_INVALID_JSON;
|
||||||
|
@ -1971,7 +1972,7 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
|
||||||
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
|
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
|
||||||
SSmlMsgBuf *msg) {
|
SSmlMsgBuf *msg) {
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
if (!pKVs){
|
if (!pKVs) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
cJSON *tags = cJSON_GetObjectItem(root, "tags");
|
cJSON *tags = cJSON_GetObjectItem(root, "tags");
|
||||||
|
@ -2204,7 +2205,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
||||||
ret = smlParseTelnetString(info, (const char *)data, (char*)data + len, tinfo, cols);
|
ret = smlParseTelnetString(info, (const char *)data, (char *)data + len, tinfo, cols);
|
||||||
} else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
} else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||||
ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
|
ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2384,16 +2385,16 @@ static void smlPrintStatisticInfo(SSmlHandle *info) {
|
||||||
info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
|
info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) {
|
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||||
if(lines){
|
if (lines) {
|
||||||
code = smlParseJSON(info, *lines);
|
code = smlParseJSON(info, *lines);
|
||||||
}else if(rawLine){
|
} else if (rawLine) {
|
||||||
code = smlParseJSON(info, rawLine);
|
code = smlParseJSON(info, rawLine);
|
||||||
}
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines?*lines:rawLine);
|
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
@ -2401,19 +2402,19 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char
|
||||||
|
|
||||||
for (int32_t i = 0; i < numLines; ++i) {
|
for (int32_t i = 0; i < numLines; ++i) {
|
||||||
char *tmp = NULL;
|
char *tmp = NULL;
|
||||||
int len = 0;
|
int len = 0;
|
||||||
if(lines){
|
if (lines) {
|
||||||
tmp = lines[i];
|
tmp = lines[i];
|
||||||
len = strlen(tmp);
|
len = strlen(tmp);
|
||||||
}else if(rawLine){
|
} else if (rawLine) {
|
||||||
tmp = rawLine;
|
tmp = rawLine;
|
||||||
while(rawLine < rawLineEnd){
|
while (rawLine < rawLineEnd) {
|
||||||
if(*(rawLine++) == '\n'){
|
if (*(rawLine++) == '\n') {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
len++;
|
len++;
|
||||||
}
|
}
|
||||||
if(info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#'){ // this line is comment
|
if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') { // this line is comment
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2433,7 +2434,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int smlProcess(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) {
|
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t retryNum = 0;
|
int32_t retryNum = 0;
|
||||||
|
|
||||||
|
@ -2532,8 +2533,8 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
|
||||||
smlDestroyInfo(info);
|
smlDestroyInfo(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd,
|
||||||
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd, int numLines, int protocol, int precision) {
|
int numLines, int protocol, int precision) {
|
||||||
int batchs = 0;
|
int batchs = 0;
|
||||||
STscObj *pTscObj = request->pTscObj;
|
STscObj *pTscObj = request->pTscObj;
|
||||||
|
|
||||||
|
@ -2581,7 +2582,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
|
||||||
batchs = ceil(((double)numLines) / LINE_BATCH);
|
batchs = ceil(((double)numLines) / LINE_BATCH);
|
||||||
params.total = batchs;
|
params.total = batchs;
|
||||||
for (int i = 0; i < batchs; ++i) {
|
for (int i = 0; i < batchs; ++i) {
|
||||||
SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT);
|
SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0);
|
||||||
if (!req) {
|
if (!req) {
|
||||||
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
uError("SML:taos_schemaless_insert error request is null");
|
uError("SML:taos_schemaless_insert error request is null");
|
||||||
|
@ -2608,16 +2609,16 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
|
||||||
info->pRequest->body.queryFp = smlInsertCallback;
|
info->pRequest->body.queryFp = smlInsertCallback;
|
||||||
info->pRequest->body.param = info;
|
info->pRequest->body.param = info;
|
||||||
int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
|
int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
|
||||||
if(lines){
|
if (lines) {
|
||||||
lines += perBatch;
|
lines += perBatch;
|
||||||
}
|
}
|
||||||
if(rawLine){
|
if (rawLine) {
|
||||||
int num = 0;
|
int num = 0;
|
||||||
while(rawLine < rawLineEnd){
|
while (rawLine < rawLineEnd) {
|
||||||
if(*(rawLine++) == '\n'){
|
if (*(rawLine++) == '\n') {
|
||||||
num++;
|
num++;
|
||||||
}
|
}
|
||||||
if(num == perBatch){
|
if (num == perBatch) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2628,7 +2629,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
|
||||||
}
|
}
|
||||||
tsem_wait(¶ms.sem);
|
tsem_wait(¶ms.sem);
|
||||||
|
|
||||||
end:
|
end:
|
||||||
taosThreadSpinDestroy(¶ms.lock);
|
taosThreadSpinDestroy(¶ms.lock);
|
||||||
tsem_destroy(¶ms.sem);
|
tsem_destroy(¶ms.sem);
|
||||||
// ((STscObj *)taos)->schemalessType = 0;
|
// ((STscObj *)taos)->schemalessType = 0;
|
||||||
|
@ -2653,9 +2654,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
|
||||||
* 0 - influxDB line protocol
|
* 0 - influxDB line protocol
|
||||||
* 1 - OpenTSDB telnet line protocol
|
* 1 - OpenTSDB telnet line protocol
|
||||||
* 2 - OpenTSDB JSON format protocol
|
* 2 - OpenTSDB JSON format protocol
|
||||||
* @return return zero for successful insertion. Otherwise return none-zero error code of
|
* @return TAOS_RES
|
||||||
* failure reason.
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
|
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
|
||||||
|
@ -2664,7 +2663,7 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT);
|
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, 0);
|
||||||
if (!request) {
|
if (!request) {
|
||||||
uError("SML:taos_schemaless_insert error request is null");
|
uError("SML:taos_schemaless_insert error request is null");
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2680,13 +2679,37 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
|
||||||
return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision);
|
return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision){
|
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
|
||||||
|
int64_t reqid) {
|
||||||
if (NULL == taos) {
|
if (NULL == taos) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT);
|
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
|
||||||
|
if (!request) {
|
||||||
|
uError("SML:taos_schemaless_insert error request is null");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!lines) {
|
||||||
|
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||||
|
request->code = TSDB_CODE_SML_INVALID_DATA;
|
||||||
|
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
|
||||||
|
return (TAOS_RES *)request;
|
||||||
|
}
|
||||||
|
|
||||||
|
return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision);
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
|
||||||
|
int precision) {
|
||||||
|
if (NULL == taos) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, 0);
|
||||||
if (!request) {
|
if (!request) {
|
||||||
uError("SML:taos_schemaless_insert error request is null");
|
uError("SML:taos_schemaless_insert error request is null");
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2702,10 +2725,45 @@ TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *
|
||||||
int numLines = 0;
|
int numLines = 0;
|
||||||
*totalRows = 0;
|
*totalRows = 0;
|
||||||
char *tmp = lines;
|
char *tmp = lines;
|
||||||
for(int i = 0; i < len; i++){
|
for (int i = 0; i < len; i++) {
|
||||||
if(lines[i] == '\n' || i == len - 1){
|
if (lines[i] == '\n' || i == len - 1) {
|
||||||
numLines++;
|
numLines++;
|
||||||
if(tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL){ //ignore comment
|
if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) { // ignore comment
|
||||||
|
(*totalRows)++;
|
||||||
|
}
|
||||||
|
tmp = lines + i + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision);
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
|
||||||
|
int precision, int64_t reqid) {
|
||||||
|
if (NULL == taos) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
|
||||||
|
if (!request) {
|
||||||
|
uError("SML:taos_schemaless_insert error request is null");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!lines || len <= 0) {
|
||||||
|
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||||
|
request->code = TSDB_CODE_SML_INVALID_DATA;
|
||||||
|
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
|
||||||
|
return (TAOS_RES *)request;
|
||||||
|
}
|
||||||
|
|
||||||
|
int numLines = 0;
|
||||||
|
*totalRows = 0;
|
||||||
|
char *tmp = lines;
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
if (lines[i] == '\n' || i == len - 1) {
|
||||||
|
numLines++;
|
||||||
|
if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) { // ignore comment
|
||||||
(*totalRows)++;
|
(*totalRows)++;
|
||||||
}
|
}
|
||||||
tmp = lines + i + 1;
|
tmp = lines + i + 1;
|
||||||
|
|
|
@ -5,13 +5,18 @@
|
||||||
|
|
||||||
#include "clientStmt.h"
|
#include "clientStmt.h"
|
||||||
|
|
||||||
char *gStmtStatusStr[] = {"unknown", "init", "prepare", "settbname", "settags", "fetchFields", "bind", "bindCol", "addBatch", "exec"};
|
char* gStmtStatusStr[] = {"unknown", "init", "prepare", "settbname", "settags",
|
||||||
|
"fetchFields", "bind", "bindCol", "addBatch", "exec"};
|
||||||
|
|
||||||
static int32_t stmtCreateRequest(STscStmt* pStmt) {
|
static int32_t stmtCreateRequest(STscStmt* pStmt) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pStmt->exec.pRequest == NULL) {
|
if (pStmt->exec.pRequest == NULL) {
|
||||||
code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest);
|
code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
|
||||||
|
pStmt->reqid);
|
||||||
|
if (pStmt->reqid != 0) {
|
||||||
|
pStmt->reqid++;
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pStmt->exec.pRequest->syncQuery = true;
|
pStmt->exec.pRequest->syncQuery = true;
|
||||||
}
|
}
|
||||||
|
@ -207,10 +212,10 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||||
if(!pSrc){
|
if (!pSrc) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
STableDataBlocks* pDst = NULL;
|
STableDataBlocks* pDst = NULL;
|
||||||
|
|
||||||
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
|
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
|
||||||
|
|
||||||
|
@ -513,7 +518,7 @@ int32_t stmtResetStmt(STscStmt* pStmt) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_STMT* stmtInit(STscObj* taos) {
|
TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid) {
|
||||||
STscObj* pObj = (STscObj*)taos;
|
STscObj* pObj = (STscObj*)taos;
|
||||||
STscStmt* pStmt = NULL;
|
STscStmt* pStmt = NULL;
|
||||||
|
|
||||||
|
@ -533,9 +538,10 @@ TAOS_STMT* stmtInit(STscObj* taos) {
|
||||||
pStmt->taos = pObj;
|
pStmt->taos = pObj;
|
||||||
pStmt->bInfo.needParse = true;
|
pStmt->bInfo.needParse = true;
|
||||||
pStmt->sql.status = STMT_INIT;
|
pStmt->sql.status = STMT_INIT;
|
||||||
|
pStmt->reqid = reqid;
|
||||||
|
|
||||||
STMT_LOG_SEQ(STMT_INIT);
|
STMT_LOG_SEQ(STMT_INIT);
|
||||||
|
|
||||||
tscDebug("stmt:%p initialized", pStmt);
|
tscDebug("stmt:%p initialized", pStmt);
|
||||||
|
|
||||||
return pStmt;
|
return pStmt;
|
||||||
|
|
Loading…
Reference in New Issue