enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info
|
@ -426,6 +426,10 @@ pipeline {
|
|||
cd ${WKC}/tests/parallel_test
|
||||
./run_check_assert_container.sh -d ${WKDIR}
|
||||
'''
|
||||
sh '''
|
||||
cd ${WKC}/tests/parallel_test
|
||||
./run_check_void_container.sh -d ${WKDIR}
|
||||
'''
|
||||
sh '''
|
||||
date
|
||||
rm -rf ${WKC}/debug
|
||||
|
|
|
@ -7,7 +7,17 @@ ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
|
|||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
|
||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
|
||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
|
||||
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo firstEp localhost:6030 > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo fqdn localhost >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo serverPort 6030 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo debugFlag 135 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo asyncLog 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo supportVnodes 1024 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo numOfLogLines 300000000 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo logKeepDays -1 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo checkpointInterval 60 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo snodeAddress 127.0.0.1:873 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo monitor 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
*
|
||||
!*.c
|
||||
!.gitignore
|
|
@ -0,0 +1,21 @@
|
|||
// compile with
|
||||
// gcc connect_example.c -o connect_example -ltaos
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include "taosws.h"
|
||||
|
||||
int main() {
|
||||
ws_enable_log("debug");
|
||||
char *dsn = "ws://localhost:6041";
|
||||
WS_TAOS *taos = ws_connect(dsn);
|
||||
if (taos == NULL) {
|
||||
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
|
||||
return -1;
|
||||
}
|
||||
fprintf(stdout, "Connected to %s successfully.\n", dsn);
|
||||
|
||||
/* put your code here for read and write */
|
||||
|
||||
// close & clean
|
||||
ws_close(taos);
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o create_db_demo create_db_demo.c -ltaos
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "taosws.h"
|
||||
|
||||
static int DemoCreateDB() {
|
||||
ws_enable_log("debug");
|
||||
// ANCHOR: create_db_and_table
|
||||
int code = 0;
|
||||
char *dsn = "ws://localhost:6041";
|
||||
// connect
|
||||
WS_TAOS *taos = ws_connect(dsn);
|
||||
|
||||
if (taos == NULL) {
|
||||
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
|
||||
return -1;
|
||||
}
|
||||
|
||||
// create database
|
||||
WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power");
|
||||
code = ws_errno(result);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result));
|
||||
ws_close(taos);
|
||||
return -1;
|
||||
}
|
||||
ws_free_result(result);
|
||||
fprintf(stdout, "Create database power successfully.\n");
|
||||
|
||||
// create table
|
||||
const char *sql =
|
||||
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId "
|
||||
"INT, location BINARY(24))";
|
||||
result = ws_query(taos, sql);
|
||||
code = ws_errno(result);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to create stable power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result));
|
||||
ws_close(taos);
|
||||
return -1;
|
||||
}
|
||||
ws_free_result(result);
|
||||
fprintf(stdout, "Create stable power.meters successfully.\n");
|
||||
|
||||
// close & clean
|
||||
ws_close(taos);
|
||||
return 0;
|
||||
// ANCHOR_END: create_db_and_table
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) { return DemoCreateDB(); }
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o insert_data_demo insert_data_demo.c -ltaos
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "taosws.h"
|
||||
|
||||
static int DemoInsertData() {
|
||||
// ANCHOR: insert_data
|
||||
int code = 0;
|
||||
char *dsn = "ws://localhost:6041";
|
||||
// connect
|
||||
WS_TAOS *taos = ws_connect(dsn);
|
||||
if (taos == NULL) {
|
||||
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
|
||||
return -1;
|
||||
}
|
||||
|
||||
// insert data, please make sure the database and table are already created
|
||||
const char *sql =
|
||||
"INSERT INTO "
|
||||
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') "
|
||||
"VALUES "
|
||||
"(NOW + 1a, 10.30000, 219, 0.31000) "
|
||||
"(NOW + 2a, 12.60000, 218, 0.33000) "
|
||||
"(NOW + 3a, 12.30000, 221, 0.31000) "
|
||||
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') "
|
||||
"VALUES "
|
||||
"(NOW + 1a, 10.30000, 218, 0.25000) ";
|
||||
WS_RES *result = ws_query(taos, sql);
|
||||
code = ws_errno(result);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to insert data to power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
|
||||
ws_errstr(result));
|
||||
ws_close(taos);
|
||||
return -1;
|
||||
}
|
||||
ws_free_result(result);
|
||||
|
||||
// you can check affectedRows here
|
||||
int rows = ws_affected_rows(result);
|
||||
fprintf(stdout, "Successfully inserted %d rows into power.meters.\n", rows);
|
||||
|
||||
// close & clean
|
||||
ws_close(taos);
|
||||
return 0;
|
||||
// ANCHOR_END: insert_data
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) { return DemoInsertData(); }
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o query_data_demo query_data_demo.c -ltaos
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "taosws.h"
|
||||
|
||||
static int DemoQueryData() {
|
||||
// ANCHOR: query_data
|
||||
int code = 0;
|
||||
char *dsn = "ws://localhost:6041";
|
||||
|
||||
// connect
|
||||
WS_TAOS *taos = ws_connect(dsn);
|
||||
if (taos == NULL) {
|
||||
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
|
||||
return -1;
|
||||
}
|
||||
|
||||
// query data, please make sure the database and table are already created
|
||||
const char *sql = "SELECT ts, current, location FROM power.meters limit 100";
|
||||
WS_RES *result = ws_query(taos, sql);
|
||||
code = ws_errno(result);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
|
||||
ws_errstr(result));
|
||||
ws_close(taos);
|
||||
return -1;
|
||||
}
|
||||
|
||||
WS_ROW row = NULL;
|
||||
int rows = 0;
|
||||
int num_fields = ws_field_count(result);
|
||||
const WS_FIELD *fields = ws_fetch_fields(result);
|
||||
|
||||
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
|
||||
|
||||
// fetch the records row by row
|
||||
while ((row = ws_fetch_row(result))) {
|
||||
// Add your data processing logic here
|
||||
|
||||
rows++;
|
||||
}
|
||||
fprintf(stdout, "total rows: %d\n", rows);
|
||||
ws_free_result(result);
|
||||
|
||||
// close & clean
|
||||
ws_close(taos);
|
||||
return 0;
|
||||
// ANCHOR_END: query_data
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) { return DemoQueryData(); }
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o sml_insert_demo sml_insert_demo.c -ltaos
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "taosws.h"
|
||||
|
||||
static int DemoSmlInsert() {
|
||||
// ANCHOR: schemaless
|
||||
int code = 0;
|
||||
char *dsn = "ws://localhost:6041";
|
||||
|
||||
// connect
|
||||
WS_TAOS *taos = ws_connect(dsn);
|
||||
if (taos == NULL) {
|
||||
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
|
||||
return -1;
|
||||
}
|
||||
|
||||
// create database
|
||||
WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power");
|
||||
code = ws_errno(result);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result));
|
||||
ws_close(taos);
|
||||
return -1;
|
||||
}
|
||||
ws_free_result(result);
|
||||
|
||||
// use database
|
||||
result = ws_query(taos, "USE power");
|
||||
code = ws_errno(result);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result));
|
||||
ws_close(taos);
|
||||
return -1;
|
||||
}
|
||||
ws_free_result(result);
|
||||
|
||||
// schemaless demo data
|
||||
char *line_demo =
|
||||
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 "
|
||||
"1626006833639";
|
||||
char *telnet_demo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
|
||||
char *json_demo =
|
||||
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, "
|
||||
"\"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
|
||||
|
||||
// influxdb line protocol
|
||||
char *lines[] = {line_demo};
|
||||
int totalLines = 0;
|
||||
result = ws_schemaless_insert_raw(taos, line_demo, strlen(line_demo), &totalLines, WS_TSDB_SML_LINE_PROTOCOL,
|
||||
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
|
||||
code = ws_errno(result);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo,
|
||||
code, ws_errstr(result));
|
||||
ws_close(taos);
|
||||
return -1;
|
||||
}
|
||||
|
||||
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", totalLines);
|
||||
ws_free_result(result);
|
||||
|
||||
// opentsdb telnet protocol
|
||||
totalLines = 0;
|
||||
result = ws_schemaless_insert_raw(taos, telnet_demo, strlen(telnet_demo), &totalLines, WS_TSDB_SML_TELNET_PROTOCOL,
|
||||
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
|
||||
code = ws_errno(result);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo,
|
||||
code, ws_errstr(result));
|
||||
ws_close(taos);
|
||||
return -1;
|
||||
}
|
||||
|
||||
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", totalLines);
|
||||
ws_free_result(result);
|
||||
|
||||
// opentsdb json protocol
|
||||
char *jsons[1] = {0};
|
||||
// allocate memory for json data. can not use static memory.
|
||||
totalLines = 0;
|
||||
result = ws_schemaless_insert_raw(taos, json_demo, strlen(json_demo), &totalLines, WS_TSDB_SML_JSON_PROTOCOL,
|
||||
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
|
||||
code = ws_errno(result);
|
||||
if (code != 0) {
|
||||
free(jsons[0]);
|
||||
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo,
|
||||
code, ws_errstr(result));
|
||||
ws_close(taos);
|
||||
return -1;
|
||||
}
|
||||
free(jsons[0]);
|
||||
|
||||
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", totalLines);
|
||||
ws_free_result(result);
|
||||
|
||||
// close & clean
|
||||
ws_close(taos);
|
||||
return 0;
|
||||
// ANCHOR_END: schemaless
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) { return DemoSmlInsert(); }
|
|
@ -0,0 +1,183 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/time.h>
|
||||
#include "taosws.h"
|
||||
|
||||
/**
|
||||
* @brief execute sql only.
|
||||
*
|
||||
* @param taos
|
||||
* @param sql
|
||||
*/
|
||||
void executeSQL(WS_TAOS *taos, const char *sql) {
|
||||
WS_RES *res = ws_query(taos, sql);
|
||||
int code = ws_errno(res);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "%s\n", ws_errstr(res));
|
||||
ws_free_result(res);
|
||||
ws_close(taos);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
ws_free_result(res);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief check return status and exit program when error occur.
|
||||
*
|
||||
* @param stmt
|
||||
* @param code
|
||||
* @param msg
|
||||
*/
|
||||
void checkErrorCode(WS_STMT *stmt, int code, const char *msg) {
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "%s. code: %d, error: %s\n", msg, code, ws_stmt_errstr(stmt));
|
||||
ws_stmt_close(stmt);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int64_t ts;
|
||||
float current;
|
||||
int voltage;
|
||||
float phase;
|
||||
} Row;
|
||||
|
||||
int num_of_sub_table = 10;
|
||||
int num_of_row = 10;
|
||||
int total_affected = 0;
|
||||
/**
|
||||
* @brief insert data using stmt API
|
||||
*
|
||||
* @param taos
|
||||
*/
|
||||
void insertData(WS_TAOS *taos) {
|
||||
// init
|
||||
WS_STMT *stmt = ws_stmt_init(taos);
|
||||
if (stmt == NULL) {
|
||||
fprintf(stderr, "Failed to init ws_stmt, error: %s\n", ws_stmt_errstr(NULL));
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
// prepare
|
||||
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
|
||||
int code = ws_stmt_prepare(stmt, sql, 0);
|
||||
checkErrorCode(stmt, code, "Failed to execute ws_stmt_prepare");
|
||||
for (int i = 1; i <= num_of_sub_table; i++) {
|
||||
char table_name[20];
|
||||
sprintf(table_name, "d_bind_%d", i);
|
||||
char location[20];
|
||||
sprintf(location, "location_%d", i);
|
||||
|
||||
// set table name and tags
|
||||
WS_MULTI_BIND tags[2];
|
||||
// groupId
|
||||
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
|
||||
tags[0].buffer_length = sizeof(int);
|
||||
tags[0].length = (int32_t *)&tags[0].buffer_length;
|
||||
tags[0].buffer = &i;
|
||||
tags[0].is_null = NULL;
|
||||
tags[0].num = 1;
|
||||
// location
|
||||
tags[1].buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||
tags[1].buffer_length = strlen(location);
|
||||
tags[1].length = (int32_t *)&tags[1].buffer_length;
|
||||
tags[1].buffer = location;
|
||||
tags[1].is_null = NULL;
|
||||
tags[1].num = 1;
|
||||
code = ws_stmt_set_tbname_tags(stmt, table_name, tags, 2);
|
||||
checkErrorCode(stmt, code, "Failed to set table name and tags\n");
|
||||
|
||||
// insert rows
|
||||
WS_MULTI_BIND params[4];
|
||||
// ts
|
||||
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
params[0].buffer_length = sizeof(int64_t);
|
||||
params[0].length = (int32_t *)¶ms[0].buffer_length;
|
||||
params[0].is_null = NULL;
|
||||
params[0].num = 1;
|
||||
// current
|
||||
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||
params[1].buffer_length = sizeof(float);
|
||||
params[1].length = (int32_t *)¶ms[1].buffer_length;
|
||||
params[1].is_null = NULL;
|
||||
params[1].num = 1;
|
||||
// voltage
|
||||
params[2].buffer_type = TSDB_DATA_TYPE_INT;
|
||||
params[2].buffer_length = sizeof(int);
|
||||
params[2].length = (int32_t *)¶ms[2].buffer_length;
|
||||
params[2].is_null = NULL;
|
||||
params[2].num = 1;
|
||||
// phase
|
||||
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||
params[3].buffer_length = sizeof(float);
|
||||
params[3].length = (int32_t *)¶ms[3].buffer_length;
|
||||
params[3].is_null = NULL;
|
||||
params[3].num = 1;
|
||||
|
||||
for (int j = 0; j < num_of_row; j++) {
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv, NULL);
|
||||
long long milliseconds = tv.tv_sec * 1000LL + tv.tv_usec / 1000; // current timestamp in milliseconds
|
||||
int64_t ts = milliseconds + j;
|
||||
float current = (float)rand() / RAND_MAX * 30;
|
||||
int voltage = rand() % 300;
|
||||
float phase = (float)rand() / RAND_MAX;
|
||||
params[0].buffer = &ts;
|
||||
params[1].buffer = ¤t;
|
||||
params[2].buffer = &voltage;
|
||||
params[3].buffer = &phase;
|
||||
// bind param
|
||||
code = ws_stmt_bind_param_batch(stmt, params, 4);
|
||||
checkErrorCode(stmt, code, "Failed to bind param");
|
||||
}
|
||||
// add batch
|
||||
code = ws_stmt_add_batch(stmt);
|
||||
checkErrorCode(stmt, code, "Failed to add batch");
|
||||
// execute batch
|
||||
int affected_rows = 0;
|
||||
code = ws_stmt_execute(stmt, &affected_rows);
|
||||
checkErrorCode(stmt, code, "Failed to exec stmt");
|
||||
// get affected rows
|
||||
int affected = ws_stmt_affected_rows_once(stmt);
|
||||
total_affected += affected;
|
||||
}
|
||||
fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
|
||||
ws_stmt_close(stmt);
|
||||
}
|
||||
|
||||
int main() {
|
||||
int code = 0;
|
||||
char *dsn = "ws://localhost:6041";
|
||||
WS_TAOS *taos = ws_connect(dsn);
|
||||
if (taos == NULL) {
|
||||
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
// create database and table
|
||||
executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power");
|
||||
executeSQL(taos, "USE power");
|
||||
executeSQL(taos,
|
||||
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
|
||||
"(groupId INT, location BINARY(24))");
|
||||
insertData(taos);
|
||||
ws_close(taos);
|
||||
}
|
|
@ -0,0 +1,488 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread
|
||||
|
||||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include "taosws.h"
|
||||
|
||||
volatile int thread_stop = 0;
|
||||
static int running = 1;
|
||||
static int count = 0;
|
||||
const char* topic_name = "topic_meters";
|
||||
|
||||
typedef struct {
|
||||
const char* enable_auto_commit;
|
||||
const char* auto_commit_interval_ms;
|
||||
const char* group_id;
|
||||
const char* client_id;
|
||||
const char* td_connect_host;
|
||||
const char* td_connect_port;
|
||||
const char* td_connect_user;
|
||||
const char* td_connect_pass;
|
||||
const char* auto_offset_reset;
|
||||
} ConsumerConfig;
|
||||
|
||||
ConsumerConfig config = {.enable_auto_commit = "true",
|
||||
.auto_commit_interval_ms = "1000",
|
||||
.group_id = "group1",
|
||||
.client_id = "client1",
|
||||
.td_connect_host = "localhost",
|
||||
.td_connect_port = "6030",
|
||||
.td_connect_user = "root",
|
||||
.td_connect_pass = "taosdata",
|
||||
.auto_offset_reset = "latest"};
|
||||
|
||||
void* prepare_data(void* arg) {
|
||||
int code = 0;
|
||||
char* dsn = "ws://localhost:6041";
|
||||
WS_TAOS* pConn = ws_connect(dsn);
|
||||
if (pConn == NULL) {
|
||||
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
WS_RES* pRes;
|
||||
int i = 1;
|
||||
|
||||
while (!thread_stop) {
|
||||
char buf[200] = {0};
|
||||
i++;
|
||||
snprintf(
|
||||
buf, sizeof(buf),
|
||||
"INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + %da, 10.30000, "
|
||||
"219, 0.31000)",
|
||||
i);
|
||||
|
||||
pRes = ws_query(pConn, buf);
|
||||
code = ws_errno(pRes);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
|
||||
}
|
||||
ws_free_result(pRes);
|
||||
sleep(1);
|
||||
}
|
||||
fprintf(stdout, "Prepare data thread exit\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// ANCHOR: msg_process
|
||||
int32_t msg_process(WS_RES* msg) {
|
||||
int32_t rows = 0;
|
||||
const char* topicName = ws_tmq_get_topic_name(msg);
|
||||
const char* dbName = ws_tmq_get_db_name(msg);
|
||||
int32_t vgroupId = ws_tmq_get_vgroup_id(msg);
|
||||
|
||||
while (true) {
|
||||
// get one row data from message
|
||||
WS_ROW row = ws_fetch_row(msg);
|
||||
if (row == NULL) break;
|
||||
|
||||
// Add your data processing logic here
|
||||
|
||||
rows++;
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
// ANCHOR_END: msg_process
|
||||
|
||||
WS_TAOS* init_env() {
|
||||
int code = 0;
|
||||
char* dsn = "ws://localhost:6041";
|
||||
WS_TAOS* pConn = ws_connect(dsn);
|
||||
if (pConn == NULL) {
|
||||
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
WS_RES* pRes;
|
||||
// drop database if exists
|
||||
pRes = ws_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
|
||||
code = ws_errno(pRes);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
ws_free_result(pRes);
|
||||
|
||||
pRes = ws_query(pConn, "DROP DATABASE IF EXISTS power");
|
||||
code = ws_errno(pRes);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
ws_free_result(pRes);
|
||||
|
||||
// create database
|
||||
pRes = ws_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
|
||||
code = ws_errno(pRes);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
ws_free_result(pRes);
|
||||
|
||||
// create super table
|
||||
pRes =
|
||||
ws_query(pConn,
|
||||
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
|
||||
"(groupId INT, location BINARY(24))");
|
||||
code = ws_errno(pRes);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
ws_free_result(pRes);
|
||||
|
||||
return pConn;
|
||||
|
||||
END:
|
||||
ws_free_result(pRes);
|
||||
ws_close(pConn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void deinit_env(WS_TAOS* pConn) {
|
||||
if (pConn) ws_close(pConn);
|
||||
}
|
||||
|
||||
int32_t create_topic(WS_TAOS* pConn) {
|
||||
WS_RES* pRes;
|
||||
int code = 0;
|
||||
|
||||
if (!pConn) {
|
||||
fprintf(stderr, "Invalid input parameter.\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRes = ws_query(pConn, "USE power");
|
||||
code = ws_errno(pRes);
|
||||
if (ws_errno(pRes) != 0) {
|
||||
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
ws_free_result(pRes);
|
||||
|
||||
pRes = ws_query(
|
||||
pConn,
|
||||
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
|
||||
code = ws_errno(pRes);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
ws_free_result(pRes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t drop_topic(WS_TAOS* pConn) {
|
||||
WS_RES* pRes;
|
||||
int code = 0;
|
||||
|
||||
if (!pConn) {
|
||||
fprintf(stderr, "Invalid input parameter.\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRes = ws_query(pConn, "USE power");
|
||||
code = ws_errno(pRes);
|
||||
if (ws_errno(pRes) != 0) {
|
||||
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
ws_free_result(pRes);
|
||||
|
||||
pRes = ws_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
|
||||
code = ws_errno(pRes);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
ws_free_result(pRes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tmq_commit_cb_print(ws_tmq_t* tmq, int32_t code, void* param) {
|
||||
count += 1;
|
||||
fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count);
|
||||
}
|
||||
|
||||
// ANCHOR: create_consumer_1
|
||||
ws_tmq_t* build_consumer(const ConsumerConfig* config) {
|
||||
ws_tmq_conf_res_t code;
|
||||
ws_tmq_t* tmq = NULL;
|
||||
|
||||
// create a configuration object
|
||||
ws_tmq_conf_t* conf = ws_tmq_conf_new();
|
||||
|
||||
// set the configuration parameters
|
||||
code = ws_tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
|
||||
if (WS_TMQ_CONF_OK != code) {
|
||||
ws_tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = ws_tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
|
||||
if (WS_TMQ_CONF_OK != code) {
|
||||
ws_tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = ws_tmq_conf_set(conf, "group.id", config->group_id);
|
||||
if (WS_TMQ_CONF_OK != code) {
|
||||
ws_tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = ws_tmq_conf_set(conf, "client.id", config->client_id);
|
||||
if (WS_TMQ_CONF_OK != code) {
|
||||
ws_tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
code = ws_tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
|
||||
if (WS_TMQ_CONF_OK != code) {
|
||||
ws_tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// create a consumer object
|
||||
tmq = ws_tmq_consumer_new(conf, "taos://localhost:6041", NULL, 0);
|
||||
|
||||
_end:
|
||||
// destroy the configuration object
|
||||
ws_tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
}
|
||||
// ANCHOR_END: create_consumer_1
|
||||
|
||||
// ANCHOR: build_topic_list
|
||||
// build a topic list used to subscribe
|
||||
ws_tmq_list_t* build_topic_list() {
|
||||
// create a empty topic list
|
||||
ws_tmq_list_t* topicList = ws_tmq_list_new();
|
||||
|
||||
// append topic name to the list
|
||||
int32_t code = ws_tmq_list_append(topicList, topic_name);
|
||||
if (code) {
|
||||
// if failed, destroy the list and return NULL
|
||||
ws_tmq_list_destroy(topicList);
|
||||
fprintf(stderr,
|
||||
"Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
|
||||
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(NULL));
|
||||
return NULL;
|
||||
}
|
||||
// if success, return the list
|
||||
return topicList;
|
||||
}
|
||||
// ANCHOR_END: build_topic_list
|
||||
|
||||
// ANCHOR: basic_consume_loop
|
||||
void basic_consume_loop(ws_tmq_t* tmq) {
|
||||
int32_t totalRows = 0; // total rows consumed
|
||||
int32_t msgCnt = 0; // total messages consumed
|
||||
int32_t timeout = 5000; // poll timeout
|
||||
|
||||
while (running) {
|
||||
// poll message from TDengine
|
||||
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmsg) {
|
||||
msgCnt++;
|
||||
|
||||
// Add your data processing logic here
|
||||
totalRows += msg_process(tmqmsg);
|
||||
|
||||
// free the message
|
||||
ws_free_result(tmqmsg);
|
||||
}
|
||||
if (msgCnt > 50) {
|
||||
// consume 50 messages and break
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// print the result: total messages and total rows consumed
|
||||
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
// ANCHOR_END: basic_consume_loop
|
||||
|
||||
// ANCHOR: consume_repeatly
|
||||
void consume_repeatly(ws_tmq_t* tmq) {
|
||||
int32_t numOfAssignment = 0;
|
||||
ws_tmq_topic_assignment* pAssign = NULL;
|
||||
|
||||
// get the topic assignment
|
||||
int32_t code = ws_tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
|
||||
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
|
||||
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
|
||||
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
|
||||
return;
|
||||
}
|
||||
|
||||
// seek to the earliest offset
|
||||
for (int32_t i = 0; i < numOfAssignment; ++i) {
|
||||
ws_tmq_topic_assignment* p = &pAssign[i];
|
||||
|
||||
code = ws_tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
|
||||
if (code != 0) {
|
||||
fprintf(stderr,
|
||||
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
|
||||
topic_name, config.group_id, config.client_id, p->vgId, code, ws_tmq_errstr(tmq));
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (code == 0) fprintf(stdout, "Assignment seek to beginning successfully.\n");
|
||||
|
||||
// free the assignment array
|
||||
ws_tmq_free_assignment(pAssign, numOfAssignment);
|
||||
|
||||
// let's consume the messages again
|
||||
basic_consume_loop(tmq);
|
||||
}
|
||||
// ANCHOR_END: consume_repeatly
|
||||
|
||||
// ANCHOR: manual_commit
|
||||
void manual_commit(ws_tmq_t* tmq) {
|
||||
int32_t totalRows = 0; // total rows consumed
|
||||
int32_t msgCnt = 0; // total messages consumed
|
||||
int32_t timeout = 5000; // poll timeout
|
||||
|
||||
while (running) {
|
||||
// poll message from TDengine
|
||||
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmsg) {
|
||||
msgCnt++;
|
||||
// process the message
|
||||
totalRows += msg_process(tmqmsg);
|
||||
// commit the message
|
||||
int32_t code = ws_tmq_commit_sync(tmq, tmqmsg);
|
||||
if (code) {
|
||||
fprintf(stderr,
|
||||
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
|
||||
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
|
||||
// free the message
|
||||
ws_free_result(tmqmsg);
|
||||
break;
|
||||
} else {
|
||||
fprintf(stdout, "Commit offset manually successfully.\n");
|
||||
}
|
||||
// free the message
|
||||
ws_free_result(tmqmsg);
|
||||
}
|
||||
if (msgCnt > 50) {
|
||||
// consume 50 messages and break
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// print the result: total messages and total rows consumed
|
||||
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
|
||||
}
|
||||
// ANCHOR_END: manual_commit
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int32_t code;
|
||||
pthread_t thread_id;
|
||||
|
||||
WS_TAOS* pConn = init_env();
|
||||
if (pConn == NULL) {
|
||||
fprintf(stderr, "Failed to init env.\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (create_topic(pConn) < 0) {
|
||||
fprintf(stderr, "Failed to create topic.\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
|
||||
fprintf(stderr, "Failed to create thread.\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// ANCHOR: create_consumer_2
|
||||
ws_tmq_t* tmq = build_consumer(&config);
|
||||
if (NULL == tmq) {
|
||||
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
|
||||
config.td_connect_host, config.group_id, config.client_id);
|
||||
return -1;
|
||||
} else {
|
||||
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n", config.td_connect_host,
|
||||
config.group_id, config.client_id);
|
||||
}
|
||||
|
||||
// ANCHOR_END: create_consumer_2
|
||||
|
||||
// ANCHOR: subscribe_3
|
||||
ws_tmq_list_t* topic_list = build_topic_list();
|
||||
if (NULL == topic_list) {
|
||||
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n", topic_name, config.group_id,
|
||||
config.client_id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((code = ws_tmq_subscribe(tmq, topic_list))) {
|
||||
fprintf(stderr,
|
||||
"Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
|
||||
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
|
||||
} else {
|
||||
fprintf(stdout, "Subscribe topics successfully.\n");
|
||||
}
|
||||
|
||||
ws_tmq_list_destroy(topic_list);
|
||||
|
||||
basic_consume_loop(tmq);
|
||||
// ANCHOR_END: subscribe_3
|
||||
|
||||
consume_repeatly(tmq);
|
||||
|
||||
manual_commit(tmq);
|
||||
|
||||
// ANCHOR: unsubscribe_and_close
|
||||
// unsubscribe the topic
|
||||
code = ws_tmq_unsubscribe(tmq);
|
||||
if (code) {
|
||||
fprintf(stderr,
|
||||
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
|
||||
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
|
||||
} else {
|
||||
fprintf(stdout, "Consumer unsubscribed successfully.\n");
|
||||
}
|
||||
|
||||
// close the consumer
|
||||
code = ws_tmq_consumer_close(tmq);
|
||||
if (code) {
|
||||
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
|
||||
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
|
||||
} else {
|
||||
fprintf(stdout, "Consumer closed successfully.\n");
|
||||
}
|
||||
// ANCHOR_END: unsubscribe_and_close
|
||||
|
||||
thread_stop = 1;
|
||||
pthread_join(thread_id, NULL);
|
||||
|
||||
if (drop_topic(pConn) < 0) {
|
||||
fprintf(stderr, "Failed to drop topic.\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
deinit_env(pConn);
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o with_reqid_demo with_reqid_demo.c -ltaos
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "taosws.h"
|
||||
|
||||
static int DemoWithReqId() {
|
||||
// ANCHOR: with_reqid
|
||||
int code = 0;
|
||||
char *dsn = "ws://localhost:6041";
|
||||
|
||||
// connect
|
||||
WS_TAOS *taos = ws_connect(dsn);
|
||||
if (taos == NULL) {
|
||||
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
|
||||
return -1;
|
||||
}
|
||||
|
||||
const char *sql = "SELECT ts, current, location FROM power.meters limit 1";
|
||||
// query data with reqid
|
||||
long reqid = 3L;
|
||||
WS_RES *result = ws_query_with_reqid(taos, sql, reqid);
|
||||
code = ws_errno(result);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to execute sql withQID: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code,
|
||||
ws_errstr(result));
|
||||
ws_close(taos);
|
||||
return -1;
|
||||
}
|
||||
|
||||
WS_ROW row = NULL;
|
||||
int rows = 0;
|
||||
int num_fields = ws_field_count(result);
|
||||
const WS_FIELD *fields = ws_fetch_fields(result);
|
||||
|
||||
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
|
||||
|
||||
// fetch the records row by row
|
||||
while ((row = ws_fetch_row(result))) {
|
||||
// Add your data processing logic here
|
||||
|
||||
rows++;
|
||||
}
|
||||
fprintf(stdout, "total rows: %d\n", rows);
|
||||
ws_free_result(result);
|
||||
|
||||
// close & clean
|
||||
ws_close(taos);
|
||||
return 0;
|
||||
// ANCHOR_END: with_reqid
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) { return DemoWithReqId(); }
|
|
@ -0,0 +1,101 @@
|
|||
PROJECT(TDengine)
|
||||
|
||||
IF (TD_LINUX)
|
||||
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
|
||||
AUX_SOURCE_DIRECTORY(. SRC)
|
||||
|
||||
add_executable(docs_connect_example "")
|
||||
add_executable(docs_create_db_demo "")
|
||||
add_executable(docs_insert_data_demo "")
|
||||
add_executable(docs_query_data_demo "")
|
||||
add_executable(docs_with_reqid_demo "")
|
||||
add_executable(docs_sml_insert_demo "")
|
||||
add_executable(docs_stmt_insert_demo "")
|
||||
add_executable(docs_tmq_demo "")
|
||||
|
||||
target_sources(docs_connect_example
|
||||
PRIVATE
|
||||
"connect_example.c"
|
||||
)
|
||||
|
||||
target_sources(docs_create_db_demo
|
||||
PRIVATE
|
||||
"create_db_demo.c"
|
||||
)
|
||||
|
||||
target_sources(docs_insert_data_demo
|
||||
PRIVATE
|
||||
"insert_data_demo.c"
|
||||
)
|
||||
|
||||
target_sources(docs_query_data_demo
|
||||
PRIVATE
|
||||
"query_data_demo.c"
|
||||
)
|
||||
|
||||
target_sources(docs_with_reqid_demo
|
||||
PRIVATE
|
||||
"with_reqid_demo.c"
|
||||
)
|
||||
|
||||
target_sources(docs_sml_insert_demo
|
||||
PRIVATE
|
||||
"sml_insert_demo.c"
|
||||
)
|
||||
|
||||
target_sources(docs_stmt_insert_demo
|
||||
PRIVATE
|
||||
"stmt_insert_demo.c"
|
||||
)
|
||||
|
||||
target_sources(docs_tmq_demo
|
||||
PRIVATE
|
||||
"tmq_demo.c"
|
||||
)
|
||||
|
||||
target_link_libraries(docs_connect_example
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(docs_create_db_demo
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(docs_insert_data_demo
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(docs_query_data_demo
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(docs_with_reqid_demo
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(docs_sml_insert_demo
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(docs_stmt_insert_demo
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(docs_tmq_demo
|
||||
taos
|
||||
pthread
|
||||
)
|
||||
|
||||
SET_TARGET_PROPERTIES(docs_connect_example PROPERTIES OUTPUT_NAME docs_connect_example)
|
||||
SET_TARGET_PROPERTIES(docs_create_db_demo PROPERTIES OUTPUT_NAME docs_create_db_demo)
|
||||
SET_TARGET_PROPERTIES(docs_insert_data_demo PROPERTIES OUTPUT_NAME docs_insert_data_demo)
|
||||
SET_TARGET_PROPERTIES(docs_query_data_demo PROPERTIES OUTPUT_NAME docs_query_data_demo)
|
||||
SET_TARGET_PROPERTIES(docs_with_reqid_demo PROPERTIES OUTPUT_NAME docs_with_reqid_demo)
|
||||
SET_TARGET_PROPERTIES(docs_sml_insert_demo PROPERTIES OUTPUT_NAME docs_sml_insert_demo)
|
||||
SET_TARGET_PROPERTIES(docs_stmt_insert_demo PROPERTIES OUTPUT_NAME docs_stmt_insert_demo)
|
||||
SET_TARGET_PROPERTIES(docs_tmq_demo PROPERTIES OUTPUT_NAME docs_tmq_demo)
|
||||
ENDIF ()
|
||||
IF (TD_DARWIN)
|
||||
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
|
||||
AUX_SOURCE_DIRECTORY(. SRC)
|
||||
ENDIF ()
|
|
@ -0,0 +1,34 @@
|
|||
# Makefile for building TDengine examples on TD Linux platform
|
||||
|
||||
INCLUDE_DIRS =
|
||||
|
||||
TARGETS = connect_example \
|
||||
create_db_demo \
|
||||
insert_data_demo \
|
||||
query_data_demo \
|
||||
with_reqid_demo \
|
||||
sml_insert_demo \
|
||||
stmt_insert_demo \
|
||||
tmq_demo
|
||||
|
||||
SOURCES = connect_example.c \
|
||||
create_db_demo.c \
|
||||
insert_data_demo.c \
|
||||
query_data_demo.c \
|
||||
with_reqid_demo.c \
|
||||
sml_insert_demo.c \
|
||||
stmt_insert_demo.c \
|
||||
tmq_demo.c
|
||||
|
||||
LIBS = -ltaos -lpthread
|
||||
|
||||
|
||||
CFLAGS = -g
|
||||
|
||||
all: $(TARGETS)
|
||||
|
||||
$(TARGETS):
|
||||
$(CC) $(CFLAGS) -o $@ $(wildcard $(@F).c) $(LIBS)
|
||||
|
||||
clean:
|
||||
rm -f $(TARGETS)
|
|
@ -4,7 +4,6 @@
|
|||
"main": "index.js",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@tdengine/client": "^3.0.1",
|
||||
"@tdengine/rest": "^3.0.0"
|
||||
"@tdengine/websocket": "^3.1.0"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ async function json_tag_example() {
|
|||
|
||||
} catch (err) {
|
||||
console.error(`Failed to create database example_json_tag or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
} finally {
|
||||
if (wsSql) {
|
||||
await wsSql.close();
|
||||
|
@ -78,9 +79,10 @@ async function all_type_example() {
|
|||
let row = wsRows.getData();
|
||||
console.log(row);
|
||||
}
|
||||
|
||||
|
||||
} catch (err) {
|
||||
console.error(`Failed to create database all_type_example or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
} finally {
|
||||
if (wsSql) {
|
||||
await wsSql.close();
|
||||
|
@ -91,7 +93,7 @@ async function all_type_example() {
|
|||
|
||||
async function test() {
|
||||
await json_tag_example()
|
||||
await all_type_example()
|
||||
await all_type_example()
|
||||
taos.destroy();
|
||||
}
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ async function json_tag_example() {
|
|||
|
||||
} catch (err) {
|
||||
console.error(`Failed to create database example_json_tag or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err
|
||||
} finally {
|
||||
if (wsSql) {
|
||||
await wsSql.close();
|
||||
|
@ -125,6 +126,7 @@ async function all_type_example() {
|
|||
|
||||
} catch (err) {
|
||||
console.error(`Failed to create database all_type_example or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
} finally {
|
||||
if (stmt) {
|
||||
await stmt.close();
|
||||
|
@ -136,10 +138,7 @@ async function all_type_example() {
|
|||
|
||||
}
|
||||
|
||||
|
||||
|
||||
async function test() {
|
||||
taos.setLevel("debug")
|
||||
async function test() {
|
||||
await json_tag_example()
|
||||
await all_type_example()
|
||||
taos.destroy();
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
const taos = require("@tdengine/websocket");
|
||||
|
||||
var host = null;
|
||||
for(var i = 2; i < global.process.argv.length; i++){
|
||||
var key = global.process.argv[i].split("=")[0];
|
||||
var value = global.process.argv[i].split("=")[1];
|
||||
if("host" == key){
|
||||
host = value;
|
||||
}
|
||||
}
|
||||
|
||||
if(host == null){
|
||||
console.log("Usage: node nodejsChecker.js host=<hostname> port=<port>");
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
let dbData = ["{\"metric\": \"meter_current\",\"timestamp\": 1626846402,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}",
|
||||
"{\"metric\": \"meter_current\",\"timestamp\": 1626846403,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1002\"}}",
|
||||
"{\"metric\": \"meter_current\",\"timestamp\": 1626846404,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1003\"}}"]
|
||||
|
||||
async function createConnect() {
|
||||
let dsn = 'ws://' + host + ':6041'
|
||||
let conf = new taos.WSConfig(dsn);
|
||||
conf.setUser('root');
|
||||
conf.setPwd('taosdata');
|
||||
conf.setDb('power');
|
||||
return await taos.sqlConnect(conf);
|
||||
}
|
||||
|
||||
async function test() {
|
||||
let wsSql = null;
|
||||
let wsRows = null;
|
||||
let reqId = 0;
|
||||
try {
|
||||
wsSql = await createConnect()
|
||||
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;', reqId++);
|
||||
await wsSql.schemalessInsert([dbData], taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, 0);
|
||||
}
|
||||
catch (err) {
|
||||
console.error(err.code, err.message);
|
||||
}
|
||||
finally {
|
||||
if (wsRows) {
|
||||
await wsRows.close();
|
||||
}
|
||||
if (wsSql) {
|
||||
await wsSql.close();
|
||||
}
|
||||
taos.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
test()
|
|
@ -15,8 +15,8 @@ async function createConnect() {
|
|||
return wsSql;
|
||||
}
|
||||
|
||||
|
||||
async function test() {
|
||||
let dsn = 'ws://localhost:6041'
|
||||
let wsSql = null;
|
||||
let wsRows = null;
|
||||
let ttl = 0;
|
||||
|
@ -29,6 +29,7 @@ async function test() {
|
|||
}
|
||||
catch (err) {
|
||||
console.error(`Failed to insert data with schemaless, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
finally {
|
||||
if (wsRows) {
|
||||
|
@ -40,4 +41,5 @@ async function test() {
|
|||
taos.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
test()
|
||||
|
|
|
@ -10,11 +10,9 @@ for(var i = 2; i < global.process.argv.length; i++){
|
|||
}
|
||||
|
||||
if(host == null){
|
||||
console.log("Usage: node nodejsChecker.js host=<hostname> port=<port>");
|
||||
process.exit(0);
|
||||
host = 'localhost';
|
||||
}
|
||||
|
||||
|
||||
async function createConnect() {
|
||||
let dsn = 'ws://' + host + ':6041'
|
||||
console.log(dsn)
|
||||
|
@ -41,7 +39,7 @@ async function test() {
|
|||
taosResult = await wsSql.exec('USE power', reqId++);
|
||||
console.log(taosResult);
|
||||
|
||||
taosResult = await wsSql.exec('CREATE STABLE IF NOT EXISTS meters (_ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);', reqId++);
|
||||
taosResult = await wsSql.exec('CREATE STABLE IF NOT EXISTS meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);', reqId++);
|
||||
console.log(taosResult);
|
||||
|
||||
taosResult = await wsSql.exec('DESCRIBE meters', reqId++);
|
||||
|
@ -62,6 +60,7 @@ async function test() {
|
|||
}
|
||||
catch (err) {
|
||||
console.error(err.code, err.message);
|
||||
throw err;
|
||||
}
|
||||
finally {
|
||||
if (wsRows) {
|
||||
|
|
|
@ -41,6 +41,7 @@ async function createDbAndTable() {
|
|||
console.log("Create stable power.meters successfully");
|
||||
} catch (err) {
|
||||
console.error(`Failed to create database power or stable meters, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
} finally {
|
||||
if (wsSql) {
|
||||
await wsSql.close();
|
||||
|
@ -53,21 +54,23 @@ async function createDbAndTable() {
|
|||
// ANCHOR: insertData
|
||||
async function insertData() {
|
||||
let wsSql = null
|
||||
let insertQuery = "INSERT INTO " +
|
||||
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
|
||||
"VALUES " +
|
||||
"(NOW + 1a, 10.30000, 219, 0.31000) " +
|
||||
"(NOW + 2a, 12.60000, 218, 0.33000) " +
|
||||
"(NOW + 3a, 12.30000, 221, 0.31000) " +
|
||||
"power.d1002 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 3) " +
|
||||
"VALUES " +
|
||||
"(NOW + 1a, 10.30000, 218, 0.25000) ";
|
||||
|
||||
try {
|
||||
wsSql = await createConnect();
|
||||
let insertQuery = "INSERT INTO " +
|
||||
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
|
||||
"VALUES " +
|
||||
"(NOW + 1a, 10.30000, 219, 0.31000) " +
|
||||
"(NOW + 2a, 12.60000, 218, 0.33000) " +
|
||||
"(NOW + 3a, 12.30000, 221, 0.31000) " +
|
||||
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
|
||||
"VALUES " +
|
||||
"(NOW + 1a, 10.30000, 218, 0.25000) ";
|
||||
taosResult = await wsSql.exec(insertQuery);
|
||||
console.log("Successfully inserted " + taosResult.getAffectRows() + " rows to power.meters.");
|
||||
} catch (err) {
|
||||
console.error(`Failed to insert data to power.meters, sql: ${insertQuery}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
} finally {
|
||||
if (wsSql) {
|
||||
await wsSql.close();
|
||||
|
@ -91,6 +94,7 @@ async function queryData() {
|
|||
}
|
||||
catch (err) {
|
||||
console.error(`Failed to query data from power.meters, sql: ${sql}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
finally {
|
||||
if (wsRows) {
|
||||
|
@ -118,6 +122,7 @@ async function sqlWithReqid() {
|
|||
}
|
||||
catch (err) {
|
||||
console.error(`Failed to query data from power.meters, reqId: ${reqId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
finally {
|
||||
if (wsRows) {
|
||||
|
@ -135,7 +140,7 @@ async function test() {
|
|||
await insertData();
|
||||
await queryData();
|
||||
await sqlWithReqid();
|
||||
taos.destroy();
|
||||
taos.destroy();
|
||||
}
|
||||
|
||||
test()
|
||||
|
|
|
@ -23,7 +23,7 @@ async function prepare() {
|
|||
return wsSql
|
||||
}
|
||||
|
||||
(async () => {
|
||||
async function test() {
|
||||
let stmt = null;
|
||||
let connector = null;
|
||||
try {
|
||||
|
@ -60,6 +60,7 @@ async function prepare() {
|
|||
}
|
||||
catch (err) {
|
||||
console.error(`Failed to insert to table meters using stmt, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
finally {
|
||||
if (stmt) {
|
||||
|
@ -70,4 +71,6 @@ async function prepare() {
|
|||
}
|
||||
taos.destroy();
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
test()
|
|
@ -1,58 +0,0 @@
|
|||
const taos = require("@tdengine/websocket");
|
||||
|
||||
var host = null;
|
||||
for(var i = 2; i < global.process.argv.length; i++){
|
||||
var key = global.process.argv[i].split("=")[0];
|
||||
var value = global.process.argv[i].split("=")[1];
|
||||
if("host" == key){
|
||||
host = value;
|
||||
}
|
||||
}
|
||||
|
||||
if(host == null){
|
||||
console.log("Usage: node nodejsChecker.js host=<hostname> port=<port>");
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
let dbData = ["meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
|
||||
"meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
|
||||
"meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
|
||||
"meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
|
||||
"meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
|
||||
"meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
|
||||
"meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
|
||||
"meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",];
|
||||
|
||||
async function createConnect() {
|
||||
let dsn = 'ws://' + host + ':6041'
|
||||
let conf = new taos.WSConfig(dsn);
|
||||
conf.setUser('root');
|
||||
conf.setPwd('taosdata');
|
||||
|
||||
return await taos.sqlConnect(conf);
|
||||
}
|
||||
|
||||
async function test() {
|
||||
let wsSql = null;
|
||||
let wsRows = null;
|
||||
let reqId = 0;
|
||||
try {
|
||||
wsSql = await createConnect()
|
||||
await wsSql.exec('create database if not exists power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;', reqId++);
|
||||
await wsSql.exec('use power', reqId++);
|
||||
await wsSql.schemalessInsert(dbData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, 0);
|
||||
}
|
||||
catch (err) {
|
||||
console.error(err.code, err.message);
|
||||
}
|
||||
finally {
|
||||
if (wsRows) {
|
||||
await wsRows.close();
|
||||
}
|
||||
if (wsSql) {
|
||||
await wsSql.close();
|
||||
}
|
||||
taos.destroy();
|
||||
}
|
||||
}
|
||||
test()
|
|
@ -1,3 +1,4 @@
|
|||
const { sleep } = require("@tdengine/websocket");
|
||||
const taos = require("@tdengine/websocket");
|
||||
|
||||
// ANCHOR: create_consumer
|
||||
|
@ -49,12 +50,20 @@ async function prepare() {
|
|||
|
||||
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
|
||||
await wsSql.exec(createTopic);
|
||||
await wsSql.close();
|
||||
}
|
||||
|
||||
|
||||
for (let i = 0; i < 10; i++) {
|
||||
async function insert() {
|
||||
let conf = new taos.WSConfig('ws://localhost:6041');
|
||||
conf.setUser('root');
|
||||
conf.setPwd('taosdata');
|
||||
conf.setDb('power');
|
||||
let wsSql = await taos.sqlConnect(conf);
|
||||
for (let i = 0; i < 50; i++) {
|
||||
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
|
||||
await sleep(100);
|
||||
}
|
||||
wsSql.close();
|
||||
await wsSql.close();
|
||||
}
|
||||
|
||||
async function subscribe(consumer) {
|
||||
|
@ -82,13 +91,17 @@ async function test() {
|
|||
let consumer = null;
|
||||
try {
|
||||
await prepare();
|
||||
consumer = await createConsumer()
|
||||
await subscribe(consumer)
|
||||
consumer = await createConsumer();
|
||||
const allPromises = [];
|
||||
allPromises.push(subscribe(consumer));
|
||||
allPromises.push(insert());
|
||||
await Promise.all(allPromises);
|
||||
await consumer.unsubscribe();
|
||||
console.log("Consumer unsubscribed successfully.");
|
||||
}
|
||||
catch (err) {
|
||||
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
finally {
|
||||
if (consumer) {
|
||||
|
|
|
@ -1,41 +1,45 @@
|
|||
const { sleep } = require("@tdengine/websocket");
|
||||
const taos = require("@tdengine/websocket");
|
||||
|
||||
const db = 'power';
|
||||
const stable = 'meters';
|
||||
const url = 'ws://localhost:6041';
|
||||
const topic = 'topic_meters'
|
||||
const topics = [topic];
|
||||
const groupId = "group1";
|
||||
const clientId = "client1";
|
||||
|
||||
|
||||
// ANCHOR: create_consumer
|
||||
async function createConsumer() {
|
||||
|
||||
let groupId = "group1";
|
||||
let clientId = "client1";
|
||||
let configMap = new Map([
|
||||
[taos.TMQConstants.GROUP_ID, "group1"],
|
||||
[taos.TMQConstants.CLIENT_ID, 'client1'],
|
||||
[taos.TMQConstants.GROUP_ID, groupId],
|
||||
[taos.TMQConstants.CLIENT_ID, clientId],
|
||||
[taos.TMQConstants.CONNECT_USER, "root"],
|
||||
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
|
||||
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
|
||||
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
|
||||
[taos.TMQConstants.WS_URL, url],
|
||||
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
|
||||
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
|
||||
]);
|
||||
try {
|
||||
return await taos.tmqConnect(configMap);
|
||||
conn = await taos.tmqConnect(configMap);
|
||||
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
|
||||
return conn;
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
|
||||
}
|
||||
// ANCHOR_END: create_consumer
|
||||
|
||||
async function prepare() {
|
||||
let conf = new taos.WSConfig('ws://localhost:6041');
|
||||
let conf = new taos.WSConfig('ws://192.168.1.98:6041');
|
||||
conf.setUser('root');
|
||||
conf.setPwd('taosdata');
|
||||
conf.setDb('power');
|
||||
const createDB = `CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
|
||||
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
|
||||
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
|
||||
|
||||
let wsSql = await taos.sqlConnect(conf);
|
||||
|
@ -44,58 +48,63 @@ async function prepare() {
|
|||
|
||||
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
|
||||
await wsSql.exec(createTopic);
|
||||
await wsSql.close();
|
||||
}
|
||||
|
||||
|
||||
for (let i = 0; i < 10; i++) {
|
||||
async function insert() {
|
||||
let conf = new taos.WSConfig('ws://localhost:6041');
|
||||
conf.setUser('root');
|
||||
conf.setPwd('taosdata');
|
||||
conf.setDb('power');
|
||||
let wsSql = await taos.sqlConnect(conf);
|
||||
for (let i = 0; i < 1; i++) {
|
||||
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
|
||||
}
|
||||
await wsSql.close();
|
||||
}
|
||||
|
||||
// ANCHOR: subscribe
|
||||
// ANCHOR: offset
|
||||
async function subscribe(consumer) {
|
||||
try {
|
||||
await consumer.subscribe(['topic_meters']);
|
||||
for (let i = 0; i < 50; i++) {
|
||||
let res = await consumer.poll(100);
|
||||
for (let [key, value] of res) {
|
||||
// Add your data processing logic here
|
||||
console.log(`data: ${key} ${value}`);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
|
||||
}
|
||||
// ANCHOR_END: subscribe
|
||||
|
||||
// ANCHOR: offset
|
||||
async function test() {
|
||||
let consumer = null;
|
||||
try {
|
||||
await prepare();
|
||||
let consumer = await createConsumer()
|
||||
await consumer.subscribe(['topic_meters']);
|
||||
let res = new Map();
|
||||
while (res.size == 0) {
|
||||
res = await consumer.poll(100);
|
||||
await consumer.commit();
|
||||
}
|
||||
|
||||
let assignment = await consumer.assignment();
|
||||
await consumer.seekToBeginning(assignment);
|
||||
console.log("Assignment seek to beginning successfully");
|
||||
} catch (err) {
|
||||
console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: offset
|
||||
|
||||
async function test() {
|
||||
let consumer = null;
|
||||
try {
|
||||
await prepare();
|
||||
consumer = await createConsumer();
|
||||
const allPromises = [];
|
||||
allPromises.push(subscribe(consumer));
|
||||
allPromises.push(insert());
|
||||
await Promise.all(allPromises);
|
||||
await consumer.unsubscribe();
|
||||
console.log("Consumer unsubscribed successfully.");
|
||||
}
|
||||
catch (err) {
|
||||
console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
console.error(`Failed to consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
finally {
|
||||
if (consumer) {
|
||||
await consumer.close();
|
||||
console.log("Consumer closed successfully.");
|
||||
}
|
||||
taos.destroy();
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: offset
|
||||
|
||||
test()
|
||||
|
|
|
@ -15,6 +15,7 @@ def create_connection():
|
|||
print(f"Connected to {host}:{port} successfully.");
|
||||
except Exception as err:
|
||||
print(f"Failed to connect to {host}:{port} , ErrMessage:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -15,7 +15,7 @@ def create_connection():
|
|||
print(f"Connected to {host}:{port} successfully.");
|
||||
except Exception as err:
|
||||
print(f"Failed to connect to {host}:{port} , ErrMessage:{err}")
|
||||
|
||||
raise err
|
||||
return conn
|
||||
# ANCHOR_END: connect
|
||||
|
||||
|
@ -28,6 +28,7 @@ def create_db_table(conn):
|
|||
conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupId, location) TAGS(0, 'Los Angles')")
|
||||
except Exception as err:
|
||||
print(f'Exception {err}')
|
||||
raise err
|
||||
# ANCHOR_END: create_db
|
||||
|
||||
def insert(conn):
|
||||
|
@ -42,9 +43,10 @@ def insert(conn):
|
|||
"""
|
||||
try:
|
||||
inserted = conn.execute(sql)
|
||||
assert inserted == 8
|
||||
assert inserted == 4
|
||||
except Exception as err:
|
||||
print(f'Exception111 {err}')
|
||||
raise err
|
||||
# ANCHOR_END: insert
|
||||
|
||||
def query(conn):
|
||||
|
@ -58,6 +60,7 @@ def query(conn):
|
|||
print(row)
|
||||
except Exception as err:
|
||||
print(f'Exception {err}')
|
||||
raise err
|
||||
# ANCHOR_END: query
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -21,6 +21,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to create database power or stable meters, ErrMessage:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -20,6 +20,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to create database power or stable meters, ErrMessage:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -21,6 +21,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to create database power or stable meters, ErrMessage:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -22,6 +22,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to insert data to power.meters, sql: {sql}, ErrMessage: {err}.")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -21,6 +21,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to insert data to power.meters, sql:{sql}, ErrMessage:{err}.")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -22,6 +22,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to insert data to power.meters, sql: {sql}, ErrMessage: {err}.")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -16,6 +16,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to query data from power.meters, sql: {sql}, ErrMessage:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -15,3 +15,4 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to query data from power.meters, sql: {sql}, ErrMessage:{err}")
|
||||
raise err
|
||||
|
|
|
@ -15,6 +15,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to query data from power.meters, sql: {sql}, ErrMessage:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -18,7 +18,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to execute sql with reqId:{reqId}, ErrMessage:{err}")
|
||||
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -16,3 +16,4 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to execute sql with reqId:{reqId}, ErrMessage:{err}")
|
||||
raise err
|
||||
|
|
|
@ -19,6 +19,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to execute sql with reqId:{reqId}, ErrMessage:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -35,6 +35,7 @@ try:
|
|||
print("Inserted data with schemaless successfully.");
|
||||
except Exception as err:
|
||||
print(f"Failed to insert data with schemaless, ErrMessage:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -75,8 +75,6 @@ def schemaless_insert():
|
|||
conn.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
prepare()
|
||||
schemaless_insert()
|
||||
except Exception as err:
|
||||
print(f"Failed to insert data with schemaless, err:{err}")
|
||||
prepare()
|
||||
schemaless_insert()
|
||||
|
||||
|
|
|
@ -57,6 +57,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to insert to table meters using stmt, ErrMessage:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if stmt:
|
||||
stmt.close()
|
||||
|
|
|
@ -62,6 +62,7 @@ try:
|
|||
|
||||
except Exception as err:
|
||||
print(f"Failed to insert to table meters using stmt, ErrMessage:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if stmt:
|
||||
stmt.close()
|
||||
|
|
|
@ -152,6 +152,7 @@ def unsubscribe(consumer):
|
|||
print("Consumer unsubscribed successfully.");
|
||||
except Exception as err:
|
||||
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||
raise err
|
||||
finally:
|
||||
if consumer:
|
||||
consumer.close()
|
||||
|
@ -166,7 +167,6 @@ if __name__ == "__main__":
|
|||
subscribe(consumer)
|
||||
seek_offset(consumer)
|
||||
commit_offset(consumer)
|
||||
except Exception as err:
|
||||
print(f"Failed to execute consumer example, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||
finally:
|
||||
unsubscribe(consumer);
|
||||
if consumer:
|
||||
unsubscribe(consumer);
|
||||
|
|
|
@ -31,7 +31,7 @@ def prepareMeta():
|
|||
|
||||
# create super table
|
||||
rowsAffected = conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(64))"
|
||||
)
|
||||
assert rowsAffected == 0
|
||||
|
||||
|
@ -155,6 +155,7 @@ def unsubscribe(consumer):
|
|||
print("Consumer unsubscribed successfully.");
|
||||
except Exception as err:
|
||||
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||
raise err
|
||||
finally:
|
||||
if consumer:
|
||||
consumer.close()
|
||||
|
@ -170,7 +171,6 @@ if __name__ == "__main__":
|
|||
subscribe(consumer)
|
||||
seek_offset(consumer)
|
||||
commit_offset(consumer)
|
||||
except Exception as err:
|
||||
print(f"Failed to execute consumer example, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||
finally:
|
||||
unsubscribe(consumer)
|
||||
if consumer:
|
||||
unsubscribe(consumer)
|
||||
|
|
|
@ -83,7 +83,6 @@ async fn main() -> anyhow::Result<()> {
|
|||
eprintln!("Failed to execute insert: {:?}", e);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
println!("Succed to execute insert 1 row");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
|
|
@ -181,7 +181,7 @@ INTERVAL(interval_val [, interval_offset])
|
|||
- FILL:用于指定窗口区间数据缺失的情况下,数据的填充模式。
|
||||
|
||||
对于时间窗口,interval_val 和 sliding_val 都表示时间段, 语法上支持三种方式。例如:
|
||||
1. INTERVAL(1s, 500a) SLIDING(1s),带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微妙), w (周), y (年);
|
||||
1. INTERVAL(1s, 500a) SLIDING(1s),带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微秒), w (周), y (年);
|
||||
2. INTERVAL(1000, 500) SLIDING(1000),不带时间单位的形式,将使用查询库的时间精度作为默认时间单位,当存在多个库时默认采用精度更高的库;
|
||||
3. INTERVAL('1s', '500a') SLIDING('1s'),带时间单位的字符串形式,字符串内部不能有任何空格等其它字符。
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ PI 系统是一套用于数据收集、查找、分析、传递和可视化的
|
|||
|
||||
在数据写入页面中,点击 **+新增数据源** 按钮,进入新增数据源页面。
|
||||
|
||||

|
||||

|
||||
|
||||
### 基本配置
|
||||
|
||||
|
|
|
@ -208,3 +208,15 @@ CSV 文件中的每个 Row 配置一个 OPC 数据点位。Row 的规则如下
|
|||
### 8. 创建完成
|
||||
|
||||
点击 **提交** 按钮,完成创建 OPC UA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
||||
|
||||
## 增加数据点位
|
||||
|
||||
在任务运行中,点击 **编辑**,点击 **增加数据点位** 按钮,追加数据点位到 CSV 文件中。
|
||||
|
||||

|
||||
|
||||
在弹出的表单中,填写数据点位的信息。
|
||||
|
||||

|
||||
|
||||
点击 **确定** 按钮,完成数据点位的追加。
|
|
@ -182,3 +182,15 @@ CSV 文件中的每个 Row 配置一个 OPC 数据点位。Row 的规则如下
|
|||
### 7. 创建完成
|
||||
|
||||
点击 **提交** 按钮,完成创建 OPC DA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
||||
|
||||
## 增加数据点位
|
||||
|
||||
在任务运行中,点击 **编辑**,点击 **增加数据点位** 按钮,追加数据点位到 CSV 文件中。
|
||||
|
||||

|
||||
|
||||
在弹出的表单中,填写数据点位的信息。
|
||||
|
||||

|
||||
|
||||
点击 **确定** 按钮,完成数据点位的追加。
|
|
@ -33,13 +33,14 @@ TDengine 可以通过 MQTT 连接器从 MQTT 代理订阅数据并将其写入 T
|
|||
|
||||
### 3. 配置连接和认证信息
|
||||
|
||||
在 **MQTT地址** 中填写 MQTT 代理的地址,例如:`192.168.1.42:1883`
|
||||
在 **MQTT 地址** 中填写 MQTT 代理的地址,例如:`192.168.1.42`
|
||||
|
||||
在 **MQTT 端口** 中填写 MQTT 代理的端口,例如:`1883`
|
||||
|
||||
在 **用户** 中填写 MQTT 代理的用户名。
|
||||
|
||||
在 **密码** 中填写 MQTT 代理的密码。
|
||||
|
||||
点击 **连通性检查** 按钮,检查数据源是否可用。
|
||||
|
||||

|
||||
|
||||
|
@ -64,6 +65,8 @@ TDengine 可以通过 MQTT 连接器从 MQTT 代理订阅数据并将其写入 T
|
|||
|
||||
在 **订阅主题及 QoS 配置** 中填写要消费的 Topic 名称。使用如下格式设置: `topic1::0,topic2::1`。
|
||||
|
||||
点击 **检查连通性** 按钮,检查数据源是否可用。
|
||||
|
||||

|
||||
|
||||
### 6. 配置 MQTT Payload 解析
|
||||
|
|
|
@ -44,8 +44,50 @@ TDengine 可以高效地从 Kafka 读取数据并将其写入 TDengine,以实
|
|||
|
||||
如果服务端开启了 SASL 认证机制,此处需要启用 SASL 并配置相关内容,目前支持 PLAIN/SCRAM-SHA-256/GSSAPI 三种认证机制,请按实际情况进行选择。
|
||||
|
||||
#### 4.1. PLAIN 认证
|
||||
|
||||
选择 `PLAIN` 认证机制,输入用户名和密码:
|
||||
|
||||

|
||||
|
||||
#### 4.1. SCRAM(SCRAM-SHA-256) 认证
|
||||
|
||||
选择 `SCRAM-SHA-256` 认证机制,输入用户名和密码:
|
||||
|
||||

|
||||
|
||||
#### 4.3. GSSAPI 认证
|
||||
|
||||
选择 `GSSAPI` ,将通过 [RDkafka 客户端](https://github.com/confluentinc/librdkafka) 调用 GSSAPI 应用 Kerberos 认证机制:
|
||||
|
||||

|
||||
|
||||
需要输入的信息有:
|
||||
|
||||
- Kerberos 服务名,一般是 `kafka`;
|
||||
- Kerberos 认证主体,即认证用户名,例如 `kafkaclient`;
|
||||
- Kerberos 初始化命令(可选,一般不用填写);
|
||||
- Kerberos 密钥表,需提供文件并上传;
|
||||
|
||||
以上信息均需由 Kafka 服务管理者提供。
|
||||
|
||||
除此之外,在服务器上需要配置 [Kerberos](https://web.mit.edu/kerberos/) 认证服务。在 Ubuntu 下使用 `apt install krb5-user` ;在 CentOS 下,使用 `yum install krb5-workstation`;即可。
|
||||
|
||||
配置完成后,可以使用 [kcat](https://github.com/edenhill/kcat) 工具进行 Kafka 主题消费验证:
|
||||
|
||||
```bash
|
||||
kcat <topic> \
|
||||
-b <kafka-server:port> \
|
||||
-G kcat \
|
||||
-X security.protocol=SASL_PLAINTEXT \
|
||||
-X sasl.mechanism=GSSAPI \
|
||||
-X sasl.kerberos.keytab=</path/to/kafkaclient.keytab> \
|
||||
-X sasl.kerberos.principal=<kafkaclient> \
|
||||
-X sasl.kerberos.service.name=kafka
|
||||
```
|
||||
|
||||
如果出现错误:“Server xxxx not found in kerberos database”,则需要配置 Kafka 节点对应的域名并在 Kerberos 客户端配置文件 `/etc/krb5.conf` 中配置反向域名解析 `rdns = true`。
|
||||
|
||||
### 5. 配置 SSL 证书
|
||||
|
||||
如果服务端开启了 SSL 加密认证,此处需要启用 SSL 并配置相关内容。
|
||||
|
@ -60,7 +102,7 @@ TDengine 可以高效地从 Kafka 读取数据并将其写入 TDengine,以实
|
|||
|
||||
在 **主题** 中填写要消费的 Topic 名称。可以配置多个 Topic , Topic 之间用逗号分隔。例如:`tp1,tp2`。
|
||||
|
||||
在 **Client ID** 中填写客户端标识,填写后会生成带有 `taosx` 前缀的客户端 ID (例如,如果填写的标识为 `foo`,则生成的客户端 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的客户端 ID 形如 `taosx100foo`)。连接到同一个 Kafka 集群的所有客户端 ID 必须保证唯一。
|
||||
在 **Client ID** 中填写客户端标识,填写后会生成带有 `taosx` 前缀的客户端 ID (例如,如果填写的标识为 `foo`,则生成的客户端 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的客户端 ID 形如 `taosx100foo`)。需要注意的是,当使用多个 taosX 订阅同一 Topic 需要进行负载均衡时,必须填写一致的客户端 ID 才能达到均衡效果。
|
||||
|
||||
在 **消费者组 ID** 中填写消费者组标识,填写后会生成带有 `taosx` 前缀的消费者组 ID (例如,如果填写的标识为 `foo`,则生成的消费者组 ID 为 `taosxfoo`)。如果打开末尾处的开关,则会把当前任务的任务 ID 拼接到 `taosx` 之后,输入的标识之前(生成的消费者组 ID 形如 `taosx100foo`)。
|
||||
|
||||
|
@ -160,4 +202,4 @@ json 数据支持 JSONObject 或者 JSONArray,使用 json 解析器可以解
|
|||
|
||||
### 9. 创建完成
|
||||
|
||||
点击 **提交** 按钮,完成创建 Kafka 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
||||
点击 **提交** 按钮,完成创建 Kafka 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
||||
|
|
Before Width: | Height: | Size: 68 KiB After Width: | Height: | Size: 79 KiB |
After Width: | Height: | Size: 43 KiB |
After Width: | Height: | Size: 20 KiB |
Before Width: | Height: | Size: 56 KiB After Width: | Height: | Size: 72 KiB |
Before Width: | Height: | Size: 70 KiB After Width: | Height: | Size: 75 KiB |
Before Width: | Height: | Size: 58 KiB After Width: | Height: | Size: 33 KiB |
Before Width: | Height: | Size: 42 KiB After Width: | Height: | Size: 30 KiB |
Before Width: | Height: | Size: 52 KiB After Width: | Height: | Size: 30 KiB |
Before Width: | Height: | Size: 29 KiB After Width: | Height: | Size: 24 KiB |
Before Width: | Height: | Size: 37 KiB After Width: | Height: | Size: 104 KiB |
Before Width: | Height: | Size: 74 KiB After Width: | Height: | Size: 40 KiB |
Before Width: | Height: | Size: 53 KiB After Width: | Height: | Size: 68 KiB |
Before Width: | Height: | Size: 26 KiB After Width: | Height: | Size: 65 KiB |
Before Width: | Height: | Size: 23 KiB After Width: | Height: | Size: 72 KiB |
Before Width: | Height: | Size: 43 KiB After Width: | Height: | Size: 99 KiB |
Before Width: | Height: | Size: 73 KiB After Width: | Height: | Size: 148 KiB |
Before Width: | Height: | Size: 35 KiB After Width: | Height: | Size: 98 KiB |
Before Width: | Height: | Size: 54 KiB After Width: | Height: | Size: 70 KiB |
Before Width: | Height: | Size: 27 KiB After Width: | Height: | Size: 49 KiB |
Before Width: | Height: | Size: 49 KiB After Width: | Height: | Size: 128 KiB |
Before Width: | Height: | Size: 35 KiB After Width: | Height: | Size: 97 KiB |
Before Width: | Height: | Size: 46 KiB After Width: | Height: | Size: 71 KiB |
Before Width: | Height: | Size: 52 KiB After Width: | Height: | Size: 92 KiB |
Before Width: | Height: | Size: 93 KiB After Width: | Height: | Size: 120 KiB |
Before Width: | Height: | Size: 115 KiB After Width: | Height: | Size: 142 KiB |
Before Width: | Height: | Size: 54 KiB After Width: | Height: | Size: 74 KiB |
Before Width: | Height: | Size: 73 KiB After Width: | Height: | Size: 124 KiB |
After Width: | Height: | Size: 175 KiB |
After Width: | Height: | Size: 67 KiB |
Before Width: | Height: | Size: 22 KiB After Width: | Height: | Size: 33 KiB |
Before Width: | Height: | Size: 18 KiB After Width: | Height: | Size: 16 KiB |
Before Width: | Height: | Size: 21 KiB |
Before Width: | Height: | Size: 35 KiB |
Before Width: | Height: | Size: 22 KiB After Width: | Height: | Size: 31 KiB |
Before Width: | Height: | Size: 57 KiB After Width: | Height: | Size: 42 KiB |
Before Width: | Height: | Size: 23 KiB |
Before Width: | Height: | Size: 11 KiB After Width: | Height: | Size: 31 KiB |
Before Width: | Height: | Size: 44 KiB |
Before Width: | Height: | Size: 92 KiB After Width: | Height: | Size: 51 KiB |
Before Width: | Height: | Size: 32 KiB After Width: | Height: | Size: 31 KiB |
After Width: | Height: | Size: 17 KiB |
Before Width: | Height: | Size: 27 KiB After Width: | Height: | Size: 19 KiB |
Before Width: | Height: | Size: 42 KiB After Width: | Height: | Size: 32 KiB |
Before Width: | Height: | Size: 37 KiB After Width: | Height: | Size: 47 KiB |
Before Width: | Height: | Size: 66 KiB After Width: | Height: | Size: 61 KiB |
Before Width: | Height: | Size: 22 KiB After Width: | Height: | Size: 41 KiB |
Before Width: | Height: | Size: 48 KiB After Width: | Height: | Size: 66 KiB |
|
@ -387,7 +387,19 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto
|
|||
- `reconnectIntervalMs`:重连间隔毫秒时间,默认为 2000。
|
||||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
C/C++ 语言连接器使用 `taos_connect()` 函数用于建立与 TDengine 数据库的连接。其参数详细说明如下:
|
||||
**Websocket 连接**
|
||||
C/C++ 语言连接器 Websocket 连接方式使用 `ws_connect()` 函数用于建立与 TDengine 数据库的连接。其参数为 DSN 描述字符串,其基本结构如下:
|
||||
|
||||
```text
|
||||
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|
||||
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|
||||
|driver| protocol | | username | password | host | port | database | params |
|
||||
```
|
||||
|
||||
DSN 的详细说明和如何使用详见 [连接功能](../../reference/connector/cpp/#dsn)
|
||||
|
||||
**原生连接**
|
||||
C/C++ 语言连接器原生连接方式使用 `taos_connect()` 函数用于建立与 TDengine 数据库的连接。其参数详细说明如下:
|
||||
|
||||
- `host`:要连接的数据库服务器的主机名或IP地址。如果是本地数据库,可以使用 `"localhost"`。
|
||||
- `user`:用于登录数据库的用户名。
|
||||
|
@ -440,7 +452,10 @@ C/C++ 语言连接器使用 `taos_connect()` 函数用于建立与 TDengine 数
|
|||
```
|
||||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
不支持
|
||||
```c
|
||||
{{#include docs/examples/c-ws/connect_example.c}}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem label="REST API" value="rest">
|
||||
不支持
|
||||
|
|
|
@ -68,9 +68,15 @@ REST API:直接调用 `taosadapter` 提供的 REST API 接口,进行数据
|
|||
```
|
||||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
|
||||
```c title="Websocket 连接"
|
||||
{{#include docs/examples/c-ws/create_db_demo.c:create_db_and_table}}
|
||||
```
|
||||
|
||||
```c title="原生连接"
|
||||
{{#include docs/examples/c/create_db_demo.c:create_db_and_table}}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem label="REST API" value="rest">
|
||||
|
||||
|
@ -144,7 +150,12 @@ NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW
|
|||
```
|
||||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
|
||||
```c title="Websocket 连接"
|
||||
{{#include docs/examples/c-ws/insert_data_demo.c:insert_data}}
|
||||
```
|
||||
|
||||
```c title="原生连接"
|
||||
{{#include docs/examples/c/insert_data_demo.c:insert_data}}
|
||||
```
|
||||
|
||||
|
@ -218,7 +229,12 @@ rust 连接器还支持使用 **serde** 进行反序列化行为结构体的结
|
|||
```
|
||||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
|
||||
```c title="Websocket 连接"
|
||||
{{#include docs/examples/c-ws/query_data_demo.c:query_data}}
|
||||
```
|
||||
|
||||
```c title="原生连接"
|
||||
{{#include docs/examples/c/query_data_demo.c:query_data}}
|
||||
```
|
||||
</TabItem>
|
||||
|
@ -293,9 +309,15 @@ reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId
|
|||
```
|
||||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
|
||||
```c "Websocket 连接"
|
||||
{{#include docs/examples/c-ws/with_reqid_demo.c:with_reqid}}
|
||||
```
|
||||
|
||||
```c "原生连接"
|
||||
{{#include docs/examples/c/with_reqid_demo.c:with_reqid}}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem label="REST API" value="rest">
|
||||
|
||||
|
|