Merge branch 'docs/sheyj-3.0' of github.com:taosdata/TDengine into docs/sheyj-3.0

This commit is contained in:
menshibin 2024-08-12 19:34:07 +08:00
commit 15ef067070
13 changed files with 247 additions and 192 deletions

View File

@ -8,19 +8,15 @@ int main() {
const char *host = "localhost"; const char *host = "localhost";
const char *user = "root"; const char *user = "root";
const char *passwd = "taosdata"; const char *passwd = "taosdata";
// if don't want to connect to a default db, set it to NULL or "" const char *db = NULL; // if don't want to connect to a default db, set it to NULL or ""
const char *db = NULL; uint16_t port = 6030; // 0 means use the default port
uint16_t port = 0; // 0 means use the default port
TAOS *taos = taos_connect(host, user, passwd, db, port); TAOS *taos = taos_connect(host, user, passwd, db, port);
if (taos == NULL) { if (taos == NULL) {
int errno = taos_errno(NULL); printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
const char *msg = taos_errstr(NULL);
printf("%d, %s\n", errno, msg);
printf("failed to connect to server %s, errno: %d, msg: %s\n", host, errno, msg);
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
printf("success to connect server %s\n", host); printf("Connected to %s:%hu successfully.\n", host, port);
/* put your code here for read and write */ /* put your code here for read and write */

View File

@ -25,47 +25,44 @@
static int DemoCreateDB() { static int DemoCreateDB() {
// ANCHOR: create_db_and_table // ANCHOR: create_db_and_table
const char *ip = "localhost"; const char *host = "localhost";
const char *user = "root"; const char *user = "root";
const char *password = "taosdata"; const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect // connect
TAOS *taos = taos_connect(ip, user, password, NULL, 0); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to server %s, reason: %s\n", ip, taos_errstr(NULL)); printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
printf("success to connect server %s\n", ip);
// create database // create database
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power"); TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
int code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("failed to create database power, reason: %s\n", taos_errstr(result)); printf("Failed to create database power, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
taos_free_result(result); taos_free_result(result);
printf("success to create database power\n"); printf("Create database power successfully.\n");
// use database
result = taos_query(taos, "USE power");
taos_free_result(result);
// create table // create table
const char* sql = "CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"; const char* sql = "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))";
result = taos_query(taos, sql); result = taos_query(taos, sql);
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("failed to create stable meters, reason: %s\n", taos_errstr(result)); printf("Failed to create stable power.meters, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s\n.", host, port, code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
taos_free_result(result); taos_free_result(result);
printf("success to create table meters\n"); printf("Create stable meters successfully.\n");
// close & clean // close & clean
taos_close(taos); taos_close(taos);

View File

@ -24,22 +24,19 @@
static int DemoInsertData() { static int DemoInsertData() {
// ANCHOR: insert_data // ANCHOR: insert_data
const char *ip = "localhost"; const char *host = "localhost";
const char *user = "root"; const char *user = "root";
const char *password = "taosdata"; const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect // connect
TAOS *taos = taos_connect(ip, user, password, NULL, 0); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to server %s, reason: %s\n", ip, taos_errstr(NULL)); printf("Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
printf("success to connect server %s\n", ip);
// use database
TAOS_RES *result = taos_query(taos, "USE power");
taos_free_result(result);
// insert data, please make sure the database and table are already created // insert data, please make sure the database and table are already created
const char* sql = "INSERT INTO " \ const char* sql = "INSERT INTO " \
@ -51,10 +48,10 @@ const char* sql = "INSERT INTO "
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " \ "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " \
"VALUES " \ "VALUES " \
"(NOW + 1a, 10.30000, 218, 0.25000) "; "(NOW + 1a, 10.30000, 218, 0.25000) ";
result = taos_query(taos, sql); TAOS_RES *result = taos_query(taos, sql);
int code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("failed to insert data to power.meters, ip: %s, reason: %s\n", ip, taos_errstr(result)); printf("Failed to insert data to power.meters, Server: %s:%hu; ErrCode: 0x%x; ErrMessage: %s\n.", host, port, code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -63,7 +60,7 @@ taos_free_result(result);
// you can check affectedRows here // you can check affectedRows here
int rows = taos_affected_rows(result); int rows = taos_affected_rows(result);
printf("success to insert %d rows data to power.meters\n", rows); printf("Successfully inserted %d rows into power.meters.\n", rows);
// close & clean // close & clean
taos_close(taos); taos_close(taos);

View File

@ -25,29 +25,28 @@
static int DemoQueryData() { static int DemoQueryData() {
// ANCHOR: query_data // ANCHOR: query_data
const char *ip = "localhost"; const char *host = "localhost";
const char *user = "root"; const char *user = "root";
const char *password = "taosdata"; const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect // connect
TAOS *taos = taos_connect(ip, user, password, NULL, 0); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to server %s, reason: %s\n", ip, taos_errstr(NULL)); printf("Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
printf("success to connect server %s\n", ip);
// use database
TAOS_RES *result = taos_query(taos, "USE power");
taos_free_result(result);
// query data, please make sure the database and table are already created // query data, please make sure the database and table are already created
const char* sql = "SELECT ts, current, location FROM power.meters limit 100"; const char* sql = "SELECT ts, current, location FROM power.meters limit 100";
result = taos_query(taos, sql); TAOS_RES *result = taos_query(taos, sql);
int code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("failed to query data from power.meters, ip: %s, reason: %s\n", ip, taos_errstr(result)); printf("Failed to query data from power.meters, Server: %s:%hu; ErrCode: 0x%x; ErrMessage: %s\n.", host, port, code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -70,7 +69,6 @@ while ((row = taos_fetch_row(result))) {
} }
printf("total rows: %d\n", rows); printf("total rows: %d\n", rows);
taos_free_result(result); taos_free_result(result);
printf("success to query data from power.meters\n");
// close & clean // close & clean
taos_close(taos); taos_close(taos);

View File

@ -24,33 +24,41 @@
static int DemoSmlInsert() { static int DemoSmlInsert() {
// ANCHOR: schemaless // ANCHOR: schemaless
const char *ip = "localhost"; const char *host = "localhost";
const char *user = "root"; const char *user = "root";
const char *password = "taosdata"; const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect // connect
TAOS *taos = taos_connect(ip, user, password, NULL, 0); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to server %s, reason: %s\n", ip, taos_errstr(NULL)); printf("Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
printf("success to connect server %s\n", ip);
// create database // create database
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power"); TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
int code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
printf("failed to create database power, reason: %s\n", taos_errstr(result)); printf("Failed to create database power, Server: %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
taos_free_result(result); taos_free_result(result);
printf("success to create database power\n"); printf("Create database power successfully.\n");
// use database // use database
result = taos_query(taos, "USE power"); result = taos_query(taos, "USE power");
code = taos_errno(result);
if (code != 0) {
printf("Failed to execute use power, Server: %s:%hu; ErrCode: 0x%x; ErrMessage: %s\n.", host, port, code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result); taos_free_result(result);
// schemaless demo data // schemaless demo data
@ -61,29 +69,31 @@ char * json_demo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"val
// influxdb line protocol // influxdb line protocol
char *lines[] = {line_demo}; char *lines[] = {line_demo};
result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
if (taos_errno(result) != 0) { code = taos_errno(result);
printf("failed to insert schemaless line data, reason: %s\n", taos_errstr(result)); if (code != 0) {
printf("Failed to insert schemaless line data, Server: %s:%hu; ErrCode: 0x%x; ErrMessage: %s\n.", host, port, code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
int rows = taos_affected_rows(result); int rows = taos_affected_rows(result);
printf("success to insert %d rows of schemaless line data\n", rows); printf("Insert %d rows of schemaless line data successfully.\n", rows);
taos_free_result(result); taos_free_result(result);
// opentsdb telnet protocol // opentsdb telnet protocol
char *telnets[] = {telnet_demo}; char *telnets[] = {telnet_demo};
result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
if (taos_errno(result) != 0) { code = taos_errno(result);
printf("failed to insert schemaless telnet data, reason: %s\n", taos_errstr(result)); if (code != 0) {
printf("Failed to insert schemaless telnet data, Server: %s:%hu; ErrCode: 0x%x; ErrMessage: %s\n.", host, port, code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
rows = taos_affected_rows(result); rows = taos_affected_rows(result);
printf("success to insert %d rows of schemaless telnet data\n", rows); printf("Insert %d rows of schemaless telnet data successfully.\n", rows);
taos_free_result(result); taos_free_result(result);
// opentsdb json protocol // opentsdb json protocol
@ -91,16 +101,17 @@ char *jsons[1] = {0};
// allocate memory for json data. can not use static memory. // allocate memory for json data. can not use static memory.
jsons[0] = malloc(1024); jsons[0] = malloc(1024);
if (jsons[0] == NULL) { if (jsons[0] == NULL) {
printf("failed to allocate memory\n"); printf("Failed to allocate memory\n");
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
(void)strncpy(jsons[0], json_demo, 1023); (void)strncpy(jsons[0], json_demo, 1023);
result = taos_schemaless_insert(taos, jsons, 1, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED); result = taos_schemaless_insert(taos, jsons, 1, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
if (taos_errno(result) != 0) { code = taos_errno(result);
if (code != 0) {
free(jsons[0]); free(jsons[0]);
printf("failed to insert schemaless json data, reason: %s\n", taos_errstr(result)); printf("Failed to insert schemaless json data, Server: %s:%hu; ErrCode: 0x%x; ErrMessage: %s\n.", host, port, code, taos_errstr(result));
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
return -1; return -1;
@ -108,7 +119,7 @@ if (taos_errno(result) != 0) {
free(jsons[0]); free(jsons[0]);
rows = taos_affected_rows(result); rows = taos_affected_rows(result);
printf("success to insert %d rows of schemaless json data\n", rows); printf("Insert %d rows of schemaless json data successfully.\n", rows);
taos_free_result(result); taos_free_result(result);
// close & clean // close & clean

View File

@ -64,6 +64,7 @@ typedef struct {
int num_of_sub_table = 10; int num_of_sub_table = 10;
int num_of_row = 10; int num_of_row = 10;
int total_affected = 0;
/** /**
* @brief insert data using stmt API * @brief insert data using stmt API
* *
@ -72,10 +73,14 @@ int num_of_row = 10;
void insertData(TAOS *taos) { void insertData(TAOS *taos) {
// init // init
TAOS_STMT *stmt = taos_stmt_init(taos); TAOS_STMT *stmt = taos_stmt_init(taos);
if (stmt == NULL) {
printf("Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL));
exit(EXIT_FAILURE);
}
// prepare // prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"; const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
int code = taos_stmt_prepare(stmt, sql, 0); int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare"); checkErrorCode(stmt, code, "Failed to execute taos_stmt_prepare");
for (int i = 1; i <= num_of_sub_table; i++) { for (int i = 1; i <= num_of_sub_table; i++) {
char table_name[20]; char table_name[20];
sprintf(table_name, "d_bind_%d", i); sprintf(table_name, "d_bind_%d", i);
@ -99,7 +104,7 @@ void insertData(TAOS *taos) {
tags[1].is_null = NULL; tags[1].is_null = NULL;
tags[1].num = 1; tags[1].num = 1;
code = taos_stmt_set_tbname_tags(stmt, table_name, tags); code = taos_stmt_set_tbname_tags(stmt, table_name, tags);
checkErrorCode(stmt, code, "failed to set table name and tags\n"); checkErrorCode(stmt, code, "Failed to set table name and tags\n");
// insert rows // insert rows
TAOS_MULTI_BIND params[4]; TAOS_MULTI_BIND params[4];
@ -142,25 +147,31 @@ void insertData(TAOS *taos) {
params[3].buffer = &phase; params[3].buffer = &phase;
// bind param // bind param
code = taos_stmt_bind_param(stmt, params); code = taos_stmt_bind_param(stmt, params);
checkErrorCode(stmt, code, "failed to bind param"); checkErrorCode(stmt, code, "Failed to bind param");
} }
// add batch // add batch
code = taos_stmt_add_batch(stmt); code = taos_stmt_add_batch(stmt);
checkErrorCode(stmt, code, "failed to add batch"); checkErrorCode(stmt, code, "Failed to add batch");
// execute batch // execute batch
code = taos_stmt_execute(stmt); code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "failed to exec stmt"); checkErrorCode(stmt, code, "Failed to exec stmt");
// get affected rows // get affected rows
int affected = taos_stmt_affected_rows_once(stmt); int affected = taos_stmt_affected_rows_once(stmt);
printf("table %s insert %d rows.\n", table_name, affected); total_affected += affected;
} }
printf("Successfully inserted %d rows to power.meters.\n", total_affected);
taos_stmt_close(stmt); taos_stmt_close(stmt);
} }
int main() { int main() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030); const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to server\n"); printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// create database and table // create database and table

View File

@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
#include <stdio.h> #include <stdio.h>
@ -27,8 +29,15 @@ static int running = 1;
const char* topic_name = "topic_meters"; const char* topic_name = "topic_meters";
void* prepare_data(void* arg) { void* prepare_data(void* arg) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS *pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) { if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return NULL; return NULL;
} }
@ -45,13 +54,14 @@ void* prepare_data(void* arg) {
i); i);
pRes = taos_query(pConn, buf); pRes = taos_query(pConn, buf);
if (taos_errno(pRes) != 0) { code = taos_errno(pRes);
printf("error in insert data to power.meters, reason:%s\n", taos_errstr(pRes)); if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
sleep(1); sleep(1);
} }
printf("prepare data thread exit\n"); fprintf(stdout, "Prepare data thread exit\n");
return NULL; return NULL;
} }
@ -63,9 +73,9 @@ static int32_t msg_process(TAOS_RES* msg) {
const char* dbName = tmq_get_db_name(msg); const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg); int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s\n", topicName); fprintf(stdout, "topic: %s\n", topicName);
printf("db: %s\n", dbName); fprintf(stdout, "db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId); fprintf(stdout, "vgroup id: %d\n", vgroupId);
while (1) { while (1) {
// get one row data from message // get one row data from message
@ -81,11 +91,11 @@ static int32_t msg_process(TAOS_RES* msg) {
rows++; rows++;
// print the row content // print the row content
if (taos_print_row(buf, row, fields, numOfFields) < 0) { if (taos_print_row(buf, row, fields, numOfFields) < 0) {
printf("failed to print row\n"); fprintf(stderr, "Failed to print row\n");
break; break;
} }
// print the precision and row content to the console // print the precision and row content to the console
printf("precision: %d, row content: %s\n", precision, buf); fprintf(stdout, "precision: %d, data: %s\n", precision, buf);
} }
return rows; return rows;
@ -93,42 +103,53 @@ static int32_t msg_process(TAOS_RES* msg) {
// ANCHOR_END: msg_process // ANCHOR_END: msg_process
static int32_t init_env() { static int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS *pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) { if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return -1; return -1;
} }
TAOS_RES* pRes; TAOS_RES* pRes;
// drop database if exists // drop database if exists
printf("create database\n"); fprintf(stdout, "Create database.\n");
pRes = taos_query(pConn, "drop topic if exists topic_meters"); pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
if (taos_errno(pRes) != 0) { code = taos_errno(pRes);
printf("error in drop topic_meters, reason:%s\n", taos_errstr(pRes)); if (code != 0) {
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists power"); pRes = taos_query(pConn, "DROP DATABASE IF EXISTS power");
if (taos_errno(pRes) != 0) { code = taos_errno(pRes);
printf("error in drop power, reason:%s\n", taos_errstr(pRes)); if (code != 0) {
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
// create database // create database
pRes = taos_query(pConn, "create database power precision 'ms' WAL_RETENTION_PERIOD 3600"); pRes = taos_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
if (taos_errno(pRes) != 0) { code = taos_errno(pRes);
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); if (code != 0) {
fprintf(stderr, "Failed to create tmqdb, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
// create super table // create super table
printf("create super table\n"); fprintf(stdout, "Create super table.\n");
pRes = taos_query( pRes = taos_query(
pConn, pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS " "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))"); "(groupId INT, location BINARY(24))");
if (taos_errno(pRes) != 0) { code = taos_errno(pRes);
printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes)); if (code != 0) {
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END; goto END;
} }
@ -143,16 +164,24 @@ END:
} }
int32_t create_topic() { int32_t create_topic() {
printf("create topic\n"); fprintf(stdout, "Create topic.\n");
TAOS_RES* pRes; TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS *pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) { if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return -1; return -1;
} }
pRes = taos_query(pConn, "use power"); pRes = taos_query(pConn, "USE POWER");
code = taos_errno(pRes);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes)); fprintf(stderr, "Failed to use tmqdb, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
@ -160,8 +189,9 @@ int32_t create_topic() {
pRes = taos_query( pRes = taos_query(
pConn, pConn,
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
if (taos_errno(pRes) != 0) { code = taos_errno(pRes);
printf("failed to create topic topic_meters, reason:%s\n", taos_errstr(pRes)); if (code != 0) {
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
@ -171,7 +201,7 @@ int32_t create_topic() {
} }
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param); fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
} }
// ANCHOR: create_consumer_1 // ANCHOR: create_consumer_1
@ -243,6 +273,7 @@ tmq_list_t* build_topic_list() {
if (code) { if (code) {
// if failed, destroy the list and return NULL // if failed, destroy the list and return NULL
tmq_list_destroy(topicList); tmq_list_destroy(topicList);
fprintf(stderr, "Failed to create topic_list, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code));
return NULL; return NULL;
} }
// if success, return the list // if success, return the list
@ -285,7 +316,7 @@ void consume_repeatly(tmq_t* tmq) {
// get the topic assignment // get the topic assignment
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment); int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) { if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code)); fprintf(stderr, "Failed to get assignment, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code));
return; return;
} }
@ -295,7 +326,9 @@ void consume_repeatly(tmq_t* tmq) {
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin); code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) { if (code != 0) {
fprintf(stderr, "failed to seek to %d, reason:%s", (int)p->begin, tmq_err2str(code)); fprintf(stderr, "Failed to seek assignment %d to beginning %ld, ErrCode: 0x%x, ErrMessage: %s.\n", i, p->begin, code, tmq_err2str(code));
} else {
fprintf(stdout, "Seek assignment %d to beginning %ld successfully.\n", i, p->begin);
} }
} }
@ -324,10 +357,12 @@ void manual_commit(tmq_t* tmq) {
int32_t code = tmq_commit_sync(tmq, tmqmsg); int32_t code = tmq_commit_sync(tmq, tmqmsg);
if (code) { if (code) {
fprintf(stderr, "Failed to commit message: %s\n", tmq_err2str(code)); fprintf(stderr, "Failed to commit message, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code));
// free the message // free the message
taos_free_result(tmqmsg); taos_free_result(tmqmsg);
break; break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
} }
// free the message // free the message
taos_free_result(tmqmsg); taos_free_result(tmqmsg);
@ -339,7 +374,7 @@ void manual_commit(tmq_t* tmq) {
} }
// print the result: total messages and total rows consumed // print the result: total messages and total rows consumed
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); fprintf(stderr, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
} }
// ANCHOR_END: manual_commit // ANCHOR_END: manual_commit
@ -356,28 +391,31 @@ int main(int argc, char* argv[]) {
} }
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) { if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
fprintf(stderr, "create thread failed\n"); fprintf(stderr, "Failed to create thread.\n");
return 1; return 1;
} }
// ANCHOR: create_consumer_2 // ANCHOR: create_consumer_2
tmq_t* tmq = build_consumer(); tmq_t* tmq = build_consumer();
if (NULL == tmq) { if (NULL == tmq) {
fprintf(stderr, "build consumer to localhost fail!\n"); fprintf(stderr, "Failed to create consumer.\n");
return -1; return -1;
} }
printf("build consumer to localhost successfully \n"); fprintf(stdout, "Create consumer successfully.\n");
// ANCHOR_END: create_consumer_2 // ANCHOR_END: create_consumer_2
// ANCHOR: subscribe_3 // ANCHOR: subscribe_3
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) { if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list.\n");
return -1; return -1;
} }
if ((code = tmq_subscribe(tmq, topic_list))) { if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code)); fprintf(stderr, "Failed to subscribe topic_list, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
} }
tmq_list_destroy(topic_list); tmq_list_destroy(topic_list);
@ -393,13 +431,15 @@ int main(int argc, char* argv[]) {
// unsubscribe the topic // unsubscribe the topic
code = tmq_unsubscribe(tmq); code = tmq_unsubscribe(tmq);
if (code) { if (code) {
fprintf(stderr, "Failed to tmq_unsubscribe(): %s\n", tmq_err2str(code)); fprintf(stderr, "Failed to unsubscribe consumer, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code));
} else {
fprintf(stderr, "Consumer unsubscribed successfully.\n");
} }
fprintf(stderr, "Unsubscribed consumer successfully.\n");
// close the consumer // close the consumer
code = tmq_consumer_close(tmq); code = tmq_consumer_close(tmq);
if (code) { if (code) {
fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code)); fprintf(stderr, "Failed to close consumer: %s.\n", tmq_err2str(code));
} else { } else {
fprintf(stderr, "Consumer closed successfully.\n"); fprintf(stderr, "Consumer closed successfully.\n");
} }

View File

@ -23,55 +23,59 @@
#include "taos.h" #include "taos.h"
static int DemoWithReqId() { static int DemoWithReqId() {
// ANCHOR: with_reqid // ANCHOR: with_reqid
const char *ip = "localhost"; const char *host = "localhost";
const char *user = "root"; const char *user = "root";
const char *password = "taosdata"; const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect // connect
TAOS *taos = taos_connect(ip, user, password, NULL, 0); TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to server %s, reason: %s\n", ip, taos_errstr(NULL)); printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return -1;
}
printf("success to connect server %s\n", ip);
const char *sql = "SELECT ts, current, location FROM power.meters limit 1";
// query data with reqid
TAOS_RES *result = taos_query_with_reqid(taos, sql, 3L);
int code = taos_errno(result);
if (code != 0) {
printf("failed to query data from power.meters, ip: %s, reason: %s\n", ip, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
TAOS_ROW row = NULL;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
printf("fields: %d\n", num_fields);
printf("sql: %s, result:\n", sql);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[1024] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
}
printf("total rows: %d\n", rows);
taos_free_result(result);
printf("success to query data from power.meters\n");
// close & clean
taos_close(taos);
taos_cleanup(); taos_cleanup();
return 0; return -1;
// ANCHOR_END: with_reqid }
const char *sql = "SELECT ts, current, location FROM power.meters limit 1";
// query data with reqid
long reqid = 3L;
TAOS_RES *result = taos_query_with_reqid(taos, sql, reqid);
code = taos_errno(result);
if (code != 0) {
printf("Failed to execute sql with reqId: %ld, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, host, port, code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
TAOS_ROW row = NULL;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
printf("fields: %d\n", num_fields);
printf("sql: %s, result:\n", sql);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[1024] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
}
printf("total rows: %d\n", rows);
taos_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
// ANCHOR_END: with_reqid
}
int main(int argc, char *argv[]) {
return DemoWithReqId();
} }
int main(int argc, char *argv[]) { return DemoWithReqId(); }

View File

@ -35,7 +35,7 @@ func main() {
} }
rowsAffected, err = res.RowsAffected() rowsAffected, err = res.RowsAffected()
if err != nil { if err != nil {
log.Fatalln("Failed to get create create rowsAffected, url:" + taosDSN + "; ErrMessage: " + err.Error()) log.Fatalln("Failed to get create db rowsAffected, url:" + taosDSN + "; ErrMessage: " + err.Error())
} }
// you can check rowsAffected here // you can check rowsAffected here
fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected) fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected)
@ -66,7 +66,7 @@ func main() {
// query data, make sure the database and table are created before // query data, make sure the database and table are created before
rows, err := db.Query("SELECT ts, current, location FROM power.meters limit 100") rows, err := db.Query("SELECT ts, current, location FROM power.meters limit 100")
if err != nil { if err != nil {
log.Fatal("query data failed:", err) log.Fatal("Failed to query data from power.meters, url:" + taosDSN + "; ErrMessage: " + err.Error())
} }
for rows.Next() { for rows.Next() {
var ( var (
@ -76,7 +76,7 @@ func main() {
) )
err = rows.Scan(&ts, &current, &location) err = rows.Scan(&ts, &current, &location)
if err != nil { if err != nil {
log.Fatal("scan data failed:", err) log.Fatal("Failed to scan data, url:" + taosDSN + "; ErrMessage: " + err.Error())
} }
// you can check data here // you can check data here
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location) fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)

View File

@ -61,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
// ANCHOR: consume // ANCHOR: consume
match consumer.subscribe(["topic_meters"]).await{ match consumer.subscribe(["topic_meters"]).await{
Ok(_) => println!("subscribe topics successfully."), Ok(_) => println!("Subscribe topics successfully."),
Err(err) => { Err(err) => {
eprintln!("Failed to subscribe topic_meters, dsn: {}; ErrMessage: {}", dsn, err); eprintln!("Failed to subscribe topic_meters, dsn: {}; ErrMessage: {}", dsn, err);
return Err(err.into()); return Err(err.into());
@ -123,7 +123,7 @@ async fn main() -> anyhow::Result<()> {
} }
// commit offset manually when you have processed the message. // commit offset manually when you have processed the message.
match consumer.commit(offset).await{ match consumer.commit(offset).await{
Ok(_) => println!("commit offset manually successfully."), Ok(_) => println!("Commit offset manually successfully."),
Err(err) => { Err(err) => {
eprintln!("Failed to commit offset manually, dsn: {}; ErrMessage: {}", dsn, err); eprintln!("Failed to commit offset manually, dsn: {}; ErrMessage: {}", dsn, err);
return Err(err.into()); return Err(err.into());
@ -140,7 +140,7 @@ async fn main() -> anyhow::Result<()> {
// ANCHOR: seek_offset // ANCHOR: seek_offset
let assignments = consumer.assignments().await.unwrap(); let assignments = consumer.assignments().await.unwrap();
println!("assignments: {:?}", assignments); println!("Now assignments: {:?}", assignments);
// seek offset // seek offset
for topic_vec_assignment in assignments { for topic_vec_assignment in assignments {
@ -163,23 +163,24 @@ async fn main() -> anyhow::Result<()> {
match consumer.offset_seek(topic, vgroup_id, begin).await{ match consumer.offset_seek(topic, vgroup_id, begin).await{
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
eprintln!("seek example failed; ErrMessage: {}", err); eprintln!("Seek example failed; ErrMessage: {}", err);
return Err(err.into()); return Err(err.into());
} }
} }
} }
let topic_assignment = consumer.topic_assignment(topic).await; let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment); println!("Topic assignment: {:?}", topic_assignment);
} }
println!("assignment seek to beginning successfully."); println!("Assignment seek to beginning successfully.");
// after seek offset // after seek offset
let assignments = consumer.assignments().await.unwrap(); let assignments = consumer.assignments().await.unwrap();
println!("after seek offset assignments: {:?}", assignments); println!("After seek offset assignments: {:?}", assignments);
// ANCHOR_END: seek_offset // ANCHOR_END: seek_offset
// ANCHOR: unsubscribe // ANCHOR: unsubscribe
consumer.unsubscribe().await; consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");
// ANCHOR_END: unsubscribe // ANCHOR_END: unsubscribe
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;

View File

@ -61,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
// ANCHOR: consume // ANCHOR: consume
match consumer.subscribe(["topic_meters"]).await{ match consumer.subscribe(["topic_meters"]).await{
Ok(_) => println!("subscribe topics successfully."), Ok(_) => println!("Subscribe topics successfully."),
Err(err) => { Err(err) => {
eprintln!("Failed to subscribe topic_meters, dsn: {}; ErrMessage: {}", dsn, err); eprintln!("Failed to subscribe topic_meters, dsn: {}; ErrMessage: {}", dsn, err);
return Err(err.into()); return Err(err.into());
@ -123,7 +123,7 @@ async fn main() -> anyhow::Result<()> {
} }
// commit offset manually when you have processed the message. // commit offset manually when you have processed the message.
match consumer.commit(offset).await{ match consumer.commit(offset).await{
Ok(_) => println!("commit offset manually successfully."), Ok(_) => println!("Commit offset manually successfully."),
Err(err) => { Err(err) => {
eprintln!("Failed to commit offset manually, dsn: {}; ErrMessage: {}", dsn, err); eprintln!("Failed to commit offset manually, dsn: {}; ErrMessage: {}", dsn, err);
return Err(err.into()); return Err(err.into());
@ -172,15 +172,15 @@ async fn main() -> anyhow::Result<()> {
let topic_assignment = consumer.topic_assignment(topic).await; let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment); println!("topic assignment: {:?}", topic_assignment);
} }
println!("assignment seek to beginning successfully."); println!("Assignment seek to beginning successfully.");
// after seek offset // after seek offset
let assignments = consumer.assignments().await.unwrap(); let assignments = consumer.assignments().await.unwrap();
println!("after seek offset assignments: {:?}", assignments); println!("After seek offset assignments: {:?}", assignments);
// ANCHOR_END: seek_offset // ANCHOR_END: seek_offset
// ANCHOR: unsubscribe // ANCHOR: unsubscribe
consumer.unsubscribe().await; consumer.unsubscribe().await;
println!("consumer unsubscribed successfully."); println!("Consumer unsubscribed successfully.");
// ANCHOR_END: unsubscribe // ANCHOR_END: unsubscribe
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;

View File

@ -59,7 +59,7 @@ public class ConsumerLoopFull {
// subscribe to the topics // subscribe to the topics
consumer.subscribe(topics); consumer.subscribe(topics);
System.out.println("subscribe topics successfully."); System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) { for (int i = 0; i < 50; i++) {
// poll data // poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100)); ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
@ -88,9 +88,9 @@ public class ConsumerLoopFull {
// subscribe to the topics // subscribe to the topics
consumer.subscribe(topics); consumer.subscribe(topics);
System.out.println("subscribe topics successfully."); System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment(); Set<TopicPartition> assignment = consumer.assignment();
System.out.println("now assignment: " + JSON.toJSONString(assignment)); System.out.println("Now assignment: " + JSON.toJSONString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord(); ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data // make sure we have got some data
@ -99,13 +99,13 @@ public class ConsumerLoopFull {
} }
consumer.seekToBeginning(assignment); consumer.seekToBeginning(assignment);
System.out.println("assignment seek to beginning successfully."); System.out.println("Assignment seek to beginning successfully.");
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println("Seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex); throw new SQLException("seek example failed", ex);
} catch (Exception ex) { } catch (Exception ex) {
System.out.println("seek example failed; ErrMessage: " + ex.getMessage()); System.out.println("Seek example failed; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex); throw new SQLException("seek example failed", ex);
} }
// ANCHOR_END: consumer_seek // ANCHOR_END: consumer_seek
@ -128,7 +128,7 @@ public class ConsumerLoopFull {
if (!records.isEmpty()) { if (!records.isEmpty()) {
// after processing the data, commit the offset manually // after processing the data, commit the offset manually
consumer.commitSync(); consumer.commitSync();
System.out.println("commit offset manually successfully."); System.out.println("Commit offset manually successfully.");
} }
} }
} catch (SQLException ex) { } catch (SQLException ex) {
@ -149,7 +149,7 @@ public class ConsumerLoopFull {
try { try {
// unsubscribe the consumer // unsubscribe the consumer
consumer.unsubscribe(); consumer.unsubscribe();
System.out.println("consumer unsubscribed successfully."); System.out.println("Consumer unsubscribed successfully.");
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
@ -161,7 +161,7 @@ public class ConsumerLoopFull {
finally { finally {
// close the consumer // close the consumer
consumer.close(); consumer.close();
System.out.println("consumer closed successfully."); System.out.println("Consumer closed successfully.");
} }
// ANCHOR_END: unsubscribe_data_code_piece // ANCHOR_END: unsubscribe_data_code_piece
} }

View File

@ -57,7 +57,7 @@ public class WsConsumerLoopFull {
// subscribe to the topics // subscribe to the topics
consumer.subscribe(topics); consumer.subscribe(topics);
System.out.println("subscribe topics successfully."); System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) { for (int i = 0; i < 50; i++) {
// poll data // poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100)); ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
@ -86,9 +86,9 @@ public class WsConsumerLoopFull {
// subscribe to the topics // subscribe to the topics
consumer.subscribe(topics); consumer.subscribe(topics);
System.out.println("subscribe topics successfully."); System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment(); Set<TopicPartition> assignment = consumer.assignment();
System.out.println("now assignment: " + JSON.toJSONString(assignment)); System.out.println("Now assignment: " + JSON.toJSONString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord(); ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data // make sure we have got some data
@ -97,13 +97,13 @@ public class WsConsumerLoopFull {
} }
consumer.seekToBeginning(assignment); consumer.seekToBeginning(assignment);
System.out.println("assignment seek to beginning successfully."); System.out.println("Assignment seek to beginning successfully.");
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println("Seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex); throw new SQLException("seek example failed", ex);
} catch (Exception ex) { } catch (Exception ex) {
System.out.println("seek example failed; ErrMessage: " + ex.getMessage()); System.out.println("Seek example failed; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex); throw new SQLException("seek example failed", ex);
} }
// ANCHOR_END: consumer_seek // ANCHOR_END: consumer_seek
@ -126,7 +126,7 @@ public class WsConsumerLoopFull {
if (!records.isEmpty()) { if (!records.isEmpty()) {
// after processing the data, commit the offset manually // after processing the data, commit the offset manually
consumer.commitSync(); consumer.commitSync();
System.out.println("commit offset manually successfully."); System.out.println("Commit offset manually successfully.");
} }
} }
} catch (SQLException ex) { } catch (SQLException ex) {
@ -147,7 +147,7 @@ public class WsConsumerLoopFull {
try { try {
// unsubscribe the consumer // unsubscribe the consumer
consumer.unsubscribe(); consumer.unsubscribe();
System.out.println("consumer unsubscribed successfully."); System.out.println("Consumer unsubscribed successfully.");
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
@ -159,7 +159,7 @@ public class WsConsumerLoopFull {
finally { finally {
// close the consumer // close the consumer
consumer.close(); consumer.close();
System.out.println("consumer closed successfully."); System.out.println("Consumer closed successfully.");
} }
// ANCHOR_END: unsubscribe_data_code_piece // ANCHOR_END: unsubscribe_data_code_piece
} }