我工作中用MQ的10种场景
前言
在分布式系统开发中,消息队列(MQ)是解决系统解耦、异步通信、流量削峰等问题的核心组件。我在过往的项目开发(电商秒杀、物流调度、支付对账等场景)中,多次用到RabbitMQ、Kafka、RocketMQ等不同类型的MQ,总结出10种高频实用场景,每种场景均包含“业务痛点、MQ解决方案、技术要点、架构优势”,希望能给大家提供参考。
场景一:系统解耦
业务痛点
早期电商系统中,订单创建后需同步调用库存系统、支付系统、物流系统的接口。若其中某一个系统接口超时(如物流系统服务宕机),会导致整个订单流程失败,系统耦合度极高,一处故障影响全局。
MQ解决方案
- 订单系统在创建订单后,不再直接调用其他系统接口,而是向MQ发送一条“订单创建成功”的消息(包含订单ID、商品ID、用户ID、订单金额等核心字段);
- 库存系统、支付系统、物流系统分别作为消费者,订阅MQ中“订单创建成功”的消息队列;
- 各系统接收到消息后,异步执行自身业务逻辑(库存扣减、支付状态同步、物流单生成),执行结果通过MQ反馈给订单系统(如“库存扣减成功”“支付完成”)。
技术要点
- 消息队列选择:RabbitMQ(支持交换机路由,可按系统类型路由消息);
- 消息可靠性:开启消息持久化(durable=true)、生产者确认机制(publisher confirm)、消费者手动ACK,避免消息丢失;
代码示例(Java + RabbitMQ):
// 订单系统生产者发送消息 @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; public void createOrder(Order order) { // 1. 保存订单到数据库 orderMapper.insert(order); // 2. 发送订单创建消息到MQ String message = JSON.toJSONString(order); rabbitTemplate.convertAndSend("order_exchange", "order.created", message, new CorrelationData(order.getOrderId())); System.out.println("订单创建消息发送成功,订单ID:" + order.getOrderId()); } } // 库存系统消费者接收消息 @Component public class InventoryConsumer { @Autowired private InventoryMapper inventoryMapper; @RabbitListener(queues = "inventory_queue") public void handleOrderCreated(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { try { Order order = JSON.parseObject(message, Order.class); // 执行库存扣减逻辑 inventoryMapper.deductStock(order.getProductId(), order.getQuantity()); // 手动ACK确认消息消费成功 channel.basicAck(deliveryTag, false); System.out.println("库存扣减成功,商品ID:" + order.getProductId()); } catch (Exception e) { // 消费失败,消息重新入队(或进入死信队列) channel.basicNack(deliveryTag, false, true); e.printStackTrace(); } } }
架构优势
- 解耦:各系统仅依赖MQ消息,无需感知其他系统存在,新增系统(如积分系统)只需订阅消息即可,无需修改订单系统代码;
- 容错:某一系统故障时,消息暂存MQ,待系统恢复后重新消费,避免流程中断;
- 可扩展性:各系统可独立扩容(如库存系统压力大时,增加消费者实例),不影响其他系统。
场景二:异步通信
业务痛点
用户在电商平台下单支付后,系统需完成“订单状态更新、发送短信通知、生成积分、同步会员等级”4个操作。若采用同步调用,每个操作耗时100ms,总耗时400ms,用户需等待400ms才能看到支付成功页面,体验较差。
MQ解决方案
- 支付系统完成支付后,同步更新订单状态(耗时100ms),随后向MQ发送“支付成功”消息;
- 短信服务、积分服务、会员服务作为消费者,异步订阅“支付成功”消息,分别执行短信发送、积分增加、会员等级更新操作;
- 用户无需等待后续3个操作完成,支付成功后立即看到结果,后续操作由MQ异步驱动。
技术要点
- 消息投递模式:采用“扇出交换机(Fanout Exchange)”,让多个消费者同时接收同一条消息;
- 异步非阻塞:消费者业务逻辑采用异步线程池处理,避免单条消息消费耗时过长阻塞队列;
- 耗时对比:同步调用总耗时400ms → 异步调用总耗时100ms(仅订单状态更新),用户等待时间缩短75%。
架构优势
- 提升用户体验:核心流程(支付+订单更新)快速响应,非核心流程异步执行;
- 提高系统吞吐量:同步调用时系统TPS受限于最慢操作,异步后各操作并行处理,TPS提升3-5倍。
场景三:流量削峰
业务痛点
电商秒杀活动中,每秒请求量可达10000+,但后端库存系统、订单系统的最大处理能力仅为2000 TPS。若直接将请求打向后端,会导致系统过载、数据库连接耗尽,甚至服务宕机。
MQ解决方案
- 秒杀活动开始前,提前在MQ中创建“秒杀请求队列”,设置队列最大长度(如10万条,避免消息堆积过多);
- 用户秒杀请求先发送至MQ,MQ作为“缓冲器”接收所有请求,后端系统按自身处理能力(2000 TPS)从MQ中拉取请求;
- 超过MQ队列长度的请求直接返回“秒杀拥挤,请稍后重试”,避免后端压力过大。
技术要点
- MQ选择:Kafka(高吞吐量,每秒可处理百万级消息,适合秒杀场景);
- 流量控制:设置Kafka消费者“每次拉取消息数量”(如每次拉取200条),配合线程池控制并发消费速度;
- 防重复提交:消息中携带用户ID+商品ID,后端系统通过Redis实现“幂等性校验”,避免重复创建订单;
代码示例(Kafka消费者限流):
@Service public class SeckillConsumer { @Autowired private SeckillService seckillService; @Autowired private StringRedisTemplate redisTemplate; // 配置Kafka消费者,每次拉取200条消息,线程池核心线程数10 @KafkaListener(topics = "seckill_topic", properties = { "max.poll.records=200", "fetch.min.bytes=1024", "fetch.max.wait.ms=500" }) public void consumeSeckillRequest(ConsumerRecord<String, String> record) { String message = record.value(); SeckillRequest request = JSON.parseObject(message, SeckillRequest.class); String userId = request.getUserId(); String productId = request.getProductId(); String key = "seckill:order:" + userId + ":" + productId; // 幂等性校验:判断用户是否已秒杀成功 if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) { System.out.println("用户" + userId + "已秒杀过商品" + productId,跳过重复请求"); return; } // 执行秒杀逻辑 boolean success = seckillService.doSeckill(userId, productId); if (success) { // 秒杀成功,记录用户秒杀记录(过期时间24小时) redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS); System.out.println("用户" + userId + "秒杀商品" + productId + "成功"); } else { System.out.println("用户" + userId + "秒杀商品" + productId + "失败,库存不足"); } } }
架构优势
- 流量缓冲:MQ承接瞬时高峰流量,避免后端系统被“冲垮”;
- 削峰填谷:将秒杀1小时内的高峰流量,分散到后续2-3小时内处理(若库存未售罄),充分利用后端资源;
- 保护核心系统:仅让符合处理能力的请求进入后端,确保系统稳定运行。
场景四:数据同步
业务痛点
电商系统中,用户在APP端修改个人信息(昵称、手机号、地址)后,需同步更新到“用户中心数据库”“搜索服务索引库”“推荐系统用户画像库”3个数据源。若采用代码中逐个调用接口同步,存在“同步失败数据不一致”“新增数据源需改代码”等问题。
MQ解决方案
- 用户中心系统在用户信息修改后,向MQ发送“用户信息更新”消息(包含用户ID、修改字段、新值);
搜索服务、推荐服务分别订阅该消息,接收到消息后:
- 搜索服务:更新Elasticsearch中的用户索引;
- 推荐服务:更新Redis或MySQL中的用户画像数据;
- 所有数据源通过MQ消息异步同步,确保数据最终一致性。
技术要点
- 消息格式:采用“增量更新”格式,仅携带修改的字段(如{userId:123, nickname:"新昵称"}),减少消息体积;
- 数据一致性:设置消息重试机制(如重试3次),重试失败后进入死信队列,人工介入处理;
- 同步延迟:通过监控平台统计消息从发送到消费的延迟(目标<1s),确保各数据源同步及时。
架构优势
- 数据一致性:避免手动同步导致的数据源不一致问题;
- 扩展性:新增数据源(如数据分析平台)只需订阅消息,无需修改用户中心代码;
- 可追溯:MQ记录所有同步消息,便于排查数据不一致原因(如某条消息消费失败)。
场景五:事务消息
业务痛点
电商“下单扣库存”场景中,存在“订单创建成功但库存扣减失败”的问题:若先创建订单,再扣库存,库存扣减失败会导致订单无库存却存在;若先扣库存,再创建订单,订单创建失败会导致库存被扣却无订单,出现数据不一致。
MQ解决方案
采用RocketMQ的“事务消息”机制,实现“订单创建”与“库存扣减”的原子性操作,流程如下:
- 订单系统发送“半事务消息”到MQ(此时消息处于“待确认”状态,消费者无法消费);
- MQ收到半事务消息后,向订单系统返回“消息发送成功”;
订单系统执行本地事务(创建订单+扣减库存,在同一个数据库事务中):
- 若本地事务执行成功,向MQ发送“确认消息”,MQ将半事务消息标记为“可消费”,库存系统(或其他消费者)可消费消息;
- 若本地事务执行失败,向MQ发送“回滚消息”,MQ删除半事务消息,避免消息被消费;
- 若订单系统因网络问题未及时发送确认/回滚消息,MQ会定时向订单系统发起“事务状态回查”,订单系统查询本地事务状态后,补发送确认/回滚消息。
技术要点
- MQ选择:RocketMQ(原生支持事务消息,RabbitMQ需通过自定义逻辑实现);
- 本地事务:订单创建与库存扣减需在同一个数据库事务中,确保两者同时成功或同时失败;
- 回查机制:设置RocketMQ事务回查次数(如3次),回查间隔(如10s),避免无限回查;
代码示例(RocketMQ事务消息):
// 1. 订单系统发送半事务消息 @Service public class OrderTransactionService { @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private OrderMapper orderMapper; @Autowired private InventoryMapper inventoryMapper; public void createOrderWithTransaction(Order order) { // 构造半事务消息 String message = JSON.toJSONString(order); Message<String> msg = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()) .build(); // 发送半事务消息,指定事务监听器 rocketMQTemplate.sendMessageInTransaction( "order_transaction_topic", // 事务消息主题 "order_tag", // 消息标签 msg, // 消息内容 order // 本地事务参数(传递给事务监听器) ); } } // 2. 事务监听器(执行本地事务+回查事务状态) @Component @RocketMQTransactionListener(txProducerGroup = "order_producer_group") public class OrderTransactionListener implements RocketMQLocalTransactionListener { @Autowired private OrderMapper orderMapper; @Autowired private InventoryMapper inventoryMapper; @Autowired private OrderStatusMapper orderStatusMapper; // 执行本地事务 @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { Order order = (Order) arg; Connection connection = null; try { // 获取数据库连接,开启手动事务 connection = DruidDataSourceFactory.createDataSource().getConnection(); connection.setAutoCommit(false); // 执行订单创建 orderMapper.insert(order); // 执行库存扣减 int rows = inventoryMapper.deductStock(order.getProductId(), order.getQuantity()); if (rows == 0) { // 库存不足,回滚本地事务 connection.rollback(); return RocketMQLocalTransactionState.ROLLBACK; } // 提交本地事务 connection.commit(); // 记录订单事务状态(用于回查) orderStatusMapper.insert(order.getOrderId(), "SUCCESS"); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { // 本地事务执行异常,回滚 try { if (connection != null) connection.rollback(); } catch (SQLException ex) { ex.printStackTrace(); } e.printStackTrace(); return RocketMQLocalTransactionState.ROLLBACK; } finally { try { if (connection != null) connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } // 事务状态回查 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String message = new String((byte[]) msg.getPayload()); Order order = JSON.parseObject(message, Order.class); // 查询本地订单事务状态 String status = orderStatusMapper.select(order.getOrderId()); if ("SUCCESS".equals(status)) { return RocketMQLocalTransactionState.COMMIT; } else if ("FAIL".equals(status)) { return RocketMQLocalTransactionState.ROLLBACK; } else { // 状态未知,继续回查(最多回查3次) return RocketMQLocalTransactionState.UNKNOWN; } } }
架构优势
- 事务一致性:确保“订单创建”与“库存扣减”等操作原子性,避免数据不一致;
- 可靠性:通过回查机制解决网络异常导致的事务状态未知问题;
- 低耦合:相比分布式事务(如Seata),无需引入额外框架,依赖MQ原生能力即可实现。
场景六:日志收集
业务痛点
分布式系统中,服务部署在多台服务器上(如订单服务部署5台、支付服务部署3台),日志分散在各服务器的本地文件中。当系统出现故障时,运维人员需逐台登录服务器查看日志,定位问题效率极低(如查找某条订单的日志,需打开5台服务器的日志文件搜索)。
MQ解决方案
采用“MQ + ELK”架构(Kafka + Elasticsearch + Logstash + Kibana),实现日志集中收集与分析:
- 各服务通过日志框架(如Logback、Log4j2)将日志输出到本地文件,同时通过“MQ日志Appender”将日志发送到Kafka队列;
- Logstash作为消费者,从Kafka中拉取日志,对日志进行清洗(如提取字段、过滤无用日志)、结构化处理(如转为JSON格式);
- Logstash将处理后的日志写入Elasticsearch(搜索引擎),构建日志索引;
- 运维人员通过Kibana(可视化工具)查询Elasticsearch中的日志,支持按“服务名、时间、日志级别、订单ID”等多维度搜索,快速定位问题。
技术要点
- MQ选择:Kafka(高吞吐量,适合日志等大量非实时数据传输);
- 日志结构化:Logstash配置过滤规则,将非结构化日志(如“2024-05-20 10:30:00 [INFO] OrderService: create order success, orderId=123”)转为JSON格式({"time":"2024-05-20 10:30:00","level":"INFO","service":"OrderService","content":"create order success","orderId":"123"});
- 日志分片:Kafka按“服务名”分区(如订单服务日志写入order_service分区,支付服务日志写入pay_service分区),便于后续按服务查询;
配置示例(Logback对接Kafka):
<!-- Logback配置文件:logback-spring.xml --> <configuration> <!-- 1. 输出日志到控制台 --> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss} [%level] %logger{50} - %msg%n</pattern> </encoder> </appender> <!-- 2. 输出日志到Kafka --> <appender name="KAFKA" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> <!-- Kafka地址(Logstash监听地址) --> <destination>192.168.1.100:9500</destination> <encoder class="net.logstash.logback.encoder.LogstashEncoder"> <!-- 自定义日志字段 --> <customFields>{"service":"order_service","env":"prod"}</customFields> <fieldNames> <timestamp>time</timestamp> <message>content</message> <logger>logger</logger> <thread>thread</thread> <level>level</level> </fieldNames> </encoder> </appender> <!-- 3. 根日志配置,同时输出到控制台和Kafka --> <root level="INFO"> <appender-ref ref="CONSOLE" /> <appender-ref ref="KAFKA" /> </root> </configuration>
架构优势
- 集中化:所有服务日志统一存储到Elasticsearch,无需逐台查看服务器;
- 高效查询:支持多维度搜索(如“orderId=123”“level=ERROR”),定位问题时间从小时级缩短到分钟级;
- 可分析:通过Kibana生成日志统计报表(如各服务ERROR日志数量趋势、请求耗时分布),提前发现系统隐患。
场景七:延迟消息
业务痛点
电商订单创建后,若用户未在30分钟内支付,需自动取消订单并释放库存。早期实现方式是“定时任务每隔1分钟扫描未支付订单”,存在两个问题:一是延迟(最快1分钟后取消,最慢2分钟),二是压力大(订单量多时,扫描SQL执行耗时久,占用数据库资源)。
MQ解决方案
采用MQ的“延迟消息”功能,实现订单30分钟未支付自动取消:
- 订单系统创建未支付订单后,向MQ发送一条“延迟30分钟”的消息(消息内容包含订单ID);
- MQ接收到延迟消息后,暂存消息,不立即投递;
- 30分钟后,MQ将消息投递到“订单取消队列”;
订单系统作为消费者,接收到消息后,查询订单支付状态:
- 若未支付:取消订单,释放库存;
- 若已支付:忽略消息(通过幂等性校验避免重复取消)。
技术要点
MQ选择:
- RabbitMQ:通过“死信队列(DLX)+ 消息过期时间(TTL)”实现延迟消息;
- RocketMQ:原生支持延迟消息,提供1s、5s、10s、30s、1min、5min等预设延迟级别,也可自定义延迟时间;
- 幂等性:消费者处理消息时,必须先查询订单状态,避免已支付订单被取消;
- 消息可靠性:开启消息持久化,避免MQ宕机导致延迟消息丢失;
代码示例(RabbitMQ延迟消息):
// 1. 配置RabbitMQ延迟队列(死信队列) @Configuration public class RabbitMQDelayConfig { // 普通队列(接收延迟消息,消息过期后转发到死信队列) @Bean public Queue orderDelayQueue() { Map<String, Object> args = new HashMap<>(); // 设置消息过期时间(30分钟=1800000ms) args.put("x-message-ttl", 1800000); // 设置死信交换机 args.put("x-dead-letter-exchange", "order_dead_exchange"); // 设置死信路由键 args.put("x-dead-letter-routing-key", "order.dead.cancel"); return QueueBuilder.durable("order_delay_queue").withArguments(args).build(); } // 死信交换机 @Bean public DirectExchange orderDeadExchange() { return ExchangeBuilder.directExchange("order_dead_exchange").durable(true).build(); } // 死信队列(实际处理订单取消的队列) @Bean public Queue orderDeadQueue() { return QueueBuilder.durable("order_dead_queue").build(); } // 绑定死信交换机与死信队列 @Bean public Binding deadExchangeBindingDeadQueue() { return BindingBuilder.bind(orderDeadQueue()) .to(orderDeadExchange()) .with("order.dead.cancel"); } // 绑定普通交换机与普通延迟队列 @Bean public DirectExchange orderDelayExchange() { return ExchangeBuilder.directExchange("order_delay_exchange").durable(true).build(); } @Bean public Binding delayExchangeBindingDelayQueue() { return BindingBuilder.bind(orderDelayQueue()) .to(orderDelayExchange()) .with("order.delay.create"); } } // 2. 订单系统发送延迟消息 @Service public class OrderDelayService { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private OrderMapper orderMapper; public void createUnpaidOrder(Order order) { // 1. 保存未支付订单 order.setStatus("UNPAID"); orderMapper.insert(order); // 2. 发送30分钟延迟消息 String message = JSON.toJSONString(order.getOrderId()); rabbitTemplate.convertAndSend("order_delay_exchange", "order.delay.create", message); System.out.println("未支付订单创建成功,订单ID:" + order.getOrderId() + ",延迟30分钟后检查支付状态"); } } // 3. 订单系统消费死信队列消息,取消未支付订单 @Component public class OrderCancelConsumer { @Autowired private OrderMapper orderMapper; @Autowired private InventoryMapper inventoryMapper; @RabbitListener(queues = "order_dead_queue") public void handleOrderCancel(String orderId) { // 查询订单当前状态 Order order = orderMapper.selectById(orderId); if (order == null) { System.out.println("订单不存在,订单ID:" + orderId); return; } // 若未支付,取消订单并释放库存 if ("UNPAID".equals(order.getStatus())) { order.setStatus("CANCELLED"); orderMapper.updateById(order); // 释放库存 inventoryMapper.restoreStock(order.getProductId(), order.getQuantity()); System.out.println("订单30分钟未支付已取消,订单ID:" + orderId + ",库存已释放"); } else { // 已支付,忽略消息 System.out.println("订单已支付,无需取消,订单ID:" + orderId); } } }
架构优势
- 精准延迟:消息延迟时间可精确到秒级(RocketMQ),避免定时任务的扫描延迟;
- 低压力:无需定时扫描数据库,减少数据库查询压力(订单量10万时,定时任务需扫描10万条记录,延迟消息仅需处理未支付的订单);
- 可扩展:支持其他延迟场景(如“订单支付后24小时未发货提醒”“会员到期前3天通知”),只需发送对应延迟级别的消息。
场景八:广播通知
业务痛点
分布式系统中,配置中心(如Nacos、Apollo)更新某一配置(如数据库连接池大小、接口超时时间)后,需通知所有依赖该配置的服务(如订单服务、支付服务、库存服务)重新加载配置。若采用“配置中心逐个调用服务接口”的方式,服务数量多时(如20个),调用效率低且易出现调用失败。
MQ解决方案
- 配置中心更新配置后,向MQ发送一条“配置更新”广播消息(包含配置Key、新值、版本号);
- 所有依赖该配置的服务作为消费者,订阅MQ的“配置更新”广播队列;
各服务接收到消息后,检查消息中的配置Key是否为自身依赖的配置:
- 若是:重新加载配置(如更新数据库连接池、接口超时时间),并记录配置版本号;
- 若否:忽略消息。
技术要点
MQ投递模式:采用“扇出交换机(Fanout Exchange)”或“主题交换机(Topic Exchange)”,实现消息广播;
- 扇出交换机:所有绑定该交换机的队列都会收到消息,适合全量广播;
- 主题交换机:通过通配符(如“config.order.#”“config.pay.#”)实现按服务类型广播;
- 配置版本号:消息中携带配置版本号,服务端对比本地版本号,避免重复加载(如配置中心重试发送消息时);
代码示例(RabbitMQ扇出交换机广播):
// 1. 配置扇出交换机与消费者队列 @Configuration public class ConfigBroadcastConfig { // 扇出交换机(配置更新广播) @Bean public FanoutExchange configFanoutExchange() { return ExchangeBuilder.fanoutExchange("config_broadcast_exchange").durable(true).build(); } // 订单服务队列(绑定扇出交换机) @Bean public Queue orderConfigQueue() { return QueueBuilder.durable("order_config_queue").build(); } // 支付服务队列(绑定扇出交换机) @Bean public Queue payConfigQueue() { return QueueBuilder.durable("pay_config_queue").build(); } // 库存服务队列(绑定扇出交换机) @Bean public Queue inventoryConfigQueue() { return QueueBuilder.durable("inventory_config_queue").build(); } // 绑定交换机与各队列 @Bean public Binding orderConfigBinding() { return BindingBuilder.bind(orderConfigQueue()).to(configFanoutExchange()); } @Bean public Binding payConfigBinding() { return BindingBuilder.bind(payConfigQueue()).to(configFanoutExchange()); } @Bean public Binding inventoryConfigBinding() { return BindingBuilder.bind(inventoryConfigQueue()).to(configFanoutExchange()); } } // 2. 配置中心发送广播消息 @Service public class ConfigCenterService { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfigMapper configMapper; public void updateConfig(String configKey, String newValue) { // 1. 更新配置中心数据库 Config config = configMapper.selectByKey(configKey); if (config == null) { config = new Config(); config.setConfigKey(configKey); config.setConfigValue(newValue); config.setVersion(1); configMapper.insert(config); } else { config.setConfigValue(newValue); config.setVersion(config.getVersion() + 1); configMapper.updateById(config); } // 2. 发送配置更新广播消息 ConfigUpdateMessage message = new ConfigUpdateMessage(); message.setConfigKey(configKey); message.setNewValue(newValue); message.setVersion(config.getVersion()); rabbitTemplate.convertAndSend("config_broadcast_exchange", "", JSON.toJSONString(message)); System.out.println("配置更新广播消息发送成功,配置Key:" + configKey + ",版本号:" + config.getVersion()); } } // 3. 订单服务消费广播消息,重新加载配置 @Component public class OrderConfigConsumer { @Autowired private ConfigService configService; // 本地缓存配置版本号 private Map<String, Integer> localConfigVersion = new ConcurrentHashMap<>(); @RabbitListener(queues = "order_config_queue") public void handleConfigUpdate(String message) { ConfigUpdateMessage updateMessage = JSON.parseObject(message, ConfigUpdateMessage.class); String configKey = updateMessage.getConfigKey(); int newVersion = updateMessage.getVersion(); String newValue = updateMessage.getNewValue(); // 对比本地版本号,避免重复加载 Integer localVersion = localConfigVersion.getOrDefault(configKey, 0); if (newVersion <= localVersion) { System.out.println("配置版本号未更新,忽略消息,配置Key:" + configKey); return; } // 重新加载配置(以数据库连接池为例) if ("db.pool.maxActive".equals(configKey)) { configService.updateDbPoolMaxActive(Integer.parseInt(newValue)); localConfigVersion.put(configKey, newVersion); System.out.println("订单服务更新数据库连接池配置,maxActive:" + newValue + ",版本号:" + newVersion); } else if ("api.timeout.order".equals(configKey)) { configService.updateOrderApiTimeout(Integer.parseInt(newValue)); localConfigVersion.put(configKey, newVersion); System.out.println("订单服务更新接口超时配置,timeout:" + newValue + "ms,版本号:" + newVersion); } // 其他配置Key的处理逻辑... } }
架构优势
- 高效广播:一次发送消息,所有服务同时接收,避免逐个调用的效率问题;
- 低耦合:配置中心无需知道有多少服务依赖配置,新增服务只需绑定队列即可;
- 可靠通知:通过MQ消息重试机制,确保所有服务都能收到配置更新消息(即使服务暂时宕机,恢复后也能消费)。
场景九:限流熔断
业务痛点
分布式系统中,服务间存在依赖关系(如订单服务依赖支付服务)。当支付服务因故障响应缓慢(如接口超时从100ms变为5000ms)或返回错误(如500错误)时,订单服务会因等待支付服务响应而产生大量线程阻塞,进而导致订单服务线程池耗尽,无法处理新请求,引发“级联故障”。
MQ解决方案
采用“MQ + 限流熔断”组合方案,保护订单服务不被依赖服务的故障影响:
- 订单服务调用支付服务前,先检查“支付服务健康状态”(通过熔断器统计调用成功率、超时率);
- 若支付服务健康(成功率>90%,超时率<5%):直接调用支付服务接口;
- 若支付服务不健康(成功率<80%,超时率>10%):触发熔断,订单服务不直接调用支付服务,而是向MQ发送“支付请求”消息,同时返回“支付处理中,请稍后查询结果”给用户;
- 支付服务恢复健康后,从MQ中拉取“支付请求”消息,异步处理支付,处理完成后通过MQ反馈结果给订单服务;
- 订单服务接收到支付结果后,更新订单状态,并通过短信/APP推送通知用户。
技术要点
- 熔断器选择:Sentinel(阿里开源,支持限流、熔断、降级)或Resilience4j(轻量级,适合微服务);
- MQ角色:熔断时作为“请求缓冲池”,暂存支付请求,避免订单服务线程阻塞;
- 健康检查:熔断器定时统计支付服务调用指标(成功率、超时率、错误率),动态调整熔断状态(闭合→半开→打开);
代码示例(Sentinel熔断 + RabbitMQ缓冲):
// 1. 配置Sentinel熔断器(支付服务调用熔断规则) @Configuration public class SentinelConfig { @PostConstruct public void initFlowRules() { // 熔断规则:支付服务调用接口 DegradeRule rule = new DegradeRule(); rule.setResource("payServiceCall"); // 资源名(支付服务调用接口) rule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO); // 按异常比例熔断 rule.setCount(0.1); // 异常比例阈值(10%) rule.setTimeWindow(60); // 熔断时间窗口(60秒) rule.setMinRequestAmount(10); // 最小请求数(10次请求后才开始统计) DegradeRuleManager.loadRules(Collections.singletonList(rule)); // 限流规则:订单服务接口限流(1000 TPS) FlowRule flowRule = new FlowRule(); flowRule.setResource("createOrderApi"); flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS); flowRule.setCount(1000); FlowRuleManager.loadRules(Collections.singletonList(flowRule)); } } // 2. 订单服务调用支付服务(带熔断与MQ缓冲) @Service public class OrderPayService { @Autowired private PayServiceFeignClient payServiceFeignClient; // Feign调用支付服务 @Autowired private RabbitTemplate rabbitTemplate; @Autowired private OrderMapper orderMapper; // 订单创建接口(带限流) @SentinelResource(value = "createOrderApi", blockHandler = "createOrderBlockHandler") public String createOrder(Order order) { // 保存订单 orderMapper.insert(order); // 调用支付服务(带熔断) String payResult = callPayServiceWithDegrade(order); return payResult; } // 支付服务调用(带熔断) @SentinelResource(value = "payServiceCall", fallback = "payServiceFallback") private String callPayServiceWithDegrade(Order order) { // 直接调用支付服务Feign接口 PayRequest request = new PayRequest(); request.setOrderId(order.getOrderId()); request.setAmount(order.getAmount()); PayResponse response = payServiceFeignClient.pay(request); if ("SUCCESS".equals(response.getCode())) { order.setStatus("PAID"); orderMapper.updateById(order); return "订单创建成功,已支付"; } else { return "订单创建成功,支付失败:" + response.getMsg(); } } // 支付服务熔断降级方法(触发熔断时调用) private String payServiceFallback(Order order, Throwable e) { // 向MQ发送支付请求消息,异步处理 PayRequestMessage message = new PayRequestMessage(); message.setOrderId(order.getOrderId()); message.setAmount(order.getAmount()); message.setUserId(order.getUserId()); rabbitTemplate.convertAndSend("pay_request_queue", JSON.toJSONString(message)); // 更新订单状态为“支付处理中” order.setStatus("PAYING"); orderMapper.updateById(order); // 返回用户提示 return "订单创建成功,支付处理中,请稍后在【我的订单】中查询结果"; } // 订单接口限流降级方法 public String createOrderBlockHandler(Order order, BlockException e) { return "当前订单创建人数过多,请稍后重试"; } } // 3. 支付服务恢复后,消费MQ消息处理支付 @Component public class PayRequestConsumer { @Autowired private PayService payService; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private OrderFeignClient orderFeignClient; @RabbitListener(queues = "pay_request_queue") public void handlePayRequest(String message) { PayRequestMessage requestMessage = JSON.parseObject(message, PayRequestMessage.class); String orderId = requestMessage.getOrderId(); BigDecimal amount = requestMessage.getAmount(); try { // 处理支付逻辑 PayResult result = payService.processPay(orderId, amount); // 发送支付结果消息给订单服务 PayResultMessage resultMessage = new PayResultMessage(); resultMessage.setOrderId(orderId); resultMessage.setPayStatus(result.isSuccess() ? "PAID" : "PAY_FAILED"); resultMessage.setPayTime(new Date()); rabbitTemplate.convertAndSend("pay_result_queue", JSON.toJSONString(resultMessage)); System.out.println("支付请求处理完成,订单ID:" + orderId + ",支付状态:" + resultMessage.getPayStatus()); } catch (Exception e) { // 支付处理失败,消息重新入队(重试3次后进入死信队列) throw new AmqpRejectAndDontRequeueException("支付处理失败,订单ID:" + orderId, e); } } } // 4. 订单服务消费支付结果消息,更新订单状态 @Component public class PayResultConsumer { @Autowired private OrderMapper orderMapper; @Autowired private SmsService smsService; @RabbitListener(queues = "pay_result_queue") public void handlePayResult(String message) { PayResultMessage resultMessage = JSON.parseObject(message, PayResultMessage.class); String orderId = resultMessage.getOrderId(); String payStatus = resultMessage.getPayStatus(); // 更新订单状态 Order order = orderMapper.selectById(orderId); if (order == null) { System.out.println("订单不存在,订单ID:" + orderId); return; } order.setStatus(payStatus); order.setPayTime(resultMessage.getPayTime()); orderMapper.updateById(order); // 发送支付结果短信通知 String smsContent = "您的订单(ID:" + orderId + ")" + ("PAID".equals(payStatus) ? "已支付成功" : "支付失败,请重新尝试") + ",详情可在APP【我的订单】中查看。"; smsService.sendSms(order.getUserId(), smsContent); System.out.println("订单状态更新完成,订单ID:" + orderId + ",支付状态:" + payStatus); } }
架构优势
- 熔断保护:依赖服务故障时,触发熔断,避免订单服务线程阻塞,保障核心订单创建功能可用;
- 缓冲请求:MQ暂存支付请求,待依赖服务恢复后异步处理,不丢失用户请求;
- 限流保护:通过Sentinel限流,避免订单服务被瞬时高峰流量冲垮,保障系统稳定。
场景十:分布式事务最终一致性
业务痛点
跨银行转账场景中,用户从A银行转账1000元到B银行,需完成“A银行扣减1000元”和“B银行增加1000元”两个操作。由于A银行和B银行是不同的系统(不同数据库、不同服务),无法使用传统的数据库事务保证原子性,存在“A银行扣钱成功但B银行加钱失败”的风险,导致数据不一致。
MQ解决方案
采用“可靠消息最终一致性”方案,实现跨系统事务的最终一致性,流程如下:
A银行系统执行本地事务(扣减1000元):
- 若扣减失败:直接返回转账失败给用户,流程结束;
- 若扣减成功:向MQ发送一条“转账发起”消息(包含转账ID、A银行账户、B银行账户、金额1000元);
- MQ收到消息后,向A银行返回“消息发送成功”确认;
B银行系统订阅“转账发起”消息,执行本地事务(增加1000元):
- 若增加成功:向MQ发送“转账成功”确认消息,MQ删除“转账发起”消息;
- 若增加失败:不发送确认消息,MQ会定时重新投递“转账发起”消息(重试机制);
- A银行系统订阅“转账成功”消息,更新本地转账状态为“成功”;
- 若B银行多次重试仍失败(如B银行系统长时间宕机),MQ将“转账发起”消息转入死信队列,A银行系统监控到死信消息后,执行“回滚本地事务”(退还1000元给用户),并通过人工介入处理B银行问题。
技术要点
- 消息可靠性:开启MQ消息持久化、生产者确认、消费者手动ACK,确保消息不丢失;
- 重试机制:设置MQ消息重试次数(如5次),重试间隔(如10s、30s、1min、5min、10min),避免频繁重试加重B银行系统负担;
- 死信处理:死信队列需关联监控告警(如短信通知运维人员),确保异常情况及时被发现;
- 幂等性:B银行系统处理“转账发起”消息时,需通过“转账ID”做幂等性校验(如查询是否已处理过该转账ID),避免重复加钱;
代码示例(核心流程):
// 1. A银行系统执行本地事务并发送消息 @Service public class ABankTransferService { @Autowired private ABankAccountMapper accountMapper; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private TransferRecordMapper transferRecordMapper; @Transactional public String transferToBBank(String fromAccount, String toBBankAccount, BigDecimal amount) { // 1. 生成唯一转账ID String transferId = UUID.randomUUID().toString(); // 2. 扣减A银行账户余额(本地事务) int rows = accountMapper.deductBalance(fromAccount, amount); if (rows == 0) { throw new RuntimeException("A银行账户余额不足,转账失败"); } // 3. 记录转账记录(状态:处理中) TransferRecord record = new TransferRecord(); record.setTransferId(transferId); record.setFromAccount(fromAccount); record.setToAccount(toBBankAccount); record.setAmount(amount); record.setStatus("PROCESSING"); record.setCreateTime(new Date()); transferRecordMapper.insert(record); // 4. 发送转账发起消息(开启生产者确认) try { TransferMessage message = new TransferMessage(); message.setTransferId(transferId); message.setFromAccount(fromAccount); message.setToBBankAccount(toBBankAccount); message.setAmount(amount); // 同步等待生产者确认 rabbitTemplate.invoke(new RabbitOperations.Callback<String>() { @Override public String doInRabbit(RabbitOperations operations) throws AmqpException { operations.convertAndSend("transfer_exchange", "transfer.to.bbank", JSON.toJSONString(message), new CorrelationData(transferId)); return null; } }); System.out.println("A银行转账发起,转账ID:" + transferId + ",已扣减余额:" + amount); return "转账处理中,转账ID:" + transferId; } catch (Exception e) { // 消息发送失败,回滚本地事务(Spring事务管理自动回滚) throw new RuntimeException("转账消息发送失败,已回滚A银行余额", e); } } // 5. 消费B银行转账成功消息,更新转账状态 @RabbitListener(queues = "abank_transfer_success_queue") public void handleTransferSuccess(String message) { TransferSuccessMessage successMessage = JSON.parseObject(message, TransferSuccessMessage.class); String transferId = successMessage.getTransferId(); // 更新转账记录状态为成功 TransferRecord record = transferRecordMapper.selectByTransferId(transferId); if (record != null && "PROCESSING".equals(record.getStatus())) { record.setStatus("SUCCESS"); record.setCompleteTime(new Date()); transferRecordMapper.updateById(record); System.out.println("A银行转账成功,转账ID:" + transferId); } } // 6. 消费死信消息,回滚本地事务 @RabbitListener(queues = "transfer_dead_queue") public void handleTransferDeadLetter(String message) { TransferMessage transferMessage = JSON.parseObject(message, TransferMessage.class); String transferId = transferMessage.getTransferId(); String fromAccount = transferMessage.getFromAccount(); BigDecimal amount = transferMessage.getAmount(); // 回滚A银行余额 accountMapper.restoreBalance(fromAccount, amount); // 更新转账记录状态为失败 TransferRecord record = transferRecordMapper.selectByTransferId(transferId); if (record != null) { record.setStatus("FAIL"); record.setFailReason("B银行处理失败,已回滚"); transferRecordMapper.updateById(record); } // 发送告警短信给运维人员 String alarmContent = "转账ID:" + transferId + " 进入死信队列,已回滚A银行余额 " + amount + ",请检查B银行系统"; SmsUtils.sendSms("13800138000", alarmContent); System.out.println("转账失败已回滚,转账ID:" + transferId + ",告警已发送"); } } // 2. B银行系统消费转账发起消息,执行本地事务 @Component public class BBankTransferConsumer { @Autowired private BBankAccountMapper accountMapper; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private BBankTransferRecordMapper transferRecordMapper; @RabbitListener(queues = "bbank_transfer_queue") public void handleTransferRequest(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { TransferMessage transferMessage = JSON.parseObject(message, TransferMessage.class); String transferId = transferMessage.getTransferId(); String toAccount = transferMessage.getToBBankAccount(); BigDecimal amount = transferMessage.getAmount(); try { // 1. 幂等性校验:判断是否已处理过该转账ID BBankTransferRecord record = transferRecordMapper.selectByTransferId(transferId); if (record != null && "SUCCESS".equals(record.getStatus())) { // 已处理过,手动ACK channel.basicAck(deliveryTag, false); System.out.println("B银行已处理该转账,转账ID:" + transferId + ",忽略消息"); return; } // 2. 执行本地事务:增加B银行账户余额 accountMapper.increaseBalance(toAccount, amount); // 3. 记录B银行转账记录 if (record == null) { record = new BBankTransferRecord(); record.setTransferId(transferId); record.setToAccount(toAccount); record.setAmount(amount); record.setStatus("SUCCESS"); record.setCreateTime(new Date()); transferRecordMapper.insert(record); } else { record.setStatus("SUCCESS"); transferRecordMapper.updateById(record); } // 4. 发送转账成功消息给A银行 TransferSuccessMessage successMessage = new TransferSuccessMessage(); successMessage.setTransferId(transferId); rabbitTemplate.convertAndSend("transfer_exchange", "transfer.success.to.abank", JSON.toJSONString(successMessage)); // 5. 手动ACK确认消息消费成功 channel.basicAck(deliveryTag, false); System.out.println("B银行转账处理成功,转账ID:" + transferId + ",已增加余额:" + amount); } catch (Exception e) { // 6. 处理失败,判断是否重试(重试5次后进入死信队列) Integer retryCount = (Integer) channel.getMessageProperties().getHeaders().get("retry-count"); if (retryCount == null) retryCount = 0; if (retryCount < 5) { // 重新入队,重试次数+1 channel.getMessageProperties().getHeaders().put("retry-count", retryCount + 1); channel.basicNack(deliveryTag, false, true); System.out.println("B银行转账处理失败,重试次数:" + (retryCount + 1) + ",转账ID:" + transferId); } else { // 重试5次失败,进入死信队列 channel.basicNack(deliveryTag, false, false); System.out.println("B银行转账处理失败,重试5次后进入死信队列,转账ID:" + transferId); } e.printStackTrace(); } } }
架构优势
- 最终一致性:通过MQ消息重试与死信处理,确保“扣钱”与“加钱”两个操作最终同时成功或同时失败,避免数据不一致;
- 可靠性:消息不丢失,即使跨系统、跨网络,也能通过重试机制保障事务推进;
- 低耦合:A银行与B银行仅通过MQ消息交互,无需感知对方系统细节,便于维护与扩展。
最后说一句
MQ的核心价值在于“解耦、异步、削峰”,但并非所有场景都适合用MQ——若业务逻辑简单、无跨系统交互、对实时性要求极高(如毫秒级响应),直接调用接口可能更高效。在实际项目中,需结合业务痛点、技术成本、维护难度综合判断是否使用MQ,以及选择哪种MQ(RabbitMQ/Kafka/RocketMQ)。希望以上10种场景能给大家带来启发,避免在项目中滥用或误用MQ。
评论已关闭