我工作中用MQ的10种场景

前言

在分布式系统开发中,消息队列(MQ)是解决系统解耦、异步通信、流量削峰等问题的核心组件。我在过往的项目开发(电商秒杀、物流调度、支付对账等场景)中,多次用到RabbitMQ、Kafka、RocketMQ等不同类型的MQ,总结出10种高频实用场景,每种场景均包含“业务痛点、MQ解决方案、技术要点、架构优势”,希望能给大家提供参考。

场景一:系统解耦

业务痛点

早期电商系统中,订单创建后需同步调用库存系统、支付系统、物流系统的接口。若其中某一个系统接口超时(如物流系统服务宕机),会导致整个订单流程失败,系统耦合度极高,一处故障影响全局。

MQ解决方案

  1. 订单系统在创建订单后,不再直接调用其他系统接口,而是向MQ发送一条“订单创建成功”的消息(包含订单ID、商品ID、用户ID、订单金额等核心字段);
  2. 库存系统、支付系统、物流系统分别作为消费者,订阅MQ中“订单创建成功”的消息队列;
  3. 各系统接收到消息后,异步执行自身业务逻辑(库存扣减、支付状态同步、物流单生成),执行结果通过MQ反馈给订单系统(如“库存扣减成功”“支付完成”)。

技术要点

  • 消息队列选择:RabbitMQ(支持交换机路由,可按系统类型路由消息);
  • 消息可靠性:开启消息持久化(durable=true)、生产者确认机制(publisher confirm)、消费者手动ACK,避免消息丢失;
  • 代码示例(Java + RabbitMQ):

    // 订单系统生产者发送消息
    @Service
    public class OrderService {
      @Autowired
      private RabbitTemplate rabbitTemplate;
    
      public void createOrder(Order order) {
          // 1. 保存订单到数据库
          orderMapper.insert(order);
          // 2. 发送订单创建消息到MQ
          String message = JSON.toJSONString(order);
          rabbitTemplate.convertAndSend("order_exchange", "order.created", message, new CorrelationData(order.getOrderId()));
          System.out.println("订单创建消息发送成功,订单ID:" + order.getOrderId());
      }
    }
    
    // 库存系统消费者接收消息
    @Component
    public class InventoryConsumer {
      @Autowired
      private InventoryMapper inventoryMapper;
    
      @RabbitListener(queues = "inventory_queue")
      public void handleOrderCreated(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
          try {
              Order order = JSON.parseObject(message, Order.class);
              // 执行库存扣减逻辑
              inventoryMapper.deductStock(order.getProductId(), order.getQuantity());
              // 手动ACK确认消息消费成功
              channel.basicAck(deliveryTag, false);
              System.out.println("库存扣减成功,商品ID:" + order.getProductId());
          } catch (Exception e) {
              // 消费失败,消息重新入队(或进入死信队列)
              channel.basicNack(deliveryTag, false, true);
              e.printStackTrace();
          }
      }
    }

架构优势

  • 解耦:各系统仅依赖MQ消息,无需感知其他系统存在,新增系统(如积分系统)只需订阅消息即可,无需修改订单系统代码;
  • 容错:某一系统故障时,消息暂存MQ,待系统恢复后重新消费,避免流程中断;
  • 可扩展性:各系统可独立扩容(如库存系统压力大时,增加消费者实例),不影响其他系统。

场景二:异步通信

业务痛点

用户在电商平台下单支付后,系统需完成“订单状态更新、发送短信通知、生成积分、同步会员等级”4个操作。若采用同步调用,每个操作耗时100ms,总耗时400ms,用户需等待400ms才能看到支付成功页面,体验较差。

MQ解决方案

  1. 支付系统完成支付后,同步更新订单状态(耗时100ms),随后向MQ发送“支付成功”消息;
  2. 短信服务、积分服务、会员服务作为消费者,异步订阅“支付成功”消息,分别执行短信发送、积分增加、会员等级更新操作;
  3. 用户无需等待后续3个操作完成,支付成功后立即看到结果,后续操作由MQ异步驱动。

技术要点

  • 消息投递模式:采用“扇出交换机(Fanout Exchange)”,让多个消费者同时接收同一条消息;
  • 异步非阻塞:消费者业务逻辑采用异步线程池处理,避免单条消息消费耗时过长阻塞队列;
  • 耗时对比:同步调用总耗时400ms → 异步调用总耗时100ms(仅订单状态更新),用户等待时间缩短75%。

架构优势

  • 提升用户体验:核心流程(支付+订单更新)快速响应,非核心流程异步执行;
  • 提高系统吞吐量:同步调用时系统TPS受限于最慢操作,异步后各操作并行处理,TPS提升3-5倍。

场景三:流量削峰

业务痛点

电商秒杀活动中,每秒请求量可达10000+,但后端库存系统、订单系统的最大处理能力仅为2000 TPS。若直接将请求打向后端,会导致系统过载、数据库连接耗尽,甚至服务宕机。

MQ解决方案

  1. 秒杀活动开始前,提前在MQ中创建“秒杀请求队列”,设置队列最大长度(如10万条,避免消息堆积过多);
  2. 用户秒杀请求先发送至MQ,MQ作为“缓冲器”接收所有请求,后端系统按自身处理能力(2000 TPS)从MQ中拉取请求;
  3. 超过MQ队列长度的请求直接返回“秒杀拥挤,请稍后重试”,避免后端压力过大。

