前言
在分布式系统开发中,消息队列(MQ)是解决系统解耦、异步通信、流量削峰等问题的核心组件。我在过往的项目开发(电商秒杀、物流调度、支付对账等场景)中,多次用到RabbitMQ、Kafka、RocketMQ等不同类型的MQ,总结出10种高频实用场景,每种场景均包含“业务痛点、MQ解决方案、技术要点、架构优势”,希望能给大家提供参考。
场景一:系统解耦
业务痛点
早期电商系统中,订单创建后需同步调用库存系统、支付系统、物流系统的接口。若其中某一个系统接口超时(如物流系统服务宕机),会导致整个订单流程失败,系统耦合度极高,一处故障影响全局。
MQ解决方案
- 订单系统在创建订单后,不再直接调用其他系统接口,而是向MQ发送一条“订单创建成功”的消息(包含订单ID、商品ID、用户ID、订单金额等核心字段);
- 库存系统、支付系统、物流系统分别作为消费者,订阅MQ中“订单创建成功”的消息队列;
- 各系统接收到消息后,异步执行自身业务逻辑(库存扣减、支付状态同步、物流单生成),执行结果通过MQ反馈给订单系统(如“库存扣减成功”“支付完成”)。
技术要点
- 消息队列选择:RabbitMQ(支持交换机路由,可按系统类型路由消息);
- 消息可靠性:开启消息持久化(durable=true)、生产者确认机制(publisher confirm)、消费者手动ACK,避免消息丢失;
代码示例(Java + RabbitMQ):
// 订单系统生产者发送消息
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 1. 保存订单到数据库
orderMapper.insert(order);
// 2. 发送订单创建消息到MQ
String message = JSON.toJSONString(order);
rabbitTemplate.convertAndSend("order_exchange", "order.created", message, new CorrelationData(order.getOrderId()));
System.out.println("订单创建消息发送成功,订单ID:" + order.getOrderId());
}
}
// 库存系统消费者接收消息
@Component
public class InventoryConsumer {
@Autowired
private InventoryMapper inventoryMapper;
@RabbitListener(queues = "inventory_queue")
public void handleOrderCreated(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
try {
Order order = JSON.parseObject(message, Order.class);
// 执行库存扣减逻辑
inventoryMapper.deductStock(order.getProductId(), order.getQuantity());
// 手动ACK确认消息消费成功
channel.basicAck(deliveryTag, false);
System.out.println("库存扣减成功,商品ID:" + order.getProductId());
} catch (Exception e) {
// 消费失败,消息重新入队(或进入死信队列)
channel.basicNack(deliveryTag, false, true);
e.printStackTrace();
}
}
}
架构优势
- 解耦:各系统仅依赖MQ消息,无需感知其他系统存在,新增系统(如积分系统)只需订阅消息即可,无需修改订单系统代码;
- 容错:某一系统故障时,消息暂存MQ,待系统恢复后重新消费,避免流程中断;
- 可扩展性:各系统可独立扩容(如库存系统压力大时,增加消费者实例),不影响其他系统。
场景二:异步通信
业务痛点
用户在电商平台下单支付后,系统需完成“订单状态更新、发送短信通知、生成积分、同步会员等级”4个操作。若采用同步调用,每个操作耗时100ms,总耗时400ms,用户需等待400ms才能看到支付成功页面,体验较差。
MQ解决方案
- 支付系统完成支付后,同步更新订单状态(耗时100ms),随后向MQ发送“支付成功”消息;
- 短信服务、积分服务、会员服务作为消费者,异步订阅“支付成功”消息,分别执行短信发送、积分增加、会员等级更新操作;
- 用户无需等待后续3个操作完成,支付成功后立即看到结果,后续操作由MQ异步驱动。
技术要点
- 消息投递模式:采用“扇出交换机(Fanout Exchange)”,让多个消费者同时接收同一条消息;
- 异步非阻塞:消费者业务逻辑采用异步线程池处理,避免单条消息消费耗时过长阻塞队列;
- 耗时对比:同步调用总耗时400ms → 异步调用总耗时100ms(仅订单状态更新),用户等待时间缩短75%。
架构优势
- 提升用户体验:核心流程(支付+订单更新)快速响应,非核心流程异步执行;
- 提高系统吞吐量:同步调用时系统TPS受限于最慢操作,异步后各操作并行处理,TPS提升3-5倍。
场景三:流量削峰
业务痛点
电商秒杀活动中,每秒请求量可达10000+,但后端库存系统、订单系统的最大处理能力仅为2000 TPS。若直接将请求打向后端,会导致系统过载、数据库连接耗尽,甚至服务宕机。
MQ解决方案
- 秒杀活动开始前,提前在MQ中创建“秒杀请求队列”,设置队列最大长度(如10万条,避免消息堆积过多);
- 用户秒杀请求先发送至MQ,MQ作为“缓冲器”接收所有请求,后端系统按自身处理能力(2000 TPS)从MQ中拉取请求;
- 超过MQ队列长度的请求直接返回“秒杀拥挤,请稍后重试”,避免后端压力过大。
技术要点
- MQ选择:Kafka(高吞吐量,每秒可处理百万级消息,适合秒杀场景);
- 流量控制:设置Kafka消费者“每次拉取消息数量”(如每次拉取200条),配合线程池控制并发消费速度;
- 防重复提交:消息中携带用户ID+商品ID,后端系统通过Redis实现“幂等性校验”,避免重复创建订单;
代码示例(Kafka消费者限流):
@Service
public class SeckillConsumer {
@Autowired
private SeckillService seckillService;
@Autowired
private StringRedisTemplate redisTemplate;
// 配置Kafka消费者,每次拉取200条消息,线程池核心线程数10
@KafkaListener(topics = "seckill_topic", properties = {
"max.poll.records=200",
"fetch.min.bytes=1024",
"fetch.max.wait.ms=500"
})
public void consumeSeckillRequest(ConsumerRecord<String, String> record) {
String message = record.value();
SeckillRequest request = JSON.parseObject(message, SeckillRequest.class);
String userId = request.getUserId();
String productId = request.getProductId();
String key = "seckill:order:" + userId + ":" + productId;
// 幂等性校验:判断用户是否已秒杀成功
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
System.out.println("用户" + userId + "已秒杀过商品" + productId,跳过重复请求");
return;
}
// 执行秒杀逻辑
boolean success = seckillService.doSeckill(userId, productId);
if (success) {
// 秒杀成功,记录用户秒杀记录(过期时间24小时)
redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS);
System.out.println("用户" + userId + "秒杀商品" + productId + "成功");
} else {
System.out.println("用户" + userId + "秒杀商品" + productId + "失败,库存不足");
}
}
}
架构优势
- 流量缓冲:MQ承接瞬时高峰流量,避免后端系统被“冲垮”;
- 削峰填谷:将秒杀1小时内的高峰流量,分散到后续2-3小时内处理(若库存未售罄),充分利用后端资源;
- 保护核心系统:仅让符合处理能力的请求进入后端,确保系统稳定运行。
场景四:数据同步
业务痛点
电商系统中,用户在APP端修改个人信息(昵称、手机号、地址)后,需同步更新到“用户中心数据库”“搜索服务索引库”“推荐系统用户画像库”3个数据源。若采用代码中逐个调用接口同步,存在“同步失败数据不一致”“新增数据源需改代码”等问题。
MQ解决方案
- 用户中心系统在用户信息修改后,向MQ发送“用户信息更新”消息(包含用户ID、修改字段、新值);
搜索服务、推荐服务分别订阅该消息,接收到消息后:
- 搜索服务:更新Elasticsearch中的用户索引;
- 推荐服务:更新Redis或MySQL中的用户画像数据;
- 所有数据源通过MQ消息异步同步,确保数据最终一致性。
技术要点
- 消息格式:采用“增量更新”格式,仅携带修改的字段(如{userId:123, nickname:"新昵称"}),减少消息体积;
- 数据一致性:设置消息重试机制(如重试3次),重试失败后进入死信队列,人工介入处理;
- 同步延迟:通过监控平台统计消息从发送到消费的延迟(目标<1s),确保各数据源同步及时。
架构优势
- 数据一致性:避免手动同步导致的数据源不一致问题;
- 扩展性:新增数据源(如数据分析平台)只需订阅消息,无需修改用户中心代码;
- 可追溯:MQ记录所有同步消息,便于排查数据不一致原因(如某条消息消费失败)。
场景五:事务消息
业务痛点
电商“下单扣库存”场景中,存在“订单创建成功但库存扣减失败”的问题:若先创建订单,再扣库存,库存扣减失败会导致订单无库存却存在;若先扣库存,再创建订单,订单创建失败会导致库存被扣却无订单,出现数据不一致。
MQ解决方案
采用RocketMQ的“事务消息”机制,实现“订单创建”与“库存扣减”的原子性操作,流程如下:
- 订单系统发送“半事务消息”到MQ(此时消息处于“待确认”状态,消费者无法消费);
- MQ收到半事务消息后,向订单系统返回“消息发送成功”;
订单系统执行本地事务(创建订单+扣减库存,在同一个数据库事务中):
- 若本地事务执行成功,向MQ发送“确认消息”,MQ将半事务消息标记为“可消费”,库存系统(或其他消费者)可消费消息;
- 若本地事务执行失败,向MQ发送“回滚消息”,MQ删除半事务消息,避免消息被消费;
- 若订单系统因网络问题未及时发送确认/回滚消息,MQ会定时向订单系统发起“事务状态回查”,订单系统查询本地事务状态后,补发送确认/回滚消息。
技术要点
- MQ选择:RocketMQ(原生支持事务消息,RabbitMQ需通过自定义逻辑实现);
- 本地事务:订单创建与库存扣减需在同一个数据库事务中,确保两者同时成功或同时失败;
- 回查机制:设置RocketMQ事务回查次数(如3次),回查间隔(如10s),避免无限回查;
代码示例(RocketMQ事务消息):
// 1. 订单系统发送半事务消息
@Service
public class OrderTransactionService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private InventoryMapper inventoryMapper;
public void createOrderWithTransaction(Order order) {
// 构造半事务消息
String message = JSON.toJSONString(order);
Message<String> msg = MessageBuilder.withPayload(message)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
// 发送半事务消息,指定事务监听器
rocketMQTemplate.sendMessageInTransaction(
"order_transaction_topic", // 事务消息主题
"order_tag", // 消息标签
msg, // 消息内容
order // 本地事务参数(传递给事务监听器)
);
}
}
// 2. 事务监听器(执行本地事务+回查事务状态)
@Component
@RocketMQTransactionListener(txProducerGroup = "order_producer_group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderMapper orderMapper;
@Autowired
private InventoryMapper inventoryMapper;
@Autowired
private OrderStatusMapper orderStatusMapper;
// 执行本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Order order = (Order) arg;
Connection connection = null;
try {
// 获取数据库连接,开启手动事务
connection = DruidDataSourceFactory.createDataSource().getConnection();
connection.setAutoCommit(false);
// 执行订单创建
orderMapper.insert(order);
// 执行库存扣减
int rows = inventoryMapper.deductStock(order.getProductId(), order.getQuantity());
if (rows == 0) {
// 库存不足,回滚本地事务
connection.rollback();
return RocketMQLocalTransactionState.ROLLBACK;
}
// 提交本地事务
connection.commit();
// 记录订单事务状态(用于回查)
orderStatusMapper.insert(order.getOrderId(), "SUCCESS");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 本地事务执行异常,回滚
try {
if (connection != null) connection.rollback();
} catch (SQLException ex) {
ex.printStackTrace();
}
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
} finally {
try {
if (connection != null) connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// 事务状态回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String message = new String((byte[]) msg.getPayload());
Order order = JSON.parseObject(message, Order.class);
// 查询本地订单事务状态
String status = orderStatusMapper.select(order.getOrderId());
if ("SUCCESS".equals(status)) {
return RocketMQLocalTransactionState.COMMIT;
} else if ("FAIL".equals(status)) {
return RocketMQLocalTransactionState.ROLLBACK;
} else {
// 状态未知,继续回查(最多回查3次)
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
架构优势
- 事务一致性:确保“订单创建”与“库存扣减”等操作原子性,避免数据不一致;
- 可靠性:通过回查机制解决网络异常导致的事务状态未知问题;
- 低耦合:相比分布式事务(如Seata),无需引入额外框架,依赖MQ原生能力即可实现。
场景六:日志收集
业务痛点
分布式系统中,服务部署在多台服务器上(如订单服务部署5台、支付服务部署3台),日志分散在各服务器的本地文件中。当系统出现故障时,运维人员需逐台登录服务器查看日志,定位问题效率极低(如查找某条订单的日志,需打开5台服务器的日志文件搜索)。
MQ解决方案
采用“MQ + ELK”架构(Kafka + Elasticsearch + Logstash + Kibana),实现日志集中收集与分析:
- 各服务通过日志框架(如Logback、Log4j2)将日志输出到本地文件,同时通过“MQ日志Appender”将日志发送到Kafka队列;
- Logstash作为消费者,从Kafka中拉取日志,对日志进行清洗(如提取字段、过滤无用日志)、结构化处理(如转为JSON格式);
- Logstash将处理后的日志写入Elasticsearch(搜索引擎),构建日志索引;
- 运维人员通过Kibana(可视化工具)查询Elasticsearch中的日志,支持按“服务名、时间、日志级别、订单ID”等多维度搜索,快速定位问题。
技术要点
- MQ选择:Kafka(高吞吐量,适合日志等大量非实时数据传输);
- 日志结构化:Logstash配置过滤规则,将非结构化日志(如“2024-05-20 10:30:00 [INFO] OrderService: create order success, orderId=123”)转为JSON格式({"time":"2024-05-20 10:30:00","level":"INFO","service":"OrderService","content":"create order success","orderId":"123"});
- 日志分片:Kafka按“服务名”分区(如订单服务日志写入order_service分区,支付服务日志写入pay_service分区),便于后续按服务查询;
配置示例(Logback对接Kafka):
<!-- Logback配置文件:logback-spring.xml -->
<configuration>
<!-- 1. 输出日志到控制台 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%level] %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!-- 2. 输出日志到Kafka -->
<appender name="KAFKA" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!-- Kafka地址(Logstash监听地址) -->
<destination>192.168.1.100:9500</destination>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<!-- 自定义日志字段 -->
<customFields>{"service":"order_service","env":"prod"}</customFields>
<fieldNames>
<timestamp>time</timestamp>
<message>content</message>
<logger>logger</logger>
<thread>thread</thread>
<level>level</level>
</fieldNames>
</encoder>
</appender>
<!-- 3. 根日志配置,同时输出到控制台和Kafka -->
<root level="INFO">
<appender-ref ref="CONSOLE" />
<appender-ref ref="KAFKA" />
</root>
</configuration>
架构优势
- 集中化:所有服务日志统一存储到Elasticsearch,无需逐台查看服务器;
- 高效查询:支持多维度搜索(如“orderId=123”“level=ERROR”),定位问题时间从小时级缩短到分钟级;
- 可分析:通过Kibana生成日志统计报表(如各服务ERROR日志数量趋势、请求耗时分布),提前发现系统隐患。
场景七:延迟消息
业务痛点
电商订单创建后,若用户未在30分钟内支付,需自动取消订单并释放库存。早期实现方式是“定时任务每隔1分钟扫描未支付订单”,存在两个问题:一是延迟(最快1分钟后取消,最慢2分钟),二是压力大(订单量多时,扫描SQL执行耗时久,占用数据库资源)。
MQ解决方案
采用MQ的“延迟消息”功能,实现订单30分钟未支付自动取消:
- 订单系统创建未支付订单后,向MQ发送一条“延迟30分钟”的消息(消息内容包含订单ID);
- MQ接收到延迟消息后,暂存消息,不立即投递;
- 30分钟后,MQ将消息投递到“订单取消队列”;
订单系统作为消费者,接收到消息后,查询订单支付状态:
- 若未支付:取消订单,释放库存;
- 若已支付:忽略消息(通过幂等性校验避免重复取消)。
技术要点
MQ选择:
- RabbitMQ:通过“死信队列(DLX)+ 消息过期时间(TTL)”实现延迟消息;
- RocketMQ:原生支持延迟消息,提供1s、5s、10s、30s、1min、5min等预设延迟级别,也可自定义延迟时间;
- 幂等性:消费者处理消息时,必须先查询订单状态,避免已支付订单被取消;
- 消息可靠性:开启消息持久化,避免MQ宕机导致延迟消息丢失;
代码示例(RabbitMQ延迟消息):
// 1. 配置RabbitMQ延迟队列(死信队列)
@Configuration
public class RabbitMQDelayConfig {
// 普通队列(接收延迟消息,消息过期后转发到死信队列)
@Bean
public Queue orderDelayQueue() {
Map<String, Object> args = new HashMap<>();
// 设置消息过期时间(30分钟=1800000ms)
args.put("x-message-ttl", 1800000);
// 设置死信交换机
args.put("x-dead-letter-exchange", "order_dead_exchange");
// 设置死信路由键
args.put("x-dead-letter-routing-key", "order.dead.cancel");
return QueueBuilder.durable("order_delay_queue").withArguments(args).build();
}
// 死信交换机
@Bean
public DirectExchange orderDeadExchange() {
return ExchangeBuilder.directExchange("order_dead_exchange").durable(true).build();
}
// 死信队列(实际处理订单取消的队列)
@Bean
public Queue orderDeadQueue() {
return QueueBuilder.durable("order_dead_queue").build();
}
// 绑定死信交换机与死信队列
@Bean
public Binding deadExchangeBindingDeadQueue() {
return BindingBuilder.bind(orderDeadQueue())
.to(orderDeadExchange())
.with("order.dead.cancel");
}
// 绑定普通交换机与普通延迟队列
@Bean
public DirectExchange orderDelayExchange() {
return ExchangeBuilder.directExchange("order_delay_exchange").durable(true).build();
}
@Bean
public Binding delayExchangeBindingDelayQueue() {
return BindingBuilder.bind(orderDelayQueue())
.to(orderDelayExchange())
.with("order.delay.create");
}
}
// 2. 订单系统发送延迟消息
@Service
public class OrderDelayService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderMapper orderMapper;
public void createUnpaidOrder(Order order) {
// 1. 保存未支付订单
order.setStatus("UNPAID");
orderMapper.insert(order);
// 2. 发送30分钟延迟消息
String message = JSON.toJSONString(order.getOrderId());
rabbitTemplate.convertAndSend("order_delay_exchange", "order.delay.create", message);
System.out.println("未支付订单创建成功,订单ID:" + order.getOrderId() + ",延迟30分钟后检查支付状态");
}
}
// 3. 订单系统消费死信队列消息,取消未支付订单
@Component
public class OrderCancelConsumer {
@Autowired
private OrderMapper orderMapper;
@Autowired
private InventoryMapper inventoryMapper;
@RabbitListener(queues = "order_dead_queue")
public void handleOrderCancel(String orderId) {
// 查询订单当前状态
Order order = orderMapper.selectById(orderId);
if (order == null) {
System.out.println("订单不存在,订单ID:" + orderId);
return;
}
// 若未支付,取消订单并释放库存
if ("UNPAID".equals(order.getStatus())) {
order.setStatus("CANCELLED");
orderMapper.updateById(order);
// 释放库存
inventoryMapper.restoreStock(order.getProductId(), order.getQuantity());
System.out.println("订单30分钟未支付已取消,订单ID:" + orderId + ",库存已释放");
} else {
// 已支付,忽略消息
System.out.println("订单已支付,无需取消,订单ID:" + orderId);
}
}
}
架构优势
- 精准延迟:消息延迟时间可精确到秒级(RocketMQ),避免定时任务的扫描延迟;
- 低压力:无需定时扫描数据库,减少数据库查询压力(订单量10万时,定时任务需扫描10万条记录,延迟消息仅需处理未支付的订单);
- 可扩展:支持其他延迟场景(如“订单支付后24小时未发货提醒”“会员到期前3天通知”),只需发送对应延迟级别的消息。
场景八:广播通知
业务痛点
分布式系统中,配置中心(如Nacos、Apollo)更新某一配置(如数据库连接池大小、接口超时时间)后,需通知所有依赖该配置的服务(如订单服务、支付服务、库存服务)重新加载配置。若采用“配置中心逐个调用服务接口”的方式,服务数量多时(如20个),调用效率低且易出现调用失败。
MQ解决方案
- 配置中心更新配置后,向MQ发送一条“配置更新”广播消息(包含配置Key、新值、版本号);
- 所有依赖该配置的服务作为消费者,订阅MQ的“配置更新”广播队列;
各服务接收到消息后,检查消息中的配置Key是否为自身依赖的配置:
- 若是:重新加载配置(如更新数据库连接池、接口超时时间),并记录配置版本号;
- 若否:忽略消息。
技术要点
MQ投递模式:采用“扇出交换机(Fanout Exchange)”或“主题交换机(Topic Exchange)”,实现消息广播;
- 扇出交换机:所有绑定该交换机的队列都会收到消息,适合全量广播;
- 主题交换机:通过通配符(如“config.order.#”“config.pay.#”)实现按服务类型广播;
- 配置版本号:消息中携带配置版本号,服务端对比本地版本号,避免重复加载(如配置中心重试发送消息时);
代码示例(RabbitMQ扇出交换机广播):
// 1. 配置扇出交换机与消费者队列
@Configuration
public class ConfigBroadcastConfig {
// 扇出交换机(配置更新广播)
@Bean
public FanoutExchange configFanoutExchange() {
return ExchangeBuilder.fanoutExchange("config_broadcast_exchange").durable(true).build();
}
// 订单服务队列(绑定扇出交换机)
@Bean
public Queue orderConfigQueue() {
return QueueBuilder.durable("order_config_queue").build();
}
// 支付服务队列(绑定扇出交换机)
@Bean
public Queue payConfigQueue() {
return QueueBuilder.durable("pay_config_queue").build();
}
// 库存服务队列(绑定扇出交换机)
@Bean
public Queue inventoryConfigQueue() {
return QueueBuilder.durable("inventory_config_queue").build();
}
// 绑定交换机与各队列
@Bean
public Binding orderConfigBinding() {
return BindingBuilder.bind(orderConfigQueue()).to(configFanoutExchange());
}
@Bean
public Binding payConfigBinding() {
return BindingBuilder.bind(payConfigQueue()).to(configFanoutExchange());
}
@Bean
public Binding inventoryConfigBinding() {
return BindingBuilder.bind(inventoryConfigQueue()).to(configFanoutExchange());
}
}
// 2. 配置中心发送广播消息
@Service
public class ConfigCenterService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfigMapper configMapper;
public void updateConfig(String configKey, String newValue) {
// 1. 更新配置中心数据库
Config config = configMapper.selectByKey(configKey);
if (config == null) {
config = new Config();
config.setConfigKey(configKey);
config.setConfigValue(newValue);
config.setVersion(1);
configMapper.insert(config);
} else {
config.setConfigValue(newValue);
config.setVersion(config.getVersion() + 1);
configMapper.updateById(config);
}
// 2. 发送配置更新广播消息
ConfigUpdateMessage message = new ConfigUpdateMessage();
message.setConfigKey(configKey);
message.setNewValue(newValue);
message.setVersion(config.getVersion());
rabbitTemplate.convertAndSend("config_broadcast_exchange", "", JSON.toJSONString(message));
System.out.println("配置更新广播消息发送成功,配置Key:" + configKey + ",版本号:" + config.getVersion());
}
}
// 3. 订单服务消费广播消息,重新加载配置
@Component
public class OrderConfigConsumer {
@Autowired
private ConfigService configService;
// 本地缓存配置版本号
private Map<String, Integer> localConfigVersion = new ConcurrentHashMap<>();
@RabbitListener(queues = "order_config_queue")
public void handleConfigUpdate(String message) {
ConfigUpdateMessage updateMessage = JSON.parseObject(message, ConfigUpdateMessage.class);
String configKey = updateMessage.getConfigKey();
int newVersion = updateMessage.getVersion();
String newValue = updateMessage.getNewValue();
// 对比本地版本号,避免重复加载
Integer localVersion = localConfigVersion.getOrDefault(configKey, 0);
if (newVersion <= localVersion) {
System.out.println("配置版本号未更新,忽略消息,配置Key:" + configKey);
return;
}
// 重新加载配置(以数据库连接池为例)
if ("db.pool.maxActive".equals(configKey)) {
configService.updateDbPoolMaxActive(Integer.parseInt(newValue));
localConfigVersion.put(configKey, newVersion);
System.out.println("订单服务更新数据库连接池配置,maxActive:" + newValue + ",版本号:" + newVersion);
} else if ("api.timeout.order".equals(configKey)) {
configService.updateOrderApiTimeout(Integer.parseInt(newValue));
localConfigVersion.put(configKey, newVersion);
System.out.println("订单服务更新接口超时配置,timeout:" + newValue + "ms,版本号:" + newVersion);
}
// 其他配置Key的处理逻辑...
}
}
架构优势
- 高效广播:一次发送消息,所有服务同时接收,避免逐个调用的效率问题;
- 低耦合:配置中心无需知道有多少服务依赖配置,新增服务只需绑定队列即可;
- 可靠通知:通过MQ消息重试机制,确保所有服务都能收到配置更新消息(即使服务暂时宕机,恢复后也能消费)。
场景九:限流熔断
业务痛点
分布式系统中,服务间存在依赖关系(如订单服务依赖支付服务)。当支付服务因故障响应缓慢(如接口超时从100ms变为5000ms)或返回错误(如500错误)时,订单服务会因等待支付服务响应而产生大量线程阻塞,进而导致订单服务线程池耗尽,无法处理新请求,引发“级联故障”。
MQ解决方案
采用“MQ + 限流熔断”组合方案,保护订单服务不被依赖服务的故障影响:
- 订单服务调用支付服务前,先检查“支付服务健康状态”(通过熔断器统计调用成功率、超时率);
- 若支付服务健康(成功率>90%,超时率<5%):直接调用支付服务接口;
- 若支付服务不健康(成功率<80%,超时率>10%):触发熔断,订单服务不直接调用支付服务,而是向MQ发送“支付请求”消息,同时返回“支付处理中,请稍后查询结果”给用户;
- 支付服务恢复健康后,从MQ中拉取“支付请求”消息,异步处理支付,处理完成后通过MQ反馈结果给订单服务;
- 订单服务接收到支付结果后,更新订单状态,并通过短信/APP推送通知用户。
技术要点
- 熔断器选择:Sentinel(阿里开源,支持限流、熔断、降级)或Resilience4j(轻量级,适合微服务);
- MQ角色:熔断时作为“请求缓冲池”,暂存支付请求,避免订单服务线程阻塞;
- 健康检查:熔断器定时统计支付服务调用指标(成功率、超时率、错误率),动态调整熔断状态(闭合→半开→打开);
代码示例(Sentinel熔断 + RabbitMQ缓冲):
// 1. 配置Sentinel熔断器(支付服务调用熔断规则)
@Configuration
public class SentinelConfig {
@PostConstruct
public void initFlowRules() {
// 熔断规则:支付服务调用接口
DegradeRule rule = new DegradeRule();
rule.setResource("payServiceCall"); // 资源名(支付服务调用接口)
rule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO); // 按异常比例熔断
rule.setCount(0.1); // 异常比例阈值(10%)
rule.setTimeWindow(60); // 熔断时间窗口(60秒)
rule.setMinRequestAmount(10); // 最小请求数(10次请求后才开始统计)
DegradeRuleManager.loadRules(Collections.singletonList(rule));
// 限流规则:订单服务接口限流(1000 TPS)
FlowRule flowRule = new FlowRule();
flowRule.setResource("createOrderApi");
flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
flowRule.setCount(1000);
FlowRuleManager.loadRules(Collections.singletonList(flowRule));
}
}
// 2. 订单服务调用支付服务(带熔断与MQ缓冲)
@Service
public class OrderPayService {
@Autowired
private PayServiceFeignClient payServiceFeignClient; // Feign调用支付服务
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderMapper orderMapper;
// 订单创建接口(带限流)
@SentinelResource(value = "createOrderApi", blockHandler = "createOrderBlockHandler")
public String createOrder(Order order) {
// 保存订单
orderMapper.insert(order);
// 调用支付服务(带熔断)
String payResult = callPayServiceWithDegrade(order);
return payResult;
}
// 支付服务调用(带熔断)
@SentinelResource(value = "payServiceCall", fallback = "payServiceFallback")
private String callPayServiceWithDegrade(Order order) {
// 直接调用支付服务Feign接口
PayRequest request = new PayRequest();
request.setOrderId(order.getOrderId());
request.setAmount(order.getAmount());
PayResponse response = payServiceFeignClient.pay(request);
if ("SUCCESS".equals(response.getCode())) {
order.setStatus("PAID");
orderMapper.updateById(order);
return "订单创建成功,已支付";
} else {
return "订单创建成功,支付失败:" + response.getMsg();
}
}
// 支付服务熔断降级方法(触发熔断时调用)
private String payServiceFallback(Order order, Throwable e) {
// 向MQ发送支付请求消息,异步处理
PayRequestMessage message = new PayRequestMessage();
message.setOrderId(order.getOrderId());
message.setAmount(order.getAmount());
message.setUserId(order.getUserId());
rabbitTemplate.convertAndSend("pay_request_queue", JSON.toJSONString(message));
// 更新订单状态为“支付处理中”
order.setStatus("PAYING");
orderMapper.updateById(order);
// 返回用户提示
return "订单创建成功,支付处理中,请稍后在【我的订单】中查询结果";
}
// 订单接口限流降级方法
public String createOrderBlockHandler(Order order, BlockException e) {
return "当前订单创建人数过多,请稍后重试";
}
}
// 3. 支付服务恢复后,消费MQ消息处理支付
@Component
public class PayRequestConsumer {
@Autowired
private PayService payService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderFeignClient orderFeignClient;
@RabbitListener(queues = "pay_request_queue")
public void handlePayRequest(String message) {
PayRequestMessage requestMessage = JSON.parseObject(message, PayRequestMessage.class);
String orderId = requestMessage.getOrderId();
BigDecimal amount = requestMessage.getAmount();
try {
// 处理支付逻辑
PayResult result = payService.processPay(orderId, amount);
// 发送支付结果消息给订单服务
PayResultMessage resultMessage = new PayResultMessage();
resultMessage.setOrderId(orderId);
resultMessage.setPayStatus(result.isSuccess() ? "PAID" : "PAY_FAILED");
resultMessage.setPayTime(new Date());
rabbitTemplate.convertAndSend("pay_result_queue", JSON.toJSONString(resultMessage));
System.out.println("支付请求处理完成,订单ID:" + orderId + ",支付状态:" + resultMessage.getPayStatus());
} catch (Exception e) {
// 支付处理失败,消息重新入队(重试3次后进入死信队列)
throw new AmqpRejectAndDontRequeueException("支付处理失败,订单ID:" + orderId, e);
}
}
}
// 4. 订单服务消费支付结果消息,更新订单状态
@Component
public class PayResultConsumer {
@Autowired
private OrderMapper orderMapper;
@Autowired
private SmsService smsService;
@RabbitListener(queues = "pay_result_queue")
public void handlePayResult(String message) {
PayResultMessage resultMessage = JSON.parseObject(message, PayResultMessage.class);
String orderId = resultMessage.getOrderId();
String payStatus = resultMessage.getPayStatus();
// 更新订单状态
Order order = orderMapper.selectById(orderId);
if (order == null) {
System.out.println("订单不存在,订单ID:" + orderId);
return;
}
order.setStatus(payStatus);
order.setPayTime(resultMessage.getPayTime());
orderMapper.updateById(order);
// 发送支付结果短信通知
String smsContent = "您的订单(ID:" + orderId + ")" +
("PAID".equals(payStatus) ? "已支付成功" : "支付失败,请重新尝试") +
",详情可在APP【我的订单】中查看。";
smsService.sendSms(order.getUserId(), smsContent);
System.out.println("订单状态更新完成,订单ID:" + orderId + ",支付状态:" + payStatus);
}
}
架构优势
- 熔断保护:依赖服务故障时,触发熔断,避免订单服务线程阻塞,保障核心订单创建功能可用;
- 缓冲请求:MQ暂存支付请求,待依赖服务恢复后异步处理,不丢失用户请求;
- 限流保护:通过Sentinel限流,避免订单服务被瞬时高峰流量冲垮,保障系统稳定。
场景十:分布式事务最终一致性
业务痛点
跨银行转账场景中,用户从A银行转账1000元到B银行,需完成“A银行扣减1000元”和“B银行增加1000元”两个操作。由于A银行和B银行是不同的系统(不同数据库、不同服务),无法使用传统的数据库事务保证原子性,存在“A银行扣钱成功但B银行加钱失败”的风险,导致数据不一致。
MQ解决方案
采用“可靠消息最终一致性”方案,实现跨系统事务的最终一致性,流程如下:
A银行系统执行本地事务(扣减1000元):
- 若扣减失败:直接返回转账失败给用户,流程结束;
- 若扣减成功:向MQ发送一条“转账发起”消息(包含转账ID、A银行账户、B银行账户、金额1000元);
- MQ收到消息后,向A银行返回“消息发送成功”确认;
B银行系统订阅“转账发起”消息,执行本地事务(增加1000元):
- 若增加成功:向MQ发送“转账成功”确认消息,MQ删除“转账发起”消息;
- 若增加失败:不发送确认消息,MQ会定时重新投递“转账发起”消息(重试机制);
- A银行系统订阅“转账成功”消息,更新本地转账状态为“成功”;
- 若B银行多次重试仍失败(如B银行系统长时间宕机),MQ将“转账发起”消息转入死信队列,A银行系统监控到死信消息后,执行“回滚本地事务”(退还1000元给用户),并通过人工介入处理B银行问题。
技术要点
- 消息可靠性:开启MQ消息持久化、生产者确认、消费者手动ACK,确保消息不丢失;
- 重试机制:设置MQ消息重试次数(如5次),重试间隔(如10s、30s、1min、5min、10min),避免频繁重试加重B银行系统负担;
- 死信处理:死信队列需关联监控告警(如短信通知运维人员),确保异常情况及时被发现;
- 幂等性:B银行系统处理“转账发起”消息时,需通过“转账ID”做幂等性校验(如查询是否已处理过该转账ID),避免重复加钱;
代码示例(核心流程):
// 1. A银行系统执行本地事务并发送消息
@Service
public class ABankTransferService {
@Autowired
private ABankAccountMapper accountMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TransferRecordMapper transferRecordMapper;
@Transactional
public String transferToBBank(String fromAccount, String toBBankAccount, BigDecimal amount) {
// 1. 生成唯一转账ID
String transferId = UUID.randomUUID().toString();
// 2. 扣减A银行账户余额(本地事务)
int rows = accountMapper.deductBalance(fromAccount, amount);
if (rows == 0) {
throw new RuntimeException("A银行账户余额不足,转账失败");
}
// 3. 记录转账记录(状态:处理中)
TransferRecord record = new TransferRecord();
record.setTransferId(transferId);
record.setFromAccount(fromAccount);
record.setToAccount(toBBankAccount);
record.setAmount(amount);
record.setStatus("PROCESSING");
record.setCreateTime(new Date());
transferRecordMapper.insert(record);
// 4. 发送转账发起消息(开启生产者确认)
try {
TransferMessage message = new TransferMessage();
message.setTransferId(transferId);
message.setFromAccount(fromAccount);
message.setToBBankAccount(toBBankAccount);
message.setAmount(amount);
// 同步等待生产者确认
rabbitTemplate.invoke(new RabbitOperations.Callback<String>() {
@Override
public String doInRabbit(RabbitOperations operations) throws AmqpException {
operations.convertAndSend("transfer_exchange", "transfer.to.bbank", JSON.toJSONString(message),
new CorrelationData(transferId));
return null;
}
});
System.out.println("A银行转账发起,转账ID:" + transferId + ",已扣减余额:" + amount);
return "转账处理中,转账ID:" + transferId;
} catch (Exception e) {
// 消息发送失败,回滚本地事务(Spring事务管理自动回滚)
throw new RuntimeException("转账消息发送失败,已回滚A银行余额", e);
}
}
// 5. 消费B银行转账成功消息,更新转账状态
@RabbitListener(queues = "abank_transfer_success_queue")
public void handleTransferSuccess(String message) {
TransferSuccessMessage successMessage = JSON.parseObject(message, TransferSuccessMessage.class);
String transferId = successMessage.getTransferId();
// 更新转账记录状态为成功
TransferRecord record = transferRecordMapper.selectByTransferId(transferId);
if (record != null && "PROCESSING".equals(record.getStatus())) {
record.setStatus("SUCCESS");
record.setCompleteTime(new Date());
transferRecordMapper.updateById(record);
System.out.println("A银行转账成功,转账ID:" + transferId);
}
}
// 6. 消费死信消息,回滚本地事务
@RabbitListener(queues = "transfer_dead_queue")
public void handleTransferDeadLetter(String message) {
TransferMessage transferMessage = JSON.parseObject(message, TransferMessage.class);
String transferId = transferMessage.getTransferId();
String fromAccount = transferMessage.getFromAccount();
BigDecimal amount = transferMessage.getAmount();
// 回滚A银行余额
accountMapper.restoreBalance(fromAccount, amount);
// 更新转账记录状态为失败
TransferRecord record = transferRecordMapper.selectByTransferId(transferId);
if (record != null) {
record.setStatus("FAIL");
record.setFailReason("B银行处理失败,已回滚");
transferRecordMapper.updateById(record);
}
// 发送告警短信给运维人员
String alarmContent = "转账ID:" + transferId + " 进入死信队列,已回滚A银行余额 " + amount + ",请检查B银行系统";
SmsUtils.sendSms("13800138000", alarmContent);
System.out.println("转账失败已回滚,转账ID:" + transferId + ",告警已发送");
}
}
// 2. B银行系统消费转账发起消息,执行本地事务
@Component
public class BBankTransferConsumer {
@Autowired
private BBankAccountMapper accountMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private BBankTransferRecordMapper transferRecordMapper;
@RabbitListener(queues = "bbank_transfer_queue")
public void handleTransferRequest(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
TransferMessage transferMessage = JSON.parseObject(message, TransferMessage.class);
String transferId = transferMessage.getTransferId();
String toAccount = transferMessage.getToBBankAccount();
BigDecimal amount = transferMessage.getAmount();
try {
// 1. 幂等性校验:判断是否已处理过该转账ID
BBankTransferRecord record = transferRecordMapper.selectByTransferId(transferId);
if (record != null && "SUCCESS".equals(record.getStatus())) {
// 已处理过,手动ACK
channel.basicAck(deliveryTag, false);
System.out.println("B银行已处理该转账,转账ID:" + transferId + ",忽略消息");
return;
}
// 2. 执行本地事务:增加B银行账户余额
accountMapper.increaseBalance(toAccount, amount);
// 3. 记录B银行转账记录
if (record == null) {
record = new BBankTransferRecord();
record.setTransferId(transferId);
record.setToAccount(toAccount);
record.setAmount(amount);
record.setStatus("SUCCESS");
record.setCreateTime(new Date());
transferRecordMapper.insert(record);
} else {
record.setStatus("SUCCESS");
transferRecordMapper.updateById(record);
}
// 4. 发送转账成功消息给A银行
TransferSuccessMessage successMessage = new TransferSuccessMessage();
successMessage.setTransferId(transferId);
rabbitTemplate.convertAndSend("transfer_exchange", "transfer.success.to.abank", JSON.toJSONString(successMessage));
// 5. 手动ACK确认消息消费成功
channel.basicAck(deliveryTag, false);
System.out.println("B银行转账处理成功,转账ID:" + transferId + ",已增加余额:" + amount);
} catch (Exception e) {
// 6. 处理失败,判断是否重试(重试5次后进入死信队列)
Integer retryCount = (Integer) channel.getMessageProperties().getHeaders().get("retry-count");
if (retryCount == null) retryCount = 0;
if (retryCount < 5) {
// 重新入队,重试次数+1
channel.getMessageProperties().getHeaders().put("retry-count", retryCount + 1);
channel.basicNack(deliveryTag, false, true);
System.out.println("B银行转账处理失败,重试次数:" + (retryCount + 1) + ",转账ID:" + transferId);
} else {
// 重试5次失败,进入死信队列
channel.basicNack(deliveryTag, false, false);
System.out.println("B银行转账处理失败,重试5次后进入死信队列,转账ID:" + transferId);
}
e.printStackTrace();
}
}
}
架构优势
- 最终一致性:通过MQ消息重试与死信处理,确保“扣钱”与“加钱”两个操作最终同时成功或同时失败,避免数据不一致;
- 可靠性:消息不丢失,即使跨系统、跨网络,也能通过重试机制保障事务推进;
- 低耦合:A银行与B银行仅通过MQ消息交互,无需感知对方系统细节,便于维护与扩展。
最后说一句
MQ的核心价值在于“解耦、异步、削峰”,但并非所有场景都适合用MQ——若业务逻辑简单、无跨系统交互、对实时性要求极高(如毫秒级响应),直接调用接口可能更高效。在实际项目中,需结合业务痛点、技术成本、维护难度综合判断是否使用MQ,以及选择哪种MQ(RabbitMQ/Kafka/RocketMQ)。希望以上10种场景能给大家带来启发,避免在项目中滥用或误用MQ。