boxmoe_header_banner_img

Hello! 欢迎来到悠悠畅享网!

文章导读

处理 Kafka 批量监听器反序列化错误的重试机制


avatar
作者 2025年9月1日 10

处理 Kafka 批量监听器反序列化错误的重试机制

本文将介绍如何在 kafka 批量监听器中配置和实现反序列化错误的重试机制。如摘要所述,默认情况下,DeserializationException 被认为是致命错误,不会进行重试。但是,通过适当的配置和代码实现,我们可以改变这种行为,使 Kafka 能够在遇到反序列化错误时自动重试,从而提高系统的健壮性。

移除默认的致命异常

spring Kafka 提供了 DefaultErrorHandler 类来处理 Kafka 监听器中的异常。默认情况下,DeserializationException 被包含在不会重试的异常列表中。要启用反序列化错误的重试,首先需要从这个列表中移除 DeserializationException。

@org.springframework.context.annotation.Configuration @EnableKafka public class Configuration {     @Bean("myContainerFactory")      public ConcurrentKafkaListenerContainerFactory<String, String> createFactory(              KafkaProperties properties     ) {         var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();         factory.setConsumerFactory(                 new DefaultKafkaConsumerFactory(                         properties.buildConsumerProperties(),                         new StringDeserializer(),                         new ErrorHandlingDeserializer(new MyDeserializer())                 )         );         factory.getContainerProperties().setAckMode(                 ContainerProperties.AckMode.MANUAL_IMMEDIATE         );         DefaultErrorHandler errorHandler = new DefaultErrorHandler();         errorHandler.removeClassification(DeserializationException.class);         factory.setCommonErrorHandler(errorHandler);         return factory;     }      // this fakes occasional errors which succeed after a retry     static class MyDeserializer implements Deserializer<String> {         int retries = 0;         @Override         public String deserialize(String topic, byte[] bytes) {             String s = new String(bytes);             if (s.contains("7") && retries == 0) {                 retries = 1;                 throw new RuntimeException();             }             retries = 0;             return s;         }     } }

在上面的代码中,我们创建了一个 DefaultErrorHandler 实例,并使用 removeClassification(DeserializationException.class) 方法从不会重试的异常列表中移除了 DeserializationException。然后,我们将这个配置好的 errorHandler 设置到 ConcurrentKafkaListenerContainerFactory 中。

批量监听器中的异常处理

对于批量监听器,当发生反序列化错误时,需要获取到具体的异常信息,并将其重新抛出,才能触发重试机制。可以通过以下两种方式获取异常信息:

  1. Consume List<Message<String>>: 如果监听器消费的是 List<Message<String>>,则可以通过 Message 对象的 header 获取异常信息。 使用 SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER 获取header中的异常信息。

  2. 使用 @Header: 可以在监听器方法中添加一个额外的参数,并使用 @Header 注解来获取异常信息。

   @Component    public class StringListener {        @KafkaListener(                topics = {"string-test"},                groupId = "test",                batch = "true",                containerFactory = "myContainerFactory"        )        public void listen(List<Message<String>> messages, Acknowledgment acknowledgment) {            for (Message<String> message: messages) {                try {                    String s = message.getPayload();                    System.out.println(s);                } catch (Exception e) {                    // 获取反序列化异常                    byte[] exceptionBytes = (byte[]) message.getHeaders().get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);                    DeserializationException deserializationException = byteArrayToDeserializationException(exceptionBytes);                     // 重新抛出异常,触发重试                    throw new ListenerExecutionFailedException("Deserialization failed", deserializationException);                }            }            acknowledgment.acknowledge();        }         private DeserializationException byteArrayToDeserializationException(byte[] bytes) {             ByteArrayInputStream bais = new ByteArrayInputStream(bytes);             ObjectInputStream ois;             try {                 ois = new ObjectInputStream(bais);                 return (DeserializationException) ois.readObject();             } catch (IOException | ClassNotFoundException e) {                 throw new RuntimeException("Failed to deserialize exception from byte array", e);             }         }    }

注意事项:

  • 如果使用@Header方法获取异常,需要确保header中存在SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER。
  • 需要手动将byte数组反序列化为DeserializationException对象。

总结

通过以上步骤,我们可以配置 Kafka 批量监听器,使其在遇到反序列化错误时自动重试。首先,需要从 DefaultErrorHandler 的默认致命异常列表中移除 DeserializationException。然后,在批量监听器中,需要捕获异常,获取异常信息,并将其重新抛出,才能触发重试机制。这种方法可以有效地处理间歇性的反序列化问题,提高 Kafka 消费的稳定性和可靠性。需要注意的是,可以配置重试次数和重试间隔,以避免无限重试。



评论(已关闭)

评论已关闭