技术要点

  • MQ选择:Kafka(高吞吐量,每秒可处理百万级消息,适合秒杀场景);
  • 流量控制:设置Kafka消费者“每次拉取消息数量”(如每次拉取200条),配合线程池控制并发消费速度;
  • 防重复提交:消息中携带用户ID+商品ID,后端系统通过Redis实现“幂等性校验”,避免重复创建订单;
  • 代码示例(Kafka消费者限流):

    @Service
    public class SeckillConsumer {
      @Autowired
      private SeckillService seckillService;
      @Autowired
      private StringRedisTemplate redisTemplate;
    
      // 配置Kafka消费者,每次拉取200条消息,线程池核心线程数10
      @KafkaListener(topics = "seckill_topic", properties = {
              "max.poll.records=200",
              "fetch.min.bytes=1024",
              "fetch.max.wait.ms=500"
      })
      public void consumeSeckillRequest(ConsumerRecord<String, String> record) {
          String message = record.value();
          SeckillRequest request = JSON.parseObject(message, SeckillRequest.class);
          String userId = request.getUserId();
          String productId = request.getProductId();
          String key = "seckill:order:" + userId + ":" + productId;
    
          // 幂等性校验:判断用户是否已秒杀成功
          if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
              System.out.println("用户" + userId + "已秒杀过商品" + productId,跳过重复请求");
              return;
          }
    
          // 执行秒杀逻辑
          boolean success = seckillService.doSeckill(userId, productId);
          if (success) {
              // 秒杀成功,记录用户秒杀记录(过期时间24小时)
              redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS);
              System.out.println("用户" + userId + "秒杀商品" + productId + "成功");
          } else {
              System.out.println("用户" + userId + "秒杀商品" + productId + "失败,库存不足");
          }
      }
    }

架构优势

  • 流量缓冲:MQ承接瞬时高峰流量,避免后端系统被“冲垮”;
  • 削峰填谷:将秒杀1小时内的高峰流量,分散到后续2-3小时内处理(若库存未售罄),充分利用后端资源;
  • 保护核心系统:仅让符合处理能力的请求进入后端,确保系统稳定运行。

场景四:数据同步

业务痛点

电商系统中,用户在APP端修改个人信息(昵称、手机号、地址)后,需同步更新到“用户中心数据库”“搜索服务索引库”“推荐系统用户画像库”3个数据源。若采用代码中逐个调用接口同步,存在“同步失败数据不一致”“新增数据源需改代码”等问题。

MQ解决方案

  1. 用户中心系统在用户信息修改后,向MQ发送“用户信息更新”消息(包含用户ID、修改字段、新值);
  2. 搜索服务、推荐服务分别订阅该消息,接收到消息后:

    • 搜索服务:更新Elasticsearch中的用户索引;
    • 推荐服务:更新Redis或MySQL中的用户画像数据;
  3. 所有数据源通过MQ消息异步同步,确保数据最终一致性。

技术要点

  • 消息格式:采用“增量更新”格式,仅携带修改的字段(如{userId:123, nickname:"新昵称"}),减少消息体积;
  • 数据一致性:设置消息重试机制(如重试3次),重试失败后进入死信队列,人工介入处理;
  • 同步延迟:通过监控平台统计消息从发送到消费的延迟(目标<1s),确保各数据源同步及时。

架构优势

  • 数据一致性:避免手动同步导致的数据源不一致问题;
  • 扩展性:新增数据源(如数据分析平台)只需订阅消息,无需修改用户中心代码;
  • 可追溯:MQ记录所有同步消息,便于排查数据不一致原因(如某条消息消费失败)。

场景五:事务消息

业务痛点

电商“下单扣库存”场景中,存在“订单创建成功但库存扣减失败”的问题:若先创建订单,再扣库存,库存扣减失败会导致订单无库存却存在;若先扣库存,再创建订单,订单创建失败会导致库存被扣却无订单,出现数据不一致。

MQ解决方案

采用RocketMQ的“事务消息”机制,实现“订单创建”与“库存扣减”的原子性操作,流程如下:

  1. 订单系统发送“半事务消息”到MQ(此时消息处于“待确认”状态,消费者无法消费);
  2. MQ收到半事务消息后,向订单系统返回“消息发送成功”;
  3. 订单系统执行本地事务(创建订单+扣减库存,在同一个数据库事务中):

    • 若本地事务执行成功,向MQ发送“确认消息”,MQ将半事务消息标记为“可消费”,库存系统(或其他消费者)可消费消息;
    • 若本地事务执行失败,向MQ发送“回滚消息”,MQ删除半事务消息,避免消息被消费;
  4. 若订单系统因网络问题未及时发送确认/回滚消息,MQ会定时向订单系统发起“事务状态回查”,订单系统查询本地事务状态后,补发送确认/回滚消息。

