From 34787a8211a220e50016a67101a59781635d32cf Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Mon, 12 Aug 2024 20:54:11 +0800 Subject: [PATCH 1/2] mod c tmq sample code --- docs/examples/c/tmq_demo.c | 67 +++++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 16 deletions(-) diff --git a/docs/examples/c/tmq_demo.c b/docs/examples/c/tmq_demo.c index d3285cb2d0..e7a2425c7d 100644 --- a/docs/examples/c/tmq_demo.c +++ b/docs/examples/c/tmq_demo.c @@ -28,6 +28,18 @@ volatile int thread_stop = 0; static int running = 1; const char* topic_name = "topic_meters"; +typedef struct { + const char* enable_auto_commit; + const char* auto_commit_interval_ms; + const char* group_id; + const char* client_id; + const char* td_connect_host; + const char* td_connect_port; + const char* td_connect_user; + const char* td_connect_pass; + const char* auto_offset_reset; +} ConsumerConfig; + void* prepare_data(void* arg) { const char *host = "localhost"; const char *user = "root"; @@ -205,7 +217,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { } // ANCHOR: create_consumer_1 -tmq_t* build_consumer() { +tmq_t* build_consumer(const ConsumerConfig* config) { tmq_conf_res_t code; tmq_t* tmq = NULL; @@ -213,37 +225,47 @@ tmq_t* build_consumer() { tmq_conf_t* conf = tmq_conf_new(); // set the configuration parameters - code = tmq_conf_set(conf, "enable.auto.commit", "true"); + code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "group.id", "group1"); + code = tmq_conf_set(conf, "group.id", config->group_id); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "client.id", "client1"); + code = tmq_conf_set(conf, "client.id", config->client_id); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "td.connect.user", "root"); + code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); + code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "auto.offset.reset", "latest"); + code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; @@ -392,16 +414,29 @@ int main(int argc, char* argv[]) { if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) { fprintf(stderr, "Failed to create thread.\n"); - return 1; - } - - // ANCHOR: create_consumer_2 - tmq_t* tmq = build_consumer(); - if (NULL == tmq) { - fprintf(stderr, "Failed to create consumer.\n"); return -1; } - fprintf(stdout, "Create consumer successfully.\n"); + + ConsumerConfig config = { + .enable_auto_commit = "true", + .auto_commit_interval_ms = "1000", + .group_id = "group1", + .client_id = "client1", + .td_connect_host = "localhost", + .td_connect_port = "6030", + .td_connect_user = "root", + .td_connect_pass = "taosdata", + .auto_offset_reset = "latest" + }; + + // ANCHOR: create_consumer_2 + tmq_t* tmq = build_consumer(&config); + if (NULL == tmq) { + fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n", config.td_connect_host, config.group_id, config.client_id); + return -1; + } else { + fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, , clientId: %s.\n", config.td_connect_host, config.group_id, config.client_id); + } // ANCHOR_END: create_consumer_2 From c6e7d69bb4ee540063fa9bb0b1a35df5fb6dcd7b Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Mon, 12 Aug 2024 20:56:20 +0800 Subject: [PATCH 2/2] update log --- docs/examples/c/create_db_demo.c | 92 ++++++++++--------- .../node/websocketexample/tmq_example.js | 2 +- .../node/websocketexample/tmq_seek_example.js | 14 +-- docs/examples/python/reqid_native.py | 4 +- docs/examples/python/reqid_rest.py | 2 +- docs/examples/python/reqid_ws.py | 6 +- .../taosdata/example/ConsumerLoopFull.java | 4 +- .../taosdata/example/WsConsumerLoopFull.java | 4 +- 8 files changed, 65 insertions(+), 63 deletions(-) diff --git a/docs/examples/c/create_db_demo.c b/docs/examples/c/create_db_demo.c index ff1f4e62fd..45d4a17c5c 100644 --- a/docs/examples/c/create_db_demo.c +++ b/docs/examples/c/create_db_demo.c @@ -22,55 +22,57 @@ #include #include "taos.h" - static int DemoCreateDB() { -// ANCHOR: create_db_and_table -const char *host = "localhost"; -const char *user = "root"; -const char *password = "taosdata"; -uint16_t port = 6030; -int code = 0; + // ANCHOR: create_db_and_table + const char *host = "localhost"; + const char *user = "root"; + const char *password = "taosdata"; + uint16_t port = 6030; + int code = 0; -// connect -TAOS *taos = taos_connect(host, user, password, NULL, port); -if (taos == NULL) { - printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); - taos_cleanup(); - return -1; -} + // connect + TAOS *taos = taos_connect(host, user, password, NULL, port); + if (taos == NULL) { + printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), + taos_errstr(NULL)); + taos_cleanup(); + return -1; + } -// create database -TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power"); -code = taos_errno(result); -if (code != 0) { - printf("Failed to create database power, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, code, taos_errstr(result)); + // create database + TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power"); + code = taos_errno(result); + if (code != 0) { + printf("Failed to create database power, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, code, + taos_errstr(result)); + taos_close(taos); + taos_cleanup(); + return -1; + } + taos_free_result(result); + printf("Create database power successfully.\n"); + + // create table + const char *sql = + "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId " + "INT, location BINARY(24))"; + result = taos_query(taos, sql); + code = taos_errno(result); + if (code != 0) { + printf("Failed to create stable power.meters, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s\n.", host, port, code, + taos_errstr(result)); + taos_close(taos); + taos_cleanup(); + return -1; + } + taos_free_result(result); + printf("Create stable power.meters successfully.\n"); + + // close & clean taos_close(taos); taos_cleanup(); - return -1; -} -taos_free_result(result); -printf("Create database power successfully.\n"); - -// create table -const char* sql = "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"; -result = taos_query(taos, sql); -code = taos_errno(result); -if (code != 0) { - printf("Failed to create stable power.meters, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s\n.", host, port, code, taos_errstr(result)); - taos_close(taos); - taos_cleanup(); - return -1; -} -taos_free_result(result); -printf("Create stable meters successfully.\n"); - -// close & clean -taos_close(taos); -taos_cleanup(); -return 0; -// ANCHOR_END: create_db_and_table + return 0; + // ANCHOR_END: create_db_and_table } -int main(int argc, char *argv[]) { - return DemoCreateDB(); -} +int main(int argc, char *argv[]) { return DemoCreateDB(); } diff --git a/docs/examples/node/websocketexample/tmq_example.js b/docs/examples/node/websocketexample/tmq_example.js index da0bffc0aa..ff676fa972 100644 --- a/docs/examples/node/websocketexample/tmq_example.js +++ b/docs/examples/node/websocketexample/tmq_example.js @@ -66,7 +66,7 @@ async function subscribe(consumer) { console.log("Commit offset manually successfully."); } } catch (err) { - console.error("Failed to poll data; err.code, ErrCode:" + err.code + "; ErrMessage: " + err.message); + console.error("Failed to poll data; ErrCode:" + err.code + "; ErrMessage: " + err.message); throw err; } // ANCHOR_END: commit diff --git a/docs/examples/node/websocketexample/tmq_seek_example.js b/docs/examples/node/websocketexample/tmq_seek_example.js index ef34cb2d16..1ae729534c 100644 --- a/docs/examples/node/websocketexample/tmq_seek_example.js +++ b/docs/examples/node/websocketexample/tmq_seek_example.js @@ -18,11 +18,11 @@ async function createConsumer() { ]); try { return await taos.tmqConnect(configMap); - }catch (err) { + } catch (err) { console.log(err); throw err; } - + } // ANCHOR_END: create_consumer @@ -33,7 +33,7 @@ async function prepare() { conf.setDb('power'); const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`; const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`; - + let wsSql = await taos.sqlConnect(conf); await wsSql.exec(createDB); await wsSql.exec(createStable); @@ -57,9 +57,9 @@ async function subscribe(consumer) { for (let [key, value] of res) { console.log(key, value); } - } - }catch (err) { - console.error("Failed to poll data; err.code, ErrCode:" + err.code + "; ErrMessage: " + err.message); + } + } catch (err) { + console.error("Failed to poll data; ErrCode:" + err.code + "; ErrMessage: " + err.message); throw err; } @@ -76,7 +76,7 @@ async function test() { let res = new Map(); while (res.size == 0) { res = await consumer.poll(100); - } + } let assignment = await consumer.assignment(); await consumer.seekToBeginning(assignment); diff --git a/docs/examples/python/reqid_native.py b/docs/examples/python/reqid_native.py index 73fc05af53..8f5bb3538c 100644 --- a/docs/examples/python/reqid_native.py +++ b/docs/examples/python/reqid_native.py @@ -1,7 +1,7 @@ import taos conn = None -reqId = 1 +reqId = 3 host="localhost" port=6030 try: @@ -21,4 +21,4 @@ except Exception as err: finally: if conn: - conn.close() \ No newline at end of file + conn.close() diff --git a/docs/examples/python/reqid_rest.py b/docs/examples/python/reqid_rest.py index a4a8999f3a..76ba735234 100644 --- a/docs/examples/python/reqid_rest.py +++ b/docs/examples/python/reqid_rest.py @@ -2,7 +2,7 @@ import taosrest client = None url="http://localhost:6041" -reqId = 1 +reqId = 3 try: client = taosrest.RestClient(url=url, user="root", diff --git a/docs/examples/python/reqid_ws.py b/docs/examples/python/reqid_ws.py index a97410c2e4..691dde710f 100644 --- a/docs/examples/python/reqid_ws.py +++ b/docs/examples/python/reqid_ws.py @@ -1,7 +1,7 @@ import taosws conn = None -reqId = 1 +reqId = 3 host="localhost" port=6041 try: @@ -12,7 +12,7 @@ try: port=port, ) - result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=1) + result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=3) # Get data from result as list of tuple for row in result: print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}") @@ -21,4 +21,4 @@ except Exception as err: print(f"Failed to execute sql with reqId:{reqId}, db addr:{host}:{port} ; ErrMessage:{err}") finally: if conn: - conn.close() \ No newline at end of file + conn.close() diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java index 3c0798d198..34629fb8d2 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java @@ -42,10 +42,10 @@ public class ConsumerLoopFull { return consumer; } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create native consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to create native consumer, host: " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create consumer", ex); } catch (Exception ex) { - System.out.println("Failed to create native consumer, host : " + config.getProperty("bootstrap.servers") + System.out.println("Failed to create native consumer, host: " + config.getProperty("bootstrap.servers") + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create consumer", ex); } diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java index d7207ffe71..241ab2df76 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java @@ -40,10 +40,10 @@ public class WsConsumerLoopFull { return consumer; } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to create websocket consumer, host: " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create consumer", ex); } catch (Exception ex) { - System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + System.out.println("Failed to create websocket consumer, host: " + config.getProperty("bootstrap.servers") + "; ErrMessage: " + ex.getMessage()); throw new SQLException("Failed to create consumer", ex); }