add c websocket sample

This commit is contained in:
sheyanjie-qq 2024-09-26 11:39:02 +08:00
parent cd68e55457
commit 73557046fd
10 changed files with 1311 additions and 0 deletions

3
docs/examples/c-ws/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*
!*.c
!.gitignore

View File

@ -0,0 +1,21 @@
// compile with
// gcc connect_example.c -o connect_example -ltaos
#include <stdio.h>
#include <stdlib.h>
#include "taosws.h"
int main() {
ws_enable_log("debug");
char *dsn = "ws://localhost:6041";
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
fprintf(stdout, "Connected to %s successfully.\n", dsn);
/* put your code here for read and write */
// close & clean
ws_close(taos);
}

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o create_db_demo create_db_demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosws.h"
static int DemoCreateDB() {
ws_enable_log("debug");
// ANCHOR: create_db_and_table
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// create database
WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
fprintf(stdout, "Create database power successfully.\n");
// create table
const char *sql =
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId "
"INT, location BINARY(24))";
result = ws_query(taos, sql);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create stable power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
fprintf(stdout, "Create stable power.meters successfully.\n");
// close & clean
ws_close(taos);
return 0;
// ANCHOR_END: create_db_and_table
}
int main(int argc, char *argv[]) { return DemoCreateDB(); }

View File

@ -0,0 +1,75 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o insert_data_demo insert_data_demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosws.h"
static int DemoInsertData() {
// ANCHOR: insert_data
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
// insert data, please make sure the database and table are already created
const char *sql =
"INSERT INTO "
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') "
"VALUES "
"(NOW + 1a, 10.30000, 219, 0.31000) "
"(NOW + 2a, 12.60000, 218, 0.33000) "
"(NOW + 3a, 12.30000, 221, 0.31000) "
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') "
"VALUES "
"(NOW + 1a, 10.30000, 218, 0.25000) ";
TAOS_RES *result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
// you can check affectedRows here
int rows = taos_affected_rows(result);
fprintf(stdout, "Successfully inserted %d rows into power.meters.\n", rows);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
// ANCHOR_END: insert_data
}
int main(int argc, char *argv[]) { return DemoInsertData(); }

View File