技术要点

  • MQ选择:RocketMQ(原生支持事务消息,RabbitMQ需通过自定义逻辑实现);
  • 本地事务:订单创建与库存扣减需在同一个数据库事务中,确保两者同时成功或同时失败;
  • 回查机制:设置RocketMQ事务回查次数(如3次),回查间隔(如10s),避免无限回查;
  • 代码示例(RocketMQ事务消息):

    // 1. 订单系统发送半事务消息
    @Service
    public class OrderTransactionService {
      @Autowired
      private RocketMQTemplate rocketMQTemplate;
      @Autowired
      private OrderMapper orderMapper;
      @Autowired
      private InventoryMapper inventoryMapper;
    
      public void createOrderWithTransaction(Order order) {
          // 构造半事务消息
          String message = JSON.toJSONString(order);
          Message<String> msg = MessageBuilder.withPayload(message)
                  .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                  .build();
    
          // 发送半事务消息,指定事务监听器
          rocketMQTemplate.sendMessageInTransaction(
                  "order_transaction_topic",  // 事务消息主题
                  "order_tag",                // 消息标签
                  msg,                        // 消息内容
                  order                       // 本地事务参数(传递给事务监听器)
          );
      }
    }
    
    // 2. 事务监听器(执行本地事务+回查事务状态)
    @Component
    @RocketMQTransactionListener(txProducerGroup = "order_producer_group")
    public class OrderTransactionListener implements RocketMQLocalTransactionListener {
      @Autowired
      private OrderMapper orderMapper;
      @Autowired
      private InventoryMapper inventoryMapper;
      @Autowired
      private OrderStatusMapper orderStatusMapper;
    
      // 执行本地事务
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          Order order = (Order) arg;
          Connection connection = null;
          try {
              // 获取数据库连接,开启手动事务
              connection = DruidDataSourceFactory.createDataSource().getConnection();
              connection.setAutoCommit(false);
    
              // 执行订单创建
              orderMapper.insert(order);
              // 执行库存扣减
              int rows = inventoryMapper.deductStock(order.getProductId(), order.getQuantity());
              if (rows == 0) {
                  // 库存不足,回滚本地事务
                  connection.rollback();
                  return RocketMQLocalTransactionState.ROLLBACK;
              }
    
              // 提交本地事务
              connection.commit();
              // 记录订单事务状态(用于回查)
              orderStatusMapper.insert(order.getOrderId(), "SUCCESS");
              return RocketMQLocalTransactionState.COMMIT;
          } catch (Exception e) {
              // 本地事务执行异常,回滚
              try {
                  if (connection != null) connection.rollback();
              } catch (SQLException ex) {
                  ex.printStackTrace();
              }
              e.printStackTrace();
              return RocketMQLocalTransactionState.ROLLBACK;
          } finally {
              try {
                  if (connection != null) connection.close();
              } catch (SQLException e) {
                  e.printStackTrace();
              }
          }
      }
    
      // 事务状态回查
      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          String message = new String((byte[]) msg.getPayload());
          Order order = JSON.parseObject(message, Order.class);
          // 查询本地订单事务状态
          String status = orderStatusMapper.select(order.getOrderId());
          if ("SUCCESS".equals(status)) {
              return RocketMQLocalTransactionState.COMMIT;
          } else if ("FAIL".equals(status)) {
              return RocketMQLocalTransactionState.ROLLBACK;
          } else {
              // 状态未知,继续回查(最多回查3次)
              return RocketMQLocalTransactionState.UNKNOWN;
          }
      }
    }

架构优势

  • 事务一致性:确保“订单创建”与“库存扣减”等操作原子性,避免数据不一致;
  • 可靠性:通过回查机制解决网络异常导致的事务状态未知问题;
  • 低耦合:相比分布式事务(如Seata),无需引入额外框架,依赖MQ原生能力即可实现。

场景六:日志收集

业务痛点

分布式系统中,服务部署在多台服务器上(如订单服务部署5台、支付服务部署3台),日志分散在各服务器的本地文件中。当系统出现故障时,运维人员需逐台登录服务器查看日志,定位问题效率极低(如查找某条订单的日志,需打开5台服务器的日志文件搜索)。

MQ解决方案

采用“MQ + ELK”架构(Kafka + Elasticsearch + Logstash + Kibana),实现日志集中收集与分析:

  1. 各服务通过日志框架(如Logback、Log4j2)将日志输出到本地文件,同时通过“MQ日志Appender”将日志发送到Kafka队列;
  2. Logstash作为消费者,从Kafka中拉取日志,对日志进行清洗(如提取字段、过滤无用日志)、结构化处理(如转为JSON格式);
  3. Logstash将处理后的日志写入Elasticsearch(搜索引擎),构建日志索引;
  4. 运维人员通过Kibana(可视化工具)查询Elasticsearch中的日志,支持按“服务名、时间、日志级别、订单ID”等多维度搜索,快速定位问题。

技术要点

  • MQ选择:Kafka(高吞吐量,适合日志等大量非实时数据传输);
  • 日志结构化:Logstash配置过滤规则,将非结构化日志(如“2024-05-20 10:30:00 [INFO] OrderService: create order success, orderId=123”)转为JSON格式({"time":"2024-05-20 10:30:00","level":"INFO","service":"OrderService","content":"create order success","orderId":"123"});
  • 日志分片:Kafka按“服务名”分区(如订单服务日志写入order_service分区,支付服务日志写入pay_service分区),便于后续按服务查询;
  • 配置示例(Logback对接Kafka):

    <!-- Logback配置文件:logback-spring.xml -->
    <configuration>
      <!-- 1. 输出日志到控制台 -->
      <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
          <encoder>
              <pattern>%d{yyyy-MM-dd HH:mm:ss} [%level] %logger{50} - %msg%n</pattern>
          </encoder>
      </appender>
    
      <!-- 2. 输出日志到Kafka -->
      <appender name="KAFKA" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
          <!-- Kafka地址(Logstash监听地址) -->
          <destination>192.168.1.100:9500</destination>
          <encoder class="net.logstash.logback.encoder.LogstashEncoder">
              <!-- 自定义日志字段 -->
              <customFields>{"service":"order_service","env":"prod"}</customFields>
              <fieldNames>
                  <timestamp>time</timestamp>
                  <message>content</message>
                  <logger>logger</logger>
                  <thread>thread</thread>
                  <level>level</level>
              </fieldNames>
          </encoder>
      </appender>
    
      <!-- 3. 根日志配置,同时输出到控制台和Kafka -->
      <root level="INFO">
          <appender-ref ref="CONSOLE" />
          <appender-ref ref="KAFKA" />
      </root>
    </configuration>

架构优势

  • 集中化:所有服务日志统一存储到Elasticsearch,无需逐台查看服务器;
  • 高效查询:支持多维度搜索(如“orderId=123”“level=ERROR”),定位问题时间从小时级缩短到分钟级;
  • 可分析:通过Kibana生成日志统计报表(如各服务ERROR日志数量趋势、请求耗时分布),提前发现系统隐患。

场景七:延迟消息

业务痛点

