6.异常处理
文章目录
- 6.异常处理
- 1.异常分类与处理原则
- 2.生产者异常处理
- 1. 同步发送捕获异常
- 2. 异步发送回调处理
- 3.消费者异常处理
- 1.全局异常处理器
- 2.方法级处理
- 3.重试yml配置
- 4.死信队列(DLQ)配置
- 1. 启用死信队列
- 2. 手动发送到DLQ
- 5.事务场景异常处理
- 1. 声明式事务
- 2. 事务异常回滚
- 6.监控与告警
- 1. Actuator 健康检查
- 2. Prometheus 指标
- 7.完整异常处理流程
- 8.最佳实践总结
来源参考的deepseek,如有侵权联系立删
1.异常分类与处理原则
异常类型 | 典型场景 | 处理建议 |
---|---|---|
可恢复异常 | 网络抖动、数据库锁冲突 | 重试机制(有限次数 + 退避策略) |
不可恢复异常 | 消息格式错误、权限不足 | 直接记录日志并进入死信队列 |
事务异常 | 事务超时、生产者ID冲突 | 终止事务并回滚操作 |
2.生产者异常处理
1. 同步发送捕获异常
public void sendSync(String topic, String message) {
try {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.get(5, TimeUnit.SECONDS); // 阻塞等待结果
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// 记录日志并触发补偿逻辑
log.error("消息发送失败: {}", e.getMessage());
throw new BusinessException("消息发送失败", e);
}
}
2. 异步发送回调处理
public void sendAsync(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(
result -> {
// 发送成功处理
log.info("消息发送成功: topic={}", result.getRecordMetadata().topic());
},
ex -> {
// 发送失败处理
log.error("消息发送失败", ex);
if (ex instanceof RetriableException) {
// 可重试异常(如网络问题)
retrySend(topic, message);
} else {
// 不可重试异常(如消息过大)
deadLetterService.saveToDlq(topic, message);
}
}
);
}
3.消费者异常处理
1.全局异常处理器
@Configuration
public class KafkaGlobalErrorConfig {
// 定义全局错误处理器(支持批量/单消息模式)
@Bean
public CommonErrorHandler globalErrorHandler(KafkaTemplate<String, Object> template) {
// 重试策略:3次重试,间隔5秒
DefaultErrorHandler handler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(template), // 死信队列恢复器
new FixedBackOff(5000L, 3)
);
// 指定可重试异常类型
handler.addRetryableExceptions(NetworkException.class);
handler.addNotRetryableExceptions(SerializationException.class);
// 偏移量提交策略
handler.setCommitRecovered(true);
return handler;
}
// 容器工厂绑定全局处理器
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
CommonErrorHandler globalErrorHandler) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(globalErrorHandler);
return factory;
}
}
2.方法级处理
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DeserializationException;
@Slf4j
@Configuration
public class KafkaExceptionConfig {
/**
* 自定义异常处理器
*/
@Bean
public ConsumerAwareListenerErrorHandler orderErrorHandler() {
return (message, exception, consumer) -> {
// 业务相关错误处理(如库存不足)
/* if (exception instanceof InventoryException) {
retryService.scheduleRetry(message.getPayload());
}*/
System.out.println("异常执行:"+exception);
return null;
};
}
/**
* 注册全局异常处理器
*/
@Bean
public ConsumerAwareListenerErrorHandler globalExceptionHandler() {
return (message, exception, consumer) -> {
log.error("捕获消费异常: topic={}, message={}",
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
message.getPayload(),
exception);
// 反序列化异常特殊处理
if (exception.getCause() instanceof DeserializationException) {
// 跳过错消息并提交偏移量
return null;
}
throw exception; // 其他异常继续抛出
};
}
}
@KafkaListener(topics = "test", groupId = "spring-group",errorHandler = "globalExceptionHandler")
public void listenBatch(List<String> messages, Acknowledgment ack) {
messages.forEach(msg -> System.out.println("批量消息:" + msg));
//异常测试
int i = 1/0;
ack.acknowledge();
}
3.重试yml配置
spring:
kafka:
listener:
retry:
max-attempts: 3 # 最大重试次数
backoff:
initial-interval: 1000 # 初始间隔(毫秒)
multiplier: 2.0 # 间隔倍数
exclude-exceptions: # 不重试的异常
- javax.validation.ValidationException
4.死信队列(DLQ)配置
1. 启用死信队列
spring:
kafka:
listener:
dead-letter-publish:
enable: true # 自动发布到死信队列
dead-letter-topic: dlq-${topic} # 死信队列命名规则
2. 手动发送到DLQ
@KafkaListener(topics = "payments")
public void handlePayment(PaymentEvent event, Acknowledgment ack) {
try {
paymentService.process(event);
ack.acknowledge();
} catch (InvalidPaymentException ex) {
// 手动发送到DLQ
kafkaTemplate.send("dlq-payments", event);
ack.acknowledge(); // 避免重复消费
}
}
5.事务场景异常处理
1. 声明式事务
@Transactional
public void processWithTransaction(Order order) {
// 数据库操作
orderRepository.save(order);
// Kafka事务消息
kafkaTemplate.send("orders", order.toEvent());
// 其他业务...
}
2. 事务异常回滚
@Bean
public KafkaTransactionManager<String, Object> transactionManager(ProducerFactory<String, Object> pf) {
return new KafkaTransactionManager<>(pf);
}
@Transactional(rollbackFor = {KafkaException.class, SQLException.class})
public void transactionalProcess() {
// 数据库与Kafka操作
}
6.监控与告警
1. Actuator 健康检查
management:
endpoints:
web:
exposure:
include: health,kafka
health:
kafka:
enabled: true
2. Prometheus 指标
@Bean
public MicrometerConsumerListener<K, V> consumerMetrics() {
return new MicrometerConsumerListener<>("kafka.consumer");
}
@Bean
public MicrometerProducerListener<K, V> producerMetrics() {
return new MicrometerProducerListener<>("kafka.producer");
}
7.完整异常处理流程
- 捕获异常 → 2. 分类判断 → 3. 重试/记录/DLQ → 4. 提交Offset → 5. 监控告警
8.最佳实践总结
- 分层处理:全局处理器兜底 + 方法级精细控制
- 幂等消费:确保消息重复消费时的数据安全性
- 监控覆盖:跟踪重试次数、DLQ堆积等关键指标
- 事务隔离:
@Transactional
+read_committed
保证数据一致性