Kafka应用点滴(二)——Springboot集成Kafka之三种手工提交的区别

         当spring.kafka.consumer.enable-auto-commit设置为false,并且

          spring.kafka.listener.ack-mode=MANUAL/MANUAL_IMMEDIATE

        时,我们有3种手工提交方式:consumer.commitAsync()、consumer.commitSync()以及acknowledgment.acknowledge()。这3种方式有什么区别呢?

  1. commitAsync()与commitSync()的区别:
  • commitSync

          This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown to the caller).

      同步提交,是一种阻塞式方法,只有在当前消息提交成功或者遇到不可恢复的的错误时,才会继续下一步操作,eg:

1while (true) 2{ 3ConsumerRecords<String, String> records = consumer.poll(100); 4for (ConsumerRecord<String, String> record : records) 5 { 6 System.out.printf("offset = %d, key = %s, value = %s", record.offset(),record.key(), record.value()); consumer.commitSync(); 7 } 8} 9
  • commitAsync

         This is an asynchronous call and will not block. Any errors encountered are either passed to the callback (if provided) or discarded.

       异步提交,是一种非阻塞式方法,即无需等待当前消息是否提交成功或者失败,就会继续下一步操作。对于失败或者成功后续的处理,可以在定义的回调函数中处理commitAsync(callback)

eg:

1while (true) 2{ 3 ConsumerRecords<String, String> records = consumer.poll(100); 4 for (ConsumerRecord<String, String> record : records) 5 { 6 System.out.printf("offset = %d, key = %s, value = %s", record.offset(),record.key(), record.value()); 7 consumer.commitAsync(callback); 8 } 9} 10
  1. acknowledge():

acknowledge()是org.springframework.kafka.support.Acknowledgment的一个方法,也就是说其实际是spring-kafka提供的封装方法,看源码:

 

 

       看到这里,其实我们发现在ack-mode为MANUAL_IMMEDIATE时,实际还是调用的commitSync或者commitAsync。另外其他模式则加入到队列或者计算偏移量,以待异步处理。

 

代码交流 2021