跳转至

Rabbit mq

基础知识

1. rmq的工作模式⭐⭐

RabbitMQ提供了多种工作模式:简单模式,work模式 ,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式等

rmq01

1. 简单模式:

使用默认的 direct exchange(空字符串 ""),消息直接通过 routingKey = queueName 投递到指定队列。

@Autowired  
private RabbitTemplate rabbitTemplate;  
// 生产者 
@Test  
public void testSend(){  
    rabbitTemplate.convertAndSend("","queue_simple","hello world");  
}

// 消费者
@RabbitListener(queues = {"queue_simple"})  
public void testSimpleConsumer(String msg){  
    System.out.println("消费者收到消息:" + msg);  
}
2. work 模式

工作(work)模式:一个生产者,一个队列,多个消费者,一个消息只能被一个消费者消费,消费者存在竞争关系,

// 生产者
@Test  
public void testWorkerSend(){  
    rabbitTemplate.convertAndSend("","queue_worker1","hello world1");  
    rabbitTemplate.convertAndSend("","queue_worker1","hello world2");  
}
// 消费者1
@RabbitListener(queues = {"queue_worker1"})  
public void testWorkerConsumer1(String msg){  
    System.out.println("消费者1收到消息:" + msg);  
}  
// 消费者2
@RabbitListener(queues = {"queue_worker1"})  
public void testWorkerConsumer2(String msg){  
    System.out.println("消费者2收到消息:" + msg);  
}
3. 发布订阅(广播)模式

一个生产者,多个消费者,并且有多个队列,路由器会把消息发送给所有队列。也就是一个生产者生产的数据供所有消费者使用

注:记得传构建一个广播路由以及两个队列,然后把该路由和这两个队列绑定起来,路由键设置为 "" 即可。

// p
@Test  
public void testPublishAndSubscribeSend(){  
    rabbitTemplate.convertAndSend("exchange_ps","","hello world");  
}
// c1
@RabbitListener(queues = {"queue_ps1"})  
public void testPsConsumer1(String msg){  
    System.out.println("消费者1收到消息:" + msg);  
}  
// c2
@RabbitListener(queues = {"queue_ps2"})  
public void testPsConsumer2(String msg){  
    System.out.println("消费者2收到消息:" + msg);  
}
4. 路由模式

rmq02

使用 direct 交换机,根据精确匹配的 routing key 进行路由。

实际上就是不同消息队列订阅的路由键不同,因此根据生产者投递时传入的路由键不同,投递到的消息队列也就不同。

5. topic 模式

使用 topic 交换机,支持 通配符匹配 routing key。 - *:匹配一个单词(如 user.* 匹配 user.login,但不匹配 user.auth.login) - #:匹配零个或多个单词(如 log.# 匹配 log.infolog.error.db

2. rmq的作用与使用方法 ⭐⭐⭐

1)服务解耦:还是用户注册场景下,没有 RMQ 的时候,注册成功后还需再注册代码调用短信和邮件通知服务,而有了 RMQ,只需要发送消息到 RMQ 即可,不需要调用其他服务。

2)异步调用:用户注册下,当用户注册成功,我们要发送邮件和短信通知,如果没有 rmq,用户注册成功后,还得等待系统完成邮件和短信服务才能看到响应,当但有了 RMQ 以后,就可以实现一旦注册成功,只需要将消息发送给消息队列,然后直接就可以响应用户,而邮件和短信通知都是异步进行的,这极大提高了响应速度。

3)流量削峰:秒杀活动下,数据库每秒只能处理 2000 个请求,但秒杀瞬间可能有 10 万个请求涌入。直接压到数据库,系统直接崩掉。

4)消息通信:利用 MQ 的发布/订阅模式(Pub/Sub)实现点对点或广播式通信。

spring:
  rabbitmq:
    ...
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1000  # 设置每次最多从消息队列服务器取回多少消息