@ -0,0 +1,147 @@
// compile with
// gcc -o multi_bind_example multi_bind_example.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taos.h"
/**
* @brief execute sql only and ignore result set
*
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
printf("%s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
}
/**
* @brief exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt));
(void)taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(TAOS *taos) {
// init
TAOS_STMT *stmt = taos_stmt_init(taos);
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
// bind table name and tags
TAOS_MULTI_BIND tags[2];
char *location = "California.SanFrancisco";
int groupId = 2;
tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[0].buffer_length = strlen(location);
tags[0].length = &tags[0].buffer_length;
tags[0].buffer = location;
tags[0].is_null = NULL;
tags[1].buffer_type = TSDB_DATA_TYPE_INT;
tags[1].buffer_length = sizeof(int);
tags[1].length = &tags[1].buffer_length;
tags[1].buffer = &groupId;
tags[1].is_null = NULL;
code = taos_stmt_set_tbname_tags(stmt, "d1001", tags);
checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags");
// highlight-start
// insert two rows with multi binds
TAOS_MULTI_BIND params[4];
// values to bind
int64_t ts[] = {1648432611249, 1648432611749};
float current[] = {10.3, 12.6};
int voltage[] = {219, 218};
float phase[] = {0.31, 0.33};
// is_null array
char is_null[2] = {0};
// length array
int32_t int64Len[2] = {sizeof(int64_t)};
int32_t floatLen[2] = {sizeof(float)};
int32_t intLen[2] = {sizeof(int)};
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].buffer = ts;
params[0].length = int64Len;
params[0].is_null = is_null;
params[0].num = 2;
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].buffer = current;
params[1].length = floatLen;
params[1].is_null = is_null;
params[1].num = 2;
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].buffer = voltage;
params[2].length = intLen;
params[2].is_null = is_null;
params[2].num = 2;
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].buffer = phase;
params[3].length = floatLen;
params[3].is_null = is_null;
params[3].num = 2;
code = taos_stmt_bind_param_batch(stmt, params); // bind batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch");
code = taos_stmt_add_batch(stmt); // add batch
checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch");
// highlight-end
// execute
code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "failed to execute taos_stmt_execute");
int affectedRows = taos_stmt_affected_rows(stmt);
printf("successfully inserted %d rows\n", affectedRows);
// close
(void)taos_stmt_close(stmt);
}
int main() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
if (taos == NULL) {
printf("failed to connect to server\n");
exit(EXIT_FAILURE);
}
executeSQL(taos, "DROP DATABASE IF EXISTS power");
executeSQL(taos, "CREATE DATABASE power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), "
"groupId INT)");
insertData(taos);
taos_close(taos);
taos_cleanup();
}
// output:
// successfully inserted 2 rows

View File

@ -0,0 +1,77 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o query_data_demo query_data_demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosws.h"
static int DemoQueryData() {
// ANCHOR: query_data
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
// query data, please make sure the database and table are already created
const char *sql = "SELECT ts, current, location FROM power.meters limit 100";
TAOS_RES *result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
TAOS_ROW row = NULL;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
// Add your data processing logic here
rows++;
}
fprintf(stdout, "total rows: %d\n", rows);
taos_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
// ANCHOR_END: query_data
}
int main(int argc, char *argv[]) { return DemoQueryData(); }

View File

@ -0,0 +1,141 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o sml_insert_demo sml_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosws.h"
static int DemoSmlInsert() {
// ANCHOR: schemaless
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
// create database
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
// use database
result = taos_query(taos, "USE power");
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
// schemaless demo data
char *line_demo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 "
"1626006833639";
char *telnet_demo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
char *json_demo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, "
"\"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
// influxdb line protocol
char *lines[] = {line_demo};
result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
int rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", rows);
taos_free_result(result);
// opentsdb telnet protocol
char *telnets[] = {telnet_demo};
result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", rows);
taos_free_result(result);
// opentsdb json protocol
char *jsons[1] = {0};
// allocate memory for json data. can not use static memory.
size_t size = 1024;
jsons[0] = malloc(size);
if (jsons[0] == NULL) {
fprintf(stderr, "Failed to allocate memory: %zu bytes.\n", size);
taos_close(taos);
taos_cleanup();
return -1;
}
(void)strncpy(jsons[0], json_demo, 1023);
result = taos_schemaless_insert(taos, jsons, 1, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
code = taos_errno(result);
if (code != 0) {
free(jsons[0]);
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
free(jsons[0]);
rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", rows);
taos_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
// ANCHOR_END: schemaless
}
int main(int argc, char *argv[]) {
return DemoSmlInsert();
}

View File

@ -0,0 +1,186 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "taosws.h"
/**
* @brief execute sql only.
*
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
fprintf(stderr, "%s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
}
/**
* @brief check return status and exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
fprintf(stderr, "%s. code: %d, error: %s\n", msg,code,taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
typedef struct {
int64_t ts;
float current;
int voltage;
float phase;
} Row;
int num_of_sub_table = 10;
int num_of_row = 10;
int total_affected = 0;
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(TAOS *taos) {
// init
TAOS_STMT *stmt = taos_stmt_init(taos);
if (stmt == NULL) {
fprintf(stderr, "Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL));
exit(EXIT_FAILURE);
}
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "Failed to execute taos_stmt_prepare");
for (int i = 1; i <= num_of_sub_table; i++) {
char table_name[20];
sprintf(table_name, "d_bind_%d", i);
char location[20];
sprintf(location, "location_%d", i);
// set table name and tags
TAOS_MULTI_BIND tags[2];
// groupId
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
tags[0].buffer_length = sizeof(int);
tags[0].length = (int32_t *)&tags[0].buffer_length;
tags[0].buffer = &i;
tags[0].is_null = NULL;
tags[0].num = 1;
// location
tags[1].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[1].buffer_length = strlen(location);
tags[1].length =(int32_t *) &tags[1].buffer_length;
tags[1].buffer = location;
tags[1].is_null = NULL;
tags[1].num = 1;
code = taos_stmt_set_tbname_tags(stmt, table_name, tags);
checkErrorCode(stmt, code, "Failed to set table name and tags\n");
// insert rows
TAOS_MULTI_BIND params[4];
// ts
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].length = (int32_t *)&params[0].buffer_length;
params[0].is_null = NULL;
params[0].num = 1;
// current
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].length = (int32_t *)&params[1].buffer_length;
params[1].is_null = NULL;
params[1].num = 1;
// voltage
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].length = (int32_t *)&params[2].buffer_length;
params[2].is_null = NULL;
params[2].num = 1;
// phase
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].length = (int32_t *)&params[3].buffer_length;
params[3].is_null = NULL;
params[3].num = 1;
for (int j = 0; j < num_of_row; j++) {
struct timeval tv;
gettimeofday(&tv, NULL);
long long milliseconds = tv.tv_sec * 1000LL + tv.tv_usec / 1000; // current timestamp in milliseconds
int64_t ts = milliseconds + j;
float current = (float)rand() / RAND_MAX * 30;
int voltage = rand() % 300;
float phase = (float)rand() / RAND_MAX;
params[0].buffer = &ts;
params[1].buffer = &current;
params[2].buffer = &voltage;
params[3].buffer = &phase;
// bind param
code = taos_stmt_bind_param(stmt, params);
checkErrorCode(stmt, code, "Failed to bind param");
}
// add batch
code = taos_stmt_add_batch(stmt);
checkErrorCode(stmt, code, "Failed to add batch");
// execute batch
code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "Failed to exec stmt");
// get affected rows
int affected = taos_stmt_affected_rows_once(stmt);
total_affected += affected;
}
fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
taos_stmt_close(stmt);
}
int main() {
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
exit(EXIT_FAILURE);
}
// create database and table
executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
insertData(taos);
taos_close(taos);
taos_cleanup();
}

View File

@ -0,0 +1,514 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "taosws.h"
volatile int thread_stop = 0;
static int running = 1;
static int count = 0;
const char* topic_name = "topic_meters";
typedef struct {
const char* enable_auto_commit;
const char* auto_commit_interval_ms;
const char* group_id;
const char* client_id;
const char* td_connect_host;
const char* td_connect_port;
const char* td_connect_user;
const char* td_connect_pass;
const char* auto_offset_reset;
} ConsumerConfig;
ConsumerConfig config = {
.enable_auto_commit = "true",
.auto_commit_interval_ms = "1000",
.group_id = "group1",
.client_id = "client1",
.td_connect_host = "localhost",
.td_connect_port = "6030",
.td_connect_user = "root",
.td_connect_pass = "taosdata",
.auto_offset_reset = "latest"
};
void* prepare_data(void* arg) {
const char* host = "localhost";
const char* user = "root";
const char* password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return NULL;
}
TAOS_RES* pRes;
int i = 1;
while (!thread_stop) {
char buf[200] = {0};
i++;
snprintf(
buf, sizeof(buf),
"INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + %da, 10.30000, "
"219, 0.31000)",
i);
pRes = taos_query(pConn, buf);
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
}
taos_free_result(pRes);
sleep(1);
}
fprintf(stdout, "Prepare data thread exit\n");
return NULL;
}
// ANCHOR: msg_process
int32_t msg_process(TAOS_RES* msg) {
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
rows++;
}
return rows;
}
// ANCHOR_END: msg_process
TAOS* init_env() {
const char* host = "localhost";
const char* user = "root";
const char* password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return NULL;
}
TAOS_RES* pRes;
// drop database if exists
pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "DROP DATABASE IF EXISTS power");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
// create database
pRes = taos_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
// create super table
pRes = taos_query(
pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
return pConn;
END:
taos_free_result(pRes);
taos_close(pConn);
return NULL;
}
void deinit_env(TAOS* pConn) {
if (pConn)
taos_close(pConn);
}
int32_t create_topic(TAOS* pConn) {
TAOS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = taos_query(pConn, "USE power");
code = taos_errno(pRes);
if (taos_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
int32_t drop_topic(TAOS* pConn) {
TAOS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = taos_query(pConn, "USE power");
code = taos_errno(pRes);
if (taos_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
count +=1;
fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count);
}
// ANCHOR: create_consumer_1
tmq_t* build_consumer(const ConsumerConfig* config) {
tmq_conf_res_t code;
tmq_t* tmq = NULL;
// create a configuration object
tmq_conf_t* conf = tmq_conf_new();
// set the configuration parameters
code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "group.id", config->group_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "client.id", config->client_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
// set the callback function for auto commit
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
// create a consumer object
tmq = tmq_consumer_new(conf, NULL, 0);
_end:
// destroy the configuration object
tmq_conf_destroy(conf);
return tmq;
}
// ANCHOR_END: create_consumer_1
// ANCHOR: build_topic_list
// build a topic list used to subscribe
tmq_list_t* build_topic_list() {
// create a empty topic list
tmq_list_t* topicList = tmq_list_new();
// append topic name to the list
int32_t code = tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
tmq_list_destroy(topicList);
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return NULL;
}
// if success, return the list
return topicList;
}
// ANCHOR_END: build_topic_list
// ANCHOR: basic_consume_loop
void basic_consume_loop(tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// Add your data processing logic here
totalRows += msg_process(tmqmsg);
// free the message
taos_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
// ANCHOR_END: basic_consume_loop
// ANCHOR: consume_repeatly
void consume_repeatly(tmq_t* tmq) {
int32_t numOfAssignment = 0;
tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
tmq_topic_assignment* p = &pAssign[i];
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr, "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, tmq_err2str(code));
break;
}
}
if (code == 0)
fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
tmq_free_assignment(pAssign);
// let's consume the messages again
basic_consume_loop(tmq);
}
// ANCHOR_END: consume_repeatly
// ANCHOR: manual_commit
void manual_commit(tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr, "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
// free the message
taos_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
taos_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
// ANCHOR_END: manual_commit
int main(int argc, char* argv[]) {
int32_t code;
pthread_t thread_id;
TAOS* pConn = init_env();
if (pConn == NULL) {
fprintf(stderr, "Failed to init env.\n");
return -1;
}
if (create_topic(pConn) < 0) {
fprintf(stderr, "Failed to create topic.\n");
return -1;
}
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
fprintf(stderr, "Failed to create thread.\n");
return -1;
}
// ANCHOR: create_consumer_2
tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
}
// ANCHOR_END: create_consumer_2
// ANCHOR: subscribe_3
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n",
topic_name, config.group_id, config.client_id);
return -1;
}
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
// ANCHOR_END: subscribe_3
consume_repeatly(tmq);
manual_commit(tmq);
// ANCHOR: unsubscribe_and_close
// unsubscribe the topic
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}
// ANCHOR_END: unsubscribe_and_close
thread_stop = 1;
pthread_join(thread_id, NULL);
if (drop_topic(pConn) < 0) {
fprintf(stderr, "Failed to drop topic.\n");
return -1;
}
deinit_env(pConn);
return 0;
}

View File

@ -0,0 +1,78 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o with_reqid_demo with_reqid_demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosws.h"
static int DemoWithReqId() {
// ANCHOR: with_reqid
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
const char *sql = "SELECT ts, current, location FROM power.meters limit 1";
// query data with reqid
long reqid = 3L;
TAOS_RES *result = taos_query_with_reqid(taos, sql, reqid);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute sql withQID: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
TAOS_ROW row = NULL;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
// Add your data processing logic here
rows++;
}
fprintf(stdout, "total rows: %d\n", rows);
taos_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
// ANCHOR_END: with_reqid
}
int main(int argc, char *argv[]) { return DemoWithReqId(); }