Merge remote-tracking branch 'origin/3.0' into feat/3.0/TD-32231

This commit is contained in:
qevolg 2024-10-18 16:52:50 +08:00
commit 7e8a76f352
689 changed files with 18299 additions and 9791 deletions

21
.gitignore vendored
View File

@ -138,3 +138,24 @@ tags
*CMakeCache*
*CMakeFiles*
.history/
*.txt
*.tcl
*.pc
contrib/geos
contrib/libuv
contrib/pcre2
contrib/zlib
deps_tmp_CMakeLists.txt.in
*.a
*.ctest
pcre2-config
pcre2_test.sh
pcre2_grep_test.sh
pcre2_chartables.c
geos-config
config.h
pcre2.h
zconf.h
version.h
geos_c.h

View File

@ -306,9 +306,9 @@ def pre_test_build_win() {
cd %WIN_CONNECTOR_ROOT%
python.exe -m pip install --upgrade pip
python -m pip uninstall taospy -y
python -m pip install taospy==2.7.13
python -m pip install taospy==2.7.16
python -m pip uninstall taos-ws-py -y
python -m pip install taos-ws-py==0.3.1
python -m pip install taos-ws-py==0.3.3
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
'''
return 1
@ -426,6 +426,10 @@ pipeline {
cd ${WKC}/tests/parallel_test
./run_check_assert_container.sh -d ${WKDIR}
'''
sh '''
cd ${WKC}/tests/parallel_test
./run_check_void_container.sh -d ${WKDIR}
'''
sh '''
date
rm -rf ${WKC}/debug

View File

@ -173,12 +173,58 @@ ELSE ()
CHECK_C_COMPILER_FLAG("-msse4.2" COMPILER_SUPPORT_SSE42)
ENDIF()
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX)
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL)
IF (TD_ARM_64 OR TD_ARM_32)
SET(COMPILER_SUPPORT_FMA false)
SET(COMPILER_SUPPORT_AVX false)
SET(COMPILER_SUPPORT_AVX2 false)
SET(COMPILER_SUPPORT_AVX512F false)
SET(COMPILER_SUPPORT_AVX512BMI false)
SET(COMPILER_SUPPORT_AVX512VL false)
ELSE()
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL)
INCLUDE(CheckCSourceRuns)
SET(CMAKE_REQUIRED_FLAGS "-mavx")
check_c_source_runs("
#include <immintrin.h>
int main() {
__m256d a, b, c;
double buf[4] = {0};
a = _mm256_loadu_pd(buf);
b = _mm256_loadu_pd(buf);
c = _mm256_add_pd(a, b);
_mm256_storeu_pd(buf, c);
for (int i = 0; i < sizeof(buf) / sizeof(buf[0]); ++i) {
if (buf[i] != 0) {
return 1;
}
}
return 0;
}
" COMPILER_SUPPORT_AVX)
SET(CMAKE_REQUIRED_FLAGS "-mavx2")
check_c_source_runs("
#include <immintrin.h>
int main() {
__m256i a, b, c;
int buf[8] = {0};
a = _mm256_loadu_si256((__m256i *)buf);
b = _mm256_loadu_si256((__m256i *)buf);
c = _mm256_and_si256(a, b);
_mm256_storeu_si256((__m256i *)buf, c);
for (int i = 0; i < sizeof(buf) / sizeof(buf[0]); ++i) {
if (buf[i] != 0) {
return 1;
}
}
return 0;
}
" COMPILER_SUPPORT_AVX2)
ENDIF()
IF (COMPILER_SUPPORT_SSE42)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2")
@ -190,15 +236,17 @@ ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma")
ENDIF()
IF (COMPILER_SUPPORT_AVX)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx")
ENDIF()
IF (COMPILER_SUPPORT_AVX2)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
ENDIF()
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED")
MESSAGE(STATUS "FMA instructions is ACTIVATED")
ENDIF()
IF (COMPILER_SUPPORT_AVX)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx")
MESSAGE(STATUS "AVX instructions is ACTIVATED")
ENDIF()
IF (COMPILER_SUPPORT_AVX2)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
MESSAGE(STATUS "AVX2 instructions is ACTIVATED")
ENDIF()
IF ("${SIMD_AVX512_SUPPORT}" MATCHES "true")

View File

@ -7,7 +7,17 @@ ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo firstEp localhost:6030 > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo fqdn localhost >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo serverPort 6030 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo debugFlag 135 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo asyncLog 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo supportVnodes 1024 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo numOfLogLines 300000000 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo logKeepDays -1 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo checkpointInterval 60 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo snodeAddress 127.0.0.1:873 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo monitor 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg

View File

@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.3.3.0.alpha")
SET(TD_VER_NUMBER "3.3.4.0.alpha")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)

View File

@ -2,7 +2,7 @@
# stub
ExternalProject_Add(stub
GIT_REPOSITORY https://github.com/coolxv/cpp-stub.git
GIT_TAG 5e903b8e
GIT_TAG 3137465194014d66a8402941e80d2bccc6346f51
GIT_SUBMODULES "src"
SOURCE_DIR "${TD_CONTRIB_DIR}/cpp-stub"
BINARY_DIR "${TD_CONTRIB_DIR}/cpp-stub/src"

View File

@ -90,7 +90,7 @@ If `maven` is used to manage the projects, what needs to be done is only adding
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.3.2</version>
<version>3.3.3</version>
</dependency>
```

View File

@ -19,7 +19,7 @@ After TDengine server or client installation, `taos.h` is located at
The dynamic libraries for the TDengine client driver are located in.
- Linux: `/usr/local/taos/driver/libtaos.so`
- Windows: `C:\TDengine\taos.dll`
- Windows: `C:\TDengine\driver\taos.dll`
- macOS: `/usr/local/lib/libtaos.dylib`
## Supported platforms

View File

@ -20,6 +20,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t
import Release from "/components/ReleaseV3";
## 3.3.3.0
<Release type="tdengine" version="3.3.3.0" />
## 3.3.2.0
<Release type="tdengine" version="3.3.2.0" />

View File

@ -19,7 +19,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.3.2</version>
<version>3.3.3</version>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>

View File

@ -18,7 +18,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.3.2</version>
<version>3.3.3</version>
</dependency>
<!-- druid -->
<dependency>

View File

@ -17,7 +17,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.3.2</version>
<version>3.3.3</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>

View File

@ -67,7 +67,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.3.2</version>
<version>3.3.3</version>
<!-- <scope>system</scope>-->
<!-- <systemPath>${project.basedir}/src/main/resources/lib/taos-jdbcdriver-2.0.15-dist.jar</systemPath>-->
</dependency>

3
docs/examples/c-ws/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*
!*.c
!.gitignore

View File

@ -0,0 +1,21 @@
// compile with
// gcc connect_example.c -o connect_example -ltaos
#include <stdio.h>
#include <stdlib.h>
#include "taosws.h"
int main() {
ws_enable_log("debug");
char *dsn = "ws://localhost:6041";
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
fprintf(stdout, "Connected to %s successfully.\n", dsn);
/* put your code here for read and write */
// close & clean
ws_close(taos);
}

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o create_db_demo create_db_demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosws.h"
static int DemoCreateDB() {
ws_enable_log("debug");
// ANCHOR: create_db_and_table
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// create database
WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
fprintf(stdout, "Create database power successfully.\n");
// create table
const char *sql =
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId "
"INT, location BINARY(24))";
result = ws_query(taos, sql);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create stable power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
fprintf(stdout, "Create stable power.meters successfully.\n");
// close & clean
ws_close(taos);
return 0;
// ANCHOR_END: create_db_and_table
}
int main(int argc, char *argv[]) { return DemoCreateDB(); }

View File

@ -0,0 +1,67 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o insert_data_demo insert_data_demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosws.h"
static int DemoInsertData() {
// ANCHOR: insert_data
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// insert data, please make sure the database and table are already created
const char *sql =
"INSERT INTO "
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') "
"VALUES "
"(NOW + 1a, 10.30000, 219, 0.31000) "
"(NOW + 2a, 12.60000, 218, 0.33000) "
"(NOW + 3a, 12.30000, 221, 0.31000) "
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') "
"VALUES "
"(NOW + 1a, 10.30000, 218, 0.25000) ";
WS_RES *result = ws_query(taos, sql);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
// you can check affectedRows here
int rows = ws_affected_rows(result);
fprintf(stdout, "Successfully inserted %d rows into power.meters.\n", rows);
// close & clean
ws_close(taos);
return 0;
// ANCHOR_END: insert_data
}
int main(int argc, char *argv[]) { return DemoInsertData(); }

View File

@ -0,0 +1,70 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o query_data_demo query_data_demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosws.h"
static int DemoQueryData() {
// ANCHOR: query_data
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// query data, please make sure the database and table are already created
const char *sql = "SELECT ts, current, location FROM power.meters limit 100";
WS_RES *result = ws_query(taos, sql);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
ws_errstr(result));
ws_close(taos);
return -1;
}
WS_ROW row = NULL;
int rows = 0;
int num_fields = ws_field_count(result);
const WS_FIELD *fields = ws_fetch_fields(result);
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
// fetch the records row by row
while ((row = ws_fetch_row(result))) {
// Add your data processing logic here
rows++;
}
fprintf(stdout, "total rows: %d\n", rows);
ws_free_result(result);
// close & clean
ws_close(taos);
return 0;
// ANCHOR_END: query_data
}
int main(int argc, char *argv[]) { return DemoQueryData(); }

View File

@ -0,0 +1,121 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o sml_insert_demo sml_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosws.h"
static int DemoSmlInsert() {
// ANCHOR: schemaless
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// create database
WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
// use database
result = ws_query(taos, "USE power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
// schemaless demo data
char *line_demo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 "
"1626006833639";
char *telnet_demo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
char *json_demo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, "
"\"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
// influxdb line protocol
char *lines[] = {line_demo};
int totalLines = 0;
result = ws_schemaless_insert_raw(taos, line_demo, strlen(line_demo), &totalLines, WS_TSDB_SML_LINE_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", totalLines);
ws_free_result(result);
// opentsdb telnet protocol
totalLines = 0;
result = ws_schemaless_insert_raw(taos, telnet_demo, strlen(telnet_demo), &totalLines, WS_TSDB_SML_TELNET_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", totalLines);
ws_free_result(result);
// opentsdb json protocol
char *jsons[1] = {0};
// allocate memory for json data. can not use static memory.
totalLines = 0;
result = ws_schemaless_insert_raw(taos, json_demo, strlen(json_demo), &totalLines, WS_TSDB_SML_JSON_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
free(jsons[0]);
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
free(jsons[0]);
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", totalLines);
ws_free_result(result);
// close & clean
ws_close(taos);
return 0;
// ANCHOR_END: schemaless
}
int main(int argc, char *argv[]) { return DemoSmlInsert(); }

View File

@ -0,0 +1,183 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "taosws.h"
/**
* @brief execute sql only.
*
* @param taos
* @param sql
*/
void executeSQL(WS_TAOS *taos, const char *sql) {
WS_RES *res = ws_query(taos, sql);
int code = ws_errno(res);
if (code != 0) {
fprintf(stderr, "%s\n", ws_errstr(res));
ws_free_result(res);
ws_close(taos);
exit(EXIT_FAILURE);
}
ws_free_result(res);
}
/**
* @brief check return status and exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(WS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
fprintf(stderr, "%s. code: %d, error: %s\n", msg, code, ws_stmt_errstr(stmt));
ws_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
typedef struct {
int64_t ts;
float current;
int voltage;
float phase;
} Row;
int num_of_sub_table = 10;
int num_of_row = 10;
int total_affected = 0;
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(WS_TAOS *taos) {
// init
WS_STMT *stmt = ws_stmt_init(taos);
if (stmt == NULL) {
fprintf(stderr, "Failed to init ws_stmt, error: %s\n", ws_stmt_errstr(NULL));
exit(EXIT_FAILURE);
}
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
int code = ws_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "Failed to execute ws_stmt_prepare");
for (int i = 1; i <= num_of_sub_table; i++) {
char table_name[20];
sprintf(table_name, "d_bind_%d", i);
char location[20];
sprintf(location, "location_%d", i);
// set table name and tags
WS_MULTI_BIND tags[2];
// groupId
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
tags[0].buffer_length = sizeof(int);
tags[0].length = (int32_t *)&tags[0].buffer_length;
tags[0].buffer = &i;
tags[0].is_null = NULL;
tags[0].num = 1;
// location
tags[1].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[1].buffer_length = strlen(location);
tags[1].length = (int32_t *)&tags[1].buffer_length;
tags[1].buffer = location;
tags[1].is_null = NULL;
tags[1].num = 1;
code = ws_stmt_set_tbname_tags(stmt, table_name, tags, 2);
checkErrorCode(stmt, code, "Failed to set table name and tags\n");
// insert rows
WS_MULTI_BIND params[4];
// ts
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].length = (int32_t *)&params[0].buffer_length;
params[0].is_null = NULL;
params[0].num = 1;
// current
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].length = (int32_t *)&params[1].buffer_length;
params[1].is_null = NULL;
params[1].num = 1;
// voltage
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].length = (int32_t *)&params[2].buffer_length;
params[2].is_null = NULL;
params[2].num = 1;
// phase
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].length = (int32_t *)&params[3].buffer_length;
params[3].is_null = NULL;
params[3].num = 1;
for (int j = 0; j < num_of_row; j++) {
struct timeval tv;
gettimeofday(&tv, NULL);
long long milliseconds = tv.tv_sec * 1000LL + tv.tv_usec / 1000; // current timestamp in milliseconds
int64_t ts = milliseconds + j;
float current = (float)rand() / RAND_MAX * 30;
int voltage = rand() % 300;
float phase = (float)rand() / RAND_MAX;
params[0].buffer = &ts;
params[1].buffer = &current;
params[2].buffer = &voltage;
params[3].buffer = &phase;
// bind param
code = ws_stmt_bind_param_batch(stmt, params, 4);
checkErrorCode(stmt, code, "Failed to bind param");
}
// add batch
code = ws_stmt_add_batch(stmt);
checkErrorCode(stmt, code, "Failed to add batch");
// execute batch
int affected_rows = 0;
code = ws_stmt_execute(stmt, &affected_rows);
checkErrorCode(stmt, code, "Failed to exec stmt");
// get affected rows
int affected = ws_stmt_affected_rows_once(stmt);
total_affected += affected;
}
fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
ws_stmt_close(stmt);
}
int main() {
int code = 0;
char *dsn = "ws://localhost:6041";
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
exit(EXIT_FAILURE);
}
// create database and table
executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
insertData(taos);
ws_close(taos);
}

View File

@ -0,0 +1,488 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "taosws.h"
volatile int thread_stop = 0;
static int running = 1;
static int count = 0;
const char* topic_name = "topic_meters";
typedef struct {
const char* enable_auto_commit;
const char* auto_commit_interval_ms;
const char* group_id;
const char* client_id;
const char* td_connect_host;
const char* td_connect_port;
const char* td_connect_user;
const char* td_connect_pass;
const char* auto_offset_reset;
} ConsumerConfig;
ConsumerConfig config = {.enable_auto_commit = "true",
.auto_commit_interval_ms = "1000",
.group_id = "group1",
.client_id = "client1",
.td_connect_host = "localhost",
.td_connect_port = "6030",
.td_connect_user = "root",
.td_connect_pass = "taosdata",
.auto_offset_reset = "latest"};
void* prepare_data(void* arg) {
int code = 0;
char* dsn = "ws://localhost:6041";
WS_TAOS* pConn = ws_connect(dsn);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return NULL;
}
WS_RES* pRes;
int i = 1;
while (!thread_stop) {
char buf[200] = {0};
i++;
snprintf(
buf, sizeof(buf),
"INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + %da, 10.30000, "
"219, 0.31000)",
i);
pRes = ws_query(pConn, buf);
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
}
ws_free_result(pRes);
sleep(1);
}
fprintf(stdout, "Prepare data thread exit\n");
return NULL;
}
// ANCHOR: msg_process
int32_t msg_process(WS_RES* msg) {
int32_t rows = 0;
const char* topicName = ws_tmq_get_topic_name(msg);
const char* dbName = ws_tmq_get_db_name(msg);
int32_t vgroupId = ws_tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
WS_ROW row = ws_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
rows++;
}
return rows;
}
// ANCHOR_END: msg_process
WS_TAOS* init_env() {
int code = 0;
char* dsn = "ws://localhost:6041";
WS_TAOS* pConn = ws_connect(dsn);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return NULL;
}
WS_RES* pRes;
// drop database if exists
pRes = ws_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
pRes = ws_query(pConn, "DROP DATABASE IF EXISTS power");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
// create database
pRes = ws_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
// create super table
pRes =
ws_query(pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
return pConn;
END:
ws_free_result(pRes);
ws_close(pConn);
return NULL;
}
void deinit_env(WS_TAOS* pConn) {
if (pConn) ws_close(pConn);
}
int32_t create_topic(WS_TAOS* pConn) {
WS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = ws_query(pConn, "USE power");
code = ws_errno(pRes);
if (ws_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
pRes = ws_query(
pConn,
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
return 0;
}
int32_t drop_topic(WS_TAOS* pConn) {
WS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = ws_query(pConn, "USE power");
code = ws_errno(pRes);
if (ws_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
pRes = ws_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
return 0;
}
void tmq_commit_cb_print(ws_tmq_t* tmq, int32_t code, void* param) {
count += 1;
fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count);
}
// ANCHOR: create_consumer_1
ws_tmq_t* build_consumer(const ConsumerConfig* config) {
ws_tmq_conf_res_t code;
ws_tmq_t* tmq = NULL;
// create a configuration object
ws_tmq_conf_t* conf = ws_tmq_conf_new();
// set the configuration parameters
code = ws_tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "group.id", config->group_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "client.id", config->client_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
// create a consumer object
tmq = ws_tmq_consumer_new(conf, "taos://localhost:6041", NULL, 0);
_end:
// destroy the configuration object
ws_tmq_conf_destroy(conf);
return tmq;
}
// ANCHOR_END: create_consumer_1
// ANCHOR: build_topic_list
// build a topic list used to subscribe
ws_tmq_list_t* build_topic_list() {
// create a empty topic list
ws_tmq_list_t* topicList = ws_tmq_list_new();
// append topic name to the list
int32_t code = ws_tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
ws_tmq_list_destroy(topicList);
fprintf(stderr,
"Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(NULL));
return NULL;
}
// if success, return the list
return topicList;
}
// ANCHOR_END: build_topic_list
// ANCHOR: basic_consume_loop
void basic_consume_loop(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// Add your data processing logic here
totalRows += msg_process(tmqmsg);
// free the message
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
// ANCHOR_END: basic_consume_loop
// ANCHOR: consume_repeatly
void consume_repeatly(ws_tmq_t* tmq) {
int32_t numOfAssignment = 0;
ws_tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = ws_tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
ws_tmq_topic_assignment* p = &pAssign[i];
code = ws_tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr,
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, ws_tmq_errstr(tmq));
break;
}
}
if (code == 0) fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
ws_tmq_free_assignment(pAssign, numOfAssignment);
// let's consume the messages again
basic_consume_loop(tmq);
}
// ANCHOR_END: consume_repeatly
// ANCHOR: manual_commit
void manual_commit(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = ws_tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr,
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
// free the message
ws_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
// ANCHOR_END: manual_commit
int main(int argc, char* argv[]) {
int32_t code;
pthread_t thread_id;
WS_TAOS* pConn = init_env();
if (pConn == NULL) {
fprintf(stderr, "Failed to init env.\n");
return -1;
}
if (create_topic(pConn) < 0) {
fprintf(stderr, "Failed to create topic.\n");
return -1;
}
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
fprintf(stderr, "Failed to create thread.\n");
return -1;
}
// ANCHOR: create_consumer_2
ws_tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n", config.td_connect_host,
config.group_id, config.client_id);
}
// ANCHOR_END: create_consumer_2
// ANCHOR: subscribe_3
ws_tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n", topic_name, config.group_id,
config.client_id);
return -1;
}
if ((code = ws_tmq_subscribe(tmq, topic_list))) {
fprintf(stderr,
"Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
ws_tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
// ANCHOR_END: subscribe_3
consume_repeatly(tmq);
manual_commit(tmq);
// ANCHOR: unsubscribe_and_close
// unsubscribe the topic
code = ws_tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr,
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = ws_tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}
// ANCHOR_END: unsubscribe_and_close
thread_stop = 1;
pthread_join(thread_id, NULL);
if (drop_topic(pConn) < 0) {
fprintf(stderr, "Failed to drop topic.\n");
return -1;
}
deinit_env(pConn);
return 0;
}

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o with_reqid_demo with_reqid_demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosws.h"
static int DemoWithReqId() {
// ANCHOR: with_reqid
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
const char *sql = "SELECT ts, current, location FROM power.meters limit 1";
// query data with reqid
long reqid = 3L;
WS_RES *result = ws_query_with_reqid(taos, sql, reqid);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute sql withQID: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code,
ws_errstr(result));
ws_close(taos);
return -1;
}
WS_ROW row = NULL;
int rows = 0;
int num_fields = ws_field_count(result);
const WS_FIELD *fields = ws_fetch_fields(result);
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
// fetch the records row by row
while ((row = ws_fetch_row(result))) {
// Add your data processing logic here
rows++;
}
fprintf(stdout, "total rows: %d\n", rows);
ws_free_result(result);
// close & clean
ws_close(taos);
return 0;
// ANCHOR_END: with_reqid
}
int main(int argc, char *argv[]) { return DemoWithReqId(); }

View File

@ -0,0 +1,101 @@
PROJECT(TDengine)
IF (TD_LINUX)
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
AUX_SOURCE_DIRECTORY(. SRC)
add_executable(docs_connect_example "")
add_executable(docs_create_db_demo "")
add_executable(docs_insert_data_demo "")
add_executable(docs_query_data_demo "")
add_executable(docs_with_reqid_demo "")
add_executable(docs_sml_insert_demo "")
add_executable(docs_stmt_insert_demo "")
add_executable(docs_tmq_demo "")
target_sources(docs_connect_example
PRIVATE
"connect_example.c"
)
target_sources(docs_create_db_demo
PRIVATE
"create_db_demo.c"
)
target_sources(docs_insert_data_demo
PRIVATE
"insert_data_demo.c"
)
target_sources(docs_query_data_demo
PRIVATE
"query_data_demo.c"
)
target_sources(docs_with_reqid_demo
PRIVATE
"with_reqid_demo.c"
)
target_sources(docs_sml_insert_demo
PRIVATE
"sml_insert_demo.c"
)
target_sources(docs_stmt_insert_demo
PRIVATE
"stmt_insert_demo.c"
)
target_sources(docs_tmq_demo
PRIVATE
"tmq_demo.c"
)
target_link_libraries(docs_connect_example
taos
)
target_link_libraries(docs_create_db_demo
taos
)
target_link_libraries(docs_insert_data_demo
taos
)
target_link_libraries(docs_query_data_demo
taos
)
target_link_libraries(docs_with_reqid_demo
taos
)
target_link_libraries(docs_sml_insert_demo
taos
)
target_link_libraries(docs_stmt_insert_demo
taos
)
target_link_libraries(docs_tmq_demo
taos
pthread
)
SET_TARGET_PROPERTIES(docs_connect_example PROPERTIES OUTPUT_NAME docs_connect_example)
SET_TARGET_PROPERTIES(docs_create_db_demo PROPERTIES OUTPUT_NAME docs_create_db_demo)
SET_TARGET_PROPERTIES(docs_insert_data_demo PROPERTIES OUTPUT_NAME docs_insert_data_demo)
SET_TARGET_PROPERTIES(docs_query_data_demo PROPERTIES OUTPUT_NAME docs_query_data_demo)
SET_TARGET_PROPERTIES(docs_with_reqid_demo PROPERTIES OUTPUT_NAME docs_with_reqid_demo)
SET_TARGET_PROPERTIES(docs_sml_insert_demo PROPERTIES OUTPUT_NAME docs_sml_insert_demo)
SET_TARGET_PROPERTIES(docs_stmt_insert_demo PROPERTIES OUTPUT_NAME docs_stmt_insert_demo)
SET_TARGET_PROPERTIES(docs_tmq_demo PROPERTIES OUTPUT_NAME docs_tmq_demo)
ENDIF ()
IF (TD_DARWIN)
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
AUX_SOURCE_DIRECTORY(. SRC)
ENDIF ()

34
docs/examples/c/Makefile Normal file
View File

@ -0,0 +1,34 @@
# Makefile for building TDengine examples on TD Linux platform
INCLUDE_DIRS =
TARGETS = connect_example \
create_db_demo \
insert_data_demo \
query_data_demo \
with_reqid_demo \
sml_insert_demo \
stmt_insert_demo \
tmq_demo
SOURCES = connect_example.c \
create_db_demo.c \
insert_data_demo.c \
query_data_demo.c \
with_reqid_demo.c \
sml_insert_demo.c \
stmt_insert_demo.c \
tmq_demo.c
LIBS = -ltaos -lpthread
CFLAGS = -g
all: $(TARGETS)
$(TARGETS):
$(CC) $(CFLAGS) -o $@ $(wildcard $(@F).c) $(LIBS)
clean:
rm -f $(TARGETS)

View File

@ -22,7 +22,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.3.2</version>
<version>3.3.3</version>
</dependency>
<!-- ANCHOR_END: dep-->

View File

@ -3,10 +3,7 @@ package com.taos.example;
import com.taosdata.jdbc.TSDBPreparedStatement;
import com.taosdata.jdbc.utils.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@ -16,15 +13,32 @@ public class ParameterBindingFullDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 50;
private static final int BINARY_COLUMN_SIZE = 100;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
"drop database if exists example_all_type_stmt",
"CREATE DATABASE IF NOT EXISTS example_all_type_stmt",
"USE example_all_type_stmt",
"CREATE STABLE IF NOT EXISTS stb_json (" +
"ts TIMESTAMP, " +
"int_col INT) " +
"tags (json_tag json)",
"CREATE STABLE IF NOT EXISTS stb (" +
"ts TIMESTAMP, " +
"int_col INT, " +
"double_col DOUBLE, " +
"bool_col BOOL, " +
"binary_col BINARY(100), " +
"nchar_col NCHAR(100), " +
"varbinary_col VARBINARY(100), " +
"geometry_col GEOMETRY(100)) " +
"tags (" +
"int_tag INT, " +
"double_tag DOUBLE, " +
"bool_tag BOOL, " +
"binary_tag BINARY(100), " +
"nchar_tag NCHAR(100), " +
"varbinary_tag VARBINARY(100), " +
"geometry_tag GEOMETRY(100))"
};
private static final int numOfSubTable = 10, numOfRow = 10;
@ -34,55 +48,37 @@ public class ParameterBindingFullDemo {
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
stmtJsonTag(conn);
stmtAll(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
bindVarbinary(conn);
bindGeometry(conn);
clean(conn);
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to insert data using stmt, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
} catch (Exception ex) {
System.out.println("Failed to insert data using stmt, ErrMessage: " + ex.getMessage());
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
clean(conn);
try (Statement stmt = conn.createStatement()) {
stmt.execute("create database if not exists test_parabind");
stmt.execute("use test_parabind");
for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]);
}
}
}
private static void clean(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
private static void stmtJsonTag(Connection conn) throws SQLException {
String sql = "INSERT INTO ? using stb_json tags(?) VALUES (?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t1_" + i);
pstmt.setTableName("ntb_json_" + i);
// set tags
pstmt.setTagByte(0, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setTagShort(1, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(2, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(3, random.nextLong());
pstmt.setTagJson(0, "{\"device\":\"device_" + i + "\"}");
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
@ -90,45 +86,42 @@ public class ParameterBindingFullDemo {
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Byte> f1List = new ArrayList<>();
ArrayList<Integer> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setByte(1, f1List);
ArrayList<Short> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setShort(2, f2List);
ArrayList<Integer> f3List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f3List.add(random.nextInt(Integer.MAX_VALUE));
pstmt.setInt(3, f3List);
ArrayList<Long> f4List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f4List.add(random.nextLong());
pstmt.setLong(4, f4List);
f1List.add(random.nextInt(Integer.MAX_VALUE));
pstmt.setInt(1, f1List);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb_json");
}
}
private static void bindFloat(Connection conn) throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
private static void stmtAll(Connection conn) throws SQLException {
String sql = "INSERT INTO ? using stb tags(?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)";
TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class);
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t2_" + i);
pstmt.setTableName("ntb" + i);
// set tags
pstmt.setTagFloat(0, random.nextFloat());
pstmt.setTagDouble(1, random.nextDouble());
pstmt.setTagInt(0, i);
pstmt.setTagDouble(1, 1.1);
pstmt.setTagBoolean(2, true);
pstmt.setTagString(3, "binary_value");
pstmt.setTagNString(4, "nchar_value");
pstmt.setTagVarbinary(5, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
pstmt.setTagGeometry(6, new byte[]{
0x01, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59,
0x40, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59, 0x40});
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
@ -136,190 +129,54 @@ public class ParameterBindingFullDemo {
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Float> f1List = new ArrayList<>();
ArrayList<Integer> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextFloat());
pstmt.setFloat(1, f1List);
f1List.add(random.nextInt(Integer.MAX_VALUE));
pstmt.setInt(1, f1List);
ArrayList<Double> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(random.nextDouble());
pstmt.setDouble(2, f2List);
ArrayList<Boolean> f3List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f3List.add(true);
pstmt.setBoolean(3, f3List);
ArrayList<String> f4List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f4List.add("binary_value");
pstmt.setString(4, f4List, BINARY_COLUMN_SIZE);
ArrayList<String> f5List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f5List.add("nchar_value");
pstmt.setNString(5, f5List, BINARY_COLUMN_SIZE);
ArrayList<byte[]> f6List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f6List.add(new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
pstmt.setVarbinary(6, f6List, BINARY_COLUMN_SIZE);
ArrayList<byte[]> f7List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f7List.add(new byte[]{
0x01, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59,
0x40, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59, 0x40});
pstmt.setGeometry(7, f7List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb");
// close if no try-with-catch statement is used
pstmt.close();
}
private static void bindBoolean(Connection conn) throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(0, random.nextBoolean());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Boolean> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextBoolean());
pstmt.setBoolean(1, f1List);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindBytes(Connection conn) throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t4_" + i);
// set tags
pstmt.setTagString(0, new String("abc"));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(new String("abc"));
}
pstmt.setString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindString(Connection conn) throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t5_" + i);
// set tags
pstmt.setTagNString(0, "California.SanFrancisco");
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add("California.LosAngeles");
}
pstmt.setNString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindVarbinary(Connection conn) throws SQLException {
String sql = "insert into ? using stable6 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t6_" + i);
// set tags
byte[] bTag = new byte[]{0,2,3,4,5};
bTag[0] = (byte) i;
pstmt.setTagVarbinary(0, bTag);
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
byte[] v = new byte[]{0,2,3,4,5,6};
v[0] = (byte)j;
f1List.add(v);
}
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindGeometry(Connection conn) throws SQLException {
String sql = "insert into ? using stable7 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
List<byte[]> listGeo = new ArrayList<>();
listGeo.add(g1);
listGeo.add(g2);
for (int i = 1; i <= 2; i++) {
// set table name
pstmt.setTableName("t7_" + i);
// set tags
pstmt.setTagGeometry(0, listGeo.get(i - 1));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(listGeo.get(i - 1));
}
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
}
// ANCHOR_END: para_bind

View File

@ -11,11 +11,30 @@ public class WSParameterBindingFullDemo {
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 30;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
"drop database if exists example_all_type_stmt",
"CREATE DATABASE IF NOT EXISTS example_all_type_stmt",
"USE example_all_type_stmt",
"CREATE STABLE IF NOT EXISTS stb_json (" +
"ts TIMESTAMP, " +
"int_col INT) " +
"tags (json_tag json)",
"CREATE STABLE IF NOT EXISTS stb (" +
"ts TIMESTAMP, " +
"int_col INT, " +
"double_col DOUBLE, " +
"bool_col BOOL, " +
"binary_col BINARY(100), " +
"nchar_col NCHAR(100), " +
"varbinary_col VARBINARY(100), " +
"geometry_col GEOMETRY(100)) " +
"tags (" +
"int_tag INT, " +
"double_tag DOUBLE, " +
"bool_tag BOOL, " +
"binary_tag BINARY(100), " +
"nchar_tag NCHAR(100), " +
"varbinary_tag VARBINARY(100), " +
"geometry_tag GEOMETRY(100))"
};
private static final int numOfSubTable = 10, numOfRow = 10;
@ -27,153 +46,91 @@ public class WSParameterBindingFullDemo {
init(conn);
bindInteger(conn);
stmtJsonTag(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
stmtAll(conn);
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
System.out.println("Failed to insert data using stmt, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex;
} catch (Exception ex){
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
} catch (Exception ex) {
System.out.println("Failed to insert data using stmt, ErrMessage: " + ex.getMessage());
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_ws_parabind");
stmt.execute("create database if not exists test_ws_parabind");
stmt.execute("use test_ws_parabind");
for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]);
}
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
private static void stmtJsonTag(Connection conn) throws SQLException {
String sql = "INSERT INTO ? using stb_json tags(?) VALUES (?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t1_" + i);
pstmt.setTableName("ntb_json_" + i);
// set tags
pstmt.setTagByte(1, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setTagShort(2, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(3, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(4, random.nextLong());
pstmt.setTagJson(1, "{\"device\":\"device_" + i + "\"}");
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setByte(2, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setShort(3, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setInt(4, random.nextInt(Integer.MAX_VALUE));
pstmt.setLong(5, random.nextLong());
pstmt.setInt(2, j);
pstmt.addBatch();
}
pstmt.executeBatch();
}
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb_json");
}
}
private static void bindFloat(Connection conn) throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
try(TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t2_" + i);
// set tags
pstmt.setTagFloat(1, random.nextFloat());
pstmt.setTagDouble(2, random.nextDouble());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setFloat(2, random.nextFloat());
pstmt.setDouble(3, random.nextDouble());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindBoolean(Connection conn) throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(1, random.nextBoolean());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setBoolean(2, random.nextBoolean());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindBytes(Connection conn) throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
private static void stmtAll(Connection conn) throws SQLException {
String sql = "INSERT INTO ? using stb tags(?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t4_" + i);
// set tags
pstmt.setTagString(1, new String("abc"));
// set table name
pstmt.setTableName("ntb");
// set tags
pstmt.setTagInt(1, 1);
pstmt.setTagDouble(2, 1.1);
pstmt.setTagBoolean(3, true);
pstmt.setTagString(4, "binary_value");
pstmt.setTagNString(5, "nchar_value");
pstmt.setTagVarbinary(6, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
pstmt.setTagGeometry(7, new byte[]{
0x01, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59,
0x40, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59, 0x40});
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setString(2, "abc");
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
long current = System.currentTimeMillis();
private static void bindString(Connection conn) throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t5_" + i);
// set tags
pstmt.setTagNString(1, "California.SanFrancisco");
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(0, new Timestamp(current + j));
pstmt.setNString(1, "California.SanFrancisco");
pstmt.addBatch();
}
pstmt.executeBatch();
}
pstmt.setTimestamp(1, new Timestamp(current));
pstmt.setInt(2, 1);
pstmt.setDouble(3, 1.1);
pstmt.setBoolean(4, true);
pstmt.setString(5, "binary_value");
pstmt.setNString(6, "nchar_value");
pstmt.setVarbinary(7, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
pstmt.setGeometry(8, new byte[]{
0x01, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59,
0x40, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59, 0x40});
pstmt.addBatch();
pstmt.executeBatch();
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb");
}
}
}

View File

@ -50,36 +50,68 @@ public class TestAll {
}
@Test
public void testRestInsert() throws SQLException {
dropDB("power");
RestInsertExample.main(args);
RestQueryExample.main(args);
public void testWsConnect() throws Exception {
WSConnectExample.main(args);
}
@Test
public void testStmtInsert() throws SQLException {
public void testBase() throws Exception {
JdbcCreatDBDemo.main(args);
JdbcInsertDataDemo.main(args);
JdbcQueryDemo.main(args);
dropDB("power");
StmtInsertExample.main(args);
}
@Test
public void testSubscribe() {
public void testWsSchemaless() throws Exception {
dropDB("power");
SchemalessWsTest.main(args);
}
@Test
public void testJniSchemaless() throws Exception {
dropDB("power");
SchemalessJniTest.main(args);
}
@Test
public void testJniStmtBasic() throws Exception {
dropDB("power");
ParameterBindingBasicDemo.main(args);
}
@Test
public void testJniStmtFull() throws Exception {
dropDB("power");
ParameterBindingFullDemo.main(args);
}
@Test
public void testWsStmtBasic() throws Exception {
dropDB("power");
WSParameterBindingBasicDemo.main(args);
}
@Test
public void testWsStmtFull() throws Exception {
dropDB("power");
WSParameterBindingFullDemo.main(args);
}
@Test
public void testConsumer() throws Exception {
dropDB("power");
SubscribeDemo.main(args);
}
@Test
public void testSubscribeOverWebsocket() {
WebsocketSubscribeDemo.main(args);
}
@Test
public void testSchemaless() throws SQLException {
LineProtocolExample.main(args);
TelnetLineProtocolExample.main(args);
// for json protocol, tags may be double type. but for telnet protocol tag must be nchar type.
// To avoid type mismatch, we delete database test.
dropDB("test");
JSONProtocolExample.main(args);
}
// @Test
// public void testSubscribeJni() throws SQLException, InterruptedException {
// dropDB("power");
// ConsumerLoopFull.main(args);
// }
// @Test
// public void testSubscribeWs() throws SQLException, InterruptedException {
// dropDB("power");
// WsConsumerLoopFull.main(args);
// }
}

View File

@ -4,7 +4,6 @@
"main": "index.js",
"license": "MIT",
"dependencies": {
"@tdengine/client": "^3.0.1",
"@tdengine/rest": "^3.0.0"
"@tdengine/websocket": "^3.1.0"
}
}

View File

@ -0,0 +1,100 @@
const taos = require("@tdengine/websocket");
let dsn = 'ws://localhost:6041';
async function json_tag_example() {
let wsSql = null;
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS example_json_tag');
console.log("Create database example_json_tag successfully.");
// create table
await wsSql.exec('create table if not exists example_json_tag.stb (ts timestamp, v int) tags(jt json)');
console.log("Create stable example_json_tag.stb successfully");
let insertQuery = 'INSERT INTO ' +
'example_json_tag.tb1 USING example_json_tag.stb TAGS(\'{"name":"value"}\') ' +
"values(now, 1) ";
taosResult = await wsSql.exec(insertQuery);
console.log("Successfully inserted " + taosResult.getAffectRows() + " rows to example_json_tag.stb.");
let sql = 'SELECT ts, v, jt FROM example_json_tag.stb limit 100';
wsRows = await wsSql.query(sql);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log('ts: ' + row[0] + ', v: ' + row[1] + ', jt: ' + row[2]);
}
} catch (err) {
console.error(`Failed to create database example_json_tag or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
async function all_type_example() {
let wsSql = null;
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS all_type_example');
console.log("Create database all_type_example successfully.");
// create table
await wsSql.exec('create table if not exists all_type_example.stb (ts timestamp, ' +
'int_col INT, double_col DOUBLE, bool_col BOOL, binary_col BINARY(100),' +
'nchar_col NCHAR(100), varbinary_col VARBINARY(100), geometry_col GEOMETRY(100)) ' +
'tags(int_tag INT, double_tag DOUBLE, bool_tag BOOL, binary_tag BINARY(100),' +
'nchar_tag NCHAR(100), varbinary_tag VARBINARY(100), geometry_tag GEOMETRY(100));');
console.log("Create stable all_type_example.stb successfully");
let insertQuery = "INSERT INTO all_type_example.tb1 using all_type_example.stb "
+ "tags(1, 1.1, true, 'binary_value', 'nchar_value', '\\x98f46e', 'POINT(100 100)') "
+ "values(now, 1, 1.1, true, 'binary_value', 'nchar_value', '\\x98f46e', 'POINT(100 100)')";
taosResult = await wsSql.exec(insertQuery);
console.log("Successfully inserted " + taosResult.getAffectRows() + " rows to all_type_example.stb.");
let sql = 'SELECT * FROM all_type_example.stb limit 100';
let wsRows = await wsSql.query(sql);
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log(row);
}
} catch (err) {
console.error(`Failed to create database all_type_example or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
async function test() {
await json_tag_example()
await all_type_example()
taos.destroy();
}
test()

View File

@ -0,0 +1,148 @@
const taos = require("@tdengine/websocket");
let dsn = 'ws://localhost:6041';
async function json_tag_example() {
let wsSql = null;
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS example_json_tag');
console.log("Create database example_json_tag successfully.");
await wsSql.exec('use example_json_tag');
// create table
await wsSql.exec('create table if not exists stb (ts timestamp, v int) tags(jt json)');
console.log("Create stable example_json_tag.stb successfully");
let stmt = await wsSql.stmtInit();
await stmt.prepare("INSERT INTO ? using stb tags(?) VALUES (?,?)");
await stmt.setTableName(`tb1`);
let tagParams = stmt.newStmtParam();
tagParams.setJson(['{"name":"value"}'])
await stmt.setTags(tagParams);
let bindParams = stmt.newStmtParam();
const currentMillis = new Date().getTime();
bindParams.setTimestamp([currentMillis]);
bindParams.setInt([1]);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
await stmt.close();
let sql = 'SELECT ts, v, jt FROM example_json_tag.stb limit 100';
wsRows = await wsSql.query(sql);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log('ts: ' + row[0] + ', v: ' + row[1] + ', jt: ' + row[2]);
}
} catch (err) {
console.error(`Failed to create database example_json_tag or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
async function all_type_example() {
let wsSql = null;
let stmt = null;
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS all_type_example');
console.log("Create database all_type_example successfully.");
await wsSql.exec('use all_type_example');
// create table
await wsSql.exec('create table if not exists stb (ts timestamp, ' +
'int_col INT, double_col DOUBLE, bool_col BOOL, binary_col BINARY(100),' +
'nchar_col NCHAR(100), varbinary_col VARBINARY(100), geometry_col GEOMETRY(100)) ' +
'tags(int_tag INT, double_tag DOUBLE, bool_tag BOOL, binary_tag BINARY(100),' +
'nchar_tag NCHAR(100), varbinary_tag VARBINARY(100), geometry_tag GEOMETRY(100));');
console.log("Create stable all_type_example.stb successfully");
let geometryData = new Uint8Array([0x01,0x01,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x59,0x40,0x00,0x00,0x00,0x00,0x00,0x00,0x59,0x40,]).buffer;
const encoder = new TextEncoder();
let vbData = encoder.encode(`Hello, world!`).buffer;
stmt = await wsSql.stmtInit();
await stmt.prepare("INSERT INTO ? using stb tags(?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)");
await stmt.setTableName(`tb1`);
let tagParams = stmt.newStmtParam();
tagParams.setInt([1]);
tagParams.setDouble([1.1]);
tagParams.setBoolean([true]);
tagParams.setVarchar(["hello"]);
tagParams.setNchar(["stmt"]);
tagParams.setGeometry([geometryData]);
tagParams.setVarBinary([vbData]);
await stmt.setTags(tagParams);
let bindParams = stmt.newStmtParam();
const currentMillis = new Date().getTime();
bindParams.setTimestamp([currentMillis]);
bindParams.setInt([1]);
bindParams.setDouble([1.1]);
bindParams.setBoolean([true]);
bindParams.setVarchar(["hello"]);
bindParams.setNchar(["stmt"]);
bindParams.setGeometry([geometryData]);
bindParams.setVarBinary([vbData]);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
let sql = 'SELECT * FROM all_type_example.stb limit 100';
let wsRows = await wsSql.query(sql);
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log(row);
}
} catch (err) {
console.error(`Failed to create database all_type_example or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
} finally {
if (stmt) {
await stmt.close();
}
if (wsSql) {
await wsSql.close();
}
}
}
async function test() {
await json_tag_example()
await all_type_example()
taos.destroy();
}
test()

View File

@ -1,53 +0,0 @@
const taos = require("@tdengine/websocket");
var host = null;
for(var i = 2; i < global.process.argv.length; i++){
var key = global.process.argv[i].split("=")[0];
var value = global.process.argv[i].split("=")[1];
if("host" == key){
host = value;
}
}
if(host == null){
console.log("Usage: node nodejsChecker.js host=<hostname> port=<port>");
process.exit(0);
}
let dbData = ["{\"metric\": \"meter_current\",\"timestamp\": 1626846402,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}",
"{\"metric\": \"meter_current\",\"timestamp\": 1626846403,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1002\"}}",
"{\"metric\": \"meter_current\",\"timestamp\": 1626846404,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1003\"}}"]
async function createConnect() {
let dsn = 'ws://' + host + ':6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
return await taos.sqlConnect(conf);
}
async function test() {
let wsSql = null;
let wsRows = null;
let reqId = 0;
try {
wsSql = await createConnect()
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;', reqId++);
await wsSql.schemalessInsert([dbData], taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, 0);
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
taos.destroy();
}
}
test()

View File

@ -15,8 +15,8 @@ async function createConnect() {
return wsSql;
}
async function test() {
let dsn = 'ws://localhost:6041'
let wsSql = null;
let wsRows = null;
let ttl = 0;
@ -29,6 +29,7 @@ async function test() {
}
catch (err) {
console.error(`Failed to insert data with schemaless, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {
@ -40,4 +41,5 @@ async function test() {
taos.destroy();
}
}
test()

View File

@ -10,11 +10,9 @@ for(var i = 2; i < global.process.argv.length; i++){
}
if(host == null){
console.log("Usage: node nodejsChecker.js host=<hostname> port=<port>");
process.exit(0);
host = 'localhost';
}
async function createConnect() {
let dsn = 'ws://' + host + ':6041'
console.log(dsn)
@ -41,7 +39,7 @@ async function test() {
taosResult = await wsSql.exec('USE power', reqId++);
console.log(taosResult);
taosResult = await wsSql.exec('CREATE STABLE IF NOT EXISTS meters (_ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);', reqId++);
taosResult = await wsSql.exec('CREATE STABLE IF NOT EXISTS meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);', reqId++);
console.log(taosResult);
taosResult = await wsSql.exec('DESCRIBE meters', reqId++);
@ -62,6 +60,7 @@ async function test() {
}
catch (err) {
console.error(err.code, err.message);
throw err;
}
finally {
if (wsRows) {

View File

@ -24,18 +24,24 @@ async function createConnect() {
async function createDbAndTable() {
let wsSql = null;
try {
wsSql = await createConnect();
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power');
console.log("Create database power successfully.");
// create table
await wsSql.exec('CREATE STABLE IF NOT EXISTS power.meters ' +
'(_ts timestamp, current float, voltage int, phase float) ' +
'(ts timestamp, current float, voltage int, phase float) ' +
'TAGS (location binary(64), groupId int);');
console.log("Create stable power.meters successfully");
} catch (err) {
console.error(`Failed to create database power or stable meters, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
} finally {
if (wsSql) {
await wsSql.close();
@ -48,21 +54,23 @@ async function createDbAndTable() {
// ANCHOR: insertData
async function insertData() {
let wsSql = null
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
try {
wsSql = await createConnect();
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
taosResult = await wsSql.exec(insertQuery);
console.log("Successfully inserted " + taosResult.getAffectRows() + " rows to power.meters.");
} catch (err) {
console.error(`Failed to insert data to power.meters, sql: ${insertQuery}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
} finally {
if (wsSql) {
await wsSql.close();
@ -86,6 +94,7 @@ async function queryData() {
}
catch (err) {
console.error(`Failed to query data from power.meters, sql: ${sql}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {
@ -113,6 +122,7 @@ async function sqlWithReqid() {
}
catch (err) {
console.error(`Failed to query data from power.meters, reqId: ${reqId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {

View File

@ -23,7 +23,7 @@ async function prepare() {
return wsSql
}
(async () => {
async function test() {
let stmt = null;
let connector = null;
try {
@ -60,6 +60,7 @@ async function prepare() {
}
catch (err) {
console.error(`Failed to insert to table meters using stmt, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (stmt) {
@ -70,4 +71,6 @@ async function prepare() {
}
taos.destroy();
}
})();
}
test()

View File

@ -1,58 +0,0 @@
const taos = require("@tdengine/websocket");
var host = null;
for(var i = 2; i < global.process.argv.length; i++){
var key = global.process.argv[i].split("=")[0];
var value = global.process.argv[i].split("=")[1];
if("host" == key){
host = value;
}
}
if(host == null){
console.log("Usage: node nodejsChecker.js host=<hostname> port=<port>");
process.exit(0);
}
let dbData = ["meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
"meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
"meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
"meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
"meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
"meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
"meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
"meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",];
async function createConnect() {
let dsn = 'ws://' + host + ':6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
return await taos.sqlConnect(conf);
}
async function test() {
let wsSql = null;
let wsRows = null;
let reqId = 0;
try {
wsSql = await createConnect()
await wsSql.exec('create database if not exists power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;', reqId++);
await wsSql.exec('use power', reqId++);
await wsSql.schemalessInsert(dbData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, 0);
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
taos.destroy();
}
}
test()

View File

@ -1,3 +1,4 @@
const { sleep } = require("@tdengine/websocket");
const taos = require("@tdengine/websocket");
// ANCHOR: create_consumer
@ -49,12 +50,20 @@ async function prepare() {
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
await wsSql.close();
}
for (let i = 0; i < 10; i++) {
async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 50; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
await sleep(100);
}
wsSql.close();
await wsSql.close();
}
async function subscribe(consumer) {
@ -82,13 +91,17 @@ async function test() {
let consumer = null;
try {
await prepare();
consumer = await createConsumer()
await subscribe(consumer)
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {

View File

@ -1,41 +1,45 @@
const { sleep } = require("@tdengine/websocket");
const taos = require("@tdengine/websocket");
const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";
// ANCHOR: create_consumer
async function createConsumer() {
let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, "group1"],
[taos.TMQConstants.CLIENT_ID, 'client1'],
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
return await taos.tmqConnect(configMap);
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(err);
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
// ANCHOR_END: create_consumer
async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
let conf = new taos.WSConfig('ws://192.168.1.98:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
const createDB = `CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
let wsSql = await taos.sqlConnect(conf);
@ -44,58 +48,63 @@ async function prepare() {
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
await wsSql.close();
}
for (let i = 0; i < 10; i++) {
async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 1; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
}
await wsSql.close();
}
// ANCHOR: subscribe
// ANCHOR: offset
async function subscribe(consumer) {
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${key} ${value}`);
}
}
} catch (err) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
// ANCHOR_END: subscribe
// ANCHOR: offset
async function test() {
let consumer = null;
try {
await prepare();
let consumer = await createConsumer()
await consumer.subscribe(['topic_meters']);
let res = new Map();
while (res.size == 0) {
res = await consumer.poll(100);
await consumer.commit();
}
let assignment = await consumer.assignment();
await consumer.seekToBeginning(assignment);
console.log("Assignment seek to beginning successfully");
} catch (err) {
console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
// ANCHOR_END: offset
async function test() {
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
console.error(`Failed to consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
}
// ANCHOR_END: offset
test()

View File

@ -15,6 +15,7 @@ def create_connection():
print(f"Connected to {host}:{port} successfully.");
except Exception as err:
print(f"Failed to connect to {host}:{port} , ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()

View File

@ -15,7 +15,7 @@ def create_connection():
print(f"Connected to {host}:{port} successfully.");
except Exception as err:
print(f"Failed to connect to {host}:{port} , ErrMessage:{err}")
raise err
return conn
# ANCHOR_END: connect
@ -28,6 +28,7 @@ def create_db_table(conn):
conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupId, location) TAGS(0, 'Los Angles')")
except Exception as err:
print(f'Exception {err}')
raise err
# ANCHOR_END: create_db
def insert(conn):
@ -42,9 +43,10 @@ def insert(conn):
"""
try:
inserted = conn.execute(sql)
assert inserted == 8
assert inserted == 4
except Exception as err:
print(f'Exception111 {err}')
raise err
# ANCHOR_END: insert
def query(conn):
@ -58,6 +60,7 @@ def query(conn):
print(row)
except Exception as err:
print(f'Exception {err}')
raise err
# ANCHOR_END: query
if __name__ == "__main__":

View File

@ -21,6 +21,7 @@ try:
except Exception as err:
print(f"Failed to create database power or stable meters, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()

View File

@ -20,6 +20,7 @@ try:
except Exception as err:
print(f"Failed to create database power or stable meters, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()

View File

@ -21,6 +21,7 @@ try:
except Exception as err:
print(f"Failed to create database power or stable meters, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()

View File

@ -22,6 +22,7 @@ try:
except Exception as err:
print(f"Failed to insert data to power.meters, sql: {sql}, ErrMessage: {err}.")
raise err
finally:
if conn:
conn.close()

View File

@ -21,6 +21,7 @@ try:
except Exception as err:
print(f"Failed to insert data to power.meters, sql:{sql}, ErrMessage:{err}.")
raise err
finally:
if conn:
conn.close()

View File

@ -22,6 +22,7 @@ try:
except Exception as err:
print(f"Failed to insert data to power.meters, sql: {sql}, ErrMessage: {err}.")
raise err
finally:
if conn:
conn.close()

View File

@ -16,6 +16,7 @@ try:
except Exception as err:
print(f"Failed to query data from power.meters, sql: {sql}, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()

View File

@ -15,3 +15,4 @@ try:
except Exception as err:
print(f"Failed to query data from power.meters, sql: {sql}, ErrMessage:{err}")
raise err

View File

@ -15,6 +15,7 @@ try:
except Exception as err:
print(f"Failed to query data from power.meters, sql: {sql}, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()

View File

@ -18,7 +18,7 @@ try:
except Exception as err:
print(f"Failed to execute sql with reqId:{reqId}, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()

View File

@ -16,3 +16,4 @@ try:
except Exception as err:
print(f"Failed to execute sql with reqId:{reqId}, ErrMessage:{err}")
raise err

View File

@ -19,6 +19,7 @@ try:
except Exception as err:
print(f"Failed to execute sql with reqId:{reqId}, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()

View File

@ -35,6 +35,7 @@ try:
print("Inserted data with schemaless successfully.");
except Exception as err:
print(f"Failed to insert data with schemaless, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()

View File

@ -75,8 +75,6 @@ def schemaless_insert():
conn.close()
if __name__ == "__main__":
try:
prepare()
schemaless_insert()
except Exception as err:
print(f"Failed to insert data with schemaless, err:{err}")
prepare()
schemaless_insert()

View File

@ -57,6 +57,7 @@ try:
except Exception as err:
print(f"Failed to insert to table meters using stmt, ErrMessage:{err}")
raise err
finally:
if stmt:
stmt.close()

View File

@ -62,6 +62,7 @@ try:
except Exception as err:
print(f"Failed to insert to table meters using stmt, ErrMessage:{err}")
raise err
finally:
if stmt:
stmt.close()

View File

@ -152,6 +152,7 @@ def unsubscribe(consumer):
print("Consumer unsubscribed successfully.");
except Exception as err:
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
finally:
if consumer:
consumer.close()
@ -166,7 +167,6 @@ if __name__ == "__main__":
subscribe(consumer)
seek_offset(consumer)
commit_offset(consumer)
except Exception as err:
print(f"Failed to execute consumer example, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
finally:
unsubscribe(consumer);
if consumer:
unsubscribe(consumer);

View File

@ -31,7 +31,7 @@ def prepareMeta():
# create super table
rowsAffected = conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(64))"
)
assert rowsAffected == 0
@ -155,6 +155,7 @@ def unsubscribe(consumer):
print("Consumer unsubscribed successfully.");
except Exception as err:
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
finally:
if consumer:
consumer.close()
@ -170,7 +171,6 @@ if __name__ == "__main__":
subscribe(consumer)
seek_offset(consumer)
commit_offset(consumer)
except Exception as err:
print(f"Failed to execute consumer example, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
finally:
unsubscribe(consumer)
if consumer:
unsubscribe(consumer)

View File

@ -8,6 +8,7 @@ edition = "2021"
anyhow = "1"
chrono = "0.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
log = "0.4"
pretty_env_logger = "0.5.0"

View File

@ -0,0 +1,121 @@
use taos::*;
use taos_query::util::hex::hex_string_to_bytes;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt")
.await?;
taos.create_database("example_all_type_stmt").await?;
taos.use_database("example_all_type_stmt").await?;
taos.exec(
r#"
CREATE STABLE IF NOT EXISTS stb (
ts TIMESTAMP,
int_col INT,
double_col DOUBLE,
bool_col BOOL,
binary_col BINARY(100),
nchar_col NCHAR(100),
varbinary_col VARBINARY(100),
geometry_col GEOMETRY(100))
TAGS (
int_tag INT,
double_tag DOUBLE,
bool_tag BOOL,
binary_tag BINARY(100),
nchar_tag NCHAR(100))
"#,
)
.await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? using stb tags(?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)")
.await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![
Value::Int(i as i32),
Value::Double(1.1),
Value::Bool(true),
Value::VarChar("binary_value".into()),
Value::NChar("nchar_value".into()),
// Value::VarBinary(vec![0x98, 0xf4, 0x6e].into()),
// Value::Geometry(
// vec![
// 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
// ]
// .into(),
// ),
];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}",
table_name, tags, err
);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_ints(vec![j as i32]),
ColumnView::from_doubles(vec![1.1]),
ColumnView::from_bools(vec![true]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
ColumnView::from_bytes(vec![hex_string_to_bytes("123456").to_vec()]),
ColumnView::from_geobytes(vec![hex_string_to_bytes(
"0101000000000000000000F03F0000000000000040",
)
.to_vec()]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to bind values, values:{:?}, ErrMessage: {}",
values, err
);
return Err(err.into());
}
}
}
match stmt.add_batch().await {
Ok(_) => {}
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await {
Ok(affected_rows) => println!(
"Successfully inserted {} rows to example_all_type_stmt.stb.",
affected_rows
),
Err(err) => {
eprintln!(
"Failed to insert to table stb using stmt, ErrMessage: {}",
err
);
return Err(err.into());
}
}
Ok(())
}

View File

@ -0,0 +1,94 @@
use serde_json::json;
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt")
.await?;
taos.create_database("example_all_type_stmt").await?;
taos.use_database("example_all_type_stmt").await?;
taos.exec(
r#"
CREATE STABLE IF NOT EXISTS stb_json (
ts TIMESTAMP,
int_col INT)
TAGS (
json_tag JSON)
"#,
)
.await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? using stb_json tags(?) VALUES (?,?)")
.await?;
const NUM_TABLES: usize = 1;
const NUM_ROWS: usize = 1;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let json_value: serde_json::Value = json!({
"name": "value"
});
dbg!(json_value.to_string());
let tags = vec![Value::Json(json_value)];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}",
table_name, tags, err
);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_ints(vec![j as i32]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to bind values, values:{:?}, ErrMessage: {}",
values, err
);
return Err(err.into());
}
}
}
match stmt.add_batch().await {
Ok(_) => {}
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await {
Ok(affected_rows) => println!(
"Successfully inserted {} rows to example_all_type_stmt.stb_json.",
affected_rows
),
Err(err) => {
eprintln!(
"Failed to insert to table stb_json using stmt, ErrMessage: {}",
err
);
return Err(err.into());
}
}
Ok(())
}

View File

@ -0,0 +1,121 @@
use taos::*;
use taos_query::util::hex::hex_string_to_bytes;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "ws://";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt")
.await?;
taos.create_database("example_all_type_stmt").await?;
taos.use_database("example_all_type_stmt").await?;
taos.exec(
r#"
CREATE STABLE IF NOT EXISTS stb (
ts TIMESTAMP,
int_col INT,
double_col DOUBLE,
bool_col BOOL,
binary_col BINARY(100),
nchar_col NCHAR(100),
varbinary_col VARBINARY(100),
geometry_col GEOMETRY(100))
TAGS (
int_tag INT,
double_tag DOUBLE,
bool_tag BOOL,
binary_tag BINARY(100),
nchar_tag NCHAR(100))
"#,
)
.await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? using stb tags(?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)")
.await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![
Value::Int(i as i32),
Value::Double(1.1),
Value::Bool(true),
Value::VarChar("binary_value".into()),
Value::NChar("nchar_value".into()),
// Value::VarBinary(vec![0x98, 0xf4, 0x6e].into()),
// Value::Geometry(
// vec![
// 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
// ]
// .into(),
// ),
];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}",
table_name, tags, err
);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_ints(vec![j as i32]),
ColumnView::from_doubles(vec![1.1]),
ColumnView::from_bools(vec![true]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
ColumnView::from_bytes(vec![hex_string_to_bytes("123456").to_vec()]),
ColumnView::from_geobytes(vec![hex_string_to_bytes(
"0101000000000000000000F03F0000000000000040",
)
.to_vec()]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to bind values, values:{:?}, ErrMessage: {}",
values, err
);
return Err(err.into());
}
}
}
match stmt.add_batch().await {
Ok(_) => {}
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await {
Ok(affected_rows) => println!(
"Successfully inserted {} rows to example_all_type_stmt.stb.",
affected_rows
),
Err(err) => {
eprintln!(
"Failed to insert to table stb using stmt, ErrMessage: {}",
err
);
return Err(err.into());
}
}
Ok(())
}

View File

@ -83,7 +83,6 @@ async fn main() -> anyhow::Result<()> {
eprintln!("Failed to execute insert: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(10)).await;
println!("Succed to execute insert 1 row");
}
});
});

View File

@ -14,9 +14,9 @@ TDengine 是一个高性能、分布式的时序数据库。通过集成的缓
TDengine OSS 是一个开源的高性能时序数据库与其他时序数据库相比它的核心优势在于其集群开源、高性能和云原生架构。而且除了基础的写入、查询和存储功能外TDengine OSS 还集成了缓存、流式计算和数据订阅等高级功能,这些功能显著简化了系统设计,降低了企业的研发和运营成本。
在 TDengine OSS 的基础上,企业版 TDengine Enterprise 提供了增强的辅助功能包括数据的备份恢复、异地容灾、多级存储、视图、权限控制、安全加密、IP 白名单、支持 MQTT、OPC-UA、OPC-DA、PI、Wonderware、Kafka 等各种数据源。这些功能为企业提供了更为全面、安全、可靠和高效的时序数据管理解决方案。
在 TDengine OSS 的基础上,企业版 TDengine Enterprise 提供了增强的辅助功能包括数据的备份恢复、异地容灾、多级存储、视图、权限控制、安全加密、IP 白名单、支持 MQTT、OPC-UA、OPC-DA、PI、Wonderware、Kafka 等各种数据源。这些功能为企业提供了更为全面、安全、可靠和高效的时序数据管理解决方案。更多的细节请看 [TDengine Enterprise](https://www.taosdata.com/tdengine-pro)
此外TDengine Cloud 作为一种全托管的云服务,存储与计算分离,分开计费,为企业提供了企业级的工具和服务,彻底解决了运维难题,尤其适合中小规模的用户使用。
此外TDengine Cloud 作为一种全托管的云服务,存储与计算分离,分开计费,为企业提供了企业级的工具和服务,彻底解决了运维难题,尤其适合中小规模的用户使用。更多的细节请看[TDengine 云服务](https://cloud.taosdata.com/?utm_source=menu&utm_medium=webcn)
## TDengine 主要功能与特性
@ -135,9 +135,3 @@ TDengine 经过特别优化,以适应时间序列数据的独特需求,引
- [TDengine 与 InfluxDB、OpenTSDB、Cassandra、MySQL、ClickHouse 等数据库的对比测试报告](https://www.taosdata.com/downloads/TDengine_Testing_Report_cn.pdf)
## 主要产品
TDengine 有两个主要产品TDengine Enterprise (即 TDengine 企业版)和 TDengine Cloud关于它们的具体定义请参考
- [TDengine 企业版](https://www.taosdata.com/tdengine-pro)
- [TDengine 云服务](https://cloud.taosdata.com/?utm_source=menu&utm_medium=webcn)

View File

@ -90,7 +90,7 @@ taosBenchmark 提供了丰富的选项,允许用户自定义测试参数,如
taosBenchmark --help
```
有关taosBenchmark 的详细使用方法,请参考[taosBenchmark 参考手册](../../reference/components/taosbenchmark)
有关taosBenchmark 的详细使用方法,请参考[taosBenchmark 参考手册](../../reference/tools/taosbenchmark)
### 体验查询

View File

@ -263,7 +263,7 @@ SELECT * FROM t;
Query OK, 2 row(s) in set (0.003128s)
```
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在机器上运行,更多细节请参考 [TDengine 命令行](../../reference/components/taos-cli/)。
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在机器上运行,更多细节请参考 [TDengine 命令行](../../reference/tools/taos-cli/)。
## 快速体验
@ -286,7 +286,7 @@ taosBenchmark 提供了丰富的选项,允许用户自定义测试参数,如
taosBenchmark --help
```
有关taosBenchmark 的详细使用方法,请参考[taosBenchmark 参考手册](../../reference/components/taosbenchmark)
有关taosBenchmark 的详细使用方法,请参考[taosBenchmark 参考手册](../../reference/tools/taosbenchmark)
### 体验查询

View File

@ -15,7 +15,7 @@ TDengine Cloud 大幅减轻了用户在部署、运维等方面的人力负担
要在 TDengine Cloud 注册新用户,请遵循以下简易步骤完成注册流程:
1. 打开浏览器,访问 TDengine Cloud 的首页https://cloud.taosdata.com,在右边的“注册”部分,填入自己的姓名以及企业邮箱地址,点击“获取验证码”按钮。
1. 打开浏览器,访问 [TDengine Cloud](https://cloud.taosdata.com),在右边的“注册”部分,填入自己的姓名以及企业邮箱地址,点击“获取验证码”按钮。
2. 检查企业邮箱,找到主题为“你的 TDengine Cloud 注册账户验证码”的邮件。从邮件内容中复制 6 位验证码,并将其粘贴到注册页面上的“验证码”输入框中。接着,点击“注册 TDengine Cloud”按钮进入客户信息补全页面。

View File

@ -181,7 +181,7 @@ INTERVAL(interval_val [, interval_offset])
- FILL用于指定窗口区间数据缺失的情况下数据的填充模式。
对于时间窗口interval_val 和 sliding_val 都表示时间段, 语法上支持三种方式。例如:
1. INTERVAL(1s, 500a) SLIDING(1s),带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微 w (周), y (年);
1. INTERVAL(1s, 500a) SLIDING(1s),带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微 w (周), y (年);
2. INTERVAL(1000, 500) SLIDING(1000),不带时间单位的形式,将使用查询库的时间精度作为默认时间单位,当存在多个库时默认采用精度更高的库;
3. INTERVAL('1s', '500a') SLIDING('1s'),带时间单位的字符串形式,字符串内部不能有任何空格等其它字符。

View File

@ -27,7 +27,7 @@ PI 系统是一套用于数据收集、查找、分析、传递和可视化的
在数据写入页面中,点击 **+新增数据源** 按钮,进入新增数据源页面。
![kafka-01.png](./kafka-01.png)
![new.png](./pic/pi-01-new.png)
### 基本配置

View File

@ -208,3 +208,15 @@ CSV 文件中的每个 Row 配置一个 OPC 数据点位。Row 的规则如下
### 8. 创建完成
点击 **提交** 按钮,完成创建 OPC UA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
## 增加数据点位
在任务运行中,点击 **编辑**,点击 **增加数据点位** 按钮,追加数据点位到 CSV 文件中。
![增加数据点位](./pic/opc-08-add-point.png)
在弹出的表单中,填写数据点位的信息。
![数据点位表单](./pic/opc-09-add-point.png)
点击 **确定** 按钮,完成数据点位的追加。

View File

@ -182,3 +182,15 @@ CSV 文件中的每个 Row 配置一个 OPC 数据点位。Row 的规则如下
### 7. 创建完成
点击 **提交** 按钮,完成创建 OPC DA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
## 增加数据点位
在任务运行中,点击 **编辑**,点击 **增加数据点位** 按钮,追加数据点位到 CSV 文件中。
![增加数据点位](./pic/opc-08-add-point.png)
在弹出的表单中,填写数据点位的信息。
![数据点位表单](./pic/opc-09-add-point.png)
点击 **确定** 按钮,完成数据点位的追加。

View File

@ -33,13 +33,14 @@ TDengine 可以通过 MQTT 连接器从 MQTT 代理订阅数据并将其写入 T
### 3. 配置连接和认证信息
**MQTT地址** 中填写 MQTT 代理的地址,例如:`192.168.1.42:1883`
**MQTT 地址** 中填写 MQTT 代理的地址,例如:`192.168.1.42`
**MQTT 端口** 中填写 MQTT 代理的端口,例如:`1883`
**用户** 中填写 MQTT 代理的用户名。
**密码** 中填写 MQTT 代理的密码。
点击 **连通性检查** 按钮,检查数据源是否可用。
![mqtt-03.png](./mqtt-03.png)
@ -64,6 +65,8 @@ TDengine 可以通过 MQTT 连接器从 MQTT 代理订阅数据并将其写入 T
**订阅主题及 QoS 配置** 中填写要消费的 Topic 名称。使用如下格式设置: `topic1::0,topic2::1`
点击 **检查连通性** 按钮,检查数据源是否可用。
![mqtt-05.png](./mqtt-05.png)
### 6. 配置 MQTT Payload 解析

View File

@ -44,8 +44,50 @@ TDengine 可以高效地从 Kafka 读取数据并将其写入 TDengine以实
如果服务端开启了 SASL 认证机制,此处需要启用 SASL 并配置相关内容,目前支持 PLAIN/SCRAM-SHA-256/GSSAPI 三种认证机制,请按实际情况进行选择。
#### 4.1. PLAIN 认证
选择 `PLAIN` 认证机制,输入用户名和密码:
![kafka-04-sasl-plain.png](./kafka-04-sasl-plain.png)
#### 4.1. SCRAM(SCRAM-SHA-256) 认证
选择 `SCRAM-SHA-256` 认证机制,输入用户名和密码:
![kafka-04.png](./kafka-04.png)
#### 4.3. GSSAPI 认证
选择 `GSSAPI` ,将通过 [RDkafka 客户端](https://github.com/confluentinc/librdkafka) 调用 GSSAPI 应用 Kerberos 认证机制:
![kafka-04-sasl-gssapi.png](./kafka-04-sasl-gssapi.png)
需要输入的信息有:
- Kerberos 服务名,一般是 `kafka`
- Kerberos 认证主体,即认证用户名,例如 `kafkaclient`
- Kerberos 初始化命令(可选,一般不用填写);
- Kerberos 密钥表,需提供文件并上传;
以上信息均需由 Kafka 服务管理者提供。
除此之外,在服务器上需要配置 [Kerberos](https://web.mit.edu/kerberos/) 认证服务。在 Ubuntu 下使用 `apt install krb5-user` ;在 CentOS 下,使用 `yum install krb5-workstation`;即可。
配置完成后,可以使用 [kcat](https://github.com/edenhill/kcat) 工具进行 Kafka 主题消费验证:
```bash
kcat <topic> \
-b <kafka-server:port> \
-G kcat \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanism=GSSAPI \
-X sasl.kerberos.keytab=</path/to/kafkaclient.keytab> \
-X sasl.kerberos.principal=<kafkaclient> \
-X sasl.kerberos.service.name=kafka
```
如果出现错误“Server xxxx not found in kerberos database”则需要配置 Kafka 节点对应的域名并在 Kerberos 客户端配置文件 `/etc/krb5.conf` 中配置反向域名解析 `rdns = true`
### 5. 配置 SSL 证书
如果服务端开启了 SSL 加密认证,此处需要启用 SSL 并配置相关内容。
@ -60,7 +102,7 @@ TDengine 可以高效地从 Kafka 读取数据并将其写入 TDengine以实
**主题** 中填写要消费的 Topic 名称。可以配置多个 Topic Topic 之间用逗号分隔。例如:`tp1,tp2`。
**Client ID** 中填写客户端标识,填写后会生成带有 `taosx` 前缀的客户端 ID (例如,如果填写的标识为 `foo`,则生成的客户端 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的客户端 ID 形如 `taosx100foo`)。连接到同一个 Kafka 集群的所有客户端 ID 必须保证唯一
**Client ID** 中填写客户端标识,填写后会生成带有 `taosx` 前缀的客户端 ID (例如,如果填写的标识为 `foo`,则生成的客户端 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的客户端 ID 形如 `taosx100foo`)。需要注意的是,当使用多个 taosX 订阅同一 Topic 需要进行负载均衡时,必须填写一致的客户端 ID 才能达到均衡效果
**消费者组 ID** 中填写消费者组标识,填写后会生成带有 `taosx` 前缀的消费者组 ID (例如,如果填写的标识为 `foo`,则生成的消费者组 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的消费者组 ID 形如 `taosx100foo`)。

View File

@ -13,7 +13,7 @@ MongoDB 是一个介于关系型数据库与非关系型数据库之间的产品
### 1. 新增数据源
在数据写入页面中点击上角的 **+新增数据源** 按钮进入新增数据源页面,如下图所示:
在数据写入页面中点击上角的 **+新增数据源** 按钮进入新增数据源页面,如下图所示:
![Common-zh00-EnterDataSourcePage.png](./pic/Common-zh00-EnterDataSourcePage.png "进入新增数据源页面")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 68 KiB

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 43 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 70 KiB

After

Width:  |  Height:  |  Size: 75 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 58 KiB

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 52 KiB

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

After

Width:  |  Height:  |  Size: 104 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 74 KiB

After

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 53 KiB

After

Width:  |  Height:  |  Size: 68 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

After

Width:  |  Height:  |  Size: 65 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 23 KiB

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

After

Width:  |  Height:  |  Size: 99 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 73 KiB

After

Width:  |  Height:  |  Size: 148 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 35 KiB

After

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 54 KiB

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 27 KiB

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 49 KiB

After

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 35 KiB

After

Width:  |  Height:  |  Size: 97 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

After

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 52 KiB

After

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 93 KiB

After

Width:  |  Height:  |  Size: 120 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 115 KiB

After

Width:  |  Height:  |  Size: 142 KiB

Some files were not shown because too many files have changed in this diff Show More