springboot整合mq

rabbitMQ概念篇—-参考

rabbitMQ实战篇—-参考

使用场景

业务服务模块解耦、异步通信、高并发限流、超时业务、数据延迟处理等

演变过程

名词解释

队列:消息的暂存区/存储区

交换机:消息的中转站,用于接收分发消息。其中有 fanout、direct、topic、headers 四种

路由:相当于密钥/第三者,与交换机绑定即可路由消息到指定的队列!



mq交换机介绍

消息推送到接收的流程

(1).Direct Exchange (直连型交换机)

根据消息携带的路由键将消息投递给对应队列。

大致流程:有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key ,然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

(2).Fanout Exchange(扇型交换机)

没有路由键概念,绑了路由键也是无视的。

在接收到消息后,会直接转发到绑定到它上面的所有队列。

(3).Topic Exchange(主题交换机)

跟直连交换机流程差不多

特点:在它的路由键和绑定键之间是有规则的。

(4).Header Exchange(头交换机)

简单地介绍下规则:

* 用来表示一个单词 (必须出现的)

# 用来表示任意数量(零个或多个)单词

举个小例子:(
若队列Q1 绑定键为 .TT. ; 队列Q2绑定键为 TT.#)

如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;

如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;

主题交换机是非常强大的,为啥这么膨胀?

当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。

当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。

系统实战

1.秒杀系统mq使用

系统特征:在某个时刻会有成百上千万的请求到达我们的接口,即瞬间巨大的流量将涌入我们的系统。

开始秒杀”、“开始抢单”的时刻,此时系统可能会出现这样的几种现象:

(1).应用系统配置承载不了这股瞬间流量,导致系统直接挂掉,即传说中的“宕机”现象;

(2).接口逻辑没有考虑并发情况,数据库读写锁发生冲突,导致最终处理结果跟理论上的结果数据不一致(如商品存库量只有 100,但是高并发情况下,实际表记录的抢到的用户记录数据量却远远大于 100);

(3).应用占据服务器的资源直接飙高,如 CPU、内存、宽带等瞬间直接飙升,导致同库同表甚至可能同 host 的其他服务或者系统出现卡顿或者挂掉的现象;

如何解决上述问题(常用解决方案)?

(1).将处理抢单的整体业务逻辑独立、服务化并做集群部署;

(2).将那股巨大的流量拒在系统的上层,即将其转移至 MQ 而不直接涌入我们的接口,从而减少数据库读写锁冲突的发生以及由于接口逻辑的复杂出现线程堵塞而导致应用占据服务器资源飙升;

(3).将抢单业务所在系统的其他同数据源甚至同表的业务拆分独立出去服务化,并基于某种 RPC 协议走 HTTP 通信进行数据交互、服务通信等等;

(4).采用分布式锁解决同一时间同个手机号、同一时间同个 IP 刷单的现象

逻辑处理: “请求” -> "处理抢单业务的接口" 中间架一层消息中间件做“缓冲”、“缓压”处理,

RabbitMQ 来实战上述的第二点流程图如下

具体实战演示参考

MQ监控消息是否成功

1.服务端监听

可参考如下: springboot整合rabbitMQ (fanout交换机)第4部分.server服务端发送消息

使用详情

2.客户端监听

和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。

消息接收的确认机制主要存在三种模式:

①自动确认(默认的消息确认)。 AcknowledgeMode.NONE

RabbitMQ成功将消息发出(消息成功写入TCP Socket),立即认为消息已经被正确处理,不管消费者端是否成功处理本次投递。
当消费端消费逻辑抛出异常(消费端没有成功处理这条消息),相当于丢失了消息。

一般使用try catch捕捉异常后,打印日志追踪数据,找对应数据后续处理。

② 不确认, 这个不做介绍

③ 手动确认,(接收消息确认机制时,多数选择的模式)。

消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。

(1).basic.ack用于肯定确认

(2).basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)

(3).basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息

消费者端3个方法都表示消息已经被正确投递,
basic.ack表示消息已经被正确处理,
basic.nack,basic.reject表示没有被正确处理,但是RabbitMQ中仍然需要删除这条消息。

springboot整合rabbitMQ (fanout交换机)

1.pom.xml

<!-- rabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>

2.application.yml

spring: 
  rabbitmq:
    host: 172.16.0.109
    port: 5672
    username: root
    password: 123456
    virtual-host: /
    listener:             
      concurrency: 10   //并发消费者的初始化值
      max-concurrency: 20  //并发消费者的最大值
      prefetch: 5 //每个消费者每次监听时可拉取处理的消息数量

3.server服务端与client客户端(公用部分)

public class MQUtils {
    public static final String fanoutexchangename = "DG_Exchange";
    public static final String queueName_1 = "test_1";
}    

