SpringBoot2.x 整合 RabbitMQ 入门

这里用到的SpringBoot版本为2.1.7.RELEASE,RabbitMQ使用版本为3.6.6。

1. 使用Spring Initializr创建一个父项目(rabbitmq-demo)。

image.png

2. 创建消息发送者(Producer)模块

  • 引入依赖

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 修改配置文件 application.yml

    # rabbitmq服务地址
    spring.rabbitmq.addresses=10.211.55.4:5672
    # 用户名
    spring.rabbitmq.username=guest
    # 密码
    spring.rabbitmq.password=guest
    # 虚拟地址, 虚拟隔离, 通常用来隔离不同项目
    spring.rabbitmq.virtual-host=/
    # 连接超时时间
    spring.rabbitmq.connection-timeout=15000
  • 创建消息发送器及消息类

    
    package top.gradual.domain;

import java.io.Serializable;

import lombok.Data;

/**

  • @Description: 消息对象
  • @Author: gradual
  • @Date: 2019-08-29 17:04
    */
    @Data
    public class Message implements Serializable {

    private Long id;

    private String message;
    }

package top.gradual.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.gradual.domain.Message;

/**
 * @Description: 消息发送器
 * @Author: gradual
 * @Date: 2019-08-29 17:01
 */
@Slf4j
@Component
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(Message message) throws Exception {
        log.info("发送信息: {}", message);
        rabbitTemplate.convertAndSend(
                "rabbitmq-demo-exchange",    //指定exchange
                "rabbitmq-demo-message",     //指定 Routing Key
                message,
                new CorrelationData(message.getId().toString())
        );
    }
}
  • 创建exchange及queue, 并且将两者通过Routing Key绑定起来

创建exchange

image.png

创建queue

image.png

两者绑定

image.png

绑定之后会在exchange上显示绑定信息
image.png

  • 创建测试用例, 测试是否能够发送数据到RabbitMQ上
package top.gradual;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import top.gradual.domain.Message;
import top.gradual.producer.MessageSender;

/**
 * @Description:
 * @Author: gradual
 * @Date: 2019-08-29 17:16
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerApplicationTest {

    @Autowired
    private MessageSender messageSender;

    @Test
    public void testSend() {
        Message message = new Message();
        message.setId(1L);
        message.setMessage("测试消息1");
        try {
            messageSender.sendMessage(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

此时测试testSend()方法;

进入RabbitMQ后台就能看到收到的消息,此时并没有消息接受者消费当前消息
image.png

创建消息接收者(Consumer)模块

  • 导入依赖
    
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

<!–添加web依赖让项目一直在跑–>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>


- 添加配置
```properties
# rabbitmq服务地址
spring.rabbitmq.addresses=10.211.55.4:5672
# 用户名
spring.rabbitmq.username=guest
# 密码
spring.rabbitmq.password=guest
# 虚拟地址, 虚拟隔离, 通常用来隔离不同项目
spring.rabbitmq.virtual-host=/
# 连接超时时间
spring.rabbitmq.connection-timeout=15000

# 消费端配置

## 消费者数量
spring.rabbitmq.listener.simple.concurrency=5

## 最大消费者数量
spring.jms.listener.max-concurrency=10

## 设置是自动签收(auto)还是手动签收(manual)
spring.rabbitmq.listener.simple.acknowledge-mode=manual

## 限流,单次消费数量
spring.rabbitmq.listener.simple.prefetch=1

server.port=8002
  • 创建消费类
package top.gradual.consumer;

import java.util.Map;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import top.gradual.domain.Message;

/**
 * @Description: 信息接收
 * @Author: gradual
 * @Date: 2019-08-29 17:42
 */
@Slf4j
@Component
public class MessageReceiver {

    //如果监听的queue、exchange不存在则会自动创建,并且为其绑定
    @RabbitListener(
            bindings = @QueueBinding(
                    //监听的队列
                    value = @Queue(value = "message-queue", durable = "true"),
                    //监听的exchange
                    exchange = @Exchange(name = "rabbitmq-demo-exchange", durable = "true", type = "topic"),
                    //监听的Routing Key
                    key = "rabbitmq-demo-message"
            )
    )
    //指定该方法处理消息
    @RabbitHandler
    public void receiverMessage(@Payload Message message,   //消息内容对象
                                @Headers Map<String, Object> headers, //头
                                Channel channel) throws Exception {
        //消费者操作
        log.info("收到消息: {}", message);

        //手动签收调用ACK回送
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }
}

消费者在此告一段落。

集成测试

  1. 将消息接收者(Consumer)项目跑起来

  2. 回到消息发送者(Producer)的测试类,执行testSend方法

可以看到Consumer接收到并且在控制台打印出从Producer发出的一条信息
image.png

发表评论