电商订单创建后,若用户未在30分钟内支付,需自动取消订单并释放库存。早期实现方式是“定时任务每隔1分钟扫描未支付订单”,存在两个问题:一是延迟(最快1分钟后取消,最慢2分钟),二是压力大(订单量多时,扫描SQL执行耗时久,占用数据库资源)。

MQ解决方案

采用MQ的“延迟消息”功能,实现订单30分钟未支付自动取消:

  1. 订单系统创建未支付订单后,向MQ发送一条“延迟30分钟”的消息(消息内容包含订单ID);
  2. MQ接收到延迟消息后,暂存消息,不立即投递;
  3. 30分钟后,MQ将消息投递到“订单取消队列”;
  4. 订单系统作为消费者,接收到消息后,查询订单支付状态:

    • 若未支付:取消订单,释放库存;
    • 若已支付:忽略消息(通过幂等性校验避免重复取消)。

技术要点

  • MQ选择:

    • RabbitMQ:通过“死信队列(DLX)+ 消息过期时间(TTL)”实现延迟消息;
    • RocketMQ:原生支持延迟消息,提供1s、5s、10s、30s、1min、5min等预设延迟级别,也可自定义延迟时间;
  • 幂等性:消费者处理消息时,必须先查询订单状态,避免已支付订单被取消;
  • 消息可靠性:开启消息持久化,避免MQ宕机导致延迟消息丢失;
  • 代码示例(RabbitMQ延迟消息):

    // 1. 配置RabbitMQ延迟队列(死信队列)
    @Configuration
    public class RabbitMQDelayConfig {
      // 普通队列(接收延迟消息,消息过期后转发到死信队列)
      @Bean
      public Queue orderDelayQueue() {
          Map<String, Object> args = new HashMap<>();
          // 设置消息过期时间(30分钟=1800000ms)
          args.put("x-message-ttl", 1800000);
          // 设置死信交换机
          args.put("x-dead-letter-exchange", "order_dead_exchange");
          // 设置死信路由键
          args.put("x-dead-letter-routing-key", "order.dead.cancel");
          return QueueBuilder.durable("order_delay_queue").withArguments(args).build();
      }
    
      // 死信交换机
      @Bean
      public DirectExchange orderDeadExchange() {
          return ExchangeBuilder.directExchange("order_dead_exchange").durable(true).build();
      }
    
      // 死信队列(实际处理订单取消的队列)
      @Bean
      public Queue orderDeadQueue() {
          return QueueBuilder.durable("order_dead_queue").build();
      }
    
      // 绑定死信交换机与死信队列
      @Bean
      public Binding deadExchangeBindingDeadQueue() {
          return BindingBuilder.bind(orderDeadQueue())
                  .to(orderDeadExchange())
                  .with("order.dead.cancel");
      }
    
      // 绑定普通交换机与普通延迟队列
      @Bean
      public DirectExchange orderDelayExchange() {
          return ExchangeBuilder.directExchange("order_delay_exchange").durable(true).build();
      }
    
      @Bean
      public Binding delayExchangeBindingDelayQueue() {
          return BindingBuilder.bind(orderDelayQueue())
                  .to(orderDelayExchange())
                  .with("order.delay.create");
      }
    }
    
    // 2. 订单系统发送延迟消息
    @Service
    public class OrderDelayService {
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private OrderMapper orderMapper;
    
      public void createUnpaidOrder(Order order) {
          // 1. 保存未支付订单
          order.setStatus("UNPAID");
          orderMapper.insert(order);
          // 2. 发送30分钟延迟消息
          String message = JSON.toJSONString(order.getOrderId());
          rabbitTemplate.convertAndSend("order_delay_exchange", "order.delay.create", message);
          System.out.println("未支付订单创建成功,订单ID:" + order.getOrderId() + ",延迟30分钟后检查支付状态");
      }
    }
    
    // 3. 订单系统消费死信队列消息,取消未支付订单
    @Component
    public class OrderCancelConsumer {
      @Autowired
      private OrderMapper orderMapper;
      @Autowired
      private InventoryMapper inventoryMapper;
    
      @RabbitListener(queues = "order_dead_queue")
      public void handleOrderCancel(String orderId) {
          // 查询订单当前状态
          Order order = orderMapper.selectById(orderId);
          if (order == null) {
              System.out.println("订单不存在,订单ID:" + orderId);
              return;
          }
          // 若未支付,取消订单并释放库存
          if ("UNPAID".equals(order.getStatus())) {
              order.setStatus("CANCELLED");
              orderMapper.updateById(order);
              // 释放库存
              inventoryMapper.restoreStock(order.getProductId(), order.getQuantity());
              System.out.println("订单30分钟未支付已取消,订单ID:" + orderId + ",库存已释放");
          } else {
              // 已支付,忽略消息
              System.out.println("订单已支付,无需取消,订单ID:" + orderId);
          }
      }
    }

架构优势

  • 精准延迟:消息延迟时间可精确到秒级(RocketMQ),避免定时任务的扫描延迟;
  • 低压力:无需定时扫描数据库,减少数据库查询压力(订单量10万时,定时任务需扫描10万条记录,延迟消息仅需处理未支付的订单);
  • 可扩展:支持其他延迟场景(如“订单支付后24小时未发货提醒”“会员到期前3天通知”),只需发送对应延迟级别的消息。

场景八:广播通知

业务痛点

分布式系统中,配置中心(如Nacos、Apollo)更新某一配置(如数据库连接池大小、接口超时时间)后,需通知所有依赖该配置的服务(如订单服务、支付服务、库存服务)重新加载配置。若采用“配置中心逐个调用服务接口”的方式,服务数量多时(如20个),调用效率低且易出现调用失败。

MQ解决方案

  1. 配置中心更新配置后,向MQ发送一条“配置更新”广播消息(包含配置Key、新值、版本号);
  2. 所有依赖该配置的服务作为消费者,订阅MQ的“配置更新”广播队列;
  3. 各服务接收到消息后,检查消息中的配置Key是否为自身依赖的配置:

    • 若是:重新加载配置(如更新数据库连接池、接口超时时间),并记录配置版本号;
    • 若否:忽略消息。

技术要点

  • MQ投递模式:采用“扇出交换机(Fanout Exchange)”或“主题交换机(Topic Exchange)”,实现消息广播;

    • 扇出交换机:所有绑定该交换机的队列都会收到消息,适合全量广播;
    • 主题交换机:通过通配符(如“config.order.#”“config.pay.#”)实现按服务类型广播;
  • 配置版本号:消息中携带配置版本号,服务端对比本地版本号,避免重复加载(如配置中心重试发送消息时);
  • 代码示例(RabbitMQ扇出交换机广播):

    // 1. 配置扇出交换机与消费者队列
    @Configuration
    public class ConfigBroadcastConfig {
      // 扇出交换机(配置更新广播)
      @Bean
      public FanoutExchange configFanoutExchange() {
          return ExchangeBuilder.fanoutExchange("config_broadcast_exchange").durable(true).build();
      }
    
      // 订单服务队列(绑定扇出交换机)
      @Bean
      public Queue orderConfigQueue() {
          return QueueBuilder.durable("order_config_queue").build();
      }
    
      // 支付服务队列(绑定扇出交换机)
      @Bean
      public Queue payConfigQueue() {
          return QueueBuilder.durable("pay_config_queue").build();
      }
    
      // 库存服务队列(绑定扇出交换机)
      @Bean
      public Queue inventoryConfigQueue() {
          return QueueBuilder.durable("inventory_config_queue").build();
      }
    
      // 绑定交换机与各队列
      @Bean
      public Binding orderConfigBinding() {
          return BindingBuilder.bind(orderConfigQueue()).to(configFanoutExchange());
      }
    
      @Bean
      public Binding payConfigBinding() {
          return BindingBuilder.bind(payConfigQueue()).to(configFanoutExchange());
      }
    
      @Bean
      public Binding inventoryConfigBinding() {
          return BindingBuilder.bind(inventoryConfigQueue()).to(configFanoutExchange());
      }
    }
    
    // 2. 配置中心发送广播消息
    @Service
    public class ConfigCenterService {
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private ConfigMapper configMapper;
    
      public void updateConfig(String configKey, String newValue) {
          // 1. 更新配置中心数据库
          Config config = configMapper.selectByKey(configKey);
          if (config == null) {
              config = new Config();
              config.setConfigKey(configKey);
              config.setConfigValue(newValue);
              config.setVersion(1);
              configMapper.insert(config);
          } else {
              config.setConfigValue(newValue);
              config.setVersion(config.getVersion() + 1);
              configMapper.updateById(config);
          }
    
          // 2. 发送配置更新广播消息
          ConfigUpdateMessage message = new ConfigUpdateMessage();
          message.setConfigKey(configKey);
          message.setNewValue(newValue);
          message.setVersion(config.getVersion());
          rabbitTemplate.convertAndSend("config_broadcast_exchange", "", JSON.toJSONString(message));
          System.out.println("配置更新广播消息发送成功,配置Key:" + configKey + ",版本号:" + config.getVersion());
      }
    }
    
    // 3. 订单服务消费广播消息,重新加载配置
    @Component
    public class OrderConfigConsumer {
      @Autowired
      private ConfigService configService;
      // 本地缓存配置版本号
      private Map<String, Integer> localConfigVersion = new ConcurrentHashMap<>();
    
      @RabbitListener(queues = "order_config_queue")
      public void handleConfigUpdate(String message) {
          ConfigUpdateMessage updateMessage = JSON.parseObject(message, ConfigUpdateMessage.class);
          String configKey = updateMessage.getConfigKey();
          int newVersion = updateMessage.getVersion();
          String newValue = updateMessage.getNewValue();
    
          // 对比本地版本号,避免重复加载
          Integer localVersion = localConfigVersion.getOrDefault(configKey, 0);
          if (newVersion <= localVersion) {
              System.out.println("配置版本号未更新,忽略消息,配置Key:" + configKey);
              return;
          }
    
          // 重新加载配置(以数据库连接池为例)
          if ("db.pool.maxActive".equals(configKey)) {
              configService.updateDbPoolMaxActive(Integer.parseInt(newValue));
              localConfigVersion.put(configKey, newVersion);
              System.out.println("订单服务更新数据库连接池配置,maxActive:" + newValue + ",版本号:" + newVersion);
          } else if ("api.timeout.order".equals(configKey)) {
              configService.updateOrderApiTimeout(Integer.parseInt(newValue));
              localConfigVersion.put(configKey, newVersion);
              System.out.println("订单服务更新接口超时配置,timeout:" + newValue + "ms,版本号:" + newVersion);
          }
          // 其他配置Key的处理逻辑...
      }
    }

架构优势

  • 高效广播:一次发送消息,所有服务同时接收,避免逐个调用的效率问题;
  • 低耦合:配置中心无需知道有多少服务依赖配置,新增服务只需绑定队列即可;
  • 可靠通知:通过MQ消息重试机制,确保所有服务都能收到配置更新消息(即使服务暂时宕机,恢复后也能消费)。

场景九:限流熔断

业务痛点

分布式系统中,服务间存在依赖关系(如订单服务依赖支付服务)。当支付服务因故障响应缓慢(如接口超时从100ms变为5000ms)或返回错误(如500错误)时,订单服务会因等待支付服务响应而产生大量线程阻塞,进而导致订单服务线程池耗尽,无法处理新请求,引发“级联故障”。

MQ解决方案

采用“MQ + 限流熔断”组合方案,保护订单服务不被依赖服务的故障影响:

  1. 订单服务调用支付服务前,先检查“支付服务健康状态”(通过熔断器统计调用成功率、超时率);
  2. 若支付服务健康(成功率>90%,超时率<5%):直接调用支付服务接口;
  3. 若支付服务不健康(成功率<80%,超时率>10%):触发熔断,订单服务不直接调用支付服务,而是向MQ发送“支付请求”消息,同时返回“支付处理中,请稍后查询结果”给用户;
  4. 支付服务恢复健康后,从MQ中拉取“支付请求”消息,异步处理支付,处理完成后通过MQ反馈结果给订单服务;
  5. 订单服务接收到支付结果后,更新订单状态,并通过短信/APP推送通知用户。

技术要点

  • 熔断器选择:Sentinel(阿里开源,支持限流、熔断、降级)或Resilience4j(轻量级,适合微服务);
  • MQ角色:熔断时作为“请求缓冲池”,暂存支付请求,避免订单服务线程阻塞;
  • 健康检查:熔断器定时统计支付服务调用指标(成功率、超时率、错误率),动态调整熔断状态(闭合→半开→打开);
  • 代码示例(Sentinel熔断 + RabbitMQ缓冲):

    // 1. 配置Sentinel熔断器(支付服务调用熔断规则)
    @Configuration
    public class SentinelConfig {
      @PostConstruct
      public void initFlowRules() {
          // 熔断规则:支付服务调用接口
          DegradeRule rule = new DegradeRule();
          rule.setResource("payServiceCall");  // 资源名(支付服务调用接口)
          rule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO);  // 按异常比例熔断
          rule.setCount(0.1);  // 异常比例阈值(10%)
          rule.setTimeWindow(60);  // 熔断时间窗口(60秒)
          rule.setMinRequestAmount(10);  // 最小请求数(10次请求后才开始统计)
          DegradeRuleManager.loadRules(Collections.singletonList(rule));
    
          // 限流规则:订单服务接口限流(1000 TPS)
          FlowRule flowRule = new FlowRule();
          flowRule.setResource("createOrderApi");
          flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
          flowRule.setCount(1000);
          FlowRuleManager.loadRules(Collections.singletonList(flowRule));
      }
    }
    
    // 2. 订单服务调用支付服务(带熔断与MQ缓冲)
    @Service
    public class OrderPayService {
      @Autowired
      private PayServiceFeignClient payServiceFeignClient;  // Feign调用支付服务
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private OrderMapper orderMapper;
    
      // 订单创建接口(带限流)
      @SentinelResource(value = "createOrderApi", blockHandler = "createOrderBlockHandler")
      public String createOrder(Order order) {
          // 保存订单
          orderMapper.insert(order);
          // 调用支付服务(带熔断)
          String payResult = callPayServiceWithDegrade(order);
          return payResult;
      }
    
      // 支付服务调用(带熔断)
      @SentinelResource(value = "payServiceCall", fallback = "payServiceFallback")
      private String callPayServiceWithDegrade(Order order) {
          // 直接调用支付服务Feign接口
          PayRequest request = new PayRequest();
          request.setOrderId(order.getOrderId());
          request.setAmount(order.getAmount());
          PayResponse response = payServiceFeignClient.pay(request);
          if ("SUCCESS".equals(response.getCode())) {
              order.setStatus("PAID");
              orderMapper.updateById(order);
              return "订单创建成功,已支付";
          } else {
              return "订单创建成功,支付失败:" + response.getMsg();
          }
      }
    
      // 支付服务熔断降级方法(触发熔断时调用)
      private String payServiceFallback(Order order, Throwable e) {
          // 向MQ发送支付请求消息,异步处理
          PayRequestMessage message = new PayRequestMessage();
          message.setOrderId(order.getOrderId());
          message.setAmount(order.getAmount());
          message.setUserId(order.getUserId());
          rabbitTemplate.convertAndSend("pay_request_queue", JSON.toJSONString(message));
          // 更新订单状态为“支付处理中”
          order.setStatus("PAYING");
          orderMapper.updateById(order);
          // 返回用户提示
          return "订单创建成功,支付处理中,请稍后在【我的订单】中查询结果";
      }
    
      // 订单接口限流降级方法
      public String createOrderBlockHandler(Order order, BlockException e) {
          return "当前订单创建人数过多,请稍后重试";
      }
    }
    
    // 3. 支付服务恢复后,消费MQ消息处理支付
    @Component
    public class PayRequestConsumer {
      @Autowired
      private PayService payService;
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private OrderFeignClient orderFeignClient;
    
      @RabbitListener(queues = "pay_request_queue")
      public void handlePayRequest(String message) {
          PayRequestMessage requestMessage = JSON.parseObject(message, PayRequestMessage.class);
          String orderId = requestMessage.getOrderId();
          BigDecimal amount = requestMessage.getAmount();
    
          try {
              // 处理支付逻辑
              PayResult result = payService.processPay(orderId, amount);
              // 发送支付结果消息给订单服务
              PayResultMessage resultMessage = new PayResultMessage();
              resultMessage.setOrderId(orderId);
              resultMessage.setPayStatus(result.isSuccess() ? "PAID" : "PAY_FAILED");
              resultMessage.setPayTime(new Date());
              rabbitTemplate.convertAndSend("pay_result_queue", JSON.toJSONString(resultMessage));
              System.out.println("支付请求处理完成,订单ID:" + orderId + ",支付状态:" + resultMessage.getPayStatus());
          } catch (Exception e) {
              // 支付处理失败,消息重新入队(重试3次后进入死信队列)
              throw new AmqpRejectAndDontRequeueException("支付处理失败,订单ID:" + orderId, e);
          }
      }
    }
    
    // 4. 订单服务消费支付结果消息,更新订单状态
    @Component
    public class PayResultConsumer {
      @Autowired
      private OrderMapper orderMapper;
      @Autowired
      private SmsService smsService;
    
      @RabbitListener(queues = "pay_result_queue")
      public void handlePayResult(String message) {
          PayResultMessage resultMessage = JSON.parseObject(message, PayResultMessage.class);
          String orderId = resultMessage.getOrderId();
          String payStatus = resultMessage.getPayStatus();
    
          // 更新订单状态
          Order order = orderMapper.selectById(orderId);
          if (order == null) {
              System.out.println("订单不存在,订单ID:" + orderId);
              return;
          }
          order.setStatus(payStatus);
          order.setPayTime(resultMessage.getPayTime());
          orderMapper.updateById(order);
    
          // 发送支付结果短信通知
          String smsContent = "您的订单(ID:" + orderId + ")" + 
                  ("PAID".equals(payStatus) ? "已支付成功" : "支付失败,请重新尝试") + 
                  ",详情可在APP【我的订单】中查看。";
          smsService.sendSms(order.getUserId(), smsContent);
          System.out.println("订单状态更新完成,订单ID:" + orderId + ",支付状态:" + payStatus);
      }
    }

