Merge branch 'docs/sheyj-3.0' of github.com:taosdata/TDengine into docs/sheyj-3.0

This commit is contained in:
sheyanjie-qq 2024-08-16 17:47:17 +08:00
commit 72ed6b52ec
12 changed files with 177 additions and 157 deletions

View File

@ -12,11 +12,11 @@ int main() {
uint16_t port = 6030; // 0 means use the default port uint16_t port = 6030; // 0 means use the default port
TAOS *taos = taos_connect(host, user, passwd, db, port); TAOS *taos = taos_connect(host, user, passwd, db, port);
if (taos == NULL) { if (taos == NULL) {
printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
printf("Connected to %s:%hu successfully.\n", host, port); fprintf(stdout, "Connected to %s:%hu successfully.\n", host, port);
/* put your code here for read and write */ /* put your code here for read and write */

View File

@ -33,7 +33,7 @@ static int DemoCreateDB() {
// connect // connect
TAOS *taos = taos_connect(host, user, password, NULL, port); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL)); taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -43,13 +43,13 @@ static int DemoCreateDB() {
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power"); TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result)); fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
taos_free_result(result); taos_free_result(result);
printf("Create database power successfully.\n"); fprintf(stdout, "Create database power successfully.\n");
// create table // create table
const char *sql = const char *sql =
@ -58,13 +58,13 @@ static int DemoCreateDB() {
result = taos_query(taos, sql); result = taos_query(taos, sql);
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("Failed to create stable power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result)); fprintf(stderr, "Failed to create stable power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
taos_free_result(result); taos_free_result(result);
printf("Create stable power.meters successfully.\n"); fprintf(stdout, "Create stable power.meters successfully.\n");
// close & clean // close & clean
taos_close(taos); taos_close(taos);

View File

@ -33,7 +33,7 @@ static int DemoInsertData() {
// connect // connect
TAOS *taos = taos_connect(host, user, password, NULL, port); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL)); taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -53,7 +53,7 @@ static int DemoInsertData() {
TAOS_RES *result = taos_query(taos, sql); TAOS_RES *result = taos_query(taos, sql);
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result)); fprintf(stderr, "Failed to insert data to power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -62,7 +62,7 @@ static int DemoInsertData() {
// you can check affectedRows here // you can check affectedRows here
int rows = taos_affected_rows(result); int rows = taos_affected_rows(result);
printf("Successfully inserted %d rows into power.meters.\n", rows); fprintf(stdout, "Successfully inserted %d rows into power.meters.\n", rows);
// close & clean // close & clean
taos_close(taos); taos_close(taos);

View File

@ -33,7 +33,7 @@ static int DemoQueryData() {
// connect // connect
TAOS *taos = taos_connect(host, user, password, NULL, port); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL)); taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -44,7 +44,7 @@ static int DemoQueryData() {
TAOS_RES *result = taos_query(taos, sql); TAOS_RES *result = taos_query(taos, sql);
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code, fprintf(stderr, "Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code,
taos_errstr(result)); taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
@ -56,20 +56,15 @@ static int DemoQueryData() {
int num_fields = taos_field_count(result); int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
printf("fields: %d\n", num_fields); fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
printf("sql: %s, result:\n", sql);
// fetch the records row by row // fetch the records row by row
while ((row = taos_fetch_row(result))) { while ((row = taos_fetch_row(result))) {
char temp[1024] = {0}; // Add your data processing logic here
if (taos_print_row(temp, row, fields, num_fields) < 0) {
printf("Failed to print row\n");
break;
}
printf("%s\n", temp);
rows++; rows++;
} }
printf("total rows: %d\n", rows); fprintf(stdout, "total rows: %d\n", rows);
taos_free_result(result); taos_free_result(result);
// close & clean // close & clean

View File

@ -32,7 +32,7 @@ static int DemoSmlInsert() {
// connect // connect
TAOS *taos = taos_connect(host, user, password, NULL, port); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL)); taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -42,7 +42,7 @@ static int DemoSmlInsert() {
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power"); TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result)); fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -53,7 +53,7 @@ static int DemoSmlInsert() {
result = taos_query(taos, "USE power"); result = taos_query(taos, "USE power");
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result)); fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -74,7 +74,7 @@ static int DemoSmlInsert() {
result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo, code, fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo, code,
taos_errstr(result)); taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
@ -82,7 +82,7 @@ static int DemoSmlInsert() {
} }
int rows = taos_affected_rows(result); int rows = taos_affected_rows(result);
printf("Insert %d rows of schemaless line data successfully.\n", rows); fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", rows);
taos_free_result(result); taos_free_result(result);
// opentsdb telnet protocol // opentsdb telnet protocol
@ -90,7 +90,7 @@ static int DemoSmlInsert() {
result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo, code, fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo, code,
taos_errstr(result)); taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
@ -98,15 +98,16 @@ static int DemoSmlInsert() {
} }
rows = taos_affected_rows(result); rows = taos_affected_rows(result);
printf("Insert %d rows of schemaless telnet data successfully.\n", rows); fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", rows);
taos_free_result(result); taos_free_result(result);
// opentsdb json protocol // opentsdb json protocol
char *jsons[1] = {0}; char *jsons[1] = {0};
// allocate memory for json data. can not use static memory. // allocate memory for json data. can not use static memory.
jsons[0] = malloc(1024); size_t size = 1024;
jsons[0] = malloc(size);
if (jsons[0] == NULL) { if (jsons[0] == NULL) {
printf("Failed to allocate memory\n"); fprintf(stderr, "Failed to allocate memory: %zu bytes.\n", size);
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -116,7 +117,7 @@ static int DemoSmlInsert() {
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
free(jsons[0]); free(jsons[0]);
printf("Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo, code, fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo, code,
taos_errstr(result)); taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
@ -125,7 +126,7 @@ static int DemoSmlInsert() {
free(jsons[0]); free(jsons[0]);
rows = taos_affected_rows(result); rows = taos_affected_rows(result);
printf("Insert %d rows of schemaless json data successfully.\n", rows); fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", rows);
taos_free_result(result); taos_free_result(result);
// close & clean // close & clean

View File

@ -32,7 +32,7 @@ void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql); TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res); int code = taos_errno(res);
if (code != 0) { if (code != 0) {
printf("%s\n", taos_errstr(res)); fprintf(stderr, "%s\n", taos_errstr(res));
taos_free_result(res); taos_free_result(res);
taos_close(taos); taos_close(taos);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
@ -49,7 +49,7 @@ void executeSQL(TAOS *taos, const char *sql) {
*/ */
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) { void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
if (code != 0) { if (code != 0) {
printf("%s. code: %d, error: %s\n", msg,code,taos_stmt_errstr(stmt)); fprintf(stderr, "%s. code: %d, error: %s\n", msg,code,taos_stmt_errstr(stmt));
taos_stmt_close(stmt); taos_stmt_close(stmt);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -74,7 +74,7 @@ void insertData(TAOS *taos) {
// init // init
TAOS_STMT *stmt = taos_stmt_init(taos); TAOS_STMT *stmt = taos_stmt_init(taos);
if (stmt == NULL) { if (stmt == NULL) {
printf("Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL)); fprintf(stderr, "Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL));
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// prepare // prepare
@ -159,7 +159,7 @@ void insertData(TAOS *taos) {
int affected = taos_stmt_affected_rows_once(stmt); int affected = taos_stmt_affected_rows_once(stmt);
total_affected += affected; total_affected += affected;
} }
printf("Successfully inserted %d rows to power.meters.\n", total_affected); fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
taos_stmt_close(stmt); taos_stmt_close(stmt);
} }
@ -170,7 +170,7 @@ int main() {
uint16_t port = 6030; uint16_t port = 6030;
TAOS *taos = taos_connect(host, user, password, NULL, port); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }

View File

@ -26,6 +26,7 @@
volatile int thread_stop = 0; volatile int thread_stop = 0;
static int running = 1; static int running = 1;
static int count = 0;
const char* topic_name = "topic_meters"; const char* topic_name = "topic_meters";
typedef struct { typedef struct {
@ -40,6 +41,18 @@ typedef struct {
const char* auto_offset_reset; const char* auto_offset_reset;
} ConsumerConfig; } ConsumerConfig;
ConsumerConfig config = {
.enable_auto_commit = "true",
.auto_commit_interval_ms = "1000",
.group_id = "group1",
.client_id = "client1",
.td_connect_host = "localhost",
.td_connect_port = "6030",
.td_connect_user = "root",
.td_connect_pass = "taosdata",
.auto_offset_reset = "latest"
};
void* prepare_data(void* arg) { void* prepare_data(void* arg) {
const char* host = "localhost"; const char* host = "localhost";
const char* user = "root"; const char* user = "root";
@ -48,8 +61,7 @@ void* prepare_data(void* arg) {
int code = 0; int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port); TAOS* pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) { if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return NULL; return NULL;
} }
@ -69,8 +81,7 @@ void* prepare_data(void* arg) {
pRes = taos_query(pConn, buf); pRes = taos_query(pConn, buf);
code = taos_errno(pRes); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
sleep(1); sleep(1);
@ -80,43 +91,28 @@ void* prepare_data(void* arg) {
} }
// ANCHOR: msg_process // ANCHOR: msg_process
static int32_t msg_process(TAOS_RES* msg) { int32_t msg_process(TAOS_RES* msg) {
char buf[1024]; // buf to store the row content char buf[1024]; // buf to store the row content
int32_t rows = 0; int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg); const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg); const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg); int32_t vgroupId = tmq_get_vgroup_id(msg);
fprintf(stdout, "topic: %s\n", topicName); while (true) {
fprintf(stdout, "db: %s\n", dbName);
fprintf(stdout, "vgroup id: %d\n", vgroupId);
while (1) {
// get one row data from message // get one row data from message
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
// get the field information // Add your data processing logic here
TAOS_FIELD* fields = taos_fetch_fields(msg);
// get the number of fields
int32_t numOfFields = taos_field_count(msg);
// get the precision of the result
int32_t precision = taos_result_precision(msg);
rows++; rows++;
// print the row content
if (taos_print_row(buf, row, fields, numOfFields) < 0) {
fprintf(stderr, "Failed to print row\n");
break;
}
// print the precision and row content to the console
fprintf(stdout, "precision: %d, data: %s\n", precision, buf);
} }
return rows; return rows;
} }
// ANCHOR_END: msg_process // ANCHOR_END: msg_process
static int32_t init_env() { TAOS* init_env() {
const char* host = "localhost"; const char* host = "localhost";
const char* user = "root"; const char* user = "root";
const char* password = "taosdata"; const char* password = "taosdata";
@ -124,10 +120,9 @@ static int32_t init_env() {
int code = 0; int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port); TAOS* pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) { if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return NULL;
} }
TAOS_RES* pRes; TAOS_RES* pRes;
@ -136,6 +131,7 @@ static int32_t init_env() {
code = taos_errno(pRes); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
@ -143,6 +139,7 @@ static int32_t init_env() {
code = taos_errno(pRes); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
@ -150,7 +147,7 @@ static int32_t init_env() {
pRes = taos_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600"); pRes = taos_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
code = taos_errno(pRes); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to create tmqdb, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
@ -165,56 +162,78 @@ static int32_t init_env() {
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn);
return 0; return pConn;
END: END:
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
return -1; return NULL;
} }
int32_t create_topic() { void deinit_env(TAOS* pConn) {
if (pConn)
taos_close(pConn);
}
int32_t create_topic(TAOS* pConn) {
TAOS_RES* pRes; TAOS_RES* pRes;
const char* host = "localhost";
const char* user = "root";
const char* password = "taosdata";
uint16_t port = 6030;
int code = 0; int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) { if (!pConn) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), fprintf(stderr, "Invalid input parameter.\n");
taos_errstr(NULL));
taos_cleanup();
return -1; return -1;
} }
pRes = taos_query(pConn, "USE POWER"); pRes = taos_query(pConn, "USE power");
code = taos_errno(pRes); code = taos_errno(pRes);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
fprintf(stderr, "Failed to use tmqdb, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query( pRes = taos_query(pConn, "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
pConn,
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
code = taos_errno(pRes); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
return 0;
}
taos_close(pConn); int32_t drop_topic(TAOS* pConn) {
TAOS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = taos_query(pConn, "USE power");
code = taos_errno(pRes);
if (taos_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0; return 0;
} }
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param); count +=1;
fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count);
} }
// ANCHOR: create_consumer_1 // ANCHOR: create_consumer_1
@ -289,7 +308,6 @@ _end:
tmq_list_t* build_topic_list() { tmq_list_t* build_topic_list() {
// create a empty topic list // create a empty topic list
tmq_list_t* topicList = tmq_list_new(); tmq_list_t* topicList = tmq_list_new();
const char* topic_name = "topic_meters";
// append topic name to the list // append topic name to the list
int32_t code = tmq_list_append(topicList, topic_name); int32_t code = tmq_list_append(topicList, topic_name);
@ -315,8 +333,10 @@ void basic_consume_loop(tmq_t* tmq) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) { if (tmqmsg) {
msgCnt++; msgCnt++;
// process the message
// Add your data processing logic here
totalRows += msg_process(tmqmsg); totalRows += msg_process(tmqmsg);
// free the message // free the message
taos_free_result(tmqmsg); taos_free_result(tmqmsg);
} }
@ -349,12 +369,13 @@ void consume_repeatly(tmq_t* tmq) {
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin); code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to seek assignment %d to beginning %ld, ErrCode: 0x%x, ErrMessage: %s.\n", i, p->begin, fprintf(stderr, "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
code, tmq_err2str(code)); topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else { break;
fprintf(stdout, "Seek assignment %d to beginning %ld successfully.\n", i, p->begin);
} }
} }
if (code == 0)
fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array // free the assignment array
tmq_free_assignment(pAssign); tmq_free_assignment(pAssign);
@ -380,7 +401,8 @@ void manual_commit(tmq_t* tmq) {
// commit the message // commit the message
int32_t code = tmq_commit_sync(tmq, tmqmsg); int32_t code = tmq_commit_sync(tmq, tmqmsg);
if (code) { if (code) {
fprintf(stderr, "Failed to commit message, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code)); fprintf(stderr, "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
// free the message // free the message
taos_free_result(tmqmsg); taos_free_result(tmqmsg);
break; break;
@ -405,11 +427,14 @@ int main(int argc, char* argv[]) {
int32_t code; int32_t code;
pthread_t thread_id; pthread_t thread_id;
if (init_env() < 0) { TAOS* pConn = init_env();
if (pConn == NULL) {
fprintf(stderr, "Failed to init env.\n");
return -1; return -1;
} }
if (create_topic() < 0) { if (create_topic(pConn) < 0) {
fprintf(stderr, "Failed to create topic.\n");
return -1; return -1;
} }
@ -418,16 +443,6 @@ int main(int argc, char* argv[]) {
return -1; return -1;
} }
ConsumerConfig config = {.enable_auto_commit = "true",
.auto_commit_interval_ms = "1000",
.group_id = "group1",
.client_id = "client1",
.td_connect_host = "localhost",
.td_connect_port = "6030",
.td_connect_user = "root",
.td_connect_pass = "taosdata",
.auto_offset_reset = "latest"};
// ANCHOR: create_consumer_2 // ANCHOR: create_consumer_2
tmq_t* tmq = build_consumer(&config); tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) { if (NULL == tmq) {
@ -435,8 +450,8 @@ int main(int argc, char* argv[]) {
config.td_connect_host, config.group_id, config.client_id); config.td_connect_host, config.group_id, config.client_id);
return -1; return -1;
} else { } else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, , clientId: %s.\n", config.td_connect_host, fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n",
config.group_id, config.client_id); config.td_connect_host, config.group_id, config.client_id);
} }
// ANCHOR_END: create_consumer_2 // ANCHOR_END: create_consumer_2
@ -467,7 +482,8 @@ int main(int argc, char* argv[]) {
// unsubscribe the topic // unsubscribe the topic
code = tmq_unsubscribe(tmq); code = tmq_unsubscribe(tmq);
if (code) { if (code) {
fprintf(stderr, "Failed to unsubscribe consumer, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code)); fprintf(stderr, "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else { } else {
fprintf(stderr, "Consumer unsubscribed successfully.\n"); fprintf(stderr, "Consumer unsubscribed successfully.\n");
} }
@ -484,5 +500,11 @@ int main(int argc, char* argv[]) {
thread_stop = 1; thread_stop = 1;
pthread_join(thread_id, NULL); pthread_join(thread_id, NULL);
if (drop_topic(pConn) < 0) {
fprintf(stderr, "Failed to drop topic.\n");
return -1;
}
deinit_env(pConn);
return 0; return 0;
} }

View File

@ -33,8 +33,7 @@ static int DemoWithReqId() {
// connect // connect
TAOS *taos = taos_connect(host, user, password, NULL, port); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
@ -45,7 +44,7 @@ static int DemoWithReqId() {
TAOS_RES *result = taos_query_with_reqid(taos, sql, reqid); TAOS_RES *result = taos_query_with_reqid(taos, sql, reqid);
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("Failed to execute sql with reqId: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code, taos_errstr(result)); fprintf(stderr, "Failed to execute sql with reqId: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -56,20 +55,16 @@ static int DemoWithReqId() {
int num_fields = taos_field_count(result); int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
printf("fields: %d\n", num_fields); fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql);
printf("sql: %s, result:\n", sql);
// fetch the records row by row // fetch the records row by row
while ((row = taos_fetch_row(result))) { while ((row = taos_fetch_row(result))) {
char temp[1024] = {0}; // Add your data processing logic here
if (taos_print_row(temp, row, fields, num_fields) < 0) {
printf("Failed to print row\n");
break;
}
printf("%s\n", temp);
rows++; rows++;
} }
printf("total rows: %d\n", rows); fprintf(stdout, "total rows: %d\n", rows);
taos_free_result(result); taos_free_result(result);
// close & clean // close & clean

View File

@ -111,11 +111,12 @@ namespace TMQExample
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine($"Failed to create native consumer, " + Console.WriteLine(
$"host: {_host}, " + $"Failed to create native consumer, " +
$"groupId: {_groupId}, " + $"host: {_host}, " +
$"clientId: {_clientId}, " + $"groupId: {_groupId}, " +
$"ErrMessage: {e.Message}"); $"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
@ -163,11 +164,12 @@ namespace TMQExample
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine($"Failed to poll data, " + Console.WriteLine(
$"topic: {_topic}, " + $"Failed to poll data, " +
$"groupId: {_groupId}, " + $"topic: {_topic}, " +
$"clientId: {_clientId}, " + $"groupId: {_groupId}, " +
$"ErrMessage: {e.Message}"); $"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
// ANCHOR_END: subscribe // ANCHOR_END: subscribe
@ -186,13 +188,14 @@ namespace TMQExample
{ {
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0)); consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
} }
Console.WriteLine("Assignment seek to beginning successfully"); Console.WriteLine("Assignment seek to beginning successfully");
} }
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine( Console.WriteLine(
$"Failed to execute seek example, " + $"Failed to seek offset, " +
$"topic: {_topic}, " + $"topic: {_topic}, " +
$"groupId: {_groupId}, " + $"groupId: {_groupId}, " +
$"clientId: {_clientId}, " + $"clientId: {_clientId}, " +
@ -204,12 +207,13 @@ namespace TMQExample
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine($"Failed to execute seek example, " + Console.WriteLine(
$"topic: {_topic}, " + $"Failed to seek offset, " +
$"groupId: {_groupId}, " + $"topic: {_topic}, " +
$"clientId: {_clientId}, " + $"groupId: {_groupId}, " +
$"offset: 0, " + $"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}"); $"offset: 0, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
// ANCHOR_END: seek // ANCHOR_END: seek
@ -240,7 +244,7 @@ namespace TMQExample
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine( Console.WriteLine(
$"Failed to execute commit example, " + $"Failed to commit offset, " +
$"topic: {_topic}, " + $"topic: {_topic}, " +
$"groupId: {_groupId}, " + $"groupId: {_groupId}, " +
$"clientId: {_clientId}, " + $"clientId: {_clientId}, " +
@ -253,7 +257,7 @@ namespace TMQExample
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine( Console.WriteLine(
$"Failed to execute commit example, " + $"Failed to commit offset, " +
$"topic: {_topic}, " + $"topic: {_topic}, " +
$"groupId: {_groupId}, " + $"groupId: {_groupId}, " +
$"clientId: {_clientId}, " + $"clientId: {_clientId}, " +

View File

@ -116,11 +116,12 @@ namespace TMQExample
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine($"Failed to create native consumer, " + Console.WriteLine(
$"host: {_host}, " + $"Failed to create native consumer, " +
$"groupId: {_groupId}, " + $"host: {_host}, " +
$"clientId: {_clientId}, " + $"groupId: {_groupId}, " +
$"ErrMessage: {e.Message}"); $"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
@ -191,13 +192,14 @@ namespace TMQExample
{ {
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0)); consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
} }
Console.WriteLine("Assignment seek to beginning successfully"); Console.WriteLine("Assignment seek to beginning successfully");
} }
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine( Console.WriteLine(
$"Failed to execute seek example, " + $"Failed to seek offset, " +
$"topic: {_topic}, " + $"topic: {_topic}, " +
$"groupId: {_groupId}, " + $"groupId: {_groupId}, " +
$"clientId: {_clientId}, " + $"clientId: {_clientId}, " +
@ -209,12 +211,13 @@ namespace TMQExample
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine($"Failed to execute seek example, " + Console.WriteLine(
$"topic: {_topic}, " + $"Failed to seek offset, " +
$"groupId: {_groupId}, " + $"topic: {_topic}, " +
$"clientId: {_clientId}, " + $"groupId: {_groupId}, " +
$"offset: 0, " + $"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}"); $"offset: 0, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
// ANCHOR_END: seek // ANCHOR_END: seek
@ -245,7 +248,7 @@ namespace TMQExample
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine( Console.WriteLine(
$"Failed to execute commit example, " + $"Failed to commit offset, " +
$"topic: {_topic}, " + $"topic: {_topic}, " +
$"groupId: {_groupId}, " + $"groupId: {_groupId}, " +
$"clientId: {_clientId}, " + $"clientId: {_clientId}, " +
@ -258,7 +261,7 @@ namespace TMQExample
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine( Console.WriteLine(
$"Failed to execute commit example, " + $"Failed to commit offset, " +
$"topic: {_topic}, " + $"topic: {_topic}, " +
$"groupId: {_groupId}, " + $"groupId: {_groupId}, " +
$"clientId: {_clientId}, " + $"clientId: {_clientId}, " +

View File

@ -112,7 +112,7 @@ func main() {
}, 0) }, 0)
if err != nil { if err != nil {
log.Fatalf( log.Fatalf(
"Failed to execute seek example, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n", "Failed to execute seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
topic, topic,
groupID, groupID,
clientID, clientID,

View File

@ -130,7 +130,7 @@ func main() {
}, 0) }, 0)
if err != nil { if err != nil {
log.Fatalf( log.Fatalf(
"Failed to execute seek example, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n", "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
topic, topic,
groupID, groupID,
clientID, clientID,