|

一、RabbitMq基础知识
0、概述
消息队列的作用就是接收消息生产者的消息,然后将消息发送到消费者
1、信道channel
我的理解是生产者/消费者和rabbitmq交互的一个通道,负责交换机、队列管理;消息发布和消费管理;事务管理等
2、交换机
四种交换机:
direct:可以用一个或者多个key绑定到一个或者多个队列上
topic:支持路由的适配符 # *
Fanout广播:将消息发送给所有的队列
Header头交换机:自定义通过头消息属性来定义路由的匹配
3、队列:保存消息的队列
4、消费者:消息的接收者
5、生产者:消息的发送者
二、 使用com.rabbitmq.client.*操作mq
2.1、基本操作
0、环境和依赖
<!-- 环境 * jdk 1.8 * idea * springboot 2.2.6 --> <!-- 依赖 这里只导入这个包,其中包含了Rabbit client的包--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
1、创建连接和信道
//获取连接 ConnectionFactory factory = new ConnectionFactory; factory.setHost("localhost");//mq主机地址 factory.setPort(5672);//端口,默认时5672 factory.setUsername("leyou"); factory.setPassword("leyou"); factory.setVirtualHost("/leyou"); Connection connection = factory.newConnection; //获取信道 Channel channel = connection..createChannel;
2、申明交换机 / 队列 / 绑定交换机和队列
//交换机名,交换机类型 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); /** * 第一个参数是queue:要创建的队列名 * 第二个参数是durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息 * 第三个参数是exclusive:true表示一个队列只能被一个消费者占有并消费 * 第四个参数是autoDelete:true表示服务器不在使用这个队列是会自动删除它 * 第五个参数是arguments:包括死信队列,队列的ttl */ channel.queueDeclare(QUEUE_ONE,true,false,false,null); //绑定交换机和队列 队列名,交换机名,routekey channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL);
3、发布消息
//1、交换机名 2、routekey 3、mandatory强制(需要return回调时必须设置为true) 4、发布消息参数 5、消息 channel.basicPublish(EXCHANGE,GIRL,true,null,"xxx降价了".getBytes);
4、接收消息
//接收消息前也需要获取连接和channel,申明队列 //接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte body) throws IOException { //拿到消息 System.out.println(new String(body,"utf-8")); } }; /** * 参数说明 * 1:队列名字 * 2:是否自动应答 autoACk,为false时需要手动ack * 3:消费者,当接收到消费者时会调用给对象中的 handleDelivery 方法 */ channel.basicConsume(QUEUE_ONE,true,consumer);
2.2、基本应用
1、功能:
有两个人小明和小华,小明对美女感兴趣,小华对股票和没事感兴趣,使用消息队列将他们感兴趣的消息发送给他们两个
2、实现:
(1)写一个类来提供创建连接和信道;(2)生产者(发送消息方)类发送消息(3)消费者(接收消息)类接收消息
[list,
[*,连接类
[/list,public class ConnectionUtil { /** * 使用原始的rabbitmq client api 操作mq */ private static ConnectionFactory factory = new ConnectionFactory; private static Connection connection; /* 获取连接 注意导包:需要导client下面的包 */ public static Connection getConnection throws IOException, TimeoutException { // factory.setHost("localhost"); // factory.setPort(5672); factory.setUsername("leyou"); factory.setPassword("leyou"); factory.setVirtualHost("/leyou"); connection = factory.newConnection; return connection; } public static void close throws IOException { connection.close; } /* 创建信道 */ public static Channel getChannel throws IOException, TimeoutException { return getConnection.createChannel; } }
[list,
[*,生产者
[/list,//生产者 public class provice{ public void producerMsg throws IOException, TimeoutException, InterruptedException { Channel channel = ConnectionUtil.getChannel; String EXCHANGE = "direct_exchange"; channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); //定义两个队列名 String QUEUE_ONE = "beauty_queue"; String QUEUE_TWO = "food_queue"; channel.queueDeclare(QUEUE_ONE,true,false,false,null); channel.queueDeclare(QUEUE_TWO,true,false,false,null); //定义三个key String GIRL = "girl"; String SHARE = "share"; String FOOD = "food"; //绑定 channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL); channel.queueBind(QUEUE_TWO,EXCHANGE,SHARE); channel.queueBind(QUEUE_TWO,EXCHANGE,FOOD); //发送消息 /** * 参数:1交换机,2routekey 3 mandatory:强制;(需要return回调时必须设置为true) * 3参数,4消息字节数据 */ channel.basicPublish(EXCHANGE,GIRL,true,null,"快看,是她".getBytes); channel.basicPublish(EXCHANGE,SHARE,true,null,"股票涨了".getBytes); channel.basicPublish(EXCHANGE,FOOD,true,null,"肯德基降价了".getBytes); //关闭连接 channel.close; ConnectionUtil.close; } }
[list,
[*,消费者
[/list,public class ConsumerMq { // 消费消息 /** * 使用原始的rabbitmq client api 操作mq */ String EXCHANGE = "direct_exchange"; String QUEUE_ONE = "beauty_queue"; String QUEUE_TWO = "food_queue"; //key String GIRL = "girl"; String SHARE = "share"; String FOOD = "food"; public void consumer throws IOException, TimeoutException { Channel channel = ConnectionUtil.getChannel; /** * 第一个参数是queue:要创建的队列名 * 第二个参数是durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息 * 第三个参数是exclusive:true表示一个队列只能被一个消费者占有并消费 * 第四个参数是autoDelete:true表示服务器不在使用这个队列是会自动删除它 * 第五个参数是arguments:包括死信队列,队列的ttl, */ channel.queueDeclare(QUEUE_ONE,true,false,false,null); channel.queueDeclare(QUEUE_TWO,true,false,false,null); //在生产者绑定了交换机和队列,在这里就不需要绑定 //channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL); //channel.queueBind(QUEUE_TWO,EXCHANGE,SHARE); //channel.queueBind(QUEUE_TWO,EXCHANGE,FOOD); //接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte body) throws IOException { System.out.println(new String(body,"utf-8")); //手动应答ack可以在该方法中进行;参数:1.消息tag,2.是否批量ack channel.basicAck(envelope.getDeliveryTag,false); } }; /** * 参数说明 * 1:队列名字 * 2:是否自动应答 autoACk 为false时需要手动ack * 3:消费者,当接收到消费者时会调用给对象中的 handleDelivery 方法 */ channel.basicConsume(QUEUE_ONE,false,consumer); channel.basicConsume(QUEUE_TWO,false,consumer); } }
2.3、mq事务,发送方确认,和消息回调
概述
消息的发送链路 生产者 -> exchange --> queue --> 消费者;为确保消息发送到rabbitmq,amqp协议提供了三个机制来保证:事务,发送方确认(ack),消息回调(returncallback);事务的方式和数据库的事务类似,这里不做详细介绍;发送方确认是当消息发送到交换机时, broker(实现amqp协议的服务端,这里指rabbitmq)会回调发送者的一个固定方法来确认消息成功发送;消息回调是发生在交换机通过路由key转发到队列的过程中,如果消息不能通过key找到对应的queue则回调一个固定方法将消息返回给生产者,确保消息不丢失
1、mq事务
[list,
[*,rabbitMq是支持事务的,但是使用事务的效率很低,在消息数量很大的情况下影响性能
[/list,2、发送方确认
对于固定消息体大小和线程数,如果消息持久化,生产者confirm(或者采用事务机制),消费者ack那么对性能有很大的影响.
消息持久化的优化没有太好方法,用更好的物理存储(SAS, SSD, RAID卡)总会带来改善。生产者confirm这一环节的优化则主要在于客户端程序的优化之上。归纳起来,客户端实现生产者confirm有三种编程方式:
[list=1,
[*,普通confirm模式:每发送一条消息后,调用waitForConfirms方法,等待服务器端confirm。实际上是一种串行confirm了。
[*,批量confirm模式:每发送一批消息后,调用waitForConfirms方法,等待服务器端confirm。
[*,异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。
[/list,
[list,
[*, 普通confirm模式
[/list,//要点 //第1种 //普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。 //1.发消息前 channel.confirmSelect; //2.发消息后 //判断消息发送是否成功 if(channel.waitForConfirms){ System.out.println("消息发送成功"); }
[list,
[*, 批量confirm模式
[/list,批量confirm模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。
channel.confirmSelect; for(int i=0;i<batchCount;i++){ channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes); } if(!channel.waitForConfirms){ System.out.println("send message failed."); }
异步confirm模式
异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms方法也是通过SortedSet维护消息序号的。关键代码:
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>); //别忘这行代码 channel.confirmSelect; //添加监听器 channel.addConfirmListener(new ConfirmListener { public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag + 1).clear; } else { confirmSet.remove(deliveryTag); } } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + "] multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear; } else { confirmSet.remove(deliveryTag); } } }); while (true) { long nextSeqNo = channel.getNextPublishSeqNo; channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes); confirmSet.add(nextSeqNo); }
3、消息回调
//要点 //1.发送消息是将第三个参数mandatory设置为true channel.basicPublish(EXCHANGE,FOOD,true,null,"肯德基降价了".getBytes); //2.添加消息回调监听器 channel.addReturnListener(new ReturnListener { @Override public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte bytes) throws IOException { System.out.println("消息不可路由"+new String(bytes,"utf-8")); } }); //注意:开启回调不能关闭连接和信道,
2.4、接收方确认
1、概述
接收方ack分为手动和自动,在接收消息时设置
//第二个参数就是指定是否手动ack false时为手动 channel.basicConsume(QUEUE_ONE,false,consumer);
手动ack有三种
[list,
[*,单个确认
[*,单个拒绝
[*,批量拒绝
[/list,2、代码实现
单个确认ack
//接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte body) throws IOException { System.out.println(new String(body,"utf-8")); //手动应答ack可以在该方法中进行;参数:1.消息tag,2.是否批量ack channel.basicAck(envelope.getDeliveryTag,false); //拒绝消息;参数:1.消息tag;2.消息是否重新入队,当只有一个消费者时,会引起重复消费 channel.basicReject(envelope.getDeliveryTag,false); //批量ack消息;参数:1.消息tag;2.是否批量ack消息,3.是否重回队列 channel.basicNack(envelope.getDeliveryTag,true,false); } }; //这里只需要条应答的语句,我这里知识都列出来 channel.basicConsume(QUEUE_ONE,false,consumer); //注意上面第二个参数要为false才能手动ack
2.5、消息TTL和队列TTL、死信队列、延迟队列
这一块暂时不使用原始RabbitMq Client API实现,后面再研究,但是会使用下面的org.springframework.amqp来实现
三、使用org.springframework.amqp操作mq
3.1、前言:
Spring对RabbitMp进行了抽象,将交换机,队列,消息,绑定,连接等抽象出实体类,方便操作,还提供了RabbitAdmit 和RabbitTemplate 来方便交换机队列的管理以及消息的发送接收等
3.2、基本实例
0、环境和依赖
<!-- 环境 * jdk 1.8 * idea * springboot 2.2.6 --> <!-- 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
1、实例
发送消息告诉消费者超时打折了快来购物
[list,
[*,配置类
[/list,@Configuration public class RabbitConfig { private final static Logger log = LoggerFactory.getLogger(RabbitConfig.class); private final static String EXCHANGE_NAME = "verification_code_exchange"; private final static String VERIFICATION_CODE_QUEUE = "verification_code_queue"; private final static String VERIFICATION_CODE_ROUTE_KEY = "verification_code_key"; //死信交换机和队列和key private final static String DLX_EXCHANGE_NAME = "dlx-exchange"; private final static String DLX_KEY = "verification_code_key"; @Bean public CachingConnectionFactory connectionFactory{ CachingConnectionFactory conn = new CachingConnectionFactory; conn.setUsername("leyou"); conn.setPassword("leyou"); conn.setVirtualHost("/leyou"); //消息发送到mq发送确认消息给生产者 conn.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); //消息发送到mq,通过绑定的key找不到queue,则发送消息给生产者 conn.setPublisherReturns(true); return conn; } @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //设置消息序列化 rabbitTemplate.setMessageConverter(converter); //消息的确认回调 // rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback { // @Override // public void confirm(CorrelationData correlationData, boolean b, String s) { // // } // }); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { //ack为确认消息是否成功发送到mq if(ack){ //成功发送 log.info("消息发送成功"); } }); //改标志位设置位true时,当交换机根据自身类型和routeKey无法找到对应的队列时, // 则mq会将消息返还给生产者 //当为false时则mq会将消息直接删除 rabbitTemplate.setMandatory(true); //消息,返回码,返回内容,交换机,路由key rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{ //消息 log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}"]message,replyCode,replyText,exchange,routingKey); }); return rabbitTemplate; } /** * 注入rabbitadmin 用来申明交换机和队列,主要作用是代替原始的使用channl申明的做法,全部交给这个对象来完成 * @param connectionFactory * @return */ @Bean public RabbitAdmin rabbitAdmit(CachingConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } /** * 消息序化对象 * 默认使用的是JDK的序列化,这里配置了后就可以将消息序列化为json格式 */ @Bean public MessageConverter converter { return new Jackson2JsonMessageConverter; } /** * 申明一个交换机 */ @Bean public DirectExchange verificationCodeExchange(RabbitAdmin rabbitAdmin){ DirectExchange exchange = new DirectExchange(EXCHANGE_NAME); rabbitAdmin.declareExchange(exchange); return exchange; } /** * 申明一个队列 * @param rabbitAdmin * @return */ @Bean public Queue getQueue(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(VERIFICATION_CODE_QUEUE, true,false,false,null); rabbitAdmin.declareQueue(queue); return queue; } /** * 申明一个绑定 * @param rabbitAdmin * @param verificationCodeExchange * @return */ @Bean public Binding bindingQueue(RabbitAdmin rabbitAdmin,DirectExchange verificationCodeExchange){ Binding with = BindingBuilder.bind(getQueue(rabbitAdmin)).to(verificationCodeExchange).with(VERIFICATION_CODE_ROUTE_KEY); rabbitAdmin.declareBinding(with); return with; } }
说明:上面用到了生产者confirm和消息回调机制1、生产者confirm关键代码:
//1、创建连接时 conn.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); //2、创建rabbitTemplate时 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { //ack为确认消息是否成功发送到mq if(ack){ //成功发送 log.info("消息发送成功"); } });
2、消息回调机制关键代码:
//1、创建连接时 conn.setPublisherReturns(true); //2、创建rabbitTemplate时 //改标志位设置位true时,当交换机根据自身类型和routeKey无法找到对应的队列时, // 则mq会将消息返还给生产者 //当为false时则mq会将消息直接删除 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{ //消息 log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}"]message,replyCode,replyText,exchange,routingKey); });
生产者:
@Component public class RabbitSender { //注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg{ //构建消息 Message message = MessageBuilder.withBody( JSONObject.toJSONString(MessageModel.builder.id(msgId).context("超市打折,快来抢购!").build).getBytes).build; //消息持久化 message.getMessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //消息的媒体类型 message.getMessageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); //消息的自定义关联id CorrelationData correlationData = new CorrelationData(String.valueOf(msgId)); rabbitTemplate.convertAndSend(exchange,routingKey,message,new MessagePostProcessor{ //消息后置处理器,可以在下面这个方法中对消息进行相关属性的设置 @Override public Message postProcessMessage(Message message) throws AmqpException { //比如可以设置上面 这些属性等 //message.getMessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化问题 //message.getMessageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);//消息的媒体类型 return message; } },correlationData); } }
消费者
@Component public class RabbitReceive { @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE_NAME, type = ExchangeTypes.DIRECT)] key = VERIFICATION_CODE_ROUTE_KEY, value = @Queue(value = VERIFICATION_CODE_QUEUE, autoDelete = "false")] ignoreDeclarationExceptions = "true")] concurrency = "1"] // 指定监听该队列的消费者个数 ackMode = "MANUAL"// 手动ack ) public void receiveCode(Channel channel, Message msg, @Headers Map<String, Object> headers) throws IOException, InterruptedException { String msgId = (String) headers.get("spring_listener_return_correlation"); long tag = msg.getMessageProperties.getDeliveryTag; channel.basicAck(tag, false); } }
其中:发送方确认(生产者confirm)、消息回调上面代码都包含了;消费者ack则和原始方法是一样的
下面介绍消息TTL,队列TTL,死信队列,延迟队列
[list,
[*,消息和队列的TTL
[/list,//消息ttl //在构建消息时设置消息的过期时间 Message message = MessageBuilder.withBody( JSONObject.toJSONString(MessageModel.builder.id(msgId).context("超市打折,快来抢购!").build).getBytes).build; //消息的过期时间 message.getMessageProperties.setExpiration("5000"); //队列的ttl //在创建队列时通过参数设置 Map<String, Object> args = new HashMap<>; //指定死信交换机 args.put("x-dead-letter-exchange"] DLX_EXCHANGE_NAME); //指定死信队列的key args.put("x-dead-letter-routing-key"] DLX_KEY); //设置队列中消息的过期时间 ms args.put("x-message-ttl"]10000); //整个队列的过期时间,过期后整个队列会被删除 //args.put("x-expires"]10000); Queue queue = new Queue(VERIFICATION_CODE_QUEUE, true,false,false,args);
上面还包括死信队列的属性设置,和死信队列key,关于死信队列的配置,还需要配置一个死信交换机和一个死信队列;当有消息或队列的ttl过期,消息超过队列最大长度,消息被拒绝且设置不重新回队列,则消息会被转发到死信交换机,再转发到死信队列。
[list,
[*,关于延迟队列的实现方法有两种
[/list,[list=1,
[*,使用死信队列,用一个设置了ttl的队列来存放消息,该队列不需要消费者监听,然后给该队列配置死信交换机和队列,消费者监听死信队列,这样就能达到时间达到延迟收到消息的目的
[*,使用rabbitmq插件的方式实现,这里先不写,放到下一篇笔记中
[/list,最后
感谢你看到这里,看完有什么的不懂的可以在评论区问我,觉得文章对你有帮助的话记得给我点个赞,每天都会分享java相关技术文章或行业资讯,欢迎大家关注和转发文章!
|
|