架构优势

  • 熔断保护:依赖服务故障时,触发熔断,避免订单服务线程阻塞,保障核心订单创建功能可用;
  • 缓冲请求:MQ暂存支付请求,待依赖服务恢复后异步处理,不丢失用户请求;
  • 限流保护:通过Sentinel限流,避免订单服务被瞬时高峰流量冲垮,保障系统稳定。

场景十:分布式事务最终一致性

业务痛点

跨银行转账场景中,用户从A银行转账1000元到B银行,需完成“A银行扣减1000元”和“B银行增加1000元”两个操作。由于A银行和B银行是不同的系统(不同数据库、不同服务),无法使用传统的数据库事务保证原子性,存在“A银行扣钱成功但B银行加钱失败”的风险,导致数据不一致。

MQ解决方案

采用“可靠消息最终一致性”方案,实现跨系统事务的最终一致性,流程如下:

  1. A银行系统执行本地事务(扣减1000元):

    • 若扣减失败:直接返回转账失败给用户,流程结束;
    • 若扣减成功:向MQ发送一条“转账发起”消息(包含转账ID、A银行账户、B银行账户、金额1000元);
  2. MQ收到消息后,向A银行返回“消息发送成功”确认;
  3. B银行系统订阅“转账发起”消息,执行本地事务(增加1000元):

    • 若增加成功:向MQ发送“转账成功”确认消息,MQ删除“转账发起”消息;
    • 若增加失败:不发送确认消息,MQ会定时重新投递“转账发起”消息(重试机制);
  4. A银行系统订阅“转账成功”消息,更新本地转账状态为“成功”;
  5. 若B银行多次重试仍失败(如B银行系统长时间宕机),MQ将“转账发起”消息转入死信队列,A银行系统监控到死信消息后,执行“回滚本地事务”(退还1000元给用户),并通过人工介入处理B银行问题。

