SpringBoot 整合 环境准备
rabbitmq 版本:3.8.8
springboot 版本:2.2.4.RELEASE
订阅模式-Fanout 生产者 新建maven项目,导入依赖 pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.2.4.RELEASE</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > </dependencies >
编写配置类 配置类:(声明交换机,队列,绑定关系)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Configuration public class RabbitConfig { public final static String FANOUT_EXCHANGE = "boot.exchange.fanout"; public final static String FANOUT_QUEUE = "boot.queue.fanout"; // 1.声明交换机(交换机类型为fanout) @Bean public Exchange fanoutExchange() { return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE).durable(true).build(); } // 2.声明队列 @Bean public Queue fanoutQueue() { return QueueBuilder.durable(FANOUT_QUEUE).build(); } /** * 3.队列和交换机绑定 * 需要知道哪个队列 * 需要知道哪个交换机 * 需要知道路由键(因为交换机类型是fanout,所以路由键为空) */ @Bean public Binding fanoutBinding() { return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange()).with("").noargs(); } }
发送消息代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @RunWith(SpringRunner.class) @SpringBootTest(classes = {Application.class}) public class ProducerTest { @Resource private RabbitTemplate rabbitTemplate; @Test public void directTest () { rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "123" , "boot hello" ); } }
消费者 新建maven项目,导入依赖 pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.2.4.RELEASE</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > </dependencies >
监听类(监听指定队列,消费消息) 1 2 3 4 5 6 7 8 9 10 11 @Component public class BootListener { @RabbitListener(queues = "boot.queue.fanout") public void listener (Message message) { System.out.println("message:" + message); } }
执行,查看结果 这里不截图了,启动消费者进行监听,生产者发送消息后,消费者成功消费到消息。
小结
使用 springboot 整合 rabbitmq,将组件提供配置方式声明,简化编码
生产者使用 spring 提供的 rabbitTemplate 发送消息
消费者使用 spring 提供的 @RabbitListener 注解监听队列,消费消息
订阅模式-Direct(rabbitAdmin 与 注解方式) 生产者 配置类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration public class RabbitConfig { public final static String DIRECT_EXCHANGE = "boot.exchange.direct" ; public final static String Direct_QUEUE = "boot.queue.direct" ; @Bean public RabbitAdmin rabbitAdmin (RabbitTemplate rabbitTemplate) { RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate); Exchange exchange = new DirectExchange(DIRECT_EXCHANGE, true , false , null ); rabbitAdmin.declareExchange(exchange); return rabbitAdmin; } }
生产者发送消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @RunWith(SpringRunner.class) @SpringBootTest(classes = {Application.class}) public class ProducerTest { @Resource private RabbitTemplate rabbitTemplate; @Test public void directTest () { rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, "123" , "boot hello" ); } }
消费者 监听类(监听指定队列,消费消息) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component public class BootListener { @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "boot.queue.direct"), exchange = @Exchange(name = "boot.exchange.direct", type = ExchangeTypes.DIRECT), key = "123")) public void listener (@Payload String body, Message message) { System.out.println("body:" + body); System.out.println("message:" + message); } }
执行,查看结果 这里不截图了,启动消费者进行监听,生产者发送消息后,消费者成功消费到消息。 因为当前是 direct 交换机,如果将消息的路由键改成456 ,那么消费者就不会有消息。
@RabbitListener 使用 简介 在 SpringBoot 环境下,消费可以说比较简单了,借助@RabbitListener注解,基本上可以满足你 90% 以上的业务开发需求。 下面我们来看一下@RabbitListener的最最常用使用姿势。
前提: 对于消费者而言其实是不需要管理 exchange 的创建/销毁的,它是由发送者定义的; 一般来讲,消费者更关注的是自己的 queue,包括定义 queue 并与 exchange 绑定,而这一套过程是可以直接通过 rabbitmq 的控制台操作的哦。
queue,exchange, binding 已存在 所以实际开发过程中,exchange 和 queue 以及对应的绑定关系已经存在的可能性是很高的,并不需要再代码中额外处理; 在这种场景下,消费数据,可以说非常非常简单了,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component public class BootListener { @RabbitListener(queues = "boot.queue.direct") public void listener (Message message) { System.out.println("body:" + new String(message.getBody())); System.out.println("message:" + message); } }
queue 不存在 不存在 Queue 的情况下,就需要我们来主动创建 Queue,并建立与 Exchange 的绑定关系,下面给出@RabbitListener的推荐使用姿势:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component public class BootListener { @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "boot.queue.direct"), exchange = @Exchange(name = "boot.exchange.direct", type = ExchangeTypes.DIRECT), key = "123")) public void listener (Message message) { System.out.println("body:" + new String(message.getBody())); System.out.println("message:" + message); } }
一个注解,内部声明了队列,并建立绑定关系,就是这么神奇!!!以上,就是在队列不存在时的使用姿势,看起来也不复杂。
注意@QueueBinding注解的三个属性:
value: @Queue 注解,用于声明队列,value 为 queueName, durable 表示队列是否持久化, autoDelete 表示没有消费者之后队列是否自动删除
exchange: @Exchange 注解,用于声明 exchange, type 指定消息投递策略,我们这里用的 direct 方式
key: 在 direct 方式下,这个就是我们熟知的 routingKey
ack(消息应答) rabbitmq 的核心知识点学习过程中,我们知道有一个消息应答机制,主要是针对消费者的。 默认情况下,消息应答机制是自动ack,我们可以通过 ack 方式(noack, auto, manual),进行修改,可以如下处理:
1 2 3 4 5 6 7 8 9 10 11 12 @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "boot.queue.direct"), exchange = @Exchange(name = "boot.exchange.direct", type = ExchangeTypes.DIRECT), key = "123"), ackMode = "MANUAL") public void listener (Message message, Channel channel) throws Exception { System.out.println("body:" + new String(message.getBody())); System.out.println("message:" + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); }
请注意,这里使用了Channel和DeliveryTag:
Channel:mq 和 consumer 之间的信道,通过它来 ack/nak
DeliveryTag:消息的唯一标识,用于 mq 辨别是哪个消息被 ack/nak 了
当我们正确消费时,通过调用 basicAck 方法即可
1 2 channel.basicAck(deliveryTag, false );
当我们消费失败,需要将消息重新塞入队列,等待重新消费时,可以使用 basicNack
1 2 channel.basicNack(deliveryTag, false , true );
当我们消费失败,想直接丢弃这个消息,可以使用 basicReject
1 2 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false );
并发消费 当消息很多,一个消费者吭哧吭哧的消费太慢,但是我的机器性能又杠杠的,这个时候我就希望并行消费,相当于同时有多个消费者来处理数据.
要支持并行消费,如下设置即可:
1 2 3 4 5 6 7 @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "boot.queue.direct"), exchange = @Exchange(name = "boot.exchange.direct", type = ExchangeTypes.DIRECT), key = "123"), concurrency = "4") public void listener (Message message, Channel channel) throws Exception { System.out.println("body:" + new String(message.getBody())); System.out.println("message:" + message); }
请注意注解中的concurrency = “4”属性,表示固定 4 个消费者;
使用 @Payload 和 @Headers 注解可以消息中的 body 与 headers 信息
1 2 3 4 5 @RabbitListener(queues = "debug") public void listener (@Payload String body, @Headers Map<String, Object> headers) { System.out.println("body:" + body); System.out.println("headers:" + headers); }
也可以获取单个 Header 属性
1 2 3 4 5 @RabbitListener(queues = "debug") public void listener (@Payload String body, @Headers String token) { System.out.println("body:" + body); System.out.println("token:" + token); }
@RabbitListener 和 @RabbitHandler 搭配使用 @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用 @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component @RabbitListener(queues = "consumer_queue") public class Receiver { @RabbitHandler public void processMessage1(String message) { System.out.println(message); } @RabbitHandler public void processMessage2(byte[] message) { System.out.println(new String(message)); } }
RabbitAdmin 使用 简介 RabbitAdmin主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等。
创建RabbitAdmin 查看源码发现,要创建RabbitAdmin,需要传递一个 ConnectionFactory 或 RabbitTemplate 即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public RabbitAdmin (ConnectionFactory connectionFactory) { Assert.notNull(connectionFactory, "ConnectionFactory must not be null" ); this .connectionFactory = connectionFactory; this .rabbitTemplate = new RabbitTemplate(connectionFactory); } public RabbitAdmin (RabbitTemplate rabbitTemplate) { Assert.notNull(rabbitTemplate, "RabbitTemplate must not be null" ); Assert.notNull(rabbitTemplate.getConnectionFactory(), "RabbitTemplate's ConnectionFactory must not be null" ); this .connectionFactory = rabbitTemplate.getConnectionFactory(); this .rabbitTemplate = rabbitTemplate; }
RabbitAdmin创建交换机 1 2 3 4 rabbitAdmin.declareExchange(new FanoutExchange("test.exchange.fanout" , true , false )); rabbitAdmin.declareExchange(new DirectExchange("test.exchange.direct" , true , false )); rabbitAdmin.declareExchange(new TopicExchange("test.exchange.topic" , true , false ));
RabbitAdmin创建队列 1 2 rabbitAdmin.declareQueue(new Queue("test.queue" ));
RabbitAdmin绑定交换机与队列 1 2 rabbitAdmin.declareBinding(new Binding("test.queue" , Binding.DestinationType.QUEUE, "test.exchange.topic" , "#" , new HashMap<String, Object>()));
RabbitAdmin发送消息 1 2 rabbitAdmin.getRabbitTemplate().convertAndSend("test.exchange.topic" , "hello" , "abc123" );