rmq的用法:

  1. 导入依赖
  2. 可选:去设置 rabbitmqTemplate 的回退与确认回调,实现重发等等
  3. 去通过 rabbitmqTemplate 发送消息
  4. 通过 @RabbitListener 去声明消费者

3. 交换机的类型⭐

类型 特点 匹配规则
Direct 精确匹配 routingKey == bindingKey
Fanout 广播 忽略 routing key,发给所有绑定队列
Topic 模糊匹配 支持 *# 通配符
Headers 基于消息头(Header)匹配 不常用,性能较差

4. 什么是幂等性⭐⭐⭐

统一操作执行多次与一次的效果相同。在rmq中,消费者可能因网络、崩溃等原因重复收到并消费同一条消息。如果业务逻辑不具备幂等性,可能会导致:重复扣款,重复发券,数据重复插入等等。 而消费多条消息与消费一条消息的效果相同就叫做幂等性。

5. 如何保证幂等性

数据库唯一索引 redis:setnx

6. RMQ 在java中如何使用?

1、直接监听已存在的队列

@Component
public class SimpleConsumer {

    // queues 参数指定要监听的队列名
    @RabbitListener(queues = "existing.queue")
    public void receiveMessage(String message) {
        System.out.println("收到消息: " + message);
    }
}

2、通过注解动态创建(推荐)

@Component
public class DynamicConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order.queue", durable = "true"), // 创建持久化队列
            exchange = @Exchange(value = "order.exchange", type = "topic"), // 创建Topic交换机
            key = "order.#" // 绑定路由键
    ))
    public void processOrder(String orderData) {
        System.out.println("处理订单消息: " + orderData);
    }
}

3、手动创建队列 + 交换机 + 它们的绑定

@Configuration
public class RabbitConfig {

    // 1. 定义队列
    @Bean
    public Queue myQueue() {
        return new Queue("manual.queue", true);
    }

    // 2. 定义交换机
    @Bean
    public DirectExchange myExchange() {
        return new DirectExchange("manual.exchange");
    }

    // 3. 定义绑定关系
    @Bean
    public Binding binding(Queue myQueue, DirectExchange myExchange) {
        return BindingBuilder.bind(myQueue).to(myExchange).with("manual.key");
    }
}

进阶知识

1. 为什么消息投递可能不成功?⭐️

首先,我们要从三个角度来分析为什么消息投递可能出现问题。

首先是生产者在投递消息时可能无法传到路由器,或者传到了路由器,但是路由器无法将消息路由到消息队列中。

其次就是消息中间件可能出现异常,导致消息丢失。

最后,即使消息成功到了消息队列中,服务器也不会故障,但是还有可能因为消费者无法正常消费,然后消费者默认的确认模式还是自动确认,就也是一拿到消息就确认了,然后 rmq 就会把这条消息从消息队列删除,从而导致这次消息没有成功投递。

2. 如何保证消息投递的可靠性呢?

(1)在生产者端开启确认机制和回退机制,然后设置 rabbitmqTemplate 的确认和回调函数,具体思路就是将要投递的信息另存为相关数据,然后存到redis里面,之后如果出发了确认回调(ack为false),或者回退回调,就拿到本次消息的相关数据的id,然后去redis里面取出来信息,然后再去重发。

(2)在消息中间件开启持久化:rmq默认开启

(3)消费者需要手动确认,而不是一拿到消息就确认,而是当业务正常完成才会去向中间件发送确认 ack,然后中间件才去删除这个消息。

3. 延迟消息如何实现

(1)设置队列的消息ttl + 死信队列 (2)官方插件的延迟交换机(在特定 xxx 时间后才会将消息路由到消息队列中)

4. 什么是死信队列

用来存放死信消息的消息队列,和普通消息队列没什么区别。

5. 消息怎么才会成为死信

  • 拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false

  • 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信

  • 超时:消息到达超时时间未被消费

6. 在生产者发送数据时,如果是自定义的对象,记得要实现序列化接口

7. 什么场景下 RMQ 消息会乱序呢?

