diff --git a/docs/examples/c/tmq_demo.c b/docs/examples/c/tmq_demo.c index d3285cb2d0..e7a2425c7d 100644 --- a/docs/examples/c/tmq_demo.c +++ b/docs/examples/c/tmq_demo.c @@ -28,6 +28,18 @@ volatile int thread_stop = 0; static int running = 1; const char* topic_name = "topic_meters"; +typedef struct { + const char* enable_auto_commit; + const char* auto_commit_interval_ms; + const char* group_id; + const char* client_id; + const char* td_connect_host; + const char* td_connect_port; + const char* td_connect_user; + const char* td_connect_pass; + const char* auto_offset_reset; +} ConsumerConfig; + void* prepare_data(void* arg) { const char *host = "localhost"; const char *user = "root"; @@ -205,7 +217,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { } // ANCHOR: create_consumer_1 -tmq_t* build_consumer() { +tmq_t* build_consumer(const ConsumerConfig* config) { tmq_conf_res_t code; tmq_t* tmq = NULL; @@ -213,37 +225,47 @@ tmq_t* build_consumer() { tmq_conf_t* conf = tmq_conf_new(); // set the configuration parameters - code = tmq_conf_set(conf, "enable.auto.commit", "true"); + code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "group.id", "group1"); + code = tmq_conf_set(conf, "group.id", config->group_id); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "client.id", "client1"); + code = tmq_conf_set(conf, "client.id", config->client_id); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "td.connect.user", "root"); + code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); + code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } - code = tmq_conf_set(conf, "auto.offset.reset", "latest"); + code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; @@ -392,16 +414,29 @@ int main(int argc, char* argv[]) { if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) { fprintf(stderr, "Failed to create thread.\n"); - return 1; - } - - // ANCHOR: create_consumer_2 - tmq_t* tmq = build_consumer(); - if (NULL == tmq) { - fprintf(stderr, "Failed to create consumer.\n"); return -1; } - fprintf(stdout, "Create consumer successfully.\n"); + + ConsumerConfig config = { + .enable_auto_commit = "true", + .auto_commit_interval_ms = "1000", + .group_id = "group1", + .client_id = "client1", + .td_connect_host = "localhost", + .td_connect_port = "6030", + .td_connect_user = "root", + .td_connect_pass = "taosdata", + .auto_offset_reset = "latest" + }; + + // ANCHOR: create_consumer_2 + tmq_t* tmq = build_consumer(&config); + if (NULL == tmq) { + fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n", config.td_connect_host, config.group_id, config.client_id); + return -1; + } else { + fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, , clientId: %s.\n", config.td_connect_host, config.group_id, config.client_id); + } // ANCHOR_END: create_consumer_2