improve log
This commit is contained in:
parent
0eba0ff308
commit
edcc5f5b67
|
@ -61,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
// ANCHOR: consume
|
// ANCHOR: consume
|
||||||
match consumer.subscribe(["topic_meters"]).await{
|
match consumer.subscribe(["topic_meters"]).await{
|
||||||
Ok(_) => println!("subscribe topics successfully."),
|
Ok(_) => println!("Subscribe topics successfully."),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Failed to subscribe topic_meters, dsn: {}; ErrMessage: {}", dsn, err);
|
eprintln!("Failed to subscribe topic_meters, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
return Err(err.into());
|
return Err(err.into());
|
||||||
|
@ -123,7 +123,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
// commit offset manually when you have processed the message.
|
// commit offset manually when you have processed the message.
|
||||||
match consumer.commit(offset).await{
|
match consumer.commit(offset).await{
|
||||||
Ok(_) => println!("commit offset manually successfully."),
|
Ok(_) => println!("Commit offset manually successfully."),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Failed to commit offset manually, dsn: {}; ErrMessage: {}", dsn, err);
|
eprintln!("Failed to commit offset manually, dsn: {}; ErrMessage: {}", dsn, err);
|
||||||
return Err(err.into());
|
return Err(err.into());
|
||||||
|
@ -140,7 +140,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
// ANCHOR: seek_offset
|
// ANCHOR: seek_offset
|
||||||
let assignments = consumer.assignments().await.unwrap();
|
let assignments = consumer.assignments().await.unwrap();
|
||||||
println!("assignments: {:?}", assignments);
|
println!("Now assignments: {:?}", assignments);
|
||||||
|
|
||||||
// seek offset
|
// seek offset
|
||||||
for topic_vec_assignment in assignments {
|
for topic_vec_assignment in assignments {
|
||||||
|
@ -163,23 +163,24 @@ async fn main() -> anyhow::Result<()> {
|
||||||
match consumer.offset_seek(topic, vgroup_id, begin).await{
|
match consumer.offset_seek(topic, vgroup_id, begin).await{
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("seek example failed; ErrMessage: {}", err);
|
eprintln!("Seek example failed; ErrMessage: {}", err);
|
||||||
return Err(err.into());
|
return Err(err.into());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let topic_assignment = consumer.topic_assignment(topic).await;
|
let topic_assignment = consumer.topic_assignment(topic).await;
|
||||||
println!("topic assignment: {:?}", topic_assignment);
|
println!("Topic assignment: {:?}", topic_assignment);
|
||||||
}
|
}
|
||||||
println!("assignment seek to beginning successfully.");
|
println!("Assignment seek to beginning successfully.");
|
||||||
// after seek offset
|
// after seek offset
|
||||||
let assignments = consumer.assignments().await.unwrap();
|
let assignments = consumer.assignments().await.unwrap();
|
||||||
println!("after seek offset assignments: {:?}", assignments);
|
println!("After seek offset assignments: {:?}", assignments);
|
||||||
// ANCHOR_END: seek_offset
|
// ANCHOR_END: seek_offset
|
||||||
|
|
||||||
// ANCHOR: unsubscribe
|
// ANCHOR: unsubscribe
|
||||||
consumer.unsubscribe().await;
|
consumer.unsubscribe().await;
|
||||||
|
println!("Consumer unsubscribed successfully.");
|
||||||
// ANCHOR_END: unsubscribe
|
// ANCHOR_END: unsubscribe
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
|
|
@ -172,15 +172,15 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let topic_assignment = consumer.topic_assignment(topic).await;
|
let topic_assignment = consumer.topic_assignment(topic).await;
|
||||||
println!("topic assignment: {:?}", topic_assignment);
|
println!("topic assignment: {:?}", topic_assignment);
|
||||||
}
|
}
|
||||||
println!("assignment seek to beginning successfully.");
|
println!("Assignment seek to beginning successfully.");
|
||||||
// after seek offset
|
// after seek offset
|
||||||
let assignments = consumer.assignments().await.unwrap();
|
let assignments = consumer.assignments().await.unwrap();
|
||||||
println!("after seek offset assignments: {:?}", assignments);
|
println!("After seek offset assignments: {:?}", assignments);
|
||||||
// ANCHOR_END: seek_offset
|
// ANCHOR_END: seek_offset
|
||||||
|
|
||||||
// ANCHOR: unsubscribe
|
// ANCHOR: unsubscribe
|
||||||
consumer.unsubscribe().await;
|
consumer.unsubscribe().await;
|
||||||
println!("consumer unsubscribed successfully.");
|
println!("Consumer unsubscribed successfully.");
|
||||||
// ANCHOR_END: unsubscribe
|
// ANCHOR_END: unsubscribe
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class ConsumerLoopFull {
|
||||||
|
|
||||||
// subscribe to the topics
|
// subscribe to the topics
|
||||||
consumer.subscribe(topics);
|
consumer.subscribe(topics);
|
||||||
System.out.println("subscribe topics successfully.");
|
System.out.println("Subscribe topics successfully.");
|
||||||
for (int i = 0; i < 50; i++) {
|
for (int i = 0; i < 50; i++) {
|
||||||
// poll data
|
// poll data
|
||||||
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
||||||
|
@ -88,9 +88,9 @@ public class ConsumerLoopFull {
|
||||||
|
|
||||||
// subscribe to the topics
|
// subscribe to the topics
|
||||||
consumer.subscribe(topics);
|
consumer.subscribe(topics);
|
||||||
System.out.println("subscribe topics successfully.");
|
System.out.println("Subscribe topics successfully.");
|
||||||
Set<TopicPartition> assignment = consumer.assignment();
|
Set<TopicPartition> assignment = consumer.assignment();
|
||||||
System.out.println("now assignment: " + JSON.toJSONString(assignment));
|
System.out.println("Now assignment: " + JSON.toJSONString(assignment));
|
||||||
|
|
||||||
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
|
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
|
||||||
// make sure we have got some data
|
// make sure we have got some data
|
||||||
|
@ -99,13 +99,13 @@ public class ConsumerLoopFull {
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer.seekToBeginning(assignment);
|
consumer.seekToBeginning(assignment);
|
||||||
System.out.println("assignment seek to beginning successfully.");
|
System.out.println("Assignment seek to beginning successfully.");
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("seek example failed", ex);
|
throw new SQLException("seek example failed", ex);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
System.out.println("seek example failed; ErrMessage: " + ex.getMessage());
|
System.out.println("Seek example failed; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("seek example failed", ex);
|
throw new SQLException("seek example failed", ex);
|
||||||
}
|
}
|
||||||
// ANCHOR_END: consumer_seek
|
// ANCHOR_END: consumer_seek
|
||||||
|
@ -128,7 +128,7 @@ public class ConsumerLoopFull {
|
||||||
if (!records.isEmpty()) {
|
if (!records.isEmpty()) {
|
||||||
// after processing the data, commit the offset manually
|
// after processing the data, commit the offset manually
|
||||||
consumer.commitSync();
|
consumer.commitSync();
|
||||||
System.out.println("commit offset manually successfully.");
|
System.out.println("Commit offset manually successfully.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
|
@ -149,7 +149,7 @@ public class ConsumerLoopFull {
|
||||||
try {
|
try {
|
||||||
// unsubscribe the consumer
|
// unsubscribe the consumer
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
System.out.println("consumer unsubscribed successfully.");
|
System.out.println("Consumer unsubscribed successfully.");
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
|
@ -161,7 +161,7 @@ public class ConsumerLoopFull {
|
||||||
finally {
|
finally {
|
||||||
// close the consumer
|
// close the consumer
|
||||||
consumer.close();
|
consumer.close();
|
||||||
System.out.println("consumer closed successfully.");
|
System.out.println("Consumer closed successfully.");
|
||||||
}
|
}
|
||||||
// ANCHOR_END: unsubscribe_data_code_piece
|
// ANCHOR_END: unsubscribe_data_code_piece
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class WsConsumerLoopFull {
|
||||||
|
|
||||||
// subscribe to the topics
|
// subscribe to the topics
|
||||||
consumer.subscribe(topics);
|
consumer.subscribe(topics);
|
||||||
System.out.println("subscribe topics successfully.");
|
System.out.println("Subscribe topics successfully.");
|
||||||
for (int i = 0; i < 50; i++) {
|
for (int i = 0; i < 50; i++) {
|
||||||
// poll data
|
// poll data
|
||||||
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
||||||
|
@ -86,9 +86,9 @@ public class WsConsumerLoopFull {
|
||||||
|
|
||||||
// subscribe to the topics
|
// subscribe to the topics
|
||||||
consumer.subscribe(topics);
|
consumer.subscribe(topics);
|
||||||
System.out.println("subscribe topics successfully.");
|
System.out.println("Subscribe topics successfully.");
|
||||||
Set<TopicPartition> assignment = consumer.assignment();
|
Set<TopicPartition> assignment = consumer.assignment();
|
||||||
System.out.println("now assignment: " + JSON.toJSONString(assignment));
|
System.out.println("Now assignment: " + JSON.toJSONString(assignment));
|
||||||
|
|
||||||
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
|
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
|
||||||
// make sure we have got some data
|
// make sure we have got some data
|
||||||
|
@ -97,13 +97,13 @@ public class WsConsumerLoopFull {
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer.seekToBeginning(assignment);
|
consumer.seekToBeginning(assignment);
|
||||||
System.out.println("assignment seek to beginning successfully.");
|
System.out.println("Assignment seek to beginning successfully.");
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("seek example failed", ex);
|
throw new SQLException("seek example failed", ex);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
System.out.println("seek example failed; ErrMessage: " + ex.getMessage());
|
System.out.println("Seek example failed; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("seek example failed", ex);
|
throw new SQLException("seek example failed", ex);
|
||||||
}
|
}
|
||||||
// ANCHOR_END: consumer_seek
|
// ANCHOR_END: consumer_seek
|
||||||
|
@ -126,7 +126,7 @@ public class WsConsumerLoopFull {
|
||||||
if (!records.isEmpty()) {
|
if (!records.isEmpty()) {
|
||||||
// after processing the data, commit the offset manually
|
// after processing the data, commit the offset manually
|
||||||
consumer.commitSync();
|
consumer.commitSync();
|
||||||
System.out.println("commit offset manually successfully.");
|
System.out.println("Commit offset manually successfully.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
|
@ -147,7 +147,7 @@ public class WsConsumerLoopFull {
|
||||||
try {
|
try {
|
||||||
// unsubscribe the consumer
|
// unsubscribe the consumer
|
||||||
consumer.unsubscribe();
|
consumer.unsubscribe();
|
||||||
System.out.println("consumer unsubscribed successfully.");
|
System.out.println("Consumer unsubscribed successfully.");
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
|
@ -159,7 +159,7 @@ public class WsConsumerLoopFull {
|
||||||
finally {
|
finally {
|
||||||
// close the consumer
|
// close the consumer
|
||||||
consumer.close();
|
consumer.close();
|
||||||
System.out.println("consumer closed successfully.");
|
System.out.println("Consumer closed successfully.");
|
||||||
}
|
}
|
||||||
// ANCHOR_END: unsubscribe_data_code_piece
|
// ANCHOR_END: unsubscribe_data_code_piece
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue