本文共 10214 字,大约阅读时间需要 34 分钟。
引入依赖(版本自选):
com.rabbitmq amqp-client 5.7.0
Exchanges策略模式:
Ps:执行下面的Demo的时候,建议优先执行消费端的代码,RabbitMQ会创建对应的exchange和queue并对应绑定
当然,也可以在RabbitMQ的管理UI上进行add和绑定消息生产者Code:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer4DirectExchange { public static void main(String[] args) throws Exception { // 1、创建connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.xxxx.xxx"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("rabbitadmin"); connectionFactory.setPassword("123456"); // 2 创建Connection Connection connection = connectionFactory.newConnection(); // 3 创建Channel Channel channel = connection.createChannel(); // 4 声明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; // 5 发送 for (int i = 0; i < 5; i++) { String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... " + i; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); System.out.println("send: " + msg); } channel.close(); connection.close(); }}
消息消费者 Code:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.QueueingConsumer.Delivery;public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { // 1、创建connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.xxx.xxx"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("rabbitadmin"); connectionFactory.setPassword("123456"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 4 声明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; // 表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 表示声明了一个队列 channel.queueDeclare(queueName, false, false, false, null); // 建立一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); // durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); // 参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); // 循环获取消息 while (true) { // 获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println(queueName + " 收到消息:" + msg); } }}
生产者输出:
send: Hello World RabbitMQ 4 Direct Exchange Message … 0 send: Hello World RabbitMQ 4 Direct Exchange Message … 1 send: Hello World RabbitMQ 4 Direct Exchange Message … 2 send: Hello World RabbitMQ 4 Direct Exchange Message … 3 send: Hello World RabbitMQ 4 Direct Exchange Message … 4消费者输出:
test_direct_queue 收到消息:Hello World RabbitMQ 4 Direct Exchange Message … 0 test_direct_queue 收到消息:Hello World RabbitMQ 4 Direct Exchange Message … 1 test_direct_queue 收到消息:Hello World RabbitMQ 4 Direct Exchange Message … 2 test_direct_queue 收到消息:Hello World RabbitMQ 4 Direct Exchange Message … 3 test_direct_queue 收到消息:Hello World RabbitMQ 4 Direct Exchange Message … 4** 经测试结果总结 **
如再起一个相同queue名称的消费:RabbitMQ会轮询方式消费,一条消息只有一个queue收到
如Copy上面的消费端代码,只修改String queueName = “test_direct_queue_2”;:RabbitMQ广播模式,1条消息,n个消费者都收到
大白话:
1生产者,n消费者(queue名相同) -> 轮询方式,一条消息只有一个queue收到 1生产者,n消费者(queue名不相同) -> 广播方式,1条消息,n个都收到消息生产者Code:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer4FanoutExchange { public static void main(String[] args) throws Exception { // 1、创建connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.204.51"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("rabbitadmin"); connectionFactory.setPassword("123456"); // 2 创建Connection Connection connection = connectionFactory.newConnection(); // 3 创建Channel Channel channel = connection.createChannel(); // 4 声明 String exchangeName = "test_fanout_exchange"; // 5 发送 for (int i = 0; i < 5; i++) { String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ..." + i; channel.basicPublish(exchangeName, "", null, msg.getBytes()); } channel.close(); connection.close(); }}
消息消费者 Code:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.QueueingConsumer.Delivery;public class Consumer4FanoutExchange { public static void main(String[] args) throws Exception { // 1、创建connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.xxx.xx"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("rabbitadmin"); connectionFactory.setPassword("123456"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 4 声明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_fanout_queue"; String routingKey = ""; // 不设置路由键 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); // durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); // 参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); // 循环获取消息 while (true) { // 获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println(queueName + " 收到消息:" + msg); } }}
消费者输出:
test_fanout_queue 收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message …0 test_fanout_queue 收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message …1 test_fanout_queue 收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message …2 test_fanout_queue 收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message …3 test_fanout_queue 收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message …4** 经测试结果总结 **
如再起一个相同queue名称的消费:RabbitMQ会轮询方式消费,一条消息只有一个queue收到
如Copy上面的消费端代码,只修改String queueName = “test_fanout_queue_2”;:RabbitMQ广播模式,1条消息,n个消费者都收到
大白话:
1生产者,n消费者(queue名相同) -> 轮询方式,一条消息只有一个queue收到 1生产者,n消费者(queue名不相同) -> 广播方式,1条消息,n个都收到消息生产者Code:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer4TopicExchange { public static void main(String[] args) throws Exception { // 1、创建connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.xxx.xx"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("rabbitadmin"); connectionFactory.setPassword("123456"); // 2 创建Connection Connection connection = connectionFactory.newConnection(); // 3 创建Channel Channel channel = connection.createChannel(); // 4 声明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; // 5 发送 for (int i = 0; i < 5; i++) { String msg = "Hello World RabbitMQ 4 Topic Exchange Message ..." + i; channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes()); } channel.close(); connection.close(); }}
消息消费者 Code:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.QueueingConsumer.Delivery;public class Consumer4TopicExchange { public static void main(String[] args) throws Exception { // 1、创建connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.xxx.xx"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("rabbitadmin"); connectionFactory.setPassword("123456"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 4 声明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; // String routingKey = "user.*"; String routingKey = "user.*"; // 1 声明交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 2 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 3 建立交换机和队列的绑定关系: channel.queueBind(queueName, exchangeName, routingKey); // durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); // 参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); // 循环获取消息 while (true) { // 获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println(queueName + " 收到消息:" + msg); } }}
消费者输出:
test_topic_queue 收到消息:Hello World RabbitMQ 4 Topic Exchange Message …0 test_topic_queue 收到消息:Hello World RabbitMQ 4 Topic Exchange Message …1 test_topic_queue 收到消息:Hello World RabbitMQ 4 Topic Exchange Message …2 test_topic_queue 收到消息:Hello World RabbitMQ 4 Topic Exchange Message …3 test_topic_queue 收到消息:Hello World RabbitMQ 4 Topic Exchange Message …4** 经测试结果总结 **
如再起一个相同queue名称的消费:RabbitMQ会轮询方式消费,一条消息只有一个queue收到
如Copy上面的消费端代码,只修改String queueName = “test_topic_queue_2”;:RabbitMQ广播模式,1条消息,n个消费者都收到
大白话:
1生产者,n消费者(queue名相同) -> 轮询方式,一条消息只有一个queue收到 1生产者,n消费者(queue名不相同) -> 广播方式,1条消息,n个都收到转载地址:http://wsuws.baihongyu.com/