技术要点

  • 消息可靠性:开启MQ消息持久化、生产者确认、消费者手动ACK,确保消息不丢失;
  • 重试机制:设置MQ消息重试次数(如5次),重试间隔(如10s、30s、1min、5min、10min),避免频繁重试加重B银行系统负担;
  • 死信处理:死信队列需关联监控告警(如短信通知运维人员),确保异常情况及时被发现;
  • 幂等性:B银行系统处理“转账发起”消息时,需通过“转账ID”做幂等性校验(如查询是否已处理过该转账ID),避免重复加钱;
  • 代码示例(核心流程):

    // 1. A银行系统执行本地事务并发送消息
    @Service
    public class ABankTransferService {
      @Autowired
      private ABankAccountMapper accountMapper;
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private TransferRecordMapper transferRecordMapper;
    
      @Transactional
      public String transferToBBank(String fromAccount, String toBBankAccount, BigDecimal amount) {
          // 1. 生成唯一转账ID
          String transferId = UUID.randomUUID().toString();
          // 2. 扣减A银行账户余额(本地事务)
          int rows = accountMapper.deductBalance(fromAccount, amount);
          if (rows == 0) {
              throw new RuntimeException("A银行账户余额不足,转账失败");
          }
          // 3. 记录转账记录(状态:处理中)
          TransferRecord record = new TransferRecord();
          record.setTransferId(transferId);
          record.setFromAccount(fromAccount);
          record.setToAccount(toBBankAccount);
          record.setAmount(amount);
          record.setStatus("PROCESSING");
          record.setCreateTime(new Date());
          transferRecordMapper.insert(record);
          // 4. 发送转账发起消息(开启生产者确认)
          try {
              TransferMessage message = new TransferMessage();
              message.setTransferId(transferId);
              message.setFromAccount(fromAccount);
              message.setToBBankAccount(toBBankAccount);
              message.setAmount(amount);
              // 同步等待生产者确认
              rabbitTemplate.invoke(new RabbitOperations.Callback<String>() {
                  @Override
                  public String doInRabbit(RabbitOperations operations) throws AmqpException {
                      operations.convertAndSend("transfer_exchange", "transfer.to.bbank", JSON.toJSONString(message),
                              new CorrelationData(transferId));
                      return null;
                  }
              });
              System.out.println("A银行转账发起,转账ID:" + transferId + ",已扣减余额:" + amount);
              return "转账处理中,转账ID:" + transferId;
          } catch (Exception e) {
              // 消息发送失败,回滚本地事务(Spring事务管理自动回滚)
              throw new RuntimeException("转账消息发送失败,已回滚A银行余额", e);
          }
      }
    
      // 5. 消费B银行转账成功消息,更新转账状态
      @RabbitListener(queues = "abank_transfer_success_queue")
      public void handleTransferSuccess(String message) {
          TransferSuccessMessage successMessage = JSON.parseObject(message, TransferSuccessMessage.class);
          String transferId = successMessage.getTransferId();
          // 更新转账记录状态为成功
          TransferRecord record = transferRecordMapper.selectByTransferId(transferId);
          if (record != null && "PROCESSING".equals(record.getStatus())) {
              record.setStatus("SUCCESS");
              record.setCompleteTime(new Date());
              transferRecordMapper.updateById(record);
              System.out.println("A银行转账成功,转账ID:" + transferId);
          }
      }
    
      // 6. 消费死信消息,回滚本地事务
      @RabbitListener(queues = "transfer_dead_queue")
      public void handleTransferDeadLetter(String message) {
          TransferMessage transferMessage = JSON.parseObject(message, TransferMessage.class);
          String transferId = transferMessage.getTransferId();
          String fromAccount = transferMessage.getFromAccount();
          BigDecimal amount = transferMessage.getAmount();
          // 回滚A银行余额
          accountMapper.restoreBalance(fromAccount, amount);
          // 更新转账记录状态为失败
          TransferRecord record = transferRecordMapper.selectByTransferId(transferId);
          if (record != null) {
              record.setStatus("FAIL");
              record.setFailReason("B银行处理失败,已回滚");
              transferRecordMapper.updateById(record);
          }
          // 发送告警短信给运维人员
          String alarmContent = "转账ID:" + transferId + " 进入死信队列,已回滚A银行余额 " + amount + ",请检查B银行系统";
          SmsUtils.sendSms("13800138000", alarmContent);
          System.out.println("转账失败已回滚,转账ID:" + transferId + ",告警已发送");
      }
    }
    
    // 2. B银行系统消费转账发起消息,执行本地事务
    @Component
    public class BBankTransferConsumer {
      @Autowired
      private BBankAccountMapper accountMapper;
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private BBankTransferRecordMapper transferRecordMapper;
    
      @RabbitListener(queues = "bbank_transfer_queue")
      public void handleTransferRequest(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
          TransferMessage transferMessage = JSON.parseObject(message, TransferMessage.class);
          String transferId = transferMessage.getTransferId();
          String toAccount = transferMessage.getToBBankAccount();
          BigDecimal amount = transferMessage.getAmount();
    
          try {
              // 1. 幂等性校验:判断是否已处理过该转账ID
              BBankTransferRecord record = transferRecordMapper.selectByTransferId(transferId);
              if (record != null && "SUCCESS".equals(record.getStatus())) {
                  // 已处理过,手动ACK
                  channel.basicAck(deliveryTag, false);
                  System.out.println("B银行已处理该转账,转账ID:" + transferId + ",忽略消息");
                  return;
              }
    
              // 2. 执行本地事务:增加B银行账户余额
              accountMapper.increaseBalance(toAccount, amount);
              // 3. 记录B银行转账记录
              if (record == null) {
                  record = new BBankTransferRecord();
                  record.setTransferId(transferId);
                  record.setToAccount(toAccount);
                  record.setAmount(amount);
                  record.setStatus("SUCCESS");
                  record.setCreateTime(new Date());
                  transferRecordMapper.insert(record);
              } else {
                  record.setStatus("SUCCESS");
                  transferRecordMapper.updateById(record);
              }
    
              // 4. 发送转账成功消息给A银行
              TransferSuccessMessage successMessage = new TransferSuccessMessage();
              successMessage.setTransferId(transferId);
              rabbitTemplate.convertAndSend("transfer_exchange", "transfer.success.to.abank", JSON.toJSONString(successMessage));
    
              // 5. 手动ACK确认消息消费成功
              channel.basicAck(deliveryTag, false);
              System.out.println("B银行转账处理成功,转账ID:" + transferId + ",已增加余额:" + amount);
          } catch (Exception e) {
              // 6. 处理失败,判断是否重试(重试5次后进入死信队列)
              Integer retryCount = (Integer) channel.getMessageProperties().getHeaders().get("retry-count");
              if (retryCount == null) retryCount = 0;
              if (retryCount < 5) {
                  // 重新入队,重试次数+1
                  channel.getMessageProperties().getHeaders().put("retry-count", retryCount + 1);
                  channel.basicNack(deliveryTag, false, true);
                  System.out.println("B银行转账处理失败,重试次数:" + (retryCount + 1) + ",转账ID:" + transferId);
              } else {
                  // 重试5次失败,进入死信队列
                  channel.basicNack(deliveryTag, false, false);
                  System.out.println("B银行转账处理失败,重试5次后进入死信队列,转账ID:" + transferId);
              }
              e.printStackTrace();
          }
      }
    }

架构优势

  • 最终一致性:通过MQ消息重试与死信处理,确保“扣钱”与“加钱”两个操作最终同时成功或同时失败,避免数据不一致;
  • 可靠性:消息不丢失,即使跨系统、跨网络,也能通过重试机制保障事务推进;
  • 低耦合:A银行与B银行仅通过MQ消息交互,无需感知对方系统细节,便于维护与扩展。

最后说一句

MQ的核心价值在于“解耦、异步、削峰”,但并非所有场景都适合用MQ——若业务逻辑简单、无跨系统交互、对实时性要求极高(如毫秒级响应),直接调用接口可能更高效。在实际项目中,需结合业务痛点、技术成本、维护难度综合判断是否使用MQ,以及选择哪种MQ(RabbitMQ/Kafka/RocketMQ)。希望以上10种场景能给大家带来启发,避免在项目中滥用或误用MQ。

标签: none

评论已关闭

/* * @Author: your name * @Date: 2016-09-06 00:00:00 * @LastEditTime: 2020-03-17 18:29:35 * @LastEditors: Please set LastEditors * @Description: In User Settings Edit * @FilePath: \htdocs\usr\themes\default\footer.php */