RabbitMQ简介
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
AMQP就是一个协议,是一个高级抽象层消息通信协议。
虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。也就是说AMQP是异步通讯的一个协议。
RabbitMQ使用场景
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。不过大多数不仅仅是无需即时返回,甚至是执行是否成功都无所谓。如果需要即时返回则可以使用Dubbo,Spring boot与Dubbo集成可以去看Spring boot 集成Dubbox
RabbitMQ依赖
RabbitMQ并不是直接一个简单的jar包(Jar包只是提供一个基本的与RabbitMQ本身通讯的一些功能),和Dubbo相同,RabbitMQ也需要其他软件来运行,以下是RabbitMQ运行所需要的软件
1、Erlang
由于RabbitMQ软件本身是基于Erlang开发的,所以想要运行RabbitMQ必须要先按照Erlang
Erlang官网
Erlang下载地址
RabbitMQ
RabbitMQ才是实现消息队列的核心
RabbitMQ官网
RabbitMQ下载
配置RabbitMQ
安装完成后,需要完成一些配置才能使用RabbitMQ,可以直接用cmd到RabbitMQ的安装目录下的sbin目录通过命令配置,也可以直接在开始菜单中直接找到RabbitMQ Command Prompt (sbin dir)运行直接到达RabbitMQ的安装目录的sbin,为了方便,我们先启用管理插件,执行命令
1
| rabbitmq-plugins.bat enable rabbitmq_management
|
即可,注意,这是在Windows下面,如果是Linux则没有bat后缀
然后我们添加一个用户,因为在外网环境没有用户的情况下是不能连接成功的,执行添加用户命令
1
| rabbitmqctl.bat add_user springboot password
|
springboot是用户名,password是密码
然后为了方便演示,我们给springboot赋予管理员权限,方便登录管理页面
1
| rabbitmqctl.bat set_user_tags springboot administrator
|
给账号赋予虚拟主机权限
1
| rabbitmqctl.bat set_permissions -p / springboot .* .* .*
|
然后启动RabbitMQ服务
访问RabbitMQ管理页面http://localhost:15672即可看见登录页面,如果没有创建用户则可以用guest,guest登录,如果有创建用户则用创建的用户登录
创建Springboot项目
因为创建spring boot项目在前面的文章已经说过很多次了,所以这里就不多说了,如果不会可以去看其他关于spring boot的博客
添加RabbitMQ相关依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
每错,就是点配置,不过这样可能有点不理解,我还是把全部配置贴出来吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>wang.raye.rabbitmq</groupId> <artifactId>demo1</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging>
<name>demo1</name> <url>http://maven.apache.org</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> </parent>
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project>
|
因为没有做其他操作,所以目前项目主要是依赖2个模块,一个Sprig boot,一个RabbitMQ
添加配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
| package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig { public static final String EXCHANGE = "my-mq-exchange"; public static final String ROUTINGKEY1 = "queue_one_key1"; public static final String ROUTINGKEY2 = "queue_one_key2";
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1",5672); connectionFactory.setUsername("springboot"); connectionFactory.setPassword("password"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; }
@Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE, true, false); }
@Bean public Queue queue() { return new Queue("queue_one", true); }
@Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(RabbitMQConfig.ROUTINGKEY1); }
@Bean public Queue queue1() { return new Queue("queue_one1", true); }
@Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(defaultExchange()).with(RabbitMQConfig.ROUTINGKEY2); }
@Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }); return container; }
@Bean public SimpleMessageListenerContainer messageContainer2() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue1()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("queue1 收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }); return container; } }
|
注意,为了更好的展示如何配置,我配置了2个消息队列,而本类除了链接配置哪里,其他都是针对消息消费者的,当然不管消息消费者和消息生产者都需要配置链接信息,而为了方便,所以本项目的消息消费者和生产者都在本项目,一般实际项目中不会在同一项目,由于注释很详细,我就不多说了
发送消息
为了方便发送消息,所以我直接写了一个Controller,通过访问接口的形式来调用发送消息的方法,话不多说,上代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| package wang.raye.rabbitmq.demo1;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController public class SendController implements RabbitTemplate.ConfirmCallback{ private RabbitTemplate rabbitTemplate;
public SendController(RabbitTemplate rabbitTemplate){ this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate.setConfirmCallback(this); }
@RequestMapping("send1") public String send1(String msg){ String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTINGKEY1, msg, correlationId); return null; }
@RequestMapping("send2") public String send2(String msg){ String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTINGKEY2, msg, correlationId); return null; }
public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回调id:" + correlationData); if (ack) { System.out.println("消息成功消费"); } else { System.out.println("消息消费失败:" + cause+"\n重新发送"); } } }
|
需要注意的是消息回调只能代表消息成功发送到RabbitMQ服务器
然后我们启动项目,访问http://localhost:8082/send1?msg=aaaa
会发现控制台输出了
1 2 3
| 收到消息 : aaaa 回调id:CorrelationData [id=37e6e913-835a-4eca-98d1-807325c5900f] 消息成功消费
|
当然回调id可能不同,如果我们访问http://localhost:8082/send2?msg=bbbb 则输出
1 2 3
| queue1 收到消息 : bbbb 回调id:CorrelationData [id=0cec7500-3117-4aa2-9ea5-4790879812d4] 消息成功消费
|
最后说两句
因为本文主要是说明如何从零到springboot集成RabbitMQ,所以对于RabbitMQ的很多信息和用法没有说明,如果对RabbitMQ本身不太熟悉的可以去看看其他关于RabbitMQ的文章,附上本文demo