原创

RabbitMQ快速入门

温馨提示:
本文最后更新于 2024年01月21日,已超过 367 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

RabbitMQ 简介

RabbitMQ 是基于Erlang语言编写的消息代理和队列服务器,主要用于在分布式系统中实现应用程序之间的异步通信。它具有以下特点:

  • 高可靠性:RabbitMQ能够确保消息的可靠传输,即使在出现故障的情况下也能保证消息不会丢失
  • 易扩展性:随着系统需求的增长,RabbitMQ可以方便地进行水平扩展以处理更多的消息流量
  • 高可用性:通过集群等方式,RabbitMQ可以实现高可用性,从而减少单点故障的风险
  • 功能丰富:除了基本的消息队列功能,RabbitMQ还提供了多种高级功能,如发布/订阅模式、路由等
  • 多语言支持:RabbitMQ支持Java、Python、Ruby、.NET、PHP、C/C++、Node.js等多种编程语言的客户端,这使得它可以在不同的开发环境中使用

RabbitMQ 消息模型

RabbitMQ 提供了7种消息模型:
RabbitMQ 消息模型

  • P:生产者,负责生产并发送消息
  • C:消费者,消息的接收方,监听并接收处理生产者生产的消息
  • Queue: 消息队列,是存储消息的容器
  • X:交换机,负责将消息路由到指定队列

示例

1.引入AMQP依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.在项目中添加yml配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: guest
    password: guest

基本消息模型

1.实现消息生产者代码:

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest implements Serializable {
    private static final long serialVersionUID = -3086376022023672067L;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendSimpleQueue() {
        rabbitTemplate.convertAndSend("simple.queue", "hello Word");
    }
}

2.在MQ管理界面查看消息发送情况
mq队列
3.实现消息消费者代码:

@Slf4j
@Component
public class RabbitMQListener implements Serializable {
    private static final long serialVersionUID = 9128812682227124956L;

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) {
        log.info("MQ基本消费队列消费者收到消息:{}", msg);
    }
}

工作队列模式

可以提高消息处理速度,避免消息堆积
1.实现消息生产者代码:

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest implements Serializable {
    private static final long serialVersionUID = -3086376022023672067L;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendWorkQueue() {
        for (int index = 1; index <= 10; index++) {
            rabbitTemplate.convertAndSend("work.queue", "work message" + index);
        }
    }
}

2.实现消息消费者代码:

@Slf4j
@Component
public class RabbitMQListener implements Serializable {
    private static final long serialVersionUID = 9128812682227124956L;

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueueMessage1(String msg) throws InterruptedException {
        log.info("MQ工作消费队列消费者1收到消息:{}", msg);
        Thread.sleep(500);
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueueMessage2(String msg) throws InterruptedException {
        log.info("MQ工作消费队列消费者2收到消息:{}", msg);
        Thread.sleep(50);
    }
}

3.消费者日志
工作队列消息

4.限制消费者消息预取数量

  • 我们可以看到,虽然两个消费者消费能力不同,但消费的消息数量却相等,这种方式并未对消费者的消费能力进行考虑,而是有序的将消息均匀地发送给每一个消费者,显然我们更希望我们的消费者能者多劳,以最快的速度将消息消费完

  • 所以我们对配置文件稍作修改,增加prefetch配置,这个参数用于控制消息的预取数量,即消费者在等待确认消息(acknowledgement)之前可以从队列中获取多少条消息,它定义了消费者手中还未确认的消息最大值

