Merge branch 'docs/sheyj-3.0' of github.com:taosdata/TDengine into docs/sheyj-3.0
This commit is contained in:
commit
cf53c3de96
|
@ -28,6 +28,18 @@ volatile int thread_stop = 0;
|
|||
static int running = 1;
|
||||
const char* topic_name = "topic_meters";
|
||||
|
||||
typedef struct {
|
||||
const char* enable_auto_commit;
|
||||
const char* auto_commit_interval_ms;
|
||||
const char* group_id;
|
||||
const char* client_id;
|
||||
const char* td_connect_host;
|
||||
const char* td_connect_port;
|
||||
const char* td_connect_user;
|
||||
const char* td_connect_pass;
|
||||
const char* auto_offset_reset;
|
||||
} ConsumerConfig;
|
||||
|
||||
void* prepare_data(void* arg) {
|
||||
const char *host = "localhost";
|
||||
const char *user = "root";
|
||||
|
@ -205,7 +217,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
|||
}
|
||||
|
||||
// ANCHOR: create_consumer_1
|
||||
tmq_t* build_consumer() {
|
||||
tmq_t* build_consumer(const ConsumerConfig* config) {
|
||||
tmq_conf_res_t code;
|
||||
tmq_t* tmq = NULL;
|
||||
|
||||
|
@ -213,37 +225,47 @@ tmq_t* build_consumer() {
|
|||
tmq_conf_t* conf = tmq_conf_new();
|
||||
|
||||
// set the configuration parameters
|
||||
code = tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "group.id", "group1");
|
||||
code = tmq_conf_set(conf, "group.id", config->group_id);
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "client.id", "client1");
|
||||
code = tmq_conf_set(conf, "client.id", config->client_id);
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.user", "root");
|
||||
code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host);
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port);
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||
code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user);
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass);
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
|
@ -392,16 +414,29 @@ int main(int argc, char* argv[]) {
|
|||
|
||||
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
|
||||
fprintf(stderr, "Failed to create thread.\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
// ANCHOR: create_consumer_2
|
||||
tmq_t* tmq = build_consumer();
|
||||
if (NULL == tmq) {
|
||||
fprintf(stderr, "Failed to create consumer.\n");
|
||||
return -1;
|
||||
}
|
||||
fprintf(stdout, "Create consumer successfully.\n");
|
||||
|
||||
ConsumerConfig config = {
|
||||
.enable_auto_commit = "true",
|
||||
.auto_commit_interval_ms = "1000",
|
||||
.group_id = "group1",
|
||||
.client_id = "client1",
|
||||
.td_connect_host = "localhost",
|
||||
.td_connect_port = "6030",
|
||||
.td_connect_user = "root",
|
||||
.td_connect_pass = "taosdata",
|
||||
.auto_offset_reset = "latest"
|
||||
};
|
||||
|
||||
// ANCHOR: create_consumer_2
|
||||
tmq_t* tmq = build_consumer(&config);
|
||||
if (NULL == tmq) {
|
||||
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n", config.td_connect_host, config.group_id, config.client_id);
|
||||
return -1;
|
||||
} else {
|
||||
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, , clientId: %s.\n", config.td_connect_host, config.group_id, config.client_id);
|
||||
}
|
||||
|
||||
// ANCHOR_END: create_consumer_2
|
||||
|
||||
|
|
Loading…
Reference in New Issue