c language sample tmq program optimization

This commit is contained in:
Yaming Pei 2024-08-16 20:11:18 +08:00
parent 177112b659
commit 5011a05e11
1 changed files with 14 additions and 9 deletions

View File

@ -314,7 +314,8 @@ tmq_list_t* build_topic_list() {
if (code) {
// if failed, destroy the list and return NULL
tmq_list_destroy(topicList);
fprintf(stderr, "Failed to create topic_list, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code));
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return NULL;
}
// if success, return the list
@ -347,7 +348,7 @@ void basic_consume_loop(tmq_t* tmq) {
}
// print the result: total messages and total rows consumed
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
// ANCHOR_END: basic_consume_loop
@ -359,7 +360,8 @@ void consume_repeatly(tmq_t* tmq) {
// get the topic assignment
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code));
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return;
}
@ -419,7 +421,7 @@ void manual_commit(tmq_t* tmq) {
}
// print the result: total messages and total rows consumed
fprintf(stderr, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
// ANCHOR_END: manual_commit
@ -459,12 +461,14 @@ int main(int argc, char* argv[]) {
// ANCHOR: subscribe_3
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list.\n");
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n",
topic_name, config.group_id, config.client_id);
return -1;
}
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "Failed to subscribe topic_list, ErrCode: 0x%x, ErrMessage: %s.\n", code, tmq_err2str(code));
fprintf(stderr, "Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
@ -485,15 +489,16 @@ int main(int argc, char* argv[]) {
fprintf(stderr, "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stderr, "Consumer unsubscribed successfully.\n");
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer: %s.\n", tmq_err2str(code));
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stderr, "Consumer closed successfully.\n");
fprintf(stdout, "Consumer closed successfully.\n");
}
// ANCHOR_END: unsubscribe_and_close