4.server服务端发送消息

@Component
public class RabbitMQConfig {

    private static Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);

    /**
     * queue 队列的名称。
     * durable: 设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在 服务器重启的时候可以保证不丢失相关信息。
     * exclusive 设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排 他队列,该队列仅对首次声明它的连接可见,
     *                 并在连接断开时自动删除。这里需要注意 三点:排他队列是基于连接( Connection) 可见的,同 个连接的不
     *                 同信道 (Channel) 是可以同时访问同一连接创建的排他队列; "首次"是指如果 个连接己经声明了 排他队列,
     *                 其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队 列是持久化的,一旦连接关闭或者客户端退出,
     *                 该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景。
     * autoDelete: 设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是: 至少有一个消费者连接到这个队列,
     *                 之后所有与这个队列连接的消费者都断开时,才会 自动删除。不能把这个参数错误地理解为: "当连接到此队列的
     *                 所有客户端断开时,这 个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队 列连接时,
     *                 都不会自动删除这个队列。
     *  argurnents: 设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、
     *               x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等
     * 
     * @param name
     * @return
     */
    @Bean
    public Queue setQueue1(){
        return new Queue(MQUtils.queueName_1,true,false,false,null);
    }

    /**
     * name 交换机名称
     * durable 是否持久化
     * autoDelete 是否自动删除
     * Map<String, Object> arguments 设置交换机的其他一些参数
     * @param exchanageName
     * @return
     */
    @Bean
    public FanoutExchange setFanoutExchange(){
        return new FanoutExchange(MQUtils.fanoutexchangename,true,false,null);
    }

    @Bean
    public Binding setQueueToExchange(){
        return BindingBuilder.bind(setQueue1()).to(setFanoutExchange());
    }

    /**
     * 配置生产者消息监控确认
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate createRabbitTemplate (ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();

        rabbitTemplate.setConnectionFactory(connectionFactory);

        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                logger.info("-----ConfirmCallback:   data -->"+correlationData);
                logger.info("-----ConfirmCallback:   ackFlag -->"+ack);
                logger.info("-----ConfirmCallback:   causemessage -->"+cause);
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                // TODO Auto-generated method stub
                logger.info("-----ReturnCallback:   responseMessage -->"+message);
                logger.info("-----ReturnCallback:   responseCode -->"+replyCode);
                logger.info("-----ReturnCallback:   responseText -->"+replyText);
                logger.info("-----ReturnCallback:   exchangeName -->"+exchange);
                logger.info("-----ReturnCallback:   routingKey -->"+routingKey);
            }
        });

        return rabbitTemplate;
    }

}    

发送消息处理类

@Component
public class MQSendMessage{
    private static Logger logger = LoggerFactory.getLogger(MQSendMessage.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    public boolean sendMessage(String exchangeName,String message){

        boolean flag = false;
        try {
            rabbitTemplate.convertAndSend(MQUtils.fanoutexchangename,null,message);

            logger.info(String.format("消息存入rabbitMQ中,{  exchangeName = [%s],queueName = [%s],message = [%s]",MQUtils.fanoutexchangename,MQUtils.queueName_1,message));
            flag = true;
        } catch (Exception e) {
            // TODO: handle exception
            flag = false;
            e.printStackTrace();
            logger.error("初始化rabbitMQ,增加消息异常",e);
        }
        return flag;
    }
}

5.client客户端接收消息

@Configuration
public class RabbitMQConfig {

    @Bean
    public Queue getQueue1(){
        return new Queue(MQUtils.queueName_1, true, false, false, null);
    }

    @Bean
    public FanoutExchange getFanoutExchange(){
        return new FanoutExchange(MQUtils.fanoutexchangename, true, false, null);
    }

    @Bean
    public Binding setBinding(){
        return BindingBuilder.bind(getQueue1()).to(getFanoutExchange());
    }

    @Autowired
    MQReceiveMessage mQReceiveMessage;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(CachingConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //并发配置
        container.setConcurrentConsumers(1); //并发消费者的初始化值
        container.setMaxConcurrentConsumers(1);  //并发消费者的最大值
        container.setPrefetchCount(1); //每个消费者每次监听时可拉取处理的消息数量

        //消息确认机制
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setQueues(getQueue1());
        container.setMessageListener(mQReceiveMessage);

        //如下配置监听多个队列和接收消息
//        container.addQueues(queueA());
//        container.setMessageListener(fanoutMessageReceiverA);
        return container;
    }
}

接收消息处理类

@Component
public class MQReceiveMessage implements ChannelAwareMessageListener {
    private static Logger logger = LoggerFactory.getLogger(MQReceiveMessage.class);

    public void onMessage(Message message, Channel channel) throws Exception {
        // TODO Auto-generated method stub
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            String receiveMessage = new String(message.getBody(),Charset.forName("UTF-8"));

            logger.info("监听到rabbitMQ消费消息成功:"+receiveMessage);

            channel.basicAck(deliveryTag, true);

        } catch (Exception e) {
            // TODO: handle exception
            logger.info("监听到rabbitMQ消费消息异常:",e);
            channel.basicReject(deliveryTag, false);
        }
    }
}

direct exchange (直连交换机)

1.pom.xml

<!-- rabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>

2.application.yml

spring: 
  rabbitmq:
    host: 172.16.0.109
    port: 5672
    username: root
    password: 123456
    virtual-host: /
    listener:             
      concurrency: 10   //并发消费者的初始化值
      max-concurrency: 20  //并发消费者的最大值
      prefetch: 5 //每个消费者每次监听时可拉取处理的消息数量

3.server服务端与client客户端(公用部分)

public class MQUtils {
    //direct直连交换机
    public static final String direct_exchangeName = "direct_exchange";
    public static final String direct_queueName_1 = "direct_queue_1";
    public static final String direct_route_1 = "direct_route_1";
}

@Component
public class RabbitDirectExchangeConfig {

    @Bean
    public Queue getQueue(){
        return new Queue(MQUtils.direct_queueName_1, true);
    }

    @Bean
    public DirectExchange getDirectExchange(){
        return new DirectExchange(MQUtils.direct_exchangeName, true, false);
    }

    @Bean
    public Binding bindQueueToExchange(){
        return BindingBuilder.bind(getQueue()).to(getDirectExchange()).with(MQUtils.direct_route_1);
    }

}

4.server服务端发送消息

@Component
public class MQSendMessage{
    private static Logger logger = LoggerFactory.getLogger(MQSendMessage.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendMessage(String message){

        rabbitTemplate.convertAndSend(MQUtils.direct_exchangeName,MQUtils.direct_route_1,message);
    }
}

5.client客户端接收消息

@Component
@RabbitListener(queues=MQUtils.direct_queueName_1)
public class DirectReveiveMessage {

    @RabbitHandler
    public void procee(String message){
        System.out.println("message");
    }
}

topic exchange (主题交换机)

1.pom.xml

<!-- rabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>

2.application.yml

spring: 
  rabbitmq:
    host: 172.16.0.109
    port: 5672
    username: root
    password: 123456
    virtual-host: /
    listener:             
      concurrency: 10   //并发消费者的初始化值
      max-concurrency: 20  //并发消费者的最大值
      prefetch: 5 //每个消费者每次监听时可拉取处理的消息数量

3.server服务端与client客户端(公用部分)

public class MQUtils {
    //topic主题交换机
    public static final String topic_exchangeName = "topic_exchange_test1";
    public static final String topic_queueName_1 = "topic_queue_11";
    public static final String topic_queueName_2 = "topic_queue_22";
    public static final String topic_route_1 = "topic_route.man";
    public static final String topic_route_2 = "topic_route.woman";
}

@Component
public class RabbitTopicExchange {
    @Bean
    public Queue getQueue1(){
        return new Queue(MQUtils.topic_queueName_1, true);
    }

    @Bean
    public Queue getQueue2(){
        return new Queue(MQUtils.topic_queueName_2, true);
    }

    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange(MQUtils.topic_exchangeName, true, false);
    }

    @Bean
    public Binding bindQueueToExchange1(){
        return BindingBuilder.bind(getQueue1()).to(getTopicExchange()).with(MQUtils.topic_route_1);
    }

    @Bean
    public Binding bindQueueToExchange2(){
        return BindingBuilder.bind(getQueue2()).to(getTopicExchange()).with("topic_route.#");
    }

}

4.server服务端发送消息

@Component
public class MQSendMessage1{
    private static Logger logger = LoggerFactory.getLogger(MQSendMessage.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendMessage1(String message){

        rabbitTemplate.convertAndSend(MQUtils.topic_exchangeName,MQUtils.topic_route_1,"topic message 1");
    }

    public void sendMessage2(String message){

        rabbitTemplate.convertAndSend(MQUtils.topic_exchangeName,MQUtils.topic_route_2,"topic message 2");
    }
}

5.client客户端接收消息

@Component
public class TopicReceiveMessage {

    @RabbitListener(queues = MQUtils.topic_queueName_1)
    public void process(String message){
        System.out.println("topic_queue_1 : "+message);
    }

    @RabbitListener(queues = MQUtils.topic_queueName_2)
    public void process2(String message){
        System.out.println("topic_queue_2 : "+message);
    }
}

文章标题:springboot整合mq

发布时间:2019-12-04, 16:55:33

最后更新:2019-12-04, 16:55:33