diff --git a/docs/examples/c/connect_example.c b/docs/examples/c/connect_example.c index 53d2d0d59b..ef07c54122 100644 --- a/docs/examples/c/connect_example.c +++ b/docs/examples/c/connect_example.c @@ -12,11 +12,11 @@ int main() { uint16_t port = 6030; // 0 means use the default port TAOS *taos = taos_connect(host, user, passwd, db, port); if (taos == NULL) { - printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); taos_cleanup(); return -1; } - printf("Connected to %s:%hu successfully.\n", host, port); + fprintf(stdout, "Connected to %s:%hu successfully.\n", host, port); /* put your code here for read and write */ diff --git a/docs/examples/c/create_db_demo.c b/docs/examples/c/create_db_demo.c index 44960defa5..7ae41da65c 100644 --- a/docs/examples/c/create_db_demo.c +++ b/docs/examples/c/create_db_demo.c @@ -33,7 +33,7 @@ static int DemoCreateDB() { // connect TAOS *taos = taos_connect(host, user, password, NULL, port); if (taos == NULL) { - printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); taos_cleanup(); return -1; @@ -43,13 +43,13 @@ static int DemoCreateDB() { TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power"); code = taos_errno(result); if (code != 0) { - printf("Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result)); + fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result)); taos_close(taos); taos_cleanup(); return -1; } taos_free_result(result); - printf("Create database power successfully.\n"); + fprintf(stdout, "Create database power successfully.\n"); // create table const char *sql = @@ -58,13 +58,13 @@ static int DemoCreateDB() { result = taos_query(taos, sql); code = taos_errno(result); if (code != 0) { - printf("Failed to create stable power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result)); + fprintf(stderr, "Failed to create stable power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result)); taos_close(taos); taos_cleanup(); return -1; } taos_free_result(result); - printf("Create stable power.meters successfully.\n"); + fprintf(stdout, "Create stable power.meters successfully.\n"); // close & clean taos_close(taos); diff --git a/docs/examples/c/insert_data_demo.c b/docs/examples/c/insert_data_demo.c index e880af0cd6..7570af02ad 100644 --- a/docs/examples/c/insert_data_demo.c +++ b/docs/examples/c/insert_data_demo.c @@ -33,7 +33,7 @@ static int DemoInsertData() { // connect TAOS *taos = taos_connect(host, user, password, NULL, port); if (taos == NULL) { - printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); taos_cleanup(); return -1; @@ -53,7 +53,7 @@ static int DemoInsertData() { TAOS_RES *result = taos_query(taos, sql); code = taos_errno(result); if (code != 0) { - printf("Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result)); + fprintf(stderr, "Failed to insert data to power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code, taos_errstr(result)); taos_close(taos); taos_cleanup(); return -1; @@ -62,7 +62,7 @@ static int DemoInsertData() { // you can check affectedRows here int rows = taos_affected_rows(result); - printf("Successfully inserted %d rows into power.meters.\n", rows); + fprintf(stdout, "Successfully inserted %d rows into power.meters.\n", rows); // close & clean taos_close(taos); diff --git a/docs/examples/c/query_data_demo.c b/docs/examples/c/query_data_demo.c index 097a6b99b8..0e13f57e3f 100644 --- a/docs/examples/c/query_data_demo.c +++ b/docs/examples/c/query_data_demo.c @@ -33,7 +33,7 @@ static int DemoQueryData() { // connect TAOS *taos = taos_connect(host, user, password, NULL, port); if (taos == NULL) { - printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); taos_cleanup(); return -1; @@ -44,7 +44,7 @@ static int DemoQueryData() { TAOS_RES *result = taos_query(taos, sql); code = taos_errno(result); if (code != 0) { - printf("Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code, + fprintf(stderr, "Failed to query data from power.meters, sql: %s, ErrCode: 0x%x, ErrMessage: %s\n.", sql, code, taos_errstr(result)); taos_close(taos); taos_cleanup(); @@ -56,20 +56,15 @@ static int DemoQueryData() { int num_fields = taos_field_count(result); TAOS_FIELD *fields = taos_fetch_fields(result); - printf("fields: %d\n", num_fields); - printf("sql: %s, result:\n", sql); + fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql); // fetch the records row by row while ((row = taos_fetch_row(result))) { - char temp[1024] = {0}; - if (taos_print_row(temp, row, fields, num_fields) < 0) { - printf("Failed to print row\n"); - break; - } - printf("%s\n", temp); + // Add your data processing logic here + rows++; } - printf("total rows: %d\n", rows); + fprintf(stdout, "total rows: %d\n", rows); taos_free_result(result); // close & clean diff --git a/docs/examples/c/sml_insert_demo.c b/docs/examples/c/sml_insert_demo.c index 9adbb28f87..63870d90b6 100644 --- a/docs/examples/c/sml_insert_demo.c +++ b/docs/examples/c/sml_insert_demo.c @@ -32,7 +32,7 @@ static int DemoSmlInsert() { // connect TAOS *taos = taos_connect(host, user, password, NULL, port); if (taos == NULL) { - printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); taos_cleanup(); return -1; @@ -42,7 +42,7 @@ static int DemoSmlInsert() { TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power"); code = taos_errno(result); if (code != 0) { - printf("Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result)); + fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result)); taos_close(taos); taos_cleanup(); return -1; @@ -53,7 +53,7 @@ static int DemoSmlInsert() { result = taos_query(taos, "USE power"); code = taos_errno(result); if (code != 0) { - printf("Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result)); + fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result)); taos_close(taos); taos_cleanup(); return -1; @@ -74,7 +74,7 @@ static int DemoSmlInsert() { result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); code = taos_errno(result); if (code != 0) { - printf("Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo, code, + fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo, code, taos_errstr(result)); taos_close(taos); taos_cleanup(); @@ -82,7 +82,7 @@ static int DemoSmlInsert() { } int rows = taos_affected_rows(result); - printf("Insert %d rows of schemaless line data successfully.\n", rows); + fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", rows); taos_free_result(result); // opentsdb telnet protocol @@ -90,7 +90,7 @@ static int DemoSmlInsert() { result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); code = taos_errno(result); if (code != 0) { - printf("Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo, code, + fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo, code, taos_errstr(result)); taos_close(taos); taos_cleanup(); @@ -98,15 +98,16 @@ static int DemoSmlInsert() { } rows = taos_affected_rows(result); - printf("Insert %d rows of schemaless telnet data successfully.\n", rows); + fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", rows); taos_free_result(result); // opentsdb json protocol char *jsons[1] = {0}; // allocate memory for json data. can not use static memory. - jsons[0] = malloc(1024); + size_t size = 1024; + jsons[0] = malloc(size); if (jsons[0] == NULL) { - printf("Failed to allocate memory\n"); + fprintf(stderr, "Failed to allocate memory: %zu bytes.\n", size); taos_close(taos); taos_cleanup(); return -1; @@ -116,7 +117,7 @@ static int DemoSmlInsert() { code = taos_errno(result); if (code != 0) { free(jsons[0]); - printf("Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo, code, + fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo, code, taos_errstr(result)); taos_close(taos); taos_cleanup(); @@ -125,7 +126,7 @@ static int DemoSmlInsert() { free(jsons[0]); rows = taos_affected_rows(result); - printf("Insert %d rows of schemaless json data successfully.\n", rows); + fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", rows); taos_free_result(result); // close & clean diff --git a/docs/examples/c/stmt_insert_demo.c b/docs/examples/c/stmt_insert_demo.c index 854c9f86e2..f89a0fc8da 100644 --- a/docs/examples/c/stmt_insert_demo.c +++ b/docs/examples/c/stmt_insert_demo.c @@ -32,7 +32,7 @@ void executeSQL(TAOS *taos, const char *sql) { TAOS_RES *res = taos_query(taos, sql); int code = taos_errno(res); if (code != 0) { - printf("%s\n", taos_errstr(res)); + fprintf(stderr, "%s\n", taos_errstr(res)); taos_free_result(res); taos_close(taos); exit(EXIT_FAILURE); @@ -49,7 +49,7 @@ void executeSQL(TAOS *taos, const char *sql) { */ void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) { if (code != 0) { - printf("%s. code: %d, error: %s\n", msg,code,taos_stmt_errstr(stmt)); + fprintf(stderr, "%s. code: %d, error: %s\n", msg,code,taos_stmt_errstr(stmt)); taos_stmt_close(stmt); exit(EXIT_FAILURE); } @@ -74,7 +74,7 @@ void insertData(TAOS *taos) { // init TAOS_STMT *stmt = taos_stmt_init(taos); if (stmt == NULL) { - printf("Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL)); + fprintf(stderr, "Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL)); exit(EXIT_FAILURE); } // prepare @@ -159,7 +159,7 @@ void insertData(TAOS *taos) { int affected = taos_stmt_affected_rows_once(stmt); total_affected += affected; } - printf("Successfully inserted %d rows to power.meters.\n", total_affected); + fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected); taos_stmt_close(stmt); } @@ -170,7 +170,7 @@ int main() { uint16_t port = 6030; TAOS *taos = taos_connect(host, user, password, NULL, port); if (taos == NULL) { - printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); taos_cleanup(); exit(EXIT_FAILURE); } diff --git a/docs/examples/c/tmq_demo.c b/docs/examples/c/tmq_demo.c index e8e28e1df4..9deff9add5 100644 --- a/docs/examples/c/tmq_demo.c +++ b/docs/examples/c/tmq_demo.c @@ -26,6 +26,7 @@ volatile int thread_stop = 0; static int running = 1; +static int count = 0; const char* topic_name = "topic_meters"; typedef struct { @@ -40,6 +41,18 @@ typedef struct { const char* auto_offset_reset; } ConsumerConfig; +ConsumerConfig config = { + .enable_auto_commit = "true", + .auto_commit_interval_ms = "1000", + .group_id = "group1", + .client_id = "client1", + .td_connect_host = "localhost", + .td_connect_port = "6030", + .td_connect_user = "root", + .td_connect_pass = "taosdata", + .auto_offset_reset = "latest" +}; + void* prepare_data(void* arg) { const char* host = "localhost"; const char* user = "root"; @@ -48,8 +61,7 @@ void* prepare_data(void* arg) { int code = 0; TAOS* pConn = taos_connect(host, user, password, NULL, port); if (pConn == NULL) { - fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), - taos_errstr(NULL)); + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); taos_cleanup(); return NULL; } @@ -69,8 +81,7 @@ void* prepare_data(void* arg) { pRes = taos_query(pConn, buf); code = taos_errno(pRes); if (code != 0) { - fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, - taos_errstr(pRes)); + fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); } taos_free_result(pRes); sleep(1); @@ -80,43 +91,28 @@ void* prepare_data(void* arg) { } // ANCHOR: msg_process -static int32_t msg_process(TAOS_RES* msg) { +int32_t msg_process(TAOS_RES* msg) { char buf[1024]; // buf to store the row content int32_t rows = 0; const char* topicName = tmq_get_topic_name(msg); - const char* dbName = tmq_get_db_name(msg); - int32_t vgroupId = tmq_get_vgroup_id(msg); + const char* dbName = tmq_get_db_name(msg); + int32_t vgroupId = tmq_get_vgroup_id(msg); - fprintf(stdout, "topic: %s\n", topicName); - fprintf(stdout, "db: %s\n", dbName); - fprintf(stdout, "vgroup id: %d\n", vgroupId); - - while (1) { + while (true) { // get one row data from message TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; - // get the field information - TAOS_FIELD* fields = taos_fetch_fields(msg); - // get the number of fields - int32_t numOfFields = taos_field_count(msg); - // get the precision of the result - int32_t precision = taos_result_precision(msg); + // Add your data processing logic here + rows++; - // print the row content - if (taos_print_row(buf, row, fields, numOfFields) < 0) { - fprintf(stderr, "Failed to print row\n"); - break; - } - // print the precision and row content to the console - fprintf(stdout, "precision: %d, data: %s\n", precision, buf); } return rows; } // ANCHOR_END: msg_process -static int32_t init_env() { +TAOS* init_env() { const char* host = "localhost"; const char* user = "root"; const char* password = "taosdata"; @@ -124,10 +120,9 @@ static int32_t init_env() { int code = 0; TAOS* pConn = taos_connect(host, user, password, NULL, port); if (pConn == NULL) { - fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), - taos_errstr(NULL)); + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); taos_cleanup(); - return -1; + return NULL; } TAOS_RES* pRes; @@ -136,6 +131,7 @@ static int32_t init_env() { code = taos_errno(pRes); if (code != 0) { fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + goto END; } taos_free_result(pRes); @@ -143,6 +139,7 @@ static int32_t init_env() { code = taos_errno(pRes); if (code != 0) { fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + goto END; } taos_free_result(pRes); @@ -150,7 +147,7 @@ static int32_t init_env() { pRes = taos_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600"); code = taos_errno(pRes); if (code != 0) { - fprintf(stderr, "Failed to create tmqdb, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); goto END; } taos_free_result(pRes); @@ -165,56 +162,78 @@ static int32_t init_env() { fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); goto END; } - taos_free_result(pRes); - taos_close(pConn); - return 0; + + return pConn; END: taos_free_result(pRes); taos_close(pConn); - return -1; + return NULL; } -int32_t create_topic() { +void deinit_env(TAOS* pConn) { + if (pConn) + taos_close(pConn); +} + +int32_t create_topic(TAOS* pConn) { TAOS_RES* pRes; - const char* host = "localhost"; - const char* user = "root"; - const char* password = "taosdata"; - uint16_t port = 6030; int code = 0; - TAOS* pConn = taos_connect(host, user, password, NULL, port); - if (pConn == NULL) { - fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), - taos_errstr(NULL)); - taos_cleanup(); + + if (!pConn) { + fprintf(stderr, "Invalid input parameter.\n"); return -1; } - pRes = taos_query(pConn, "USE POWER"); + pRes = taos_query(pConn, "USE power"); code = taos_errno(pRes); if (taos_errno(pRes) != 0) { - fprintf(stderr, "Failed to use tmqdb, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); return -1; } taos_free_result(pRes); - pRes = taos_query( - pConn, - "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); + pRes = taos_query(pConn, "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); code = taos_errno(pRes); if (code != 0) { fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); return -1; } taos_free_result(pRes); + return 0; +} - taos_close(pConn); +int32_t drop_topic(TAOS* pConn) { + TAOS_RES* pRes; + int code = 0; + + if (!pConn) { + fprintf(stderr, "Invalid input parameter.\n"); + return -1; + } + + pRes = taos_query(pConn, "USE power"); + code = taos_errno(pRes); + if (taos_errno(pRes) != 0) { + fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters"); + code = taos_errno(pRes); + if (code != 0) { + fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); return 0; } void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param); + count +=1; + fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count); } // ANCHOR: create_consumer_1 @@ -289,7 +308,6 @@ _end: tmq_list_t* build_topic_list() { // create a empty topic list tmq_list_t* topicList = tmq_list_new(); - const char* topic_name = "topic_meters"; // append topic name to the list int32_t code = tmq_list_append(topicList, topic_name); @@ -315,8 +333,10 @@ void basic_consume_loop(tmq_t* tmq) { TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); if (tmqmsg) { msgCnt++; - // process the message + + // Add your data processing logic here totalRows += msg_process(tmqmsg); + // free the message taos_free_result(tmqmsg); } @@ -349,12 +369,13 @@ void consume_repeatly(tmq_t* tmq) { code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin); if (code != 0) { - fprintf(stderr, "Failed to seek assignment %d to beginning %ld, ErrCode: 0x%x, ErrMessage: %s.\n", i, p->begin, - code, tmq_err2str(code)); - } else { - fprintf(stdout, "Seek assignment %d to beginning %ld successfully.\n", i, p->begin); + fprintf(stderr, "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n", + topic_name, config.group_id, config.client_id, code, tmq_err2str(code)); + break; } } + if (code == 0) + fprintf(stdout, "Assignment seek to beginning successfully.\n"); // free the assignment array tmq_free_assignment(pAssign); @@ -380,7 +401,8 @@ void manual_commit(tmq_t* tmq) { // commit the message int32_t code = tmq_commit_sync(tmq, tmqmsg); if (code) { - fprintf(stderr, "Failed to commit message, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code)); + fprintf(stderr, "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n", + topic_name, config.group_id, config.client_id, code, tmq_err2str(code)); // free the message taos_free_result(tmqmsg); break; @@ -405,11 +427,14 @@ int main(int argc, char* argv[]) { int32_t code; pthread_t thread_id; - if (init_env() < 0) { + TAOS* pConn = init_env(); + if (pConn == NULL) { + fprintf(stderr, "Failed to init env.\n"); return -1; } - if (create_topic() < 0) { + if (create_topic(pConn) < 0) { + fprintf(stderr, "Failed to create topic.\n"); return -1; } @@ -418,16 +443,6 @@ int main(int argc, char* argv[]) { return -1; } - ConsumerConfig config = {.enable_auto_commit = "true", - .auto_commit_interval_ms = "1000", - .group_id = "group1", - .client_id = "client1", - .td_connect_host = "localhost", - .td_connect_port = "6030", - .td_connect_user = "root", - .td_connect_pass = "taosdata", - .auto_offset_reset = "latest"}; - // ANCHOR: create_consumer_2 tmq_t* tmq = build_consumer(&config); if (NULL == tmq) { @@ -435,8 +450,8 @@ int main(int argc, char* argv[]) { config.td_connect_host, config.group_id, config.client_id); return -1; } else { - fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, , clientId: %s.\n", config.td_connect_host, - config.group_id, config.client_id); + fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n", + config.td_connect_host, config.group_id, config.client_id); } // ANCHOR_END: create_consumer_2 @@ -467,7 +482,8 @@ int main(int argc, char* argv[]) { // unsubscribe the topic code = tmq_unsubscribe(tmq); if (code) { - fprintf(stderr, "Failed to unsubscribe consumer, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code)); + fprintf(stderr, "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n", + topic_name, config.group_id, config.client_id, code, tmq_err2str(code)); } else { fprintf(stderr, "Consumer unsubscribed successfully.\n"); } @@ -484,5 +500,11 @@ int main(int argc, char* argv[]) { thread_stop = 1; pthread_join(thread_id, NULL); + if (drop_topic(pConn) < 0) { + fprintf(stderr, "Failed to drop topic.\n"); + return -1; + } + + deinit_env(pConn); return 0; } diff --git a/docs/examples/c/with_reqid_demo.c b/docs/examples/c/with_reqid_demo.c index c6e87686e9..8942077f67 100644 --- a/docs/examples/c/with_reqid_demo.c +++ b/docs/examples/c/with_reqid_demo.c @@ -33,8 +33,7 @@ static int DemoWithReqId() { // connect TAOS *taos = taos_connect(host, user, password, NULL, port); if (taos == NULL) { - printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), - taos_errstr(NULL)); + fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); taos_cleanup(); return -1; } @@ -45,7 +44,7 @@ static int DemoWithReqId() { TAOS_RES *result = taos_query_with_reqid(taos, sql, reqid); code = taos_errno(result); if (code != 0) { - printf("Failed to execute sql with reqId: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code, taos_errstr(result)); + fprintf(stderr, "Failed to execute sql with reqId: %ld, ErrCode: 0x%x, ErrMessage: %s\n.", reqid, code, taos_errstr(result)); taos_close(taos); taos_cleanup(); return -1; @@ -56,20 +55,16 @@ static int DemoWithReqId() { int num_fields = taos_field_count(result); TAOS_FIELD *fields = taos_fetch_fields(result); - printf("fields: %d\n", num_fields); - printf("sql: %s, result:\n", sql); + fprintf(stdout, "query successfully, got %d fields, the sql is: %s.\n", num_fields, sql); + // fetch the records row by row while ((row = taos_fetch_row(result))) { - char temp[1024] = {0}; - if (taos_print_row(temp, row, fields, num_fields) < 0) { - printf("Failed to print row\n"); - break; - } - printf("%s\n", temp); + // Add your data processing logic here + rows++; } - printf("total rows: %d\n", rows); + fprintf(stdout, "total rows: %d\n", rows); taos_free_result(result); // close & clean diff --git a/docs/examples/csharp/subscribe/Program.cs b/docs/examples/csharp/subscribe/Program.cs index 2ec73aae48..50988d0c5d 100644 --- a/docs/examples/csharp/subscribe/Program.cs +++ b/docs/examples/csharp/subscribe/Program.cs @@ -111,11 +111,12 @@ namespace TMQExample catch (Exception e) { // handle other exceptions - Console.WriteLine($"Failed to create native consumer, " + - $"host: {_host}, " + - $"groupId: {_groupId}, " + - $"clientId: {_clientId}, " + - $"ErrMessage: {e.Message}"); + Console.WriteLine( + $"Failed to create native consumer, " + + $"host: {_host}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrMessage: {e.Message}"); throw; } @@ -163,11 +164,12 @@ namespace TMQExample catch (Exception e) { // handle other exceptions - Console.WriteLine($"Failed to poll data, " + - $"topic: {_topic}, " + - $"groupId: {_groupId}, " + - $"clientId: {_clientId}, " + - $"ErrMessage: {e.Message}"); + Console.WriteLine( + $"Failed to poll data, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrMessage: {e.Message}"); throw; } // ANCHOR_END: subscribe @@ -186,13 +188,14 @@ namespace TMQExample { consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0)); } + Console.WriteLine("Assignment seek to beginning successfully"); } catch (TDengineError e) { // handle TDengine error Console.WriteLine( - $"Failed to execute seek example, " + + $"Failed to seek offset, " + $"topic: {_topic}, " + $"groupId: {_groupId}, " + $"clientId: {_clientId}, " + @@ -204,12 +207,13 @@ namespace TMQExample catch (Exception e) { // handle other exceptions - Console.WriteLine($"Failed to execute seek example, " + - $"topic: {_topic}, " + - $"groupId: {_groupId}, " + - $"clientId: {_clientId}, " + - $"offset: 0, " + - $"ErrMessage: {e.Message}"); + Console.WriteLine( + $"Failed to seek offset, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"offset: 0, " + + $"ErrMessage: {e.Message}"); throw; } // ANCHOR_END: seek @@ -240,7 +244,7 @@ namespace TMQExample { // handle TDengine error Console.WriteLine( - $"Failed to execute commit example, " + + $"Failed to commit offset, " + $"topic: {_topic}, " + $"groupId: {_groupId}, " + $"clientId: {_clientId}, " + @@ -253,7 +257,7 @@ namespace TMQExample { // handle other exceptions Console.WriteLine( - $"Failed to execute commit example, " + + $"Failed to commit offset, " + $"topic: {_topic}, " + $"groupId: {_groupId}, " + $"clientId: {_clientId}, " + diff --git a/docs/examples/csharp/wssubscribe/Program.cs b/docs/examples/csharp/wssubscribe/Program.cs index 5f5afc575a..939189cabd 100644 --- a/docs/examples/csharp/wssubscribe/Program.cs +++ b/docs/examples/csharp/wssubscribe/Program.cs @@ -116,11 +116,12 @@ namespace TMQExample catch (Exception e) { // handle other exceptions - Console.WriteLine($"Failed to create native consumer, " + - $"host: {_host}, " + - $"groupId: {_groupId}, " + - $"clientId: {_clientId}, " + - $"ErrMessage: {e.Message}"); + Console.WriteLine( + $"Failed to create native consumer, " + + $"host: {_host}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrMessage: {e.Message}"); throw; } @@ -191,13 +192,14 @@ namespace TMQExample { consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0)); } + Console.WriteLine("Assignment seek to beginning successfully"); } catch (TDengineError e) { // handle TDengine error Console.WriteLine( - $"Failed to execute seek example, " + + $"Failed to seek offset, " + $"topic: {_topic}, " + $"groupId: {_groupId}, " + $"clientId: {_clientId}, " + @@ -209,12 +211,13 @@ namespace TMQExample catch (Exception e) { // handle other exceptions - Console.WriteLine($"Failed to execute seek example, " + - $"topic: {_topic}, " + - $"groupId: {_groupId}, " + - $"clientId: {_clientId}, " + - $"offset: 0, " + - $"ErrMessage: {e.Message}"); + Console.WriteLine( + $"Failed to seek offset, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"offset: 0, " + + $"ErrMessage: {e.Message}"); throw; } // ANCHOR_END: seek @@ -245,7 +248,7 @@ namespace TMQExample { // handle TDengine error Console.WriteLine( - $"Failed to execute commit example, " + + $"Failed to commit offset, " + $"topic: {_topic}, " + $"groupId: {_groupId}, " + $"clientId: {_clientId}, " + @@ -258,7 +261,7 @@ namespace TMQExample { // handle other exceptions Console.WriteLine( - $"Failed to execute commit example, " + + $"Failed to commit offset, " + $"topic: {_topic}, " + $"groupId: {_groupId}, " + $"clientId: {_clientId}, " + diff --git a/docs/examples/go/tmq/native/main.go b/docs/examples/go/tmq/native/main.go index 1d4a22b880..8d667abc18 100644 --- a/docs/examples/go/tmq/native/main.go +++ b/docs/examples/go/tmq/native/main.go @@ -112,7 +112,7 @@ func main() { }, 0) if err != nil { log.Fatalf( - "Failed to execute seek example, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n", + "Failed to execute seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n", topic, groupID, clientID, diff --git a/docs/examples/go/tmq/ws/main.go b/docs/examples/go/tmq/ws/main.go index aaed8395e0..9ea4d72b39 100644 --- a/docs/examples/go/tmq/ws/main.go +++ b/docs/examples/go/tmq/ws/main.go @@ -130,7 +130,7 @@ func main() { }, 0) if err != nil { log.Fatalf( - "Failed to execute seek example, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n", + "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n", topic, groupID, clientID,