From 73557046fda718f50a08eb5ade4c89345d16c25a Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Thu, 26 Sep 2024 11:39:02 +0800 Subject: [PATCH] add c websocket sample --- docs/examples/c-ws/.gitignore | 3 + docs/examples/c-ws/connect_example.c | 21 + docs/examples/c-ws/create_db_demo.c | 69 ++++ docs/examples/c-ws/insert_data_demo.c | 75 ++++ docs/examples/c-ws/multi_bind_example.c | 147 +++++++ docs/examples/c-ws/query_data_demo.c | 77 ++++ docs/examples/c-ws/sml_insert_demo.c | 141 +++++++ docs/examples/c-ws/stmt_insert_demo.c | 186 +++++++++ docs/examples/c-ws/tmq_demo.c | 514 ++++++++++++++++++++++++ docs/examples/c-ws/with_reqid_demo.c | 78 ++++ 10 files changed, 1311 insertions(+) create mode 100644 docs/examples/c-ws/.gitignore create mode 100644 docs/examples/c-ws/connect_example.c create mode 100755 docs/examples/c-ws/create_db_demo.c create mode 100644 docs/examples/c-ws/insert_data_demo.c create mode 100644 docs/examples/c-ws/multi_bind_example.c create mode 100644 docs/examples/c-ws/query_data_demo.c create mode 100644 docs/examples/c-ws/sml_insert_demo.c create mode 100644 docs/examples/c-ws/stmt_insert_demo.c create mode 100644 docs/examples/c-ws/tmq_demo.c create mode 100644 docs/examples/c-ws/with_reqid_demo.c diff --git a/docs/examples/c-ws/.gitignore b/docs/examples/c-ws/.gitignore new file mode 100644 index 0000000000..afe9743149 --- /dev/null +++ b/docs/examples/c-ws/.gitignore @@ -0,0 +1,3 @@ +* +!*.c +!.gitignore diff --git a/docs/examples/c-ws/connect_example.c b/docs/examples/c-ws/connect_example.c new file mode 100644 index 0000000000..5a9cd64292 --- /dev/null +++ b/docs/examples/c-ws/connect_example.c @@ -0,0 +1,21 @@ +// compile with +// gcc connect_example.c -o connect_example -ltaos +#include +#include +#include "taosws.h" + +int main() { + ws_enable_log("debug"); + char *dsn = "ws://localhost:6041"; + WS_TAOS *taos = ws_connect(dsn); + if (taos == NULL) { + fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL)); + return -1; + } + fprintf(stdout, "Connected to %s successfully.\n", dsn); + + /* put your code here for read and write */ + + // close & clean + ws_close(taos); +} diff --git a/docs/examples/c-ws/create_db_demo.c b/docs/examples/c-ws/create_db_demo.c new file mode 100755 index 0000000000..13f038f1df --- /dev/null +++ b/docs/examples/c-ws/create_db_demo.c @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS standard API example. The same syntax as MySQL, but only a subset +// to compile: gcc -o create_db_demo create_db_demo.c -ltaos + +#include +#include +#include +#include +#include "taosws.h" + +static int DemoCreateDB() { + ws_enable_log("debug"); + // ANCHOR: create_db_and_table + int code = 0; + char *dsn = "ws://localhost:6041"; + // connect + WS_TAOS *taos = ws_connect(dsn); + + if (taos == NULL) { + fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL)); + return -1; + } + + // create database + WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power"); + code = ws_errno(result); + if (code != 0) { + fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result)); + ws_close(taos); + return -1; + } + ws_free_result(result); + fprintf(stdout, "Create database power successfully.\n"); + + // create table + const char *sql = + "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId " + "INT, location BINARY(24))"; + result = ws_query(taos, sql); + code = ws_errno(result); + if (code != 0) { + fprintf(stderr, "Failed to create stable power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result)); + ws_close(taos); + return -1; + } + ws_free_result(result); + fprintf(stdout, "Create stable power.meters successfully.\n"); + + // close & clean + ws_close(taos); + return 0; + // ANCHOR_END: create_db_and_table +} + +int main(int argc, char *argv[]) { return DemoCreateDB(); } diff --git a/docs/examples/c-ws/insert_data_demo.c b/docs/examples/c-ws/insert_data_demo.c new file mode 100644 index 0000000000..5fd76c01d8 --- /dev/null +++ b/docs/examples/c-ws/insert_data_demo.c @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS standard API example. The same syntax as MySQL, but only a subset +// to compile: gcc -o insert_data_demo insert_data_demo.c -ltaos + +#include +#include +#include +#include +#include "taosws.h" + +static int DemoInsertData() { + // ANCHOR: insert_data + const char *host = "localhost"; + const char *user = "root"; + const char *password = "taosdata"; + uint16_t port = 6030; + int code = 0; + + // connect + TAOS *taos = taos_connect(host, user, password, NULL, port); + if (taos == NULL) { + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), + taos_errstr(NULL)); + taos_cleanup(); + return -1; + } + + // insert data, please make sure the database and table are already created + const char *sql = + "INSERT INTO " + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + "VALUES " + "(NOW + 1a, 10.30000, 219, 0.31000) " + "(NOW + 2a, 12.60000, 218, 0.33000) " + "(NOW + 3a, 12.30000, 221, 0.31000) " + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + "VALUES " + "(NOW + 1a, 10.30000, 218, 0.25000) "; + TAOS_RES *result = taos_query(taos, sql); + code = taos_errno(result); + if (code != 0) { + fprintf(stderr, "Failed to insert data to power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code, + taos_errstr(result)); + taos_close(taos); + taos_cleanup(); + return -1; + } + taos_free_result(result); + + // you can check affectedRows here + int rows = taos_affected_rows(result); + fprintf(stdout, "Successfully inserted %d rows into power.meters.\n", rows); + + // close & clean + taos_close(taos); + taos_cleanup(); + return 0; + // ANCHOR_END: insert_data +} + +int main(int argc, char *argv[]) { return DemoInsertData(); } diff --git a/docs/examples/c-ws/multi_bind_example.c b/docs/examples/c-ws/multi_bind_example.c new file mode 100644 index 0000000000..0134899a1d --- /dev/null +++ b/docs/examples/c-ws/multi_bind_example.c @@ -0,0 +1,147 @@ +// compile with +// gcc -o multi_bind_example multi_bind_example.c -ltaos +#include +#include +#include +#include "taos.h" + +/** + * @brief execute sql only and ignore result set + * + * @param taos + * @param sql + */ +void executeSQL(TAOS *taos, const char *sql) { + TAOS_RES *res = taos_query(taos, sql); + int code = taos_errno(res); + if (code != 0) { + printf("%s\n", taos_errstr(res)); + taos_free_result(res); + taos_close(taos); + exit(EXIT_FAILURE); + } + taos_free_result(res); +} + +/** + * @brief exit program when error occur. + * + * @param stmt + * @param code + * @param msg + */ +void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) { + if (code != 0) { + printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt)); + (void)taos_stmt_close(stmt); + exit(EXIT_FAILURE); + } +} + +/** + * @brief insert data using stmt API + * + * @param taos + */ +void insertData(TAOS *taos) { + // init + TAOS_STMT *stmt = taos_stmt_init(taos); + // prepare + const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare"); + // bind table name and tags + TAOS_MULTI_BIND tags[2]; + char *location = "California.SanFrancisco"; + int groupId = 2; + tags[0].buffer_type = TSDB_DATA_TYPE_BINARY; + tags[0].buffer_length = strlen(location); + tags[0].length = &tags[0].buffer_length; + tags[0].buffer = location; + tags[0].is_null = NULL; + + tags[1].buffer_type = TSDB_DATA_TYPE_INT; + tags[1].buffer_length = sizeof(int); + tags[1].length = &tags[1].buffer_length; + tags[1].buffer = &groupId; + tags[1].is_null = NULL; + + code = taos_stmt_set_tbname_tags(stmt, "d1001", tags); + checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags"); + + // highlight-start + // insert two rows with multi binds + TAOS_MULTI_BIND params[4]; + // values to bind + int64_t ts[] = {1648432611249, 1648432611749}; + float current[] = {10.3, 12.6}; + int voltage[] = {219, 218}; + float phase[] = {0.31, 0.33}; + // is_null array + char is_null[2] = {0}; + // length array + int32_t int64Len[2] = {sizeof(int64_t)}; + int32_t floatLen[2] = {sizeof(float)}; + int32_t intLen[2] = {sizeof(int)}; + + params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[0].buffer_length = sizeof(int64_t); + params[0].buffer = ts; + params[0].length = int64Len; + params[0].is_null = is_null; + params[0].num = 2; + + params[1].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[1].buffer_length = sizeof(float); + params[1].buffer = current; + params[1].length = floatLen; + params[1].is_null = is_null; + params[1].num = 2; + + params[2].buffer_type = TSDB_DATA_TYPE_INT; + params[2].buffer_length = sizeof(int); + params[2].buffer = voltage; + params[2].length = intLen; + params[2].is_null = is_null; + params[2].num = 2; + + params[3].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[3].buffer_length = sizeof(float); + params[3].buffer = phase; + params[3].length = floatLen; + params[3].is_null = is_null; + params[3].num = 2; + + code = taos_stmt_bind_param_batch(stmt, params); // bind batch + checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch"); + code = taos_stmt_add_batch(stmt); // add batch + checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch"); + // highlight-end + // execute + code = taos_stmt_execute(stmt); + checkErrorCode(stmt, code, "failed to execute taos_stmt_execute"); + int affectedRows = taos_stmt_affected_rows(stmt); + printf("successfully inserted %d rows\n", affectedRows); + // close + (void)taos_stmt_close(stmt); +} + +int main() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030); + if (taos == NULL) { + printf("failed to connect to server\n"); + exit(EXIT_FAILURE); + } + executeSQL(taos, "DROP DATABASE IF EXISTS power"); + executeSQL(taos, "CREATE DATABASE power"); + executeSQL(taos, "USE power"); + executeSQL(taos, + "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), " + "groupId INT)"); + insertData(taos); + taos_close(taos); + taos_cleanup(); +} + +// output: +// successfully inserted 2 rows diff --git a/docs/examples/c-ws/query_data_demo.c b/docs/examples/c-ws/query_data_demo.c new file mode 100644 index 0000000000..311392ae4e --- /dev/null +++ b/docs/examples/c-ws/query_data_demo.c @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS standard API example. The same syntax as MySQL, but only a subset +// to compile: gcc -o query_data_demo query_data_demo.c -ltaos + +#include +#include +#include +#include +#include "taosws.h" + +static int DemoQueryData() { + // ANCHOR: query_data + const char *host = "localhost"; + const char *user = "root"; + const char *password = "taosdata"; + uint16_t port = 6030; + int code = 0; + + // connect + TAOS *taos = taos_connect(host, user, password, NULL, port); + if (taos == NULL) { + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), + taos_errstr(NULL)); + taos_cleanup(); + return -1; + } + + // query data, please make sure the database and table are already created + const char *sql = "SELECT ts, current, location FROM power.meters limit 100"; + TAOS_RES *result = taos_query(taos, sql); + code = taos_errno(result); + if (code != 0) { + fprintf(stderr, "Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code, + taos_errstr(result)); + taos_close(taos); + taos_cleanup(); + return -1; + } + + TAOS_ROW row = NULL; + int rows = 0; + int num_fields = taos_field_count(result); + TAOS_FIELD *fields = taos_fetch_fields(result); + + fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql); + + // fetch the records row by row + while ((row = taos_fetch_row(result))) { + // Add your data processing logic here + + rows++; + } + fprintf(stdout, "total rows: %d\n", rows); + taos_free_result(result); + + // close & clean + taos_close(taos); + taos_cleanup(); + return 0; + // ANCHOR_END: query_data +} + +int main(int argc, char *argv[]) { return DemoQueryData(); } diff --git a/docs/examples/c-ws/sml_insert_demo.c b/docs/examples/c-ws/sml_insert_demo.c new file mode 100644 index 0000000000..0ffde0ff05 --- /dev/null +++ b/docs/examples/c-ws/sml_insert_demo.c @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS standard API example. The same syntax as MySQL, but only a subset +// to compile: gcc -o sml_insert_demo sml_insert_demo.c -ltaos + +#include +#include +#include +#include "taosws.h" + +static int DemoSmlInsert() { + // ANCHOR: schemaless + const char *host = "localhost"; + const char *user = "root"; + const char *password = "taosdata"; + uint16_t port = 6030; + int code = 0; + + // connect + TAOS *taos = taos_connect(host, user, password, NULL, port); + if (taos == NULL) { + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), + taos_errstr(NULL)); + taos_cleanup(); + return -1; + } + + // create database + TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power"); + code = taos_errno(result); + if (code != 0) { + fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result)); + taos_close(taos); + taos_cleanup(); + return -1; + } + taos_free_result(result); + + // use database + result = taos_query(taos, "USE power"); + code = taos_errno(result); + if (code != 0) { + fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result)); + taos_close(taos); + taos_cleanup(); + return -1; + } + taos_free_result(result); + + // schemaless demo data + char *line_demo = + "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 " + "1626006833639"; + char *telnet_demo = "metric_telnet 1707095283260 4 host=host0 interface=eth0"; + char *json_demo = + "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, " + "\"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"; + + // influxdb line protocol + char *lines[] = {line_demo}; + result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); + code = taos_errno(result); + if (code != 0) { + fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo, code, + taos_errstr(result)); + taos_close(taos); + taos_cleanup(); + return -1; + } + + int rows = taos_affected_rows(result); + fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", rows); + taos_free_result(result); + + // opentsdb telnet protocol + char *telnets[] = {telnet_demo}; + result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); + code = taos_errno(result); + if (code != 0) { + fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo, code, + taos_errstr(result)); + taos_close(taos); + taos_cleanup(); + return -1; + } + + rows = taos_affected_rows(result); + fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", rows); + taos_free_result(result); + + // opentsdb json protocol + char *jsons[1] = {0}; + // allocate memory for json data. can not use static memory. + size_t size = 1024; + jsons[0] = malloc(size); + if (jsons[0] == NULL) { + fprintf(stderr, "Failed to allocate memory: %zu bytes.\n", size); + taos_close(taos); + taos_cleanup(); + return -1; + } + (void)strncpy(jsons[0], json_demo, 1023); + result = taos_schemaless_insert(taos, jsons, 1, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED); + code = taos_errno(result); + if (code != 0) { + free(jsons[0]); + fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo, code, + taos_errstr(result)); + taos_close(taos); + taos_cleanup(); + return -1; + } + free(jsons[0]); + + rows = taos_affected_rows(result); + fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", rows); + taos_free_result(result); + + // close & clean + taos_close(taos); + taos_cleanup(); + return 0; + // ANCHOR_END: schemaless +} + +int main(int argc, char *argv[]) { + return DemoSmlInsert(); +} diff --git a/docs/examples/c-ws/stmt_insert_demo.c b/docs/examples/c-ws/stmt_insert_demo.c new file mode 100644 index 0000000000..2f45a5b653 --- /dev/null +++ b/docs/examples/c-ws/stmt_insert_demo.c @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS standard API example. The same syntax as MySQL, but only a subset +// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos + +#include +#include +#include +#include +#include "taosws.h" + +/** + * @brief execute sql only. + * + * @param taos + * @param sql + */ +void executeSQL(TAOS *taos, const char *sql) { + TAOS_RES *res = taos_query(taos, sql); + int code = taos_errno(res); + if (code != 0) { + fprintf(stderr, "%s\n", taos_errstr(res)); + taos_free_result(res); + taos_close(taos); + exit(EXIT_FAILURE); + } + taos_free_result(res); +} + +/** + * @brief check return status and exit program when error occur. + * + * @param stmt + * @param code + * @param msg + */ +void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) { + if (code != 0) { + fprintf(stderr, "%s. code: %d, error: %s\n", msg,code,taos_stmt_errstr(stmt)); + taos_stmt_close(stmt); + exit(EXIT_FAILURE); + } +} + +typedef struct { + int64_t ts; + float current; + int voltage; + float phase; +} Row; + +int num_of_sub_table = 10; +int num_of_row = 10; +int total_affected = 0; +/** + * @brief insert data using stmt API + * + * @param taos + */ +void insertData(TAOS *taos) { + // init + TAOS_STMT *stmt = taos_stmt_init(taos); + if (stmt == NULL) { + fprintf(stderr, "Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL)); + exit(EXIT_FAILURE); + } + // prepare + const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + checkErrorCode(stmt, code, "Failed to execute taos_stmt_prepare"); + for (int i = 1; i <= num_of_sub_table; i++) { + char table_name[20]; + sprintf(table_name, "d_bind_%d", i); + char location[20]; + sprintf(location, "location_%d", i); + + // set table name and tags + TAOS_MULTI_BIND tags[2]; + // groupId + tags[0].buffer_type = TSDB_DATA_TYPE_INT; + tags[0].buffer_length = sizeof(int); + tags[0].length = (int32_t *)&tags[0].buffer_length; + tags[0].buffer = &i; + tags[0].is_null = NULL; + tags[0].num = 1; + // location + tags[1].buffer_type = TSDB_DATA_TYPE_BINARY; + tags[1].buffer_length = strlen(location); + tags[1].length =(int32_t *) &tags[1].buffer_length; + tags[1].buffer = location; + tags[1].is_null = NULL; + tags[1].num = 1; + code = taos_stmt_set_tbname_tags(stmt, table_name, tags); + checkErrorCode(stmt, code, "Failed to set table name and tags\n"); + + // insert rows + TAOS_MULTI_BIND params[4]; + // ts + params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[0].buffer_length = sizeof(int64_t); + params[0].length = (int32_t *)¶ms[0].buffer_length; + params[0].is_null = NULL; + params[0].num = 1; + // current + params[1].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[1].buffer_length = sizeof(float); + params[1].length = (int32_t *)¶ms[1].buffer_length; + params[1].is_null = NULL; + params[1].num = 1; + // voltage + params[2].buffer_type = TSDB_DATA_TYPE_INT; + params[2].buffer_length = sizeof(int); + params[2].length = (int32_t *)¶ms[2].buffer_length; + params[2].is_null = NULL; + params[2].num = 1; + // phase + params[3].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[3].buffer_length = sizeof(float); + params[3].length = (int32_t *)¶ms[3].buffer_length; + params[3].is_null = NULL; + params[3].num = 1; + + for (int j = 0; j < num_of_row; j++) { + struct timeval tv; + gettimeofday(&tv, NULL); + long long milliseconds = tv.tv_sec * 1000LL + tv.tv_usec / 1000; // current timestamp in milliseconds + int64_t ts = milliseconds + j; + float current = (float)rand() / RAND_MAX * 30; + int voltage = rand() % 300; + float phase = (float)rand() / RAND_MAX; + params[0].buffer = &ts; + params[1].buffer = ¤t; + params[2].buffer = &voltage; + params[3].buffer = &phase; + // bind param + code = taos_stmt_bind_param(stmt, params); + checkErrorCode(stmt, code, "Failed to bind param"); + } + // add batch + code = taos_stmt_add_batch(stmt); + checkErrorCode(stmt, code, "Failed to add batch"); + // execute batch + code = taos_stmt_execute(stmt); + checkErrorCode(stmt, code, "Failed to exec stmt"); + // get affected rows + int affected = taos_stmt_affected_rows_once(stmt); + total_affected += affected; + } + fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected); + taos_stmt_close(stmt); +} + +int main() { + const char *host = "localhost"; + const char *user = "root"; + const char *password = "taosdata"; + uint16_t port = 6030; + TAOS *taos = taos_connect(host, user, password, NULL, port); + if (taos == NULL) { + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); + taos_cleanup(); + exit(EXIT_FAILURE); + } + // create database and table + executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power"); + executeSQL(taos, "USE power"); + executeSQL(taos, + "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS " + "(groupId INT, location BINARY(24))"); + insertData(taos); + taos_close(taos); + taos_cleanup(); +} diff --git a/docs/examples/c-ws/tmq_demo.c b/docs/examples/c-ws/tmq_demo.c new file mode 100644 index 0000000000..5739a44c14 --- /dev/null +++ b/docs/examples/c-ws/tmq_demo.c @@ -0,0 +1,514 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread + +#include +#include +#include +#include +#include +#include +#include +#include "taosws.h" + +volatile int thread_stop = 0; +static int running = 1; +static int count = 0; +const char* topic_name = "topic_meters"; + +typedef struct { + const char* enable_auto_commit; + const char* auto_commit_interval_ms; + const char* group_id; + const char* client_id; + const char* td_connect_host; + const char* td_connect_port; + const char* td_connect_user; + const char* td_connect_pass; + const char* auto_offset_reset; +} ConsumerConfig; + +ConsumerConfig config = { + .enable_auto_commit = "true", + .auto_commit_interval_ms = "1000", + .group_id = "group1", + .client_id = "client1", + .td_connect_host = "localhost", + .td_connect_port = "6030", + .td_connect_user = "root", + .td_connect_pass = "taosdata", + .auto_offset_reset = "latest" +}; + +void* prepare_data(void* arg) { + const char* host = "localhost"; + const char* user = "root"; + const char* password = "taosdata"; + uint16_t port = 6030; + int code = 0; + TAOS* pConn = taos_connect(host, user, password, NULL, port); + if (pConn == NULL) { + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); + taos_cleanup(); + return NULL; + } + + TAOS_RES* pRes; + int i = 1; + + while (!thread_stop) { + char buf[200] = {0}; + i++; + snprintf( + buf, sizeof(buf), + "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + %da, 10.30000, " + "219, 0.31000)", + i); + + pRes = taos_query(pConn, buf); + code = taos_errno(pRes); + if (code != 0) { + fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + } + taos_free_result(pRes); + sleep(1); + } + fprintf(stdout, "Prepare data thread exit\n"); + return NULL; +} + +// ANCHOR: msg_process +int32_t msg_process(TAOS_RES* msg) { + int32_t rows = 0; + const char* topicName = tmq_get_topic_name(msg); + const char* dbName = tmq_get_db_name(msg); + int32_t vgroupId = tmq_get_vgroup_id(msg); + + while (true) { + // get one row data from message + TAOS_ROW row = taos_fetch_row(msg); + if (row == NULL) break; + + // Add your data processing logic here + + rows++; + } + + return rows; +} +// ANCHOR_END: msg_process + +TAOS* init_env() { + const char* host = "localhost"; + const char* user = "root"; + const char* password = "taosdata"; + uint16_t port = 6030; + int code = 0; + TAOS* pConn = taos_connect(host, user, password, NULL, port); + if (pConn == NULL) { + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); + taos_cleanup(); + return NULL; + } + + TAOS_RES* pRes; + // drop database if exists + pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters"); + code = taos_errno(pRes); + if (code != 0) { + fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "DROP DATABASE IF EXISTS power"); + code = taos_errno(pRes); + if (code != 0) { + fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + // create database + pRes = taos_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600"); + code = taos_errno(pRes); + if (code != 0) { + fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + // create super table + pRes = taos_query( + pConn, + "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS " + "(groupId INT, location BINARY(24))"); + code = taos_errno(pRes); + if (code != 0) { + fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + return pConn; + +END: + taos_free_result(pRes); + taos_close(pConn); + return NULL; +} + +void deinit_env(TAOS* pConn) { + if (pConn) + taos_close(pConn); +} + +int32_t create_topic(TAOS* pConn) { + TAOS_RES* pRes; + int code = 0; + + if (!pConn) { + fprintf(stderr, "Invalid input parameter.\n"); + return -1; + } + + pRes = taos_query(pConn, "USE power"); + code = taos_errno(pRes); + if (taos_errno(pRes) != 0) { + fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); + code = taos_errno(pRes); + if (code != 0) { + fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + return 0; +} + +int32_t drop_topic(TAOS* pConn) { + TAOS_RES* pRes; + int code = 0; + + if (!pConn) { + fprintf(stderr, "Invalid input parameter.\n"); + return -1; + } + + pRes = taos_query(pConn, "USE power"); + code = taos_errno(pRes); + if (taos_errno(pRes) != 0) { + fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters"); + code = taos_errno(pRes); + if (code != 0) { + fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + return 0; +} + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + count +=1; + fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count); +} + +// ANCHOR: create_consumer_1 +tmq_t* build_consumer(const ConsumerConfig* config) { + tmq_conf_res_t code; + tmq_t* tmq = NULL; + + // create a configuration object + tmq_conf_t* conf = tmq_conf_new(); + + // set the configuration parameters + code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "group.id", config->group_id); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "client.id", config->client_id); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + + // set the callback function for auto commit + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + // create a consumer object + tmq = tmq_consumer_new(conf, NULL, 0); + +_end: + // destroy the configuration object + tmq_conf_destroy(conf); + return tmq; +} +// ANCHOR_END: create_consumer_1 + +// ANCHOR: build_topic_list +// build a topic list used to subscribe +tmq_list_t* build_topic_list() { + // create a empty topic list + tmq_list_t* topicList = tmq_list_new(); + + // append topic name to the list + int32_t code = tmq_list_append(topicList, topic_name); + if (code) { + // if failed, destroy the list and return NULL + tmq_list_destroy(topicList); + fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n", + topic_name, config.group_id, config.client_id, code, tmq_err2str(code)); + return NULL; + } + // if success, return the list + return topicList; +} +// ANCHOR_END: build_topic_list + +// ANCHOR: basic_consume_loop +void basic_consume_loop(tmq_t* tmq) { + int32_t totalRows = 0; // total rows consumed + int32_t msgCnt = 0; // total messages consumed + int32_t timeout = 5000; // poll timeout + + while (running) { + // poll message from TDengine + TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); + if (tmqmsg) { + msgCnt++; + + // Add your data processing logic here + totalRows += msg_process(tmqmsg); + + // free the message + taos_free_result(tmqmsg); + } + if (msgCnt > 50) { + // consume 50 messages and break + break; + } + } + + // print the result: total messages and total rows consumed + fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} +// ANCHOR_END: basic_consume_loop + +// ANCHOR: consume_repeatly +void consume_repeatly(tmq_t* tmq) { + int32_t numOfAssignment = 0; + tmq_topic_assignment* pAssign = NULL; + + // get the topic assignment + int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment); + if (code != 0 || pAssign == NULL || numOfAssignment == 0) { + fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n", + topic_name, config.group_id, config.client_id, code, tmq_err2str(code)); + return; + } + + // seek to the earliest offset + for (int32_t i = 0; i < numOfAssignment; ++i) { + tmq_topic_assignment* p = &pAssign[i]; + + code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin); + if (code != 0) { + fprintf(stderr, "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n", + topic_name, config.group_id, config.client_id, p->vgId, code, tmq_err2str(code)); + break; + } + } + if (code == 0) + fprintf(stdout, "Assignment seek to beginning successfully.\n"); + + // free the assignment array + tmq_free_assignment(pAssign); + + // let's consume the messages again + basic_consume_loop(tmq); +} +// ANCHOR_END: consume_repeatly + +// ANCHOR: manual_commit +void manual_commit(tmq_t* tmq) { + int32_t totalRows = 0; // total rows consumed + int32_t msgCnt = 0; // total messages consumed + int32_t timeout = 5000; // poll timeout + + while (running) { + // poll message from TDengine + TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); + if (tmqmsg) { + msgCnt++; + // process the message + totalRows += msg_process(tmqmsg); + // commit the message + int32_t code = tmq_commit_sync(tmq, tmqmsg); + if (code) { + fprintf(stderr, "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n", + topic_name, config.group_id, config.client_id, code, tmq_err2str(code)); + // free the message + taos_free_result(tmqmsg); + break; + } else { + fprintf(stdout, "Commit offset manually successfully.\n"); + } + // free the message + taos_free_result(tmqmsg); + } + if (msgCnt > 50) { + // consume 50 messages and break + break; + } + } + + // print the result: total messages and total rows consumed + fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows); +} +// ANCHOR_END: manual_commit + +int main(int argc, char* argv[]) { + int32_t code; + pthread_t thread_id; + + TAOS* pConn = init_env(); + if (pConn == NULL) { + fprintf(stderr, "Failed to init env.\n"); + return -1; + } + + if (create_topic(pConn) < 0) { + fprintf(stderr, "Failed to create topic.\n"); + return -1; + } + + if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) { + fprintf(stderr, "Failed to create thread.\n"); + return -1; + } + + // ANCHOR: create_consumer_2 + tmq_t* tmq = build_consumer(&config); + if (NULL == tmq) { + fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n", + config.td_connect_host, config.group_id, config.client_id); + return -1; + } else { + fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n", + config.td_connect_host, config.group_id, config.client_id); + } + + // ANCHOR_END: create_consumer_2 + + // ANCHOR: subscribe_3 + tmq_list_t* topic_list = build_topic_list(); + if (NULL == topic_list) { + fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n", + topic_name, config.group_id, config.client_id); + return -1; + } + + if ((code = tmq_subscribe(tmq, topic_list))) { + fprintf(stderr, "Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n", + topic_name, config.group_id, config.client_id, code, tmq_err2str(code)); + } else { + fprintf(stdout, "Subscribe topics successfully.\n"); + } + + tmq_list_destroy(topic_list); + + basic_consume_loop(tmq); + // ANCHOR_END: subscribe_3 + + consume_repeatly(tmq); + + manual_commit(tmq); + + // ANCHOR: unsubscribe_and_close + // unsubscribe the topic + code = tmq_unsubscribe(tmq); + if (code) { + fprintf(stderr, "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n", + topic_name, config.group_id, config.client_id, code, tmq_err2str(code)); + } else { + fprintf(stdout, "Consumer unsubscribed successfully.\n"); + } + + // close the consumer + code = tmq_consumer_close(tmq); + if (code) { + fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n", + topic_name, config.group_id, config.client_id, code, tmq_err2str(code)); + } else { + fprintf(stdout, "Consumer closed successfully.\n"); + } + // ANCHOR_END: unsubscribe_and_close + + thread_stop = 1; + pthread_join(thread_id, NULL); + + if (drop_topic(pConn) < 0) { + fprintf(stderr, "Failed to drop topic.\n"); + return -1; + } + + deinit_env(pConn); + return 0; +} diff --git a/docs/examples/c-ws/with_reqid_demo.c b/docs/examples/c-ws/with_reqid_demo.c new file mode 100644 index 0000000000..3c748b876e --- /dev/null +++ b/docs/examples/c-ws/with_reqid_demo.c @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS standard API example. The same syntax as MySQL, but only a subset +// to compile: gcc -o with_reqid_demo with_reqid_demo.c -ltaos + +#include +#include +#include +#include +#include "taosws.h" + +static int DemoWithReqId() { + // ANCHOR: with_reqid + const char *host = "localhost"; + const char *user = "root"; + const char *password = "taosdata"; + uint16_t port = 6030; + int code = 0; + + // connect + TAOS *taos = taos_connect(host, user, password, NULL, port); + if (taos == NULL) { + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), + taos_errstr(NULL)); + taos_cleanup(); + return -1; + } + + const char *sql = "SELECT ts, current, location FROM power.meters limit 1"; + // query data with reqid + long reqid = 3L; + TAOS_RES *result = taos_query_with_reqid(taos, sql, reqid); + code = taos_errno(result); + if (code != 0) { + fprintf(stderr, "Failed to execute sql withQID: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code, + taos_errstr(result)); + taos_close(taos); + taos_cleanup(); + return -1; + } + + TAOS_ROW row = NULL; + int rows = 0; + int num_fields = taos_field_count(result); + TAOS_FIELD *fields = taos_fetch_fields(result); + + fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql); + + // fetch the records row by row + while ((row = taos_fetch_row(result))) { + // Add your data processing logic here + + rows++; + } + fprintf(stdout, "total rows: %d\n", rows); + taos_free_result(result); + + // close & clean + taos_close(taos); + taos_cleanup(); + return 0; + // ANCHOR_END: with_reqid +} + +int main(int argc, char *argv[]) { return DemoWithReqId(); }