RabbitMQ-从入门到生产落地

张开发
2026/5/17 18:36:02 15 分钟阅读
RabbitMQ-从入门到生产落地
一、什么是 RabbitMQ为什么我们需要消息队列RabbitMQ 是一个开源的、基于 AMQP 协议的、高性能的消息队列中间件。它是目前 Java 生态中最流行的消息队列之一被广泛应用于各大互联网公司。消息队列的核心作用简单来说消息队列就是一个 缓冲区用来在不同的服务之间传递消息。它主要解决以下四个问题解耦服务之间不需要直接调用只需要发送消息到队列降低了服务之间的耦合度异步将非核心业务逻辑异步处理提升系统的响应速度削峰在流量高峰期将请求缓存到队列中后端服务按照自己的处理能力消费避免系统被打垮广播一个消息可以被多个消费者同时消费实现服务之间的广播通信举个最常见的例子用户下单流程没有消息队列时下单流程是这样的plaintext用户下单 → 扣减库存 → 生成订单 → 发送短信通知 → 发送邮件通知 → 返回成功整个流程需要同步执行用户需要等待所有步骤完成才能看到结果。如果短信服务挂了整个下单流程都会失败。有了消息队列后下单流程变成了这样plaintext用户下单 → 扣减库存 → 生成订单 → 发送订单创建成功消息到队列 → 返回成功 ↓ 短信服务消费消息 邮件服务消费消息 物流服务消费消息用户只需要等待核心业务完成就能看到结果非核心业务由消息队列异步处理。即使短信服务挂了也不会影响下单流程消息会在队列中等待直到短信服务恢复。二、RabbitMQ 的核心概念与工作原理要真正用好 RabbitMQ必须先搞懂它的核心概念和工作原理。RabbitMQ 的整体架构plaintext┌─────────────────────────────────────────────────────────┐ │ Producer生产者 │ └───────────────────────────┬─────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ RabbitMQ Server │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Exchange │───▶│ Queue │───▶│ Consumer │ │ │ │ 交换机 │ │ 队列 │ │ 消费者 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────┘核心组件详解Producer生产者发送消息的应用程序Consumer消费者接收消息的应用程序Broker消息代理就是 RabbitMQ 服务器本身负责接收和转发消息Virtual Host虚拟主机相当于 RabbitMQ 中的 租户不同的虚拟主机之间相互隔离有自己的交换机、队列和权限Exchange交换机接收生产者发送的消息并根据路由键将消息路由到对应的队列Queue队列存储消息的地方消息最终会被发送到队列中等待消费者消费Binding绑定将交换机和队列绑定在一起同时指定一个路由键Routing Key路由键生产者发送消息时指定的一个键交换机根据这个键来决定将消息发送到哪个队列交换机的四种类型RabbitMQ 有四种常用的交换机类型每种类型对应不同的路由规则表格交换机类型路由规则适用场景Direct直连消息的路由键必须与绑定的路由键完全匹配一对一的消息传递Topic主题消息的路由键与绑定的路由键进行模式匹配发布订阅模式多条件路由Fanout广播忽略路由键将消息广播到所有绑定的队列一对多的消息广播Headers头根据消息头中的属性进行路由复杂的路由规则最常用的是 Direct 和 Topic 交换机Fanout 交换机适用于广播场景Headers 交换机很少使用。三、Spring Boot 集成 RabbitMQ 完整教程下面我就以最常用的 Spring Boot 框架为例教你如何快速集成和使用 RabbitMQ。第一步引入依赖在pom.xml中引入 Spring AMQP 依赖xmldependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency第二步配置 RabbitMQ在application.yml中配置 RabbitMQ 连接信息yamlspring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / # 生产者配置 publisher-confirm-type: correlated # 开启生产者确认 publisher-returns: true # 开启消息退回 # 消费者配置 listener: simple: acknowledge-mode: manual # 手动确认消息 prefetch: 1 # 每次只消费一条消息 retry: enabled: true # 开启消费者重试 max-attempts: 3 # 最大重试次数 initial-interval: 1000ms # 初始重试间隔第三步配置交换机、队列和绑定java运行Configuration public class RabbitMQConfig { // 订单交换机 public static final String ORDER_EXCHANGE order.exchange; // 订单队列 public static final String ORDER_QUEUE order.queue; // 订单路由键 public static final String ORDER_ROUTING_KEY order.create; // 声明交换机 Bean public DirectExchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE, true, false); } // 声明队列 Bean public Queue orderQueue() { return new Queue(ORDER_QUEUE, true); } // 绑定交换机和队列 Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(orderExchange()) .with(ORDER_ROUTING_KEY); } }第四步发送消息生产者java运行Service public class OrderProducer { Autowired private RabbitTemplate rabbitTemplate; public void sendOrderMessage(Order order) { // 发送消息 rabbitTemplate.convertAndSend( RabbitMQConfig.ORDER_EXCHANGE, RabbitMQConfig.ORDER_ROUTING_KEY, order, new CorrelationData(UUID.randomUUID().toString()) ); System.out.println(订单消息发送成功 order.getId()); } }第五步接收消息消费者java运行Service public class OrderConsumer { RabbitListener(queues RabbitMQConfig.ORDER_QUEUE) public void receiveOrderMessage(Message message, Channel channel) throws IOException { try { // 解析消息 String body new String(message.getBody()); Order order new ObjectMapper().readValue(body, Order.class); // 处理业务逻辑 System.out.println(收到订单消息 order.getId()); processOrder(order); // 手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 处理异常拒绝消息并重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); e.printStackTrace(); } } private void processOrder(Order order) { // 处理订单逻辑发送短信、通知物流等 } }四、RabbitMQ 三大核心问题与解决方案这是本文最重要的部分。在生产环境中使用 RabbitMQ最常见的三个问题是消息丢失、重复消费、消息堆积。这三个问题如果处理不好会导致严重的数据一致性问题和系统故障。问题 1消息丢失消息丢失可能发生在三个环节生产者发送消息、RabbitMQ 存储消息、消费者消费消息。1.1 生产者消息丢失问题描述生产者发送消息后消息没有到达 RabbitMQ 服务器。解决方案开启生产者确认机制Publisher Confirm。当消息成功到达交换机时RabbitMQ 会发送一个确认消息给生产者如果消息没有到达交换机RabbitMQ 会发送一个 nack 消息给生产者生产者可以根据确认结果决定是否重发消息代码实现java运行Configuration public class RabbitMQConfirmConfig { Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory); // 开启生产者确认 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - { if (ack) { System.out.println(消息发送成功 correlationData.getId()); } else { System.out.println(消息发送失败 cause); // 重发消息 } }); // 开启消息退回消息到达交换机但没有到达队列时触发 rabbitTemplate.setReturnsCallback(returned - { System.out.println(消息被退回 returned.getMessage()); // 处理退回的消息 }); return rabbitTemplate; } }1.2 RabbitMQ 消息丢失问题描述消息到达 RabbitMQ 后RabbitMQ 服务器宕机消息丢失。解决方案开启持久化交换机、队列和消息都要设置为持久化开启镜像队列在集群环境下将队列的消息复制到多个节点上避免单节点故障导致消息丢失1.3 消费者消息丢失问题描述消费者收到消息后还没处理完就宕机了消息丢失。解决方案使用手动确认机制Manual Acknowledge。消费者收到消息后RabbitMQ 不会立即删除消息只有当消费者发送 ack 确认消息后RabbitMQ 才会删除消息如果消费者宕机消息会重新入队等待其他消费者消费问题 2重复消费问题描述同一个消息被消费者消费了多次。产生原因消费者处理完消息后还没来得及发送 ack 就宕机了网络延迟导致 ack 没有到达 RabbitMQ消息重试机制导致重复发送解决方案保证消费的幂等性。幂等性是指同一个操作执行多次和执行一次的结果是一样的。常见的幂等性实现方式唯一 ID 去重表给每个消息生成一个唯一 ID消费前先查询去重表如果已经消费过就直接返回乐观锁在数据库表中加一个 version 字段更新时判断 version 是否一致分布式锁使用 Redis 分布式锁保证同一时间只有一个消费者能处理这个消息代码示例唯一 ID 去重表java运行RabbitListener(queues RabbitMQConfig.ORDER_QUEUE) public void receiveOrderMessage(Message message, Channel channel) throws IOException { String messageId message.getMessageProperties().getMessageId(); // 1. 判断消息是否已经消费过 if (redisTemplate.hasKey(mq:consumed: messageId)) { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } try { // 2. 处理业务逻辑 String body new String(message.getBody()); Order order new ObjectMapper().readValue(body, Order.class); processOrder(order); // 3. 标记消息为已消费 redisTemplate.opsForValue().set(mq:consumed: messageId, 1, 24, TimeUnit.HOURS); // 4. 确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); e.printStackTrace(); } }问题 3消息堆积问题描述生产者发送消息的速度远大于消费者消费消息的速度导致队列中堆积了大量的消息。产生原因消费者处理能力不足消费者宕机流量突增解决方案增加消费者数量水平扩展消费者实例提高消费能力优化消费者逻辑减少消费者处理消息的时间批量消费一次消费多条消息减少网络 IO死信队列将处理失败的消息转移到死信队列避免影响正常消息的消费限流对生产者进行限流控制消息发送的速度五、RabbitMQ 生产环境最佳实践最后我分享几个我在生产环境中踩过无数坑总结出来的最佳实践永远使用手动确认不要使用自动确认自动确认会导致消息丢失设置合理的 prefetch 值prefetch1 是最安全的避免一个消费者堆积太多消息开启生产者确认和消息退回保证消息不丢失所有消息都要设置过期时间避免死消息长期占用内存使用死信队列处理消费失败的消息避免消息无限重试不要在消费者中做耗时操作耗时操作会导致消费速度变慢引发消息堆积监控 RabbitMQ 状态监控队列长度、消息发送速率、消费速率、连接数等指标定期清理无用的队列和交换机释放资源避免使用默认的虚拟主机和用户生产环境要创建独立的虚拟主机和用户并设置最小权限集群部署生产环境一定要部署 RabbitMQ 集群保证高可用

更多文章