  • 预取机制允许消费者在处理当前消息的同时,提前从队列中获取一些额外的消息并准备处理,这样可以减少等待时间,提高整体的消费速度和系统的吞吐量,配置如下:

spring:
    rabbitmq:
        listener:
            simple:
                prefetch: 1

在我们修改了预取机制后,我们看一看到,消费者2明显消费了更多的消息,而消费者1仅消费了一条,而且总耗时也仅400ms,与未改之前耗时2s有很明显的改善
能者多劳

发布订阅模式 - 广播模式(Fanout)

前面两种模式发送的消息都只能被消费一次,而在日常开发中,我们发送的消息经常需要被多方监听和处理。比如常见购物场景,用户下单完成,后续要给用户发送购买成功消息通知、通知商家发货等等。这些操作可能都是由不同的服务来触发,我们希望每个服务都对订单消息进行监听,从而第一时间处理本服务相关业务。接下来我们就来介绍发布订阅模式中的广播模式:

在前面图中我们知道,发布订阅模式相较于前两种模式,增加了新的组件交换机(Exchange)

此模式下,生产者发送的消息将不会直接发送到队列中,而只能发送给交换机,由交换机将消息转发给与之有绑定关系的队列,然后消费者才能从自己监听的队列中获取消息进行消费
1.实现消息生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest implements Serializable {
    private static final long serialVersionUID = -3086376022023672067L;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendFanoutMessage() {
        rabbitTemplate.convertAndSend("fanout.exchange", "", "fanout message");
    }
}

2.实现消息消费者

@Slf4j
@Component
public class RabbitMQListener implements Serializable {
    private static final long serialVersionUID = 9128812682227124956L;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "fanout.queue1"),
            exchange = @Exchange(name = "fanout.exchange", type = ExchangeTypes.FANOUT)
    ))
    public void listenFanoutQueueMessage1(String msg) {
        log.info("MQ发布订阅消费者1收到消息:{}", msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "fanout.queue2"),
            exchange = @Exchange(name = "fanout.exchange", type = ExchangeTypes.FANOUT)
    ))
    public void listenFanoutQueueMessage2(String msg) {
        log.info("MQ发布订阅消费者2收到消息:{}", msg);
    }
}

4.日志:
发布订阅广播模式
我们可以看到,每个消费者都分别收到了一份属于自己的消息,这样就可以实现每个消费者分别处理属于自己的业务而不会对其它消费者产生影响,也可以更大程度的对业务系统进行解耦

发布订阅模式 - Direct(直连)

在 广播模式(Fanout)下,只要队列绑定了我们的交换机,经过该交换机的所有消息每个队列都能够消费。而在某些业务场景下,我们更希望将消息根据指定规则发送给对应队列,因此也称为路由模式
1.实现消息生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest implements Serializable {
    private static final long serialVersionUID = -3086376022023672067L;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendDirectMessage() {
        rabbitTemplate.convertAndSend("direct.exchange", "direct.key1", "direct message1");
        rabbitTemplate.convertAndSend("direct.exchange", "direct.key2", "direct message2");
        rabbitTemplate.convertAndSend("direct.exchange", "direct.key3", "direct message3");
    }
}

2.实现消息消费者

@Slf4j
@Component
public class RabbitMQListener implements Serializable {
    private static final long serialVersionUID = 9128812682227124956L;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "direct.exchange"),
            key = {"direct.key1", "direct.key2"}
    ))
    public void listenDirectQueueMessage1(String msg) {
        log.info("MQ发布订阅Direct消费者1收到消息:{}", msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "direct.exchange"),
            key = {"direct.key1", "direct.key3"}
    ))
    public void listenDirectQueueMessage2(String msg) {
        log.info("MQ发布订阅Direct消费者2收到消息:{}", msg);
    }
}

3.消费者日志
发布订阅直连模式
Direct模式下,每个队列都要与交换机指定一个路由规则(RoutingKey),生产者发送消息时,也需要指定消息的 RoutingKey。交换机不再把消息发送给每个与之绑定的队列,而是根据RoutingKey进行判定发送给指定队列

发布订阅模式 - Topic(话题)

Topic 与 Direct 类似,区别在于 Topic 可以在配置路由规则时可以使用通配符,Topic 模式路由规则通常由多个单词组成,并且以.进行分割,例如:boy.topic,stars.#(#:适配大于等于0个单词),start.:适配一个单词)
1.实现消息生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest implements Serializable {
    private static final long serialVersionUID = -3086376022023672067L;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendTopicMessage() {
        rabbitTemplate.convertAndSend("topic.exchange", "boy.#", "这里是男生话题");
        rabbitTemplate.convertAndSend("topic.exchange", "girl.#", "这里是女生话题");
    }
}

2.实现消息消费者

@Slf4j
@Component
public class RabbitMQListener implements Serializable {
    private static final long serialVersionUID = 9128812682227124956L;

   @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
            key = "boy.#"
    ))
    public void listenTopicQueueMessage1(String msg) {
        log.info("MQ发布订阅Topic消费者1收到消息:{}", msg);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
            key = "girl.#"
    ))
    public void listenTopicQueueMessage2(String msg) {
        log.info("MQ发布订阅Topic消费者2收到消息:{}", msg);
    }
}

3.消费者日志
话题类型

正文到此结束
本文目录