RabbitMQ — HelloWorld程序

  • 时间:
  • 来源:

   

消息队列按使用者角色可以分为消息的生产者和消息的消费者,下面写一个HelloWorld的例子,例子主要实现了消息的生产者publish一条消息到RabbitMQ,然后消息的消费者获取这条消息。

引用jar包

002UASMrzy7605pjKJv15&690.jpg

Java使用RabbitMQ需要引用amqp-client.jar:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.net.bysoft</groupId> <artifactId>rabbitmqapp</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>war</packaging> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies> </project> 生产者代码

消息的生产者从代码的角度来看,主要做了如下步骤:

连接到RabbitMQ 获取信道 声明交换器 创建消息队列 发布消息 关闭信道 关闭连接

代码如下:

package cn.net.bysoft.rabbitmqapp.lesson1; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消息的生产者 */ public class Producer { private final static String QUEUE_NAME = "hello-world"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1 帐号 factory.setUsername("guest"); // 1.2 密码 factory.setPassword("guest"); // 1.3 地址 factory.setHost("127.0.0.1"); // 1.4 虚拟主机,"/"是默认的 factory.setVirtualHost("/"); // 1.5 端口 factory.setPort(5672); // 1.6 连接超时 factory.setConnectionTimeout(10000); // 1.7 实例化连接 Connection connection = factory.newConnection(); // 2.创建信道 Channel channel = connection.createChannel(); // 3.声明交换器(参数:交换器名称、交换器类型) channel.exchangeDeclare("hello-exchange", "direct"); // 4.声明队列(参数:队列名称、是否持久化、是否是私有的、是否自动删除、参数对象) channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 5.发布消息(参数:交换器名称、队列名称、属性对象、发送内容) String msg = "Hello World"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("Send " + msg); // 6.关闭连接 channel.close(); connection.close(); } }

 

 

 

执行上面的程序,消息已经发送到了RabbitMQ,下面编写消费者代码。

消费者代码

还是从程序的角度看,消费者主要操作了如下步骤:

连接到RabbitMQ; 获得信道; 声明交换器; 声明队列; 消费消息; 关闭信道; 关闭连接;

代码如下:

package cn.net.bysoft.rabbitmqapp.lesson1; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; /** * 消息的消费者 */ public class Consumer { private final static String QUEUE_NAME = "hello-world"; public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { // 1.创建连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1 帐号 factory.setUsername("guest"); // 1.2 密码 factory.setPassword("guest"); // 1.3 地址 factory.setHost("127.0.0.1"); // 1.4 虚拟主机,"/"是默认的 factory.setVirtualHost("/"); // 1.5 端口 factory.setPort(5672); // 1.6 连接超时 factory.setConnectionTimeout(10000); // 1.7 实例化连接 Connection connection = factory.newConnection(); // 2.创建信道 Channel channel = connection.createChannel(); // 3.声明交换器(参数:交换器名称、交换器类型) channel.exchangeDeclare("hello-exchange", "direct"); // 4.声明队列(参数:队列名称、是否持久化、是否是私有的、是否自动删除、参数对象) QueueingConsumer qc = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, qc); // 5.消费消息 System.out.println("begin reading..."); while (true) { QueueingConsumer.Delivery delivery = qc.nextDelivery(); if (delivery != null) { String message = new String(delivery.getBody()); System.out.println(message); } } } }