RabbitMQ 能保证消息在单个 Queue 中是先进先出(FIFO)的。导致乱序的通常是以下两种情况:

  • 多消费者并行:一个 Queue 绑定了多个 Consumer,Consumer A 处理消息 1 比较慢,Consumer B 处理消息 2 比较快,导致消息 2 先入库。
  • 异常重试:消息 1 处理失败触发 nack 并重回队列(requeue),此时消息 2 已经处理完,消息 1 变成了在消息 2 之后执行。

8. RMQ 如何保证消息的顺序性呢?⭐️

方案 A:单队列 + 单消费者(单线程)(最简单,但性能差)
  • 做法:一个 Queue 只允许一个 Consumer 实例监听,且 Consumer 内部不开启多线程处理。
  • 适用:吞吐量要求不高,但顺序性极强的场景。
  • 缺点:无法发挥分布式优势,消费速度受限于单台机器。 单消费者但是多线程,还是会导致消息顺序错误!

rmq03

方案 B:拆分多个队列 + 一致性 Hash(最主流)

核心思想:将需要保证顺序的消息(比如同一个订单的创建、支付、发货),通过 Hash 路由发送到同一个队列中。

  • 创建多个 Queue(如 queue_1, queue_2, ...)。
  • 生产者发送消息时,根据 business_id(如订单 ID)进行取模或 Hash,确保同一业务 ID 的消息进入同一个 Queue
  • 每个 Queue 只对应一个唯一的 Consumer 实例。

9. 如何使用RabbitMQ解决分布式事务⭐️

在分布式系统中,保证两个服务(比如订单服务和库存服务)的数据一致性,最经典且最常用的方案就是 基于 MQ 的可靠消息最终一致性方案

在 RabbitMQ 中,我们通常通过 “本地事务表 + 确认机制” 来实现。

  • 在发送端:利用本地事务表。将业务操作和‘发消息’任务放在一个数据库事务里。通过后台任务配合 RabbitMQ 的 Confirm 机制,确保消息一定能到达交换机。

  • 在消费端:开启手动确认(Manual ACK)。只有当下游业务处理成功后才告知 MQ 删除消息。同时,为了防止重复消费,必须配合幂等性检查(如数据库唯一索引或 Redis 状态位)。

10. 如何解决消息队列的延时以及过期失效问题呢?⭐️

1)延时问题 根因:通常是消费端出了问题(宕机、逻辑卡死、处理太慢)。

2)过期失效问题 RabbitMQ 可以设置消息的过期时间(TTL)。如果消息在队列里积压超过 TTL 还没被消费,会被 丢弃

解决方法:配置死信队列,然后将这些超过 TTL 的消息持久化到数据库,之后重新发送到 MQ

11. 消息队列满了之后改如何处理呢?

当队列达到 max-length(最大长度)且内存/磁盘水位过高时,MQ 会触发流量控制(Flow Control),此时生产者会被挂起(卡住)。

  • 临时丢弃并记录:如果业务允许,可以临时开启消费端,直接读取消息但不做任何业务处理(直接 ACK),同时将消息内容记录到日志中,等高峰期过后通过日志进行人工补偿。

  • 死信队列:确保配置了死信队列(DLX)。如果队列满了,让消息溢出到死信队列,防止主队列直接崩溃。

12. 有几百万条的消息持续积压,分析该如何解决?⭐️

1)通常,消失挤压到这种程度往往说明消费者出现了问题,我们要先去找到消费者的问题并解决!

2)临时紧急扩容(核心考点):创建10个临时队列,然后写一个分流程序,它只负责从原有的积压队列里取消息,然后不做任何业务逻辑,直接转发到刚才建的那 10 个临时队列里。然后让10个消费者并行的去消费,这样就能保证原来10天才能消费完这些积压得消息,现在只需要1天即可。

3)当积压消化完后,我们停止中转程序,然后把增加得服务器删除掉,恢复原来的消费模式即可。