add cpp websocket connector doc

This commit is contained in:
sheyanjie-qq 2024-09-27 08:49:58 +08:00
parent 73557046fd
commit 7e5af93031
16 changed files with 1048 additions and 561 deletions

View File

@ -24,18 +24,12 @@
static int DemoInsertData() {
// ANCHOR: insert_data
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
@ -50,24 +44,22 @@ static int DemoInsertData() {
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') "
"VALUES "
"(NOW + 1a, 10.30000, 218, 0.25000) ";
TAOS_RES *result = taos_query(taos, sql);
code = taos_errno(result);
WS_RES *result = ws_query(taos, sql);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
ws_errstr(result));
ws_close(taos);
return -1;
}
taos_free_result(result);
ws_free_result(result);
// you can check affectedRows here
int rows = taos_affected_rows(result);
int rows = ws_affected_rows(result);
fprintf(stdout, "Successfully inserted %d rows into power.meters.\n", rows);
// close & clean
taos_close(taos);
taos_cleanup();
ws_close(taos);
return 0;
// ANCHOR_END: insert_data
}

View File

@ -1,147 +0,0 @@
// compile with
// gcc -o multi_bind_example multi_bind_example.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taos.h"
/**
* @brief execute sql only and ignore result set
*
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
printf("%s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
}
/**
* @brief exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt));
(void)taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(TAOS *taos) {
// init
TAOS_STMT *stmt = taos_stmt_init(taos);
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
// bind table name and tags
TAOS_MULTI_BIND tags[2];
char *location = "California.SanFrancisco";
int groupId = 2;
tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[0].buffer_length = strlen(location);
tags[0].length = &tags[0].buffer_length;
tags[0].buffer = location;
tags[0].is_null = NULL;
tags[1].buffer_type = TSDB_DATA_TYPE_INT;
tags[1].buffer_length = sizeof(int);
tags[1].length = &tags[1].buffer_length;
tags[1].buffer = &groupId;
tags[1].is_null = NULL;
code = taos_stmt_set_tbname_tags(stmt, "d1001", tags);
checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags");
// highlight-start
// insert two rows with multi binds
TAOS_MULTI_BIND params[4];
// values to bind
int64_t ts[] = {1648432611249, 1648432611749};
float current[] = {10.3, 12.6};
int voltage[] = {219, 218};
float phase[] = {0.31, 0.33};
// is_null array
char is_null[2] = {0};
// length array
int32_t int64Len[2] = {sizeof(int64_t)};
int32_t floatLen[2] = {sizeof(float)};
int32_t intLen[2] = {sizeof(int)};
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].buffer = ts;
params[0].length = int64Len;
params[0].is_null = is_null;
params[0].num = 2;
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].buffer = current;
params[1].length = floatLen;
params[1].is_null = is_null;
params[1].num = 2;
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].buffer = voltage;
params[2].length = intLen;
params[2].is_null = is_null;
params[2].num = 2;
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].buffer = phase;
params[3].length = floatLen;
params[3].is_null = is_null;
params[3].num = 2;
code = taos_stmt_bind_param_batch(stmt, params); // bind batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch");
code = taos_stmt_add_batch(stmt); // add batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch");
// highlight-end
// execute
code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "failed to execute taos_stmt_execute");
int affectedRows = taos_stmt_affected_rows(stmt);
printf("successfully inserted %d rows\n", affectedRows);
// close
(void)taos_stmt_close(stmt);
}
int main() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
if (taos == NULL) {
printf("failed to connect to server\n");
exit(EXIT_FAILURE);
}
executeSQL(taos, "DROP DATABASE IF EXISTS power");
executeSQL(taos, "CREATE DATABASE power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), "
"groupId INT)");
insertData(taos);
taos_close(taos);
taos_cleanup();
}
// output:
// successfully inserted 2 rows

View File

@ -24,52 +24,45 @@
static int DemoQueryData() {
// ANCHOR: query_data
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// query data, please make sure the database and table are already created
const char *sql = "SELECT ts, current, location FROM power.meters limit 100";
TAOS_RES *result = taos_query(taos, sql);
code = taos_errno(result);
WS_RES *result = ws_query(taos, sql);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
ws_errstr(result));
ws_close(taos);
return -1;
}
TAOS_ROW row = NULL;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
WS_ROW row = NULL;
int rows = 0;
int num_fields = ws_field_count(result);
const WS_FIELD *fields = ws_fetch_fields(result);
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
while ((row = ws_fetch_row(result))) {
// Add your data processing logic here
rows++;
}
fprintf(stdout, "total rows: %d\n", rows);
taos_free_result(result);
ws_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
ws_close(taos);
return 0;
// ANCHOR_END: query_data
}

View File

@ -23,42 +23,35 @@
static int DemoSmlInsert() {
// ANCHOR: schemaless
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// create database
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = taos_errno(result);
WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result));
ws_close(taos);
return -1;
}
taos_free_result(result);
ws_free_result(result);
// use database
result = taos_query(taos, "USE power");
code = taos_errno(result);
result = ws_query(taos, "USE power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result));
ws_close(taos);
return -1;
}
taos_free_result(result);
ws_free_result(result);
// schemaless demo data
char *line_demo =
@ -71,71 +64,58 @@ static int DemoSmlInsert() {
// influxdb line protocol
char *lines[] = {line_demo};
result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result);
int totalLines = 0;
result = ws_schemaless_insert_raw(taos, line_demo, strlen(line_demo), &totalLines, WS_TSDB_SML_LINE_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
int rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", rows);
taos_free_result(result);
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", totalLines);
ws_free_result(result);
// opentsdb telnet protocol
char *telnets[] = {telnet_demo};
result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result);
totalLines = 0;
result = ws_schemaless_insert_raw(taos, telnet_demo, strlen(telnet_demo), &totalLines, WS_TSDB_SML_TELNET_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", rows);
taos_free_result(result);
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", totalLines);
ws_free_result(result);
// opentsdb json protocol
char *jsons[1] = {0};
// allocate memory for json data. can not use static memory.
size_t size = 1024;
jsons[0] = malloc(size);
if (jsons[0] == NULL) {
fprintf(stderr, "Failed to allocate memory: %zu bytes.\n", size);
taos_close(taos);
taos_cleanup();
return -1;
}
(void)strncpy(jsons[0], json_demo, 1023);
result = taos_schemaless_insert(taos, jsons, 1, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
code = taos_errno(result);
totalLines = 0;
result = ws_schemaless_insert_raw(taos, json_demo, strlen(json_demo), &totalLines, WS_TSDB_SML_JSON_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
free(jsons[0]);
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
free(jsons[0]);
rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", rows);
taos_free_result(result);
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", totalLines);
ws_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
ws_close(taos);
return 0;
// ANCHOR_END: schemaless
}
int main(int argc, char *argv[]) {
return DemoSmlInsert();
}
int main(int argc, char *argv[]) { return DemoSmlInsert(); }

View File

@ -28,16 +28,16 @@
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
void executeSQL(WS_TAOS *taos, const char *sql) {
WS_RES *res = ws_query(taos, sql);
int code = ws_errno(res);
if (code != 0) {
fprintf(stderr, "%s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
fprintf(stderr, "%s\n", ws_errstr(res));
ws_free_result(res);
ws_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
ws_free_result(res);
}
/**
@ -47,10 +47,10 @@ void executeSQL(TAOS *taos, const char *sql) {
* @param code
* @param msg
*/
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
void checkErrorCode(WS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
fprintf(stderr, "%s. code: %d, error: %s\n", msg,code,taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
fprintf(stderr, "%s. code: %d, error: %s\n", msg, code, ws_stmt_errstr(stmt));
ws_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
@ -70,17 +70,17 @@ int total_affected = 0;
*
* @param taos
*/
void insertData(TAOS *taos) {
void insertData(WS_TAOS *taos) {
// init
TAOS_STMT *stmt = taos_stmt_init(taos);
WS_STMT *stmt = ws_stmt_init(taos);
if (stmt == NULL) {
fprintf(stderr, "Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL));
exit(EXIT_FAILURE);
fprintf(stderr, "Failed to init ws_stmt, error: %s\n", ws_stmt_errstr(NULL));
exit(EXIT_FAILURE);
}
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "Failed to execute taos_stmt_prepare");
int code = ws_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "Failed to execute ws_stmt_prepare");
for (int i = 1; i <= num_of_sub_table; i++) {
char table_name[20];
sprintf(table_name, "d_bind_%d", i);
@ -88,7 +88,7 @@ void insertData(TAOS *taos) {
sprintf(location, "location_%d", i);
// set table name and tags
TAOS_MULTI_BIND tags[2];
WS_MULTI_BIND tags[2];
// groupId
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
tags[0].buffer_length = sizeof(int);
@ -99,15 +99,15 @@ void insertData(TAOS *taos) {
// location
tags[1].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[1].buffer_length = strlen(location);
tags[1].length =(int32_t *) &tags[1].buffer_length;
tags[1].length = (int32_t *)&tags[1].buffer_length;
tags[1].buffer = location;
tags[1].is_null = NULL;
tags[1].num = 1;
code = taos_stmt_set_tbname_tags(stmt, table_name, tags);
code = ws_stmt_set_tbname_tags(stmt, table_name, tags, 2);
checkErrorCode(stmt, code, "Failed to set table name and tags\n");
// insert rows
TAOS_MULTI_BIND params[4];
WS_MULTI_BIND params[4];
// ts
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
@ -146,32 +146,30 @@ void insertData(TAOS *taos) {
params[2].buffer = &voltage;
params[3].buffer = &phase;
// bind param
code = taos_stmt_bind_param(stmt, params);
code = ws_stmt_bind_param_batch(stmt, params, 4);
checkErrorCode(stmt, code, "Failed to bind param");
}
// add batch
code = taos_stmt_add_batch(stmt);
code = ws_stmt_add_batch(stmt);
checkErrorCode(stmt, code, "Failed to add batch");
// execute batch
code = taos_stmt_execute(stmt);
int affected_rows = 0;
code = ws_stmt_execute(stmt, &affected_rows);
checkErrorCode(stmt, code, "Failed to exec stmt");
// get affected rows
int affected = taos_stmt_affected_rows_once(stmt);
int affected = ws_stmt_affected_rows_once(stmt);
total_affected += affected;
}
fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
taos_stmt_close(stmt);
ws_stmt_close(stmt);
}
int main() {
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
TAOS *taos = taos_connect(host, user, password, NULL, port);
int code = 0;
char *dsn = "ws://localhost:6041";
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
exit(EXIT_FAILURE);
}
// create database and table
@ -181,6 +179,5 @@ int main() {
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
insertData(taos);
taos_close(taos);
taos_cleanup();
ws_close(taos);
}

View File

@ -41,33 +41,27 @@ typedef struct {
const char* auto_offset_reset;
} ConsumerConfig;
ConsumerConfig config = {
.enable_auto_commit = "true",
.auto_commit_interval_ms = "1000",
.group_id = "group1",
.client_id = "client1",
.td_connect_host = "localhost",
.td_connect_port = "6030",
.td_connect_user = "root",
.td_connect_pass = "taosdata",
.auto_offset_reset = "latest"
};
ConsumerConfig config = {.enable_auto_commit = "true",
.auto_commit_interval_ms = "1000",
.group_id = "group1",
.client_id = "client1",
.td_connect_host = "localhost",
.td_connect_port = "6030",
.td_connect_user = "root",
.td_connect_pass = "taosdata",
.auto_offset_reset = "latest"};
void* prepare_data(void* arg) {
const char* host = "localhost";
const char* user = "root";
const char* password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port);
int code = 0;
char* dsn = "ws://localhost:6041";
WS_TAOS* pConn = ws_connect(dsn);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return NULL;
}
TAOS_RES* pRes;
int i = 1;
WS_RES* pRes;
int i = 1;
while (!thread_stop) {
char buf[200] = {0};
@ -78,12 +72,12 @@ void* prepare_data(void* arg) {
"219, 0.31000)",
i);
pRes = taos_query(pConn, buf);
code = taos_errno(pRes);
pRes = ws_query(pConn, buf);
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
}
taos_free_result(pRes);
ws_free_result(pRes);
sleep(1);
}
fprintf(stdout, "Prepare data thread exit\n");
@ -91,15 +85,15 @@ void* prepare_data(void* arg) {
}
// ANCHOR: msg_process
int32_t msg_process(TAOS_RES* msg) {
int32_t msg_process(WS_RES* msg) {
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
const char* topicName = ws_tmq_get_topic_name(msg);
const char* dbName = ws_tmq_get_db_name(msg);
int32_t vgroupId = ws_tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
TAOS_ROW row = taos_fetch_row(msg);
WS_ROW row = ws_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
@ -111,210 +105,187 @@ int32_t msg_process(TAOS_RES* msg) {
}
// ANCHOR_END: msg_process
TAOS* init_env() {
const char* host = "localhost";
const char* user = "root";
const char* password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port);
WS_TAOS* init_env() {
int code = 0;
char* dsn = "ws://localhost:6041";
WS_TAOS* pConn = ws_connect(dsn);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return NULL;
}
TAOS_RES* pRes;
WS_RES* pRes;
// drop database if exists
pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = taos_errno(pRes);
pRes = ws_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
taos_free_result(pRes);
ws_free_result(pRes);
pRes = taos_query(pConn, "DROP DATABASE IF EXISTS power");
code = taos_errno(pRes);
pRes = ws_query(pConn, "DROP DATABASE IF EXISTS power");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
taos_free_result(pRes);
ws_free_result(pRes);
// create database
pRes = taos_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
code = taos_errno(pRes);
pRes = ws_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
taos_free_result(pRes);
ws_free_result(pRes);
// create super table
pRes = taos_query(
pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
code = taos_errno(pRes);
pRes =
ws_query(pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
taos_free_result(pRes);
ws_free_result(pRes);
return pConn;
END:
taos_free_result(pRes);
taos_close(pConn);
ws_free_result(pRes);
ws_close(pConn);
return NULL;
}
void deinit_env(TAOS* pConn) {
if (pConn)
taos_close(pConn);
void deinit_env(WS_TAOS* pConn) {
if (pConn) ws_close(pConn);
}
int32_t create_topic(TAOS* pConn) {
TAOS_RES* pRes;
int code = 0;
int32_t create_topic(WS_TAOS* pConn) {
WS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = taos_query(pConn, "USE power");
code = taos_errno(pRes);
if (taos_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
pRes = ws_query(pConn, "USE power");
code = ws_errno(pRes);
if (ws_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
taos_free_result(pRes);
ws_free_result(pRes);
pRes = taos_query(pConn, "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
code = taos_errno(pRes);
pRes = ws_query(
pConn,
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
taos_free_result(pRes);
ws_free_result(pRes);
return 0;
}
int32_t drop_topic(TAOS* pConn) {
TAOS_RES* pRes;
int code = 0;
int32_t drop_topic(WS_TAOS* pConn) {
WS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = taos_query(pConn, "USE power");
code = taos_errno(pRes);
if (taos_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
pRes = ws_query(pConn, "USE power");
code = ws_errno(pRes);
if (ws_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
taos_free_result(pRes);
ws_free_result(pRes);
pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = taos_errno(pRes);
pRes = ws_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
taos_free_result(pRes);
ws_free_result(pRes);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
count +=1;
void tmq_commit_cb_print(ws_tmq_t* tmq, int32_t code, void* param) {
count += 1;
fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count);
}
// ANCHOR: create_consumer_1
tmq_t* build_consumer(const ConsumerConfig* config) {
tmq_conf_res_t code;
tmq_t* tmq = NULL;
ws_tmq_t* build_consumer(const ConsumerConfig* config) {
ws_tmq_conf_res_t code;
ws_tmq_t* tmq = NULL;
// create a configuration object
tmq_conf_t* conf = tmq_conf_new();
ws_tmq_conf_t* conf = ws_tmq_conf_new();
// set the configuration parameters
code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
code = ws_tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
code = ws_tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "group.id", config->group_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
code = ws_tmq_conf_set(conf, "group.id", config->group_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "client.id", config->client_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
code = ws_tmq_conf_set(conf, "client.id", config->client_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
// set the callback function for auto commit
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
// create a consumer object
tmq = tmq_consumer_new(conf, NULL, 0);
tmq = ws_tmq_consumer_new(conf, "taos://localhost:6041", NULL, 0);
_end:
// destroy the configuration object
tmq_conf_destroy(conf);
ws_tmq_conf_destroy(conf);
return tmq;
}
// ANCHOR_END: create_consumer_1
// ANCHOR: build_topic_list
// build a topic list used to subscribe
tmq_list_t* build_topic_list() {
ws_tmq_list_t* build_topic_list() {
// create a empty topic list
tmq_list_t* topicList = tmq_list_new();
ws_tmq_list_t* topicList = ws_tmq_list_new();
// append topic name to the list
int32_t code = tmq_list_append(topicList, topic_name);
int32_t code = ws_tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
tmq_list_destroy(topicList);
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
ws_tmq_list_destroy(topicList);
fprintf(stderr,
"Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(NULL));
return NULL;
}
// if success, return the list
@ -323,14 +294,14 @@ tmq_list_t* build_topic_list() {
// ANCHOR_END: build_topic_list
// ANCHOR: basic_consume_loop
void basic_consume_loop(tmq_t* tmq) {
void basic_consume_loop(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
@ -338,7 +309,7 @@ void basic_consume_loop(tmq_t* tmq) {
totalRows += msg_process(tmqmsg);
// free the message
taos_free_result(tmqmsg);
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
@ -352,34 +323,34 @@ void basic_consume_loop(tmq_t* tmq) {
// ANCHOR_END: basic_consume_loop
// ANCHOR: consume_repeatly
void consume_repeatly(tmq_t* tmq) {
int32_t numOfAssignment = 0;
tmq_topic_assignment* pAssign = NULL;
void consume_repeatly(ws_tmq_t* tmq) {
int32_t numOfAssignment = 0;
ws_tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
int32_t code = ws_tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
tmq_topic_assignment* p = &pAssign[i];
ws_tmq_topic_assignment* p = &pAssign[i];
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
code = ws_tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr, "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, tmq_err2str(code));
fprintf(stderr,
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, ws_tmq_errstr(tmq));
break;
}
}
if (code == 0)
fprintf(stdout, "Assignment seek to beginning successfully.\n");
if (code == 0) fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
tmq_free_assignment(pAssign);
ws_tmq_free_assignment(pAssign, numOfAssignment);
// let's consume the messages again
basic_consume_loop(tmq);
@ -387,31 +358,32 @@ void consume_repeatly(tmq_t* tmq) {
// ANCHOR_END: consume_repeatly
// ANCHOR: manual_commit
void manual_commit(tmq_t* tmq) {
void manual_commit(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = tmq_commit_sync(tmq, tmqmsg);
int32_t code = ws_tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr, "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
fprintf(stderr,
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
// free the message
taos_free_result(tmqmsg);
ws_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
taos_free_result(tmqmsg);
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
@ -428,7 +400,7 @@ int main(int argc, char* argv[]) {
int32_t code;
pthread_t thread_id;
TAOS* pConn = init_env();
WS_TAOS* pConn = init_env();
if (pConn == NULL) {
fprintf(stderr, "Failed to init env.\n");
return -1;
@ -445,34 +417,35 @@ int main(int argc, char* argv[]) {
}
// ANCHOR: create_consumer_2
tmq_t* tmq = build_consumer(&config);
ws_tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n", config.td_connect_host,
config.group_id, config.client_id);
}
// ANCHOR_END: create_consumer_2
// ANCHOR: subscribe_3
tmq_list_t* topic_list = build_topic_list();
ws_tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n",
topic_name, config.group_id, config.client_id);
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n", topic_name, config.group_id,
config.client_id);
return -1;
}
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
if ((code = ws_tmq_subscribe(tmq, topic_list))) {
fprintf(stderr,
"Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
tmq_list_destroy(topic_list);
ws_tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
// ANCHOR_END: subscribe_3
@ -483,19 +456,20 @@ int main(int argc, char* argv[]) {
// ANCHOR: unsubscribe_and_close
// unsubscribe the topic
code = tmq_unsubscribe(tmq);
code = ws_tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
fprintf(stderr,
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = tmq_consumer_close(tmq);
code = ws_tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}

View File

@ -24,53 +24,46 @@
static int DemoWithReqId() {
// ANCHOR: with_reqid
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
const char *sql = "SELECT ts, current, location FROM power.meters limit 1";
// query data with reqid
long reqid = 3L;
TAOS_RES *result = taos_query_with_reqid(taos, sql, reqid);
code = taos_errno(result);
long reqid = 3L;
WS_RES *result = ws_query_with_reqid(taos, sql, reqid);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute sql withQID: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
ws_errstr(result));
ws_close(taos);
return -1;
}
TAOS_ROW row = NULL;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
WS_ROW row = NULL;
int rows = 0;
int num_fields = ws_field_count(result);
const WS_FIELD *fields = ws_fetch_fields(result);
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
while ((row = ws_fetch_row(result))) {
// Add your data processing logic here
rows++;
}
fprintf(stdout, "total rows: %d\n", rows);
taos_free_result(result);
ws_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
ws_close(taos);
return 0;
// ANCHOR_END: with_reqid
}

View File

@ -387,7 +387,19 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto
- `reconnectIntervalMs`:重连间隔毫秒时间,默认为 2000。
</TabItem>
<TabItem label="C" value="c">
C/C++ 语言连接器使用 `taos_connect()` 函数用于建立与 TDengine 数据库的连接。其参数详细说明如下:
**Websocket 连接**
C/C++ 语言连接器 Websocket 连接方式使用 `ws_connect()` 函数用于建立与 TDengine 数据库的连接。其参数为 DSN 描述字符串,其基本结构如下:
```text
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver| protocol | | username | password | host | port | database | params |
```
DSN 的详细说明和如何使用详见 [连接功能](../../reference/connector/cpp/#dsn)
**原生连接**
C/C++ 语言连接器原生连接方式使用 `taos_connect()` 函数用于建立与 TDengine 数据库的连接。其参数详细说明如下:
- `host`要连接的数据库服务器的主机名或IP地址。如果是本地数据库可以使用 `"localhost"`
- `user`:用于登录数据库的用户名。
@ -440,7 +452,10 @@ C/C++ 语言连接器使用 `taos_connect()` 函数用于建立与 TDengine 数
```
</TabItem>
<TabItem label="C" value="c">
不支持
```c
{{#include docs/examples/c-ws/connect_example.c}}
```
</TabItem>
<TabItem label="REST API" value="rest">
不支持

View File

@ -68,9 +68,15 @@ REST API直接调用 `taosadapter` 提供的 REST API 接口,进行数据
```
</TabItem>
<TabItem label="C" value="c">
```c
```c title="Websocket 连接"
{{#include docs/examples/c-ws/create_db_demo.c:create_db_and_table}}
```
```c title="原生连接"
{{#include docs/examples/c/create_db_demo.c:create_db_and_table}}
```
</TabItem>
<TabItem label="REST API" value="rest">
@ -144,7 +150,12 @@ NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW
```
</TabItem>
<TabItem label="C" value="c">
```c
```c title="Websocket 连接"
{{#include docs/examples/c-ws/insert_data_demo.c:insert_data}}
```
```c title="原生连接"
{{#include docs/examples/c/insert_data_demo.c:insert_data}}
```
@ -218,7 +229,12 @@ rust 连接器还支持使用 **serde** 进行反序列化行为结构体的结
```
</TabItem>
<TabItem label="C" value="c">
```c
```c title="Websocket 连接"
{{#include docs/examples/c-ws/query_data_demo.c:query_data}}
```
```c title="原生连接"
{{#include docs/examples/c/query_data_demo.c:query_data}}
```
</TabItem>
@ -293,9 +309,15 @@ reqId 可用于请求链路追踪reqId 就像分布式系统中的 traceId
```
</TabItem>
<TabItem label="C" value="c">
```c
```c "Websocket 连接"
{{#include docs/examples/c-ws/with_reqid_demo.c:with_reqid}}
```
```c "原生连接"
{{#include docs/examples/c/with_reqid_demo.c:with_reqid}}
```
</TabItem>
<TabItem label="REST API" value="rest">

View File

@ -237,7 +237,10 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
```
</TabItem>
<TabItem label="C" value="c">
不支持
```c
{{#include docs/examples/c-ws/sml_insert_demo.c:schemaless}}
```
</TabItem>
<TabItem label="REST API" value="rest">
不支持

View File

@ -64,7 +64,9 @@ import TabItem from "@theme/TabItem";
```
</TabItem>
<TabItem label="C" value="c">
不支持
```c
{{#include docs/examples/c-ws/stmt_insert_demo.c}}
```
</TabItem>
<TabItem label="REST API" value="rest">
不支持

View File

@ -28,21 +28,21 @@ TDengine 消费者的概念跟 Kafka 类似,消费者通过订阅主题来接
### 创建参数
创建消费者的参数较多,非常灵活的支持了各种连接类型、 Offset 提交方式、压缩、重连、反序列化等特性。各语言连接器都适用的通用基础配置项如下表所示:
| 参数名称 | 类型 | 参数说明 | 备注 |
| :-----------------------: | :-----: | ----------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `td.connect.ip` | string | 服务端的 IP 地址 | |
| `td.connect.user` | string | 用户名 | |
| `td.connect.pass` | string | 密码 | |
| `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192。<br />每个topic最多可建立 100 个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度192 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交true: 自动提交客户端应用无需commitfalse客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句从3.2.0.0版本该参数废弃恒为true | 默认关闭 |
| `enable.replay` | boolean | 是否开启数据回放功能 | 默认关闭 |
| `session.timeout.ms` | integer | consumer 心跳丢失后超时时间,超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除从3.3.3.0版本开始支持) | 默认值为 12000取值范围 [6000 1800000] |
| `max.poll.interval.ms` | integer | consumer poll 拉取数据间隔的最长时间,超过该时间,会认为该 consumer 离线触发rebalance 逻辑,成功后该 consumer 会被删除从3.3.3.0版本开始支持) | 默认值为 300000[1000INT32_MAX] |
| 参数名称 | 类型 | 参数说明 | 备注 |
| :-----------------------: | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `td.connect.ip` | string | 服务端的 IP 地址 | |
| `td.connect.user` | string | 用户名 | |
| `td.connect.pass` | string | 密码 | |
| `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192。<br />每个topic最多可建立 100 个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度192 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交true: 自动提交客户端应用无需commitfalse客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句从3.2.0.0版本该参数废弃恒为true | 默认关闭 |
| `enable.replay` | boolean | 是否开启数据回放功能 | 默认关闭 |
| `session.timeout.ms` | integer | consumer 心跳丢失后超时时间,超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除从3.3.3.0版本开始支持) | 默认值为 12000取值范围 [6000 1800000] |
| `max.poll.interval.ms` | integer | consumer poll 拉取数据间隔的最长时间,超过该时间,会认为该 consumer 离线触发rebalance 逻辑,成功后该 consumer 会被删除从3.3.3.0版本开始支持) | 默认值为 300000[1000INT32_MAX] |
下面是各语言连接器创建参数:
@ -94,8 +94,8 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</TabItem>
<TabItem label="C" value="c">
同通用基础配置项。
- Websocket 连接: 因为使用 dsn不需要 `td.connect.ip``td.connect.port``td.connect.user` 和 `td.connect.pass` 四个配置项,其余同通用配置项。
- 原生连接: 同通用基础配置项。
</TabItem>
<TabItem label="REST API" value="rest">
@ -154,7 +154,15 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
```
</TabItem>
<TabItem label="C" value="c">
不支持
```c
{{#include docs/examples/c-ws/tmq_demo.c:create_consumer_1}}
```
```c
{{#include docs/examples/c-ws/tmq_demo.c:create_consumer_2}}
```
调用 `build_consumer` 函数尝试获取消费者实例 `tmq`。成功则打印成功日志,失败则打印失败日志。
</TabItem>
<TabItem label="REST API" value="rest">
不支持
@ -283,7 +291,29 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
```
</TabItem>
<TabItem label="C" value="c">
不支持
```c
{{#include docs/examples/c-ws/tmq_demo.c:build_topic_list}}
```
```c
{{#include docs/examples/c-ws/tmq_demo.c:basic_consume_loop}}
```
```c
{{#include docs/examples/c-ws/tmq_demo.c:msg_process}}
```
```c
{{#include docs/examples/c-ws/tmq_demo.c:subscribe_3}}
```
订阅消费数据步骤:
1. 调用 `ws_build_topic_list` 函数创建一个主题列表 `topic_list`
2. 如果 `topic_list``NULL`,表示创建失败,函数返回 `-1`
3. 使用 `ws_tmq_subscribe` 函数订阅 `tmq` 指定的主题列表。如果订阅失败,打印错误信息。
4. 销毁主题列表 `topic_list` 以释放资源。
5. 调用 `basic_consume_loop` 函数开始基本的消费循环,处理订阅的消息。
</TabItem>
<TabItem label="REST API" value="rest">
不支持
@ -427,7 +457,17 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
```
</TabItem>
<TabItem label="C" value="c">
不支持
```c
{{#include docs/examples/c-ws/tmq_demo.c:consume_repeatly}}
```
1. 通过 `ws_tmq_get_topic_assignment` 函数获取特定主题的分配信息,包括分配的数量和具体分配详情。
2. 如果获取分配信息失败,则打印错误信息并返回。
3. 对于每个分配,使用 `ws_tmq_offset_seek` 函数将消费者的偏移量设置到最早的偏移量。
4. 如果设置偏移量失败,则打印错误信息。
5. 释放分配信息数组以释放资源。
6. 调用 `basic_consume_loop` 函数开始新的消费循环,处理消息。
</TabItem>
<TabItem label="REST API" value="rest">
不支持
@ -554,7 +594,12 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
```
</TabItem>
<TabItem label="C" value="c">
不支持
```c
{{#include docs/examples/c-ws/tmq_demo.c:manual_commit}}
```
可以通过 `ws_tmq_commit_sync` 函数来手工提交消费进度。
</TabItem>
<TabItem label="REST API" value="rest">
不支持
@ -662,7 +707,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
```
</TabItem>
<TabItem label="C" value="c">
不支持
```c
{{#include docs/examples/c-ws/tmq_demo.c:unsubscribe_and_close}}
```
</TabItem>
<TabItem label="REST API" value="rest">
不支持
@ -777,7 +824,13 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
</details>
</TabItem>
<TabItem label="C" value="c">
不支持
<details>
<summary>完整代码示例</summary>
```c
{{#include docs/examples/c-ws/tmq_demo.c}}
```
</details>
</TabItem>
<TabItem label="REST API" value="rest">
不支持

View File

@ -4,8 +4,615 @@ title: C/C++ Connector
toc_max_heading_level: 4
---
C/C++ 开发人员可以使用 TDengine 的客户端驱动,即 C/C++连接器 (以下都用 TDengine 客户端驱动表示),开发自己的应用来连接 TDengine 集群完成数据存储、查询以及其他功能。TDengine 客户端驱动的 API 类似于 MySQL 的 C API。应用程序使用时需要包含 TDengine 头文件 _taos.h_里面列出了提供的 API 的函数原型;应用程序还要链接到所在平台上对应的动态库。
C/C++ 开发人员可以使用 TDengine 的客户端驱动,即 C/C++连接器 (以下都用 TDengine 客户端驱动表示),开发自己的应用来连接 TDengine 集群完成数据存储、查询以及其他功能。TDengine 客户端驱动的 API 类似于 MySQL 的 C API。应用程序使用时需要包含 TDengine 头文件,里面列出了提供的 API 的函数原型;应用程序还要链接到所在平台上对应的动态库。
TDengine 的客户端驱动提供了 taosws 和 taos 两个动态库,分别支持 Websocket 连接和原生连接。 Websocket 连接和原生连接的区别是 Websocket 连接方式不要求客户端和服务端版本完全匹配,而原生连接要求,在性能上 Websocket 连接方式也接近于原生连接,一般我们推荐使用 Websocket 连接方式。
下面我们分开介绍两种连接方式的使用方法。
## Websocket 连接方式
Websocket 连接方式需要使用 taosws.h 头文件和 taosws 动态库。
```c
#include <taosws.h>
```
TDengine 服务端或客户端安装后,`taosws.h` 位于:
- Linux`/usr/local/taos/include`
- Windows`C:\TDengine\include`
- macOS`/usr/local/include`
TDengine 客户端驱动的动态库位于:
- Linux: `/usr/local/taos/driver/libtaosws.so`
- Windows: `C:\TDengine\taosws.dll`
- macOS: `/usr/local/lib/libtaosws.dylib`
### 支持的平台
请参考[支持的平台列表](../#支持的平台)
### 版本历史
| TDengine 客户端版本 | 主要变化 | TDengine 版本 |
| :------------------: | :---------------------------: | :----------------: |
| 3.3.3.0 | 首次发布,提供了 SQL执行参数绑定无模式写入和数据订阅等全面功能支持。 | 3.3.2.0及更高版本 |
### 错误码
在 C 接口的设计中,错误码采用整数类型表示,每个错误码都对应一个特定的错误状态。如未特别说明,当 API 的返回值是整数时_0_ 代表成功,其它是代表失败原因的错误码,当返回值是指针时, _NULL_ 表示失败。
Websocket 连接方式单独的错误码在 `taosws.h` 中,
| 错误码 | 错误描述 | 可能的出错场景或者可能的原因 | 建议用户采取的措施 |
| ------- | -------- | ---------------------------- | ------------------ |
| 0xE000 | DSN 错误 | DSN 不符合规范 | 检查 dsn 字符串是否符合规范 |
| 0xE001 | 内部错误 | 不确定 | 保留现场和日志github上报issue |
| 0xE002 | 连接关闭 | 网络断开 | 请检查网络状况,查看 `taosadapter` 日志。 |
| 0xE003 | 发送超时 | 网络断开 | 请检查网络状况 |
| 0xE004 | 接收超时 | 慢查询,或者网络断开 | 排查 `taosadapter` 日志 |
其余错误码请参考同目录下 `taoserror.h` 文件,详细的原生连接错误码说明参考:[错误码](../../../reference/error-code)。
:::info
WebSocket 连接方式错误码只保留了原生连接错误码的后两个字节。
:::
### 示例程序
本节展示了使用客户端驱动访问 TDengine 集群的常见访问方式的示例代码。
- 同步查询示例:[同步查询](https://github.com/taosdata/TDengine/tree/main/docs/examples/c-ws/query_data_demo.c)
- 参数绑定示例:[参数绑定](https://github.com/taosdata/TDengine/tree/main/docs/examples/c-ws/stmt_insert_demo.c)
- 无模式写入示例:[无模式写入](https://github.com/taosdata/TDengine/tree/main/docs/examples/c-ws/sml_insert_demo.c)
- 订阅和消费示例:[订阅和消费](https://github.com/taosdata/TDengine/tree/main/docs/examples/c-ws/tmq_demo.c)
:::info
更多示例代码及下载请见 [GitHub](https://github.com/taosdata/TDengine/tree/main/docs/examples/c-ws)。
:::
### API 参考
以下分别介绍 TDengine 客户端驱动的 DSN、基础 API、同步查询 API、参数绑定 API、无模式写入 API 和 数据订阅订阅 API。
#### DSN
C/C++ Websocket 连接器通过 DSN 连接描述字符串来表示连接信息。
DSN 描述字符串基本结构如下:
```text
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver| protocol | | username | password | host | port | database | params |
```
各部分意义见下表:
- **driver**: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
- **taos**: 默认驱动,支持 SQL 执行,参数绑定,无模式写入。
- **tmq**: 使用 TMQ 订阅数据。
- **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。
- **http/ws**: 使用 Websocket 协议。
- **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 协议。
- **username/password**: 用于创建连接的用户名及密码。
- **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时 Websocket 连接默认为 `localhost:6041` 。
- **database**: 指定默认连接的数据库名,可选参数。
- **params**:其他可选参数。
一个完整的 DSN 描述字符串示例如下:`taos+ws://localhost:6041/test` 表示使用 Websocket`ws`)方式通过 `6041` 端口连接服务器 `localhost`,并指定默认数据库为 `test`。
#### 基础 API
基础 API 用于完成创建数据库连接等工作,为其它 API 的执行提供运行时环境。
- `char *ws_get_client_info()`
- **接口说明**:获取客户端版本信息。
- **返回值**:返回客户端版本信息。
- `WS_TAOS *ws_connect(const char *dsn)`
- **接口说明**:创建数据库连接,初始化连接上下文。
- **参数说明**
- dsn[入参] 连接信息,见上文 DSN 章节。
- **返回值**:返回数据库连接,返回值为空表示失败。应用程序需要保存返回的参数,以便后续使用。
:::info
同一进程可以根据不同的 dsn 连接多个 TDengine 集群
:::
- `const char *ws_get_server_info(WS_TAOS *taos)`
- **接口说明**:获取服务端版本信息。
- **参数说明**
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `ws_connect()` 函数建立。
- **返回值**:返回获取服务端版本信息。
- `int32_t ws_select_db(WS_TAOS *taos, const char *db)`
- **接口说明**:将当前的缺省数据库设置为 `db`。
- **参数说明**
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `ws_connect()` 函数建立。
- db[入参] 数据库名称。
- **返回值**`0`:成功,非 `0`:失败,详情请参考错误码页面。
- `int32_t ws_get_current_db(WS_TAOS *taos, char *database, int len, int *required)`
- **接口说明**:获取当前数据库名称。
- **参数说明**
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `ws_connect()` 函数建立。
- database[出参] 存储当前数据库名称。
- len[入参] database 的空间大小。
- required[出参] 存储当前数据库名称所需的空间(包含最后的'\0')。
- **返回值**`0`:成功,`-1`:失败,可调用函数 ws_errstr(NULL) 获取更详细的错误信息。
- 如果database == NULL 或者 len\<=0 返回失败。
- 如果len 小于 存储数据库名称所需的空间(包含最后的'\0'返回失败database 里赋值截断的数据,以'\0'结尾。
- 如果len 大于等于 存储数据库名称所需的空间(包含最后的'\0'返回成功database 里赋值以'\0结尾数据库名称。
- `int32_t ws_close(WS_TAOS *taos);`
- **接口说明**:关闭连接。
- **参数说明**
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `ws_connect()` 函数建立。
- **返回值**`0`:成功,非 `0`:失败,详情请参考错误码页面。
#### 同步查询
本小节介绍 API 均属于同步接口。应用调用后,会阻塞等待响应,直到获得返回结果或错误信息。
- `WS_RES *ws_query(WS_TAOS *taos, const char *sql)`
- **接口说明**:执行 SQL 语句,可以是 DQL、DML 或 DDL 语句。
- **参数说明**
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `ws_connect()` 函数建立。
- sql[入参] 需要执行 SQL 语句。
- **返回值**:不能通过返回值是否是 `NULL` 来判断执行结果是否失败,而是需要调用 `ws_errno()` 函数解析结果集中的错误代码来进行判断。
- ws_errno 返回值:`0`:成功,`-1`:失败,详情请调用 ws_errstr 函数来获取错误提示。
- `int32_t ws_result_precision(const WS_RES *rs)`
- **接口说明**:返回结果集时间戳字段的精度类别。
- **参数说明**
- res[入参] 结果集。
- **返回值**`0`:毫秒,`1`:微秒,`2`:纳秒。
- `WS_ROW ws_fetch_row(WS_RES *rs)`
- **接口说明**:按行获取查询结果集中的数据。
- **参数说明**
- res[入参] 结果集。
- **返回值**:非 `NULL`:成功,`NULL`:失败,可调用函数 ws_errstr(NULL) 获取更详细的错误信息。
- `int32_t ws_fetch_raw_block(WS_RES *rs, const void **pData, int32_t *numOfRows)`
- **接口说明**:批量获取查询结果集中的数据。
- **参数说明**
- res[入参] 结果集。
- pData[出参] 用于存储从结果集中获取一个数据块。
- numOfRows[出参] 用于存储从结果集中获取数据块包含的行数。
- **返回值**`0`:成功。非 `0`:失败,详情请参考错误码页面。
- `int32_t ws_num_fields(const WS_RES *rs)` 和 `int32_t ws_field_count(const WS_RES *rs)`
- **接口说明**:这两个 API 等价,用于获取查询结果集中的列数。
- **参数说明**
- res[入参] 结果集。
- **返回值**:返回值为结果集中列的数量。
- `int32_t ws_affected_rows(const WS_RES *rs)`
- **接口说明**:获取被所执行的 SQL 语句影响的行数。
- **参数说明**
- res[入参] 结果集。
- **返回值**:返回值表示受影响的行数。
- `int64_t ws_affected_rows64(const WS_RES *rs)`
- **接口说明**:获取被所执行的 SQL 语句影响的行数。
- **参数说明**
- res[入参] 结果集。
- **返回值**:返回值表示受影响的行数。
- `const struct WS_FIELD *ws_fetch_fields(WS_RES *rs)`
- **接口说明**:获取查询结果集每列数据的属性(列的名称、列的数据类型、列的长度),与 `ws_num_fields()` 配合使用,可用来解析 `ws_fetch_row()` 返回的一个元组(一行)的数据。
- **参数说明**
- res[入参] 结果集。
- **返回值**:非 `NULL`:成功,返回一个指向 WS_FIELD 结构体的指针,每个元素代表一列的元数据。`NULL`:失败。
- `int32_t ws_stop_query(WS_RES *rs)`
- **接口说明**:停止当前查询的执行。
- **参数说明**
- res[入参] 结果集。
- **返回值**`0`:成功。非 `0`:失败,详情请参考错误码页面。
- `int32_t ws_free_result(WS_RES *rs)`
- **接口说明**:释放查询结果集以及相关的资源。查询完成后,务必调用该 API 释放资源,否则可能导致应用内存泄露。但也需注意,释放资源后,如果再调用 `ws_fetch_fields()` 等获取查询结果的函数,将导致应用崩溃。
- **参数说明**
- res[入参] 结果集。
- **返回值**`0`:成功。非 `0`:失败,详情请参考错误码页面。
- `const char *ws_errstr(WS_RES *rs)`
- **接口说明**:获取最近一次 API 调用失败的原因,返回值为字符串标识的错误提示信息。
- **参数说明**
- res[入参] 结果集。
- **返回值**:字符串标识的错误提示信息。
- `int32_t ws_errno(WS_RES *rs)`
- **接口说明**:获取最近一次 API 调用失败的原因,返回值为错误代码。
- **参数说明**
- res[入参] 结果集。
- **返回值**:字符串标识的错误提示信息。
:::note
TDengine 推荐数据库应用的每个线程都建立一个独立的连接,或基于线程建立连接池。不要在应用中将该连接 (WS_TAOS\*) 结构体传递到不同的线程共享使用。
另一个需要注意的是,在上述同步 API 执行过程中,不能调用类似 pthread_cancel 之类的 API 来强制结束线程,因为涉及一些模块的同步操作,如果强制结束线程有可能造成包括但不限于死锁等异常状况。
:::
#### 参数绑定
除了直接调用 `ws_query()` 通过执行 SQL 进行数据写入TDengine 也提供了支持参数绑定的 Prepare API风格与 MySQL 类似,目前也仅支持用问号 `?` 来代表待绑定的参数。
通过参数绑定接口写入数据时,可以避免 SQL 语法解析的资源消耗,从而在绝大多数情况下显著提升写入性能。此时的典型操作步骤如下:
1. 调用 `ws_stmt_init()` 创建参数绑定对象;
2. 调用 `ws_stmt_prepare()` 解析 INSERT 语句;
3. 如果 INSERT 语句中预留了表名但没有预留 TAGS那么调用 `ws_stmt_set_tbname()` 来设置表名;
4. 如果 INSERT 语句中既预留了表名又预留了 TAGS例如 INSERT 语句采取的是自动建表的方式),那么调用 `ws_stmt_set_tbname_tags()` 来设置表名和 TAGS 的值;
5. 调用 `ws_stmt_bind_param_batch()` 以多行的方式设置 VALUES 的值;
6. 调用 `ws_stmt_add_batch()` 把当前绑定的参数加入批处理;
7. 可以重复第 3 6 步,为批处理加入更多的数据行;
8. 调用 `ws_stmt_execute()` 执行已经准备好的批处理指令;
9. 执行完毕,调用 `ws_stmt_close()` 释放所有资源。
说明:如果 `ws_stmt_execute()` 执行成功,假如不需要改变 SQL 语句的话,那么是可以复用 `ws_stmt_prepare()` 的解析结果,直接进行第 3 6 步绑定新数据的。但如果执行出错,那么并不建议继续在当前的环境上下文下继续工作,而是建议释放资源,然后从 `ws_stmt_init()` 步骤重新开始。
接口相关的具体函数如下(也可以参考 [stmt_insert_demo.c](https://github.com/taosdata/TDengine/blob/develop/docs/examples/c-ws/stmt_insert_demo.c) 文件中使用对应函数的方式):
- `WS_STMT *ws_stmt_init(const WS_TAOS *taos)`
- **接口说明**:初始化一个预编译的 SQL 语句对象。
- **参数说明**
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `ws_connect()` 函数建立。
- **返回值**:非 `NULL`:成功,返回一个指向 WS_STMT 结构体的指针,该结构体表示预编译的 SQL 语句对象。`NULL`:失败,详情请调用 ws_stmt_errstr() 函数来获取错误提示。
- `int ws_stmt_prepare(WS_STMT *stmt, const char *sql, unsigned long len)`
- **接口说明**:解析一条预编译的 SQL 语句,将解析结果和参数信息绑定到 stmt 上。
- **参数说明**
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- sql[入参] 需要解析的 SQL 语句。
- len[入参] 参数 sql 的长度。如果参数 len 大于 0将使用此参数作为 SQL 语句的长度,如等于 0将自动判断 SQL 语句的长度。
- **返回值**`0`:成功。非 `0`:失败,详情请参考错误码页面。
- `int ws_stmt_bind_param_batch(WS_STMT *stmt, const WS_MULTI_BIND *bind, uint32_t len)`
- **接口说明**:以多列的方式传递待绑定的数据,需要保证这里传递的数据列的顺序、列的数量与 SQL 语句中的 VALUES 参数完全一致。
- **参数说明**
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- bind[入参] 指向一个有效的 WS_MULTI_BIND 结构体指针,该结构体包含了要批量绑定到 SQL 语句中的参数列表。
- len: [入参] bind 数组的元素个数。
- **返回值**`0`:成功。非 `0`:失败,详情请参考错误码页面。
- `int ws_stmt_set_tbname(WS_STMT *stmt, const char *name)`
- **接口说明**:(仅支持用于替换 INSERT 语句中的参数值)当 SQL 语句中的表名使用了 `?` 占位时,可以使用此函数绑定一个具体的表名。
- **参数说明**
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- name[入参] 指向一个包含子表名称的字符串常量。
- **返回值**`0`:成功。非 `0`:失败,详情请参考错误码页面。
- `int ws_stmt_set_tbname_tags(WS_STMT *stmt,
const char *name,
const WS_MULTI_BIND *bind,
uint32_t len);`
- **接口说明**:(仅支持用于替换 INSERT 语句中的参数值)当 SQL 语句中的表名和 TAGS 都使用了 `?` 占位时,可以使用此函数绑定具体的表名和具体的 TAGS 取值。最典型的使用场景是使用了自动建表功能的 INSERT 语句(目前版本不支持指定具体的 TAGS 列。TAGS 参数中的列数量需要与 SQL 语句中要求的 TAGS 数量完全一致。
- **参数说明**
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- name[入参] 指向一个包含子表名称的字符串常量。
- tags[入参] 指向一个有效的 WS_MULTI_BIND 结构体指针,该结构体包含了子表标签的值。
- len[入参] bind 数组的元素个数。
- **返回值**`0`:成功。非 `0`:失败,详情请参考错误码页面。
- `int ws_stmt_add_batch(WS_STMT *stmt)`
- **接口说明**:将当前绑定的参数加入批处理中,调用此函数后,可以再次调用 `ws_stmt_bind_param_batch()` 绑定新的参数。需要注意,此函数仅支持 INSERT/IMPORT 语句,如果是 SELECT 等其他 SQL 语句,将返回错误。
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- **返回值**`0`:成功。非 `0`:失败,详情请参考错误码页面。
- `int ws_stmt_execute(WS_STMT *stmt, int32_t *affected_rows)`
- **接口说明**:执行准备好的语句。目前,一条语句只能执行一次。
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- affected_rows[出参] 成功写入的行数。
- **返回值**`0`:成功。非 `0`:失败,详情请参考错误码页面。
- `int ws_stmt_affected_rows(WS_STMT *stmt)`
- **接口说明**:获取执行预编译 SQL 语句后受影响的行数。
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- **返回值**:返回受影响的行数。
- `int ws_stmt_affected_rows_once(WS_STMT *stmt)`
- **接口说明**:获取执行一次绑定语句影响的行数。
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- **返回值**:返回受影响的行数。
- `int32_t ws_stmt_close(WS_STMT *stmt)`
- **接口说明**:执行完毕,释放所有资源。
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- **返回值**`0`:成功。非 `0`:失败,详情请参考错误码页面。
- `const char *ws_stmt_errstr(WS_STMT *stmt)`
- **接口说明**:用于在其他 STMT API 返回错误(返回错误码或空指针)时获取错误信息。
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- **返回值**:返回一个指向包含错误信息的字符串的指针。
#### 无模式写入
除了使用 SQL 方式或者使用参数绑定 API 写入数据外,还可以使用 Schemaless 的方式完成写入。Schemaless 可以免于预先创建超级表/数据子表的数据结构而是可以直接写入数据TDengine 系统会根据写入的数据内容自动创建和维护所需要的表结构。Schemaless 的使用方式详见 [Schemaless 写入](../../../develop/schemaless/) 章节,这里介绍与之配套使用的 C/C++ API。
- `WS_RES *ws_schemaless_insert_raw(WS_TAOS *taos,
const char *lines,
int len,
int32_t *totalRows,
int protocal,
int precision)`
- **接口说明**:执行无模式的批量插入操作,将行协议的文本数据写入到 TDengine 中。通过传递的参数lines指针和长度len来表示数据为了解决原始接口数据包含'\0'而被截断的问题。
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `ws_connect()` 函数建立。
- lines[入参] 文本数据。满足解析格式要求的无模式文本字符串。
- len[入参] 数据缓冲区 lines 的总长度(字节数)。
- totalRows[出参] 指向一个整数指针,用于返回成功插入的记录总数。
- protocol[入参] 行协议类型,用于标识文本数据格式。
- precision[入参] 文本数据中的时间戳精度字符串。
- **返回值**:返回一个指向 WS_RES 结构体的指针,该结构体包含了插入操作的结果。应用可以通过使用 `ws_errstr()` 获得错误信息,也可以使用 `ws_errno()` 获得错误码。在某些情况下,返回的 WS_RES 为 `NULL`,此时仍然可以调用 `ws_errno()` 来安全地获得错误码信息。
返回的 WS_RES 需要调用方来负责释放,否则会出现内存泄漏。
**说明**
协议类型是枚举类型,包含以下三种格式:
- WS_TSDB_SML_LINE_PROTOCOLInfluxDB 行协议Line Protocol)
- WS_TSDB_SML_TELNET_PROTOCOL: OpenTSDB Telnet 文本行协议
- WS_TSDB_SML_JSON_PROTOCOL: OpenTSDB Json 协议格式
时间戳分辨率的定义,定义在 `taosws.h` 文件中,具体内容如下:
- WS_TSDB_SML_TIMESTAMP_NOT_CONFIGURED = 0,
- WS_TSDB_SML_TIMESTAMP_HOURS,
- WS_TSDB_SML_TIMESTAMP_MINUTES,
- WS_TSDB_SML_TIMESTAMP_SECONDS,
- WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS,
- WS_TSDB_SML_TIMESTAMP_MICRO_SECONDS,
- WS_TSDB_SML_TIMESTAMP_NANO_SECONDS
需要注意的是,时间戳分辨率参数只在协议类型为 `WS_SML_LINE_PROTOCOL` 的时候生效。
对于 OpenTSDB 的文本协议,时间戳的解析遵循其官方解析规则 — 按照时间戳包含的字符的数量来确认时间精度。
**schemaless 其他相关的接口**
- `WS_RES *ws_schemaless_insert_raw_with_reqid(WS_TAOS *taos,
const char *lines,
int len,
int32_t *totalRows,
int protocal,
int precision,
uint64_t reqid)`
- **接口说明**:执行无模式的批量插入操作,将行协议的文本数据写入到 TDengine 中。通过传递的参数lines指针和长度len来表示数据为了解决原始接口数据包含'\0'而被截断的问题。通过传递参数reqid来跟踪整个的函数调用链情况。
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `ws_connect()` 函数建立。
- lines[入参] 文本数据。满足解析格式要求的无模式文本字符串。
- len[入参] 数据缓冲区 lines 的总长度(字节数)。
- totalRows[出参] 指向一个整数指针,用于返回成功插入的记录总数。
- protocol[入参] 行协议类型,用于标识文本数据格式。
- precision[入参] 文本数据中的时间戳精度字符串。
- reqid[入参] 指定的请求 ID用于跟踪调用请求。请求 ID (reqid) 可以用于在客户端和服务器端之间建立请求和响应之间的关联,对于分布式系统中的跟踪和调试非常有用。
- **返回值**:返回一个指向 WS_RES 结构体的指针,该结构体包含了插入操作的结果。应用可以通过使用 `ws_errstr()` 获得错误信息,也可以使用 `ws_errno()` 获得错误码。在某些情况下,返回的 WS_RES 为 `NULL`,此时仍然可以调用 `ws_errno()` 来安全地获得错误码信息。
返回的 WS_RES 需要调用方来负责释放,否则会出现内存泄漏。
- `WS_RES *ws_schemaless_insert_raw_ttl(WS_TAOS *taos,
const char *lines,
int len,
int32_t *totalRows,
int protocal,
int precision,
int ttl)`
- **接口说明**:执行无模式的批量插入操作,将行协议的文本数据写入到 TDengine 中。通过传递的参数lines指针和长度len来表示数据为了解决原始接口数据包含'\0'而被截断的问题。通过传递ttl参数来控制建表的ttl到期时间。
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `ws_connect()` 函数建立。
- lines[入参] 文本数据。满足解析格式要求的无模式文本字符串。
- len[入参] 数据缓冲区 lines 的总长度(字节数)。
- totalRows[出参] 指向一个整数指针,用于返回成功插入的记录总数。
- protocol[入参] 行协议类型,用于标识文本数据格式。
- precision[入参] 文本数据中的时间戳精度字符串。
- ttl[入参] 指定的生存时间TTL单位为天。记录在超过这个生存时间后会被自动删除。
- **返回值**:返回一个指向 WS_RES 结构体的指针,该结构体包含了插入操作的结果。应用可以通过使用 `ws_errstr()` 获得错误信息,也可以使用 `ws_errno()` 获得错误码。在某些情况下,返回的 WS_RES 为 `NULL`,此时仍然可以调用 `ws_errno()` 来安全地获得错误码信息。
返回的 WS_RES 需要调用方来负责释放,否则会出现内存泄漏。
- `WS_RES *ws_schemaless_insert_raw_ttl_with_reqid(WS_TAOS *taos,
const char *lines,
int len,
int32_t *totalRows,
int protocal,
int precision,
int ttl,
uint64_t reqid)`
- **接口说明**:执行无模式的批量插入操作,将行协议的文本数据写入到 TDengine 中。通过传递的参数lines指针和长度len来表示数据为了解决原始接口数据包含'\0'而被截断的问题。通过传递ttl参数来控制建表的ttl到期时间。通过传递参数reqid来跟踪整个的函数调用链情况。
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `ws_connect()` 函数建立。
- lines[入参] 文本数据。满足解析格式要求的无模式文本字符串。
- len[入参] 数据缓冲区 lines 的总长度(字节数)。
- totalRows[出参] 指向一个整数指针,用于返回成功插入的记录总数。
- protocol[入参] 行协议类型,用于标识文本数据格式。
- precision[入参] 文本数据中的时间戳精度字符串。
- ttl[入参] 指定的生存时间TTL单位为天。记录在超过这个生存时间后会被自动删除。
- reqid[入参] 指定的请求 ID用于跟踪调用请求。请求 ID (reqid) 可以用于在客户端和服务器端之间建立请求和响应之间的关联,对于分布式系统中的跟踪和调试非常有用。
- **返回值**:返回一个指向 WS_RES 结构体的指针,该结构体包含了插入操作的结果。应用可以通过使用 `ws_errstr()` 获得错误信息,也可以使用 `ws_errno()` 获得错误码。在某些情况下,返回的 WS_RES 为 `NULL`,此时仍然可以调用 `ws_errno()` 来安全地获得错误码信息。
返回的 WS_RES 需要调用方来负责释放,否则会出现内存泄漏。
**说明**
- 上面这3个接口是扩展接口主要用于在 schemaless 写入时传递 ttl、reqid 参数,可以根据需要使用。
- 带 ttl 的接口可以传递 ttl 参数来控制建表的ttl到期时间。
- 带 reqid 的接口可以通过传递 reqid 参数来追踪整个的调用链。
#### 数据订阅
- `const char *ws_tmq_errstr(ws_tmq_t *tmq)`
- **接口说明**:用于获取数据订阅的错误信息。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- **返回值**返回一个指向包含错误信息字符串的指针返回值非NULL但是错误信息可能为空字符串。
- `ws_tmq_conf_t *ws_tmq_conf_new(void);`
- **接口说明**:创建一个新的 TMQ 配置对象。
- **返回值**:非 `NULL`:成功,返回一个指向 ws_tmq_conf_t 结构体的指针,该结构体用于配置 TMQ 的行为和特性。`NULL`:失败,可调用函数 ws_errstr(NULL) 获取更详细的错误信息。
- `enum ws_tmq_conf_res_t ws_tmq_conf_set(ws_tmq_conf_t *conf, const char *key, const char *value)`
- **接口说明**:设置 TMQ 配置对象中的配置项,用于配置消费参数。
- conf[入参] 指向一个有效的 ws_tmq_conf_t 结构体指针,该结构体代表一个 TMQ 配置对象。
- key[入参] 数配置项的键名。
- value[入参] 配置项的值。
- **返回值**:返回一个 ws_tmq_conf_res_t 枚举值,表示配置设置的结果。
- WS_TMQ_CONF_OK成功设置配置项。
- WS_TMQ_CONF_INVALID_KEY键值无效。
- WS_TMQ_CONF_UNKNOWN键名无效。
- `int32_t ws_tmq_conf_destroy(ws_tmq_conf_t *conf)`
- **接口说明**:销毁一个 TMQ 配置对象并释放相关资源。
- conf[入参] 指向一个有效的 ws_tmq_conf_t 结构体指针,该结构体代表一个 TMQ 配置对象。
- **返回值**`0`:成功。非 `0`:失败,可调用函数 `ws_tmq_errstr(NULL)` 获取更详细的错误信息。
- `ws_tmq_list_t *ws_tmq_list_new(void)`
- **接口说明**:用于创建一个 ws_tmq_list_t 结构体,用于存储订阅的 topic。
- **返回值**:非 `NULL`:成功,返回一个指向 ws_tmq_list_t 结构体的指针。`NULL`:失败,可调用函数 `ws_tmq_errstr(NULL)` 获取更详细的错误信息。
- `int32_t ws_tmq_list_append(ws_tmq_list_t *list, const char *topic)`
- **接口说明**:用于向 ws_tmq_list_t 结构体中添加一个 topic。
- list[入参] 指向一个有效的 ws_tmq_list_t 结构体指针,该结构体代表一个 TMQ 列表对象。
- topic[入参] topic 名称。
- **返回值**`0`:成功。非 `0`:失败,可调用函数 `ws_tmq_errstr(NULL)` 获取更详细的错误信息。
- `int32_t ws_tmq_list_destroy(ws_tmq_list_t *list);`
- **接口说明**:用于销毁 ws_tmq_list_t 结构体ws_tmq_list_new 的结果需要通过该接口销毁。
- list[入参] 指向一个有效的 ws_tmq_list_t 结构体指针,该结构体代表一个 TMQ 列表对象。
- **返回值**`0`:成功。非 `0`:失败,可调用函数 `ws_tmq_errstr(NULL)` 获取更详细的错误信息。
- `int32_t ws_tmq_list_get_size(ws_tmq_list_t *list);`
- **接口说明**:用于获取 ws_tmq_list_t 结构体中 topic 的个数。
- list[入参] 指向一个有效的 ws_tmq_list_t 结构体指针,该结构体代表一个 TMQ 列表对象。
- **返回值**`>=0`:成功,返回 ws_tmq_list_t 结构体中 topic 的个数。`-1`:失败,表示输入参数 list 为 NULL 。
- `char **ws_tmq_list_to_c_array(const ws_tmq_list_t *list, uint32_t *topic_num);`
- **接口说明**:用于将 ws_tmq_list_t 结构体转换为 C 数组,数组每个元素为字符串指针。
- list[入参] 指向一个有效的 ws_tmq_list_t 结构体指针,该结构体代表一个 TMQ 列表对象。
- topic_num[入参] list 的元素个数。
- **返回值**:非 `NULL`:成功,返回 c 数组, 每个元素是字符串指针,代表一个 topic 名称。`NULL`:失败,表示输入参数 list 为 NULL 。
- `ws_tmq_t *ws_tmq_consumer_new(ws_tmq_conf_t *conf, const char *dsn, char *errstr, int errstr_len)`
- **接口说明**:用于创建一个 ws_tmq_t 结构体,用于消费数据,消费完数据后需调用 tmq_consumer_close 关闭消费者。
- conf[入参] 指向一个有效的 ws_tmq_conf_t 结构体指针,该结构体代表一个 TMQ 配置对象。
- dsn[入参] dsn 信息字符串,具体可参考上面 DSN 章节。一个常见的合法 dsn 为 "tmq+ws://root:taosdata@localhost:6041"。
- errstr[出参] 指向一个有效的字符缓冲区指针,用于接收创建过程中可能产生的错误信息。内存的申请/释放由调用者负责。
- errstrLen[入参] 指定 errstr 缓冲区的大小(以字节为单位)。
- **返回值**:非 `NULL`:成功,返回一个指向 ws_tmq_t 结构体的指针,该结构体代表一个 TMQ 消费者对象。。`NULL`:失败,错误信息存储在参数 errstr 中 。
- `int32_t ws_tmq_subscribe(ws_tmq_t *tmq, const ws_tmq_list_t *topic_list)`
- **接口说明**:用于订阅 topic 列表,消费完数据后,需调用 ws_tmq_subscribe 取消订阅。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- topic_list[入参] 指向一个有效的 ws_tmq_list_t 结构体指针,该结构体包含一个或多个主题名称,目前仅支持一个主题名称。
- **返回值**`0`:成功。非 `0`:失败,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `int32_t ws_tmq_unsubscribe(ws_tmq_t *tmq)`
- **接口说明**:用于取消订阅的 topic 列表。需与 ws_tmq_subscribe 配合使用。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- **返回值**`0`:成功。非 `0`:失败,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `WS_RES *ws_tmq_consumer_poll(ws_tmq_t *tmq, int64_t timeout)`
- **接口说明**:用于轮询消费数据,每一个消费者,只能单线程调用该接口。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- timeout[入参] 轮询的超时时间单位为毫秒负数表示默认超时1秒。
- **返回值**:非 `NULL`:成功,返回一个指向 WS_RES 结构体的指针,该结构体包含了接收到的消息。`NULL`失败表示没有数据。WS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 WS_RES 里的信息,比如 schema 等。
- `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)`
- **接口说明**:用于关闭 ws_tmq_t 结构体。需与 ws_tmq_consumer_new 配合使用。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- **返回值**`0`:成功。非 `0`:失败,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `int32_t ws_tmq_get_topic_assignment(ws_tmq_t *tmq,
const char *pTopicName,
struct ws_tmq_topic_assignment **assignment,
int32_t *numOfAssignment)`
- **接口说明**:返回当前 consumer 分配的 vgroup 的信息,每个 vgroup 的信息包括 vgIdwal 的最大最小 offset以及当前消费到的 offset。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- pTopicName[入参] 要查询分配信息的主题名称。
- assignment[出参] 指向一个 tmq_topic_assignment 结构体指针的指针,用于接收分配信息。数据大小为 numOfAssignment需要通过 tmq_free_assignment 接口释放。
- numOfAssignment[出参] 指向一个整数指针,用于接收分配给该 consumer 有效的 vgroup 个数。
- **返回值**`0`:成功。非 `0`:失败,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `int32_t ws_tmq_free_assignment(struct ws_tmq_topic_assignment *pAssignment, int32_t numOfAssignment)`
- **接口说明**返回当前consumer分配的vgroup的信息每个vgroup的信息包括vgIdwal的最大最小offset以及当前消费到的offset。
- pAssignment[入参] 指向一个有效的 ws_tmq_topic_assignment 结构体数组的指针,该数组包含了 vgroup 分配信息。
- numOfAssignment[入参] pAssignment 指向的数组元素个数。
- **返回值**`0`:成功。非 `0`:失败,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `int64_t ws_tmq_committed(ws_tmq_t *tmq, const char *pTopicName, int32_t vgId)`
- **接口说明**:获取 TMQ 消费者对象对特定 topic 和 vgroup 的已提交偏移量。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- pTopicName[入参] 要查询已提交偏移量的主题名称。
- vgId[入参] vgroup 的 ID。
- **返回值**`>=0`:成功,返回一个 int64_t 类型的值,表示已提交的偏移量。`<0`:失败,返回值就是错误码,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `int32_t ws_tmq_commit_sync(ws_tmq_t *tmq, const WS_RES *rs)`
- **接口说明**:同步提交 TMQ 消费者对象处理的消息偏移量。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- rs[入参] 指向一个有效的 WS_RES 结构体指针,该结构体包含了已处理的消息。如果为 NULL提交当前 consumer 所有消费的 vgroup 的当前进度。
- **返回值**`0`:成功,已经成功提交偏移量。非 `0`:失败,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `int32_t ws_tmq_commit_offset_sync(ws_tmq_t *tmq,
const char *pTopicName,
int32_t vgId,
int64_t offset)`
- **接口说明**:同步提交 TMQ 消费者对象的特定主题和 vgroup 的偏移量。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- pTopicName[入参] 要提交偏移量的主题名称。
- vgId[入参] 虚拟组 vgroup 的 ID。
- offset[入参] 要提交的偏移量。
- **返回值**`0`:成功,已经成功提交偏移量。非 `0`:失败,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `int64_t ws_tmq_position(ws_tmq_t *tmq, const char *pTopicName, int32_t vgId)`
- **接口说明**:获取当前消费位置,即已消费到的数据位置的下一个位置.
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- pTopicName[入参] 要查询当前位置的主题名称。
- vgId[入参] 虚拟组 vgroup 的 ID。
- **返回值**`>=0`:成功,返回一个 int64_t 类型的值,表示当前位置的偏移量。`<0`:失败,返回值就是错误码,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `int32_t ws_tmq_offset_seek(ws_tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
- **接口说明**:将 TMQ 消费者对象在某个特定 topic 和 vgroup 的偏移量设置到指定的位置。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- pTopicName[入参] 要查询当前位置的主题名称。
- vgId[入参] 虚拟组 vgroup 的 ID。
- offset[入参] 虚拟组 vgroup 的 ID。
- **返回值**`0`:成功,非 `0`:失败,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `int64_t ws_tmq_get_vgroup_offset(const WS_RES *rs)`
- **接口说明**:从 TMQ 消费者获取的消息结果中提取虚拟组vgroup的当前消费数据位置的偏移量。
- res[入参] 指向一个有效的 WS_RES 结构体指针,该结构体包含了从 TMQ 消费者轮询得到的消息。
- **返回值**`>=0`:成功,返回一个 int64_t 类型的值,表示当前消费位置的偏移量。`<0`:失败,返回值就是错误码,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `int32_t ws_tmq_get_vgroup_id(const WS_RES *rs)`
- **接口说明**:从 TMQ 消费者获取的消息结果中提取所属虚拟组vgroup的 ID。
- res[入参] 指向一个有效的 WS_RES 结构体指针,该结构体包含了从 TMQ 消费者轮询得到的消息。
- **返回值**`>=0`:成功,返回一个 int32_t 类型的值表示虚拟组vgroup的 ID。`<0`:失败,返回值就是错误码,可调用函数 `ws_tmq_errstr(tmq)` 获取更详细的错误信息。
- `const char *ws_tmq_get_table_name(const WS_RES *rs)`
- **接口说明**:从 TMQ 消费者获取的消息结果中获取所属的表名。
- res[入参] 指向一个有效的 WS_RES 结构体指针,该结构体包含了从 TMQ 消费者轮询得到的消息。
- **返回值**:非 `NULL`:成功,返回一个 const char * 类型的指针,指向表名字符串。`NULL`:失败,非法的输入参数。
- `enum ws_tmq_res_t ws_tmq_get_res_type(const WS_RES *rs)`
- **接口说明**:从 TMQ 消费者获取的消息结果中获取消息类型。
- res[入参] 指向一个有效的 WS_RES 结构体指针,该结构体包含了从 TMQ 消费者轮询得到的消息。
- **返回值**:返回一个 ws_tmq_res_t 类型的枚举值,表示消息类型。
- ws_tmq_res_t 表示消费到的数据类型,定义如下:
```
typedef enum ws_tmq_res_t {
WS_TMQ_RES_INVALID = -1, // 无效
WS_TMQ_RES_DATA = 1, // 数据类型
WS_TMQ_RES_TABLE_META = 2, // 元数据类型
WS_TMQ_RES_METADATA = 3 // 既有元数据类型又有数据类型,即自动建表
} tmq_res_t;
```
- `const char *ws_tmq_get_topic_name(const WS_RES *rs)`
- **接口说明**:从 TMQ 消费者获取的消息结果中获取所属的 topic 名称。
- res[入参] 指向一个有效的 WS_RES 结构体指针,该结构体包含了从 TMQ 消费者轮询得到的消息。
- **返回值**:非 `NULL`:成功,返回一个 const char * 类型的指针,指向 topic 名称字符串。`NULL`:失败,非法的输入参数。
- `const char *ws_tmq_get_db_name(const WS_RES *rs)`
- **接口说明**:从 TMQ 消费者获取的消息结果中获取所属的数据库名称。
- res[入参] 指向一个有效的 WS_RES 结构体指针,该结构体包含了从 TMQ 消费者轮询得到的消息。
- **返回值**:非 `NULL`:成功,返回一个 const char * 类型的指针,指向数据库名称字符串。`NULL`:失败,非法的输入参数。
## 原生连接方式
原生连接方式需要使用 taos.h 头文件和 taos 动态库。
```c
#include <taos.h>
```
@ -22,21 +629,21 @@ TDengine 客户端驱动的动态库位于:
- Windows: `C:\TDengine\taos.dll`
- macOS: `/usr/local/lib/libtaos.dylib`
## 支持的平台
### 支持的平台
请参考[支持的平台列表](../#支持的平台)
## 支持的版本
### 支持的版本
TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一对应的强对应关系,建议使用与 TDengine 服务端完全相同的客户端驱动。虽然低版本的客户端驱动在前三段版本号一致(即仅第四段版本号不同)的情况下也能够与高版本的服务端相兼容,但这并非推荐用法。强烈不建议使用高版本的客户端驱动访问低版本的服务端。
## 错误码
### 错误码
在 C 接口的设计中,错误码采用整数类型表示,每个错误码都对应一个特定的错误状态。如未特别说明,当 API 的返回值是整数时_0_ 代表成功,其它是代表失败原因的错误码,当返回值是指针时, _NULL_ 表示失败。
所有的错误码以及对应的原因描述在 `taoserror.h` 文件中。
详细的错误码说明参考:[错误码](../../../reference/error-code)
## 示例程序
### 示例程序
本节展示了使用客户端驱动访问 TDengine 集群的常见访问方式的示例代码。
@ -57,11 +664,11 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一
:::
## API 参考
### API 参考
以下分别介绍 TDengine 客户端驱动的基础 API、同步 API、异步 API、订阅 API 和无模式写入 API。
以下分别介绍 TDengine 客户端驱动的基础 API、同步 API、异步 API、参数绑定 API无模式写入 API 和数据订阅 API。
### 基础 API
#### 基础 API
基础 API 用于完成创建数据库连接等工作,为其它 API 的执行提供运行时环境。
@ -145,7 +752,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一
- **参数说明**
- taos[入参] 指向数据库连接的指针,数据库连接是通过 `taos_connect()` 函数建立。
### 同步查询
#### 同步查询
本小节介绍 API 均属于同步接口。应用调用后,会阻塞等待响应,直到获得返回结果或错误信息。
@ -228,7 +835,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一
:::
### 异步查询
#### 异步查询
TDengine 还提供性能更高的异步 API 处理数据插入、查询操作。在软硬件环境相同的情况下,异步 API 处理数据插入的速度比同步 API 快 2 4 倍。异步 API 采用非阻塞式的调用方式,在系统真正完成某个具体数据库操作前,立即返回。调用的线程可以去处理其他工作,从而可以提升整个应用的性能。异步 API 在网络延迟严重的情况下,优势尤为突出。
@ -252,7 +859,7 @@ TDengine 还提供性能更高的异步 API 处理数据插入、查询操作。
TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多线程同时打开多张表,并可以同时对每张打开的表进行查询或者插入操作。需要指出的是,**客户端应用必须确保对同一张表的操作完全串行化**,即对同一个表的插入或查询操作未完成时(未返回时),不能够执行第二个插入或查询操作。
### 参数绑定
#### 参数绑定
除了直接调用 `taos_query()` 进行查询TDengine 也提供了支持参数绑定的 Prepare API风格与 MySQL 类似,目前也仅支持用问号 `?` 来代表待绑定的参数。
@ -350,7 +957,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- stmt[入参] 指向一个有效的预编译的 SQL 语句对象指针。
- **返回值**:返回一个指向包含错误信息的字符串的指针。
### 无模式写入
#### 无模式写入
除了使用 SQL 方式或者使用参数绑定 API 写入数据外,还可以使用 Schemaless 的方式完成写入。Schemaless 可以免于预先创建超级表/数据子表的数据结构而是可以直接写入数据TDengine 系统会根据写入的数据内容自动创建和维护所需要的表结构。Schemaless 的使用方式详见 [Schemaless 写入](../../../develop/schemaless/) 章节,这里介绍与之配套使用的 C/C++ API。
@ -474,7 +1081,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- 带_ttl的接口可以传递ttl参数来控制建表的ttl到期时间。
- 带_reqid的接口可以通过传递reqid参数来追踪整个的调用链。
### 数据订阅
#### 数据订阅
- `const char *tmq_err2str(int32_t code)`
- **接口说明**:用于将数据订阅的错误码转换为错误信息。
- code[入参] 数据订阅的错误码。
@ -584,7 +1191,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- **接口说明**:获取 TMQ 消费者对象对特定 topic 和 vgroup 的已提交偏移量。
- tmq[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- pTopicName[入参] 要查询已提交偏移量的主题名称。
- assignment[入参] vgroup 的 ID。
- vgId[入参] vgroup 的 ID。
- **返回值**`>=0`:成功,返回一个 int64_t 类型的值,表示已提交的偏移量。`<0`:失败,返回值就是错误码,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
- `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)`
@ -604,6 +1211,8 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- **接口说明**:同步提交 TMQ 消费者对象的特定主题和 vgroup 的偏移量。
- tmq[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- pTopicName[入参] 要提交偏移量的主题名称。
- vgId[入参] 虚拟组 vgroup 的 ID。
- offset[入参] 要提交的偏移量。
- **返回值**`0`:成功,已经成功提交偏移量。非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
- `void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param)`

View File

@ -113,11 +113,11 @@ DSN 描述字符串基本结构如下:
各部分意义见下表:
- **driver**: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
- **taos**: 表名使用 TDengine 连接器驱动。
- **taos**: 使用 TDengine 连接器驱动,默认是使用 taos 驱动。
- **tmq**: 使用 TMQ 订阅数据。
- **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。
- **http/ws**: 使用 Websocket 创建连接。
- **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
- **protocol**: 显示指定以何种方式建立连接,例如:`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。
- **username/password**: 用于创建连接的用户名及密码。
- **host/port**: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(`taos://`),原生连接默认为 `localhost:6030`Websocket 连接默认为 `localhost:6041` 。
- **database**: 指定默认连接的数据库名,可选参数。

View File

@ -28,14 +28,14 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
TDengine 版本更新往往会增加新的功能特性,列表中的连接器版本为连接器最佳适配版本。
| **TDengine 版本** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** |
| ---------------------- | ----------- | ------------------------------------------- | ------------ | ------------- | --------------- | -------- |
| **3.3.0.0 及以上** | 3.3.0及以上 | taospy 2.7.15及以上taos-ws-py 0.3.2及以上 | 3.5.5及以上 | 3.1.3及以上 | 3.1.0及以上 | 当前版本 |
| **3.0.0.0 及以上** | 3.0.2以上 | 当前版本 | 3.0 分支 | 3.0.0 | 3.1.0 | 当前版本 |
| **2.4.0.14 及以上** | 2.0.38 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 |
| **2.4.0.4 - 2.4.0.13** | 2.0.37 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 |
| **2.2.x.x ** | 2.0.36 | 当前版本 | master 分支 | n/a | 2.0.7 - 2.0.9 | 当前版本 |
| **2.0.x.x ** | 2.0.34 | 当前版本 | master 分支 | n/a | 2.0.1 - 2.0.6 | 当前版本 |
| **TDengine 版本** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** | **C/C++** |
| ---------------------- | ----------- | ------------------------------------------- | ------------ | ------------- | --------------- | -------- | -------------------- |
| **3.3.0.0 及以上** | 3.3.0及以上 | taospy 2.7.15及以上taos-ws-py 0.3.2及以上 | 3.5.5及以上 | 3.1.3及以上 | 3.1.0及以上 | 当前版本 | 与 TDengine 相同版本 |
| **3.0.0.0 及以上** | 3.0.2以上 | 当前版本 | 3.0 分支 | 3.0.0 | 3.1.0 | 当前版本 | 与 TDengine 相同版本 |
| **2.4.0.14 及以上** | 2.0.38 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 | 与 TDengine 相同版本 |
| **2.4.0.4 - 2.4.0.13** | 2.0.37 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 | 与 TDengine 相同版本 |
| **2.2.x.x ** | 2.0.36 | 当前版本 | master 分支 | n/a | 2.0.7 - 2.0.9 | 当前版本 | 与 TDengine 相同版本 |
| **2.0.x.x ** | 2.0.34 | 当前版本 | master 分支 | n/a | 2.0.1 - 2.0.6 | 当前版本 | 与 TDengine 相同版本 |
## 功能特性
@ -43,13 +43,13 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器
### 使用原生接口taosc
| **功能特性** | **Java** | **Python** | **Go** | **C#** | **Rust** |
| ------------------- | -------- | ---------- | ------ | ------ | -------- |
| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 |
| **执行 SQL** | 支持 | 支持 | 支持 | 支持 | 支持 |
| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 |
| **数据订阅TMQ** | 支持 | 支持 | 支持 | 支持 | 支持 |
| **无模式写入** | 支持 | 支持 | 支持 | 支持 | 支持 |
| **功能特性** | **Java** | **Python** | **Go** | **C#** | **Rust** | **C/C++** |
| ------------------- | -------- | ---------- | ------ | ------ | -------- | --------- |
| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **执行 SQL** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **数据订阅TMQ** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **无模式写入** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
:::info
由于不同编程语言数据库框架规范不同,并不意味着所有 C/C++ 接口都需要对应封装支持。
@ -64,13 +64,13 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器
### 使用 Websocket 接口
| **功能特性** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** |
| ------------------- | -------- | ---------- | ------ | ------ | ----------- | -------- |
| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **执行 SQL** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **数据订阅TMQ** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **无模式写入** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **功能特性** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** | **C/C++** |
| ------------------- | -------- | ---------- | ------ | ------ | ----------- | -------- | --------- |
| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **执行 SQL** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **数据订阅TMQ** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **无模式写入** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
:::warning
- 无论选用何种编程语言的连接器2.0 及以上版本的 TDengine 推荐数据库应用的每个线程都建立一个独立的连接或基于线程建立连接池以避免连接内的“USE statement”状态量在线程之间相互干扰但连接的查询和写入操作都是线程安全的

View File

@ -7,11 +7,11 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
## TDengine 服务端支持的平台列表
| | **Windows server 2016/2019** | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18 以上** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** | **macOS** |
| ------------ | ---------------------------- | ----------------- | ---------------- | ---------------- | ------------ | ----------------- | ---------------- | --------- |
| X64 | ●/E | ●/E | ● | ● | ●/E | ●/E | ●/E | ● |
| 树莓派 ARM64 | | | ● | | | | | |
| 华为云 ARM64 | | | | ● | | | | |
| M1 | | | | | | | | ● |
| ------------ | ---------------------------- | ----------------- | ---------------- | ------------------ | ------------ | ----------------- | ---------------- | --------- |
| X64 | ●/E | ●/E | ● | ● | ●/E | ●/E | ●/E | ● |
| 树莓派 ARM64 | | | ● | | | | | |
| 华为云 ARM64 | | | | ● | | | | |
| M1 | | | | | | | | ● |
1) ● 表示经过官方测试验证, ○ 表示非官方测试验证E 表示仅企业版支持。
2) 社区版仅支持主流操作系统的较新版本,包括 Ubuntu 18+/CentOS 7+/RedHat/Debian/CoreOS/FreeBSD/OpenSUSE/SUSE Linux/Fedora/macOS 等。如果有其他操作系统及版本的需求,请联系企业版支持。
@ -31,6 +31,7 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
| **Go** | ● | ● | ● | ● | ● |
| **NodeJs** | ● | ● | ● | ○ | ○ |
| **C#** | ● | ● | ○ | ○ | ○ |
| **Rust** | ● | ● | ○ | ● | ● |
| **RESTful** | ● | ● | ● | ● | ● |
注:● 表示官方测试验证通过,○ 表示非官方测试验证通过,-- 表示未经验证。