labchy 发布的文章

具有商人思维的程序员很稀缺,绝大多数都是农民思维

在技术圈,“码农”的自嘲背后,隐含两种截然不同的思维模式:一是依赖线性付出的“农民思维”,二是着眼全局、善用整合放大的“商人思维”,二者直接决定了程序员是执行者还是创造者。

一、关于投入:时间与资源的使用逻辑

1. 时间观:堆砌工时 vs. 购买时间

  • 农民思维:信奉“天道酬勤”,认为解决问题的唯一途径是投入更多个人时间,常通过加班、熬夜敲代码换取线性、有限的回报(工资、奖金),本质是“出售自己的时间”。
  • 商人思维:视“时间为最大成本”,遇到问题优先思考:是否有现成开源项目可用?能否让更专业的人外包/协作?能否用脚本/AI自动化?通过花钱或利用资源“购买他人时间”,解放自己去思考架构、产品、商业模式等高价值问题。

2. 本金观:囤积技术 vs. 杠杆资源

  • 农民思维:像攒粮食一样囤积技术栈,追求掌握所有新技术,认为“个人价值=懂的技术”,学习过程漫长且焦虑,技术变革(如AI兴起)可能让过往知识归零。
  • 商人思维:明确“个人价值=利用资源解决问题的能力”,本金不是知识,而是信用、人脉与整合能力。善于借助云服务(杠杆算力)、开源社区(杠杆代码)、合作伙伴(杠杆技能),以最小自有资源快速搭建产品、撬动大项目。

二、关于产出:追求代码交付 vs. 创造核心价值

3. 回报观:稳定优先 vs. 增长优先

  • 农民思维:追求稳定工资与明确需求,厌恶不确定性,希望产品需求不变更、技术栈不迭代,核心目标是完成交付、拿到“当年收成”(工资)。
  • 商人思维:拥抱变化,愿为高回报承担风险。不满足于完成需求,更关注:功能能否带来用户增长?技术方案能否形成专利/核心壁垒?能否做成SaaS产品?追求指数级回报。

4. 价值观:重视技术资产 vs. 重视数据现金流

  • 农民思维:将写的代码、搭建的系统视为宝贵“资产”,倾向于保护,甚至通过重复造轮子证明自己。
  • 商人思维:认为代码会贬值,用户、数据、现金流才是核心。追求快速迭代验证,代码可重写、系统可重构,只要用户与现金流持续增长即可。

5. 产品观:项目思维 vs. 产品矩阵思维

  • 农民思维:专注于当前任务或项目,追求代码完美性与技术先进性,如同精心培育单一稻米品种的农民。
  • 商人思维:具备产品化与矩阵思维。开发工具时会思考:能否做成开源项目积累声望?能否包装成API服务出售?能否基于它提供咨询培训?设计的是商业闭环,而非单一项目。

三、关于风险:防御规避 vs. 主动管理

6. 风险观:规避风险 vs. 管理风险

  • 农民思维:核心风险是“线上出bug”“技术被淘汰”,应对方式以防御为主:写详细测试、做冗余备份、学稳定技术,祈祷坏事不发生。
  • 商人思维:核心风险是“错过市场机会”,主动进入新兴、不成熟但潜力大的领域(如早期AI、Web3)套利。通过快速试错与迭代将不确定性转化为优势,而非单纯规避风险。

7. 决策观:经验主义 vs. 数据驱动

  • 农民思维:依赖过往技术经验决策,如“以前用MySQL好,这次还选MySQL”“熟悉这个架构,风险低”。
  • 商人思维:依靠数据与概率决策,通过A/B测试、用户行为分析、市场调研,计算不同技术方案或产品方向的成功率与期望值,做出更优选择。

四、关于关系:自给自足 vs. 开放共赢

8. 资源观:自给自足 vs. 开放协作

  • 农民思维:希望所有功能自己开发、所有问题自己解决,对外部库或服务充满不信任,追求技术“纯洁性”与“可控性”。
  • 商人思维:信奉“万物皆可为我所用”,积极拥抱开源、SaaS服务、云原生,善用最佳资源,仅专注于核心、差异化竞争力的部分。

9. 博弈观:零和博弈 vs. 正和博弈

  • 农民思维:易陷入内部竞争,认为技术资源、晋升名额有限,“你上我就没机会”,视为零和游戏。
  • 商人思维:致力于创造正和游戏,通过打造开源项目、分享技术博客、建立行业连接扩大影响力,帮助他人成功以给自己创造更多机会,实现共赢。

关键启示

  1. 从“资产”到“资本”:编程能力若仅用于打工,只是固定资产;需将其产品化(开发自有软件、工具包)、资本化(投资技术项目、做顾问)、杠杆化(组建团队放大输出),让价值流动。
  2. 打破“工程师基因枷锁”:追求完美、技术至上、厌恶风险等工程师特质,在商业世界可能成为阻碍;需转变为拥抱市场不确定性、精于价值计算、善用资源的商人。

结论

写代码的本质是创造,但创造的价值归属取决于思维模式。是做辛勤的“码农”,还是成为“自己代码的CEO”,答案藏在每一次思考与选择中。

MQTT 与 Kafka|物联网消息与流数据集成实践

一、MQTT 如何与 Kafka 一起使用?

  • 技术定位:MQTT(Message Queuing Telemetry Transport)是轻量级消息传输协议,专为受限网络环境下的设备通信设计;Apache Kafka 是分布式流处理平台,旨在处理大规模实时数据流。
  • 协同价值:二者为互补技术,结合使用可构建强大物联网架构,实现设备与平台间的稳定连接、高效数据传输,同时支持高吞吐量数据的实时处理与分析。
  • 适用场景:网联汽车和车联网、智能城市基础设施、工业物联网监控、物流管理等物联网场景。

二、Kafka 和 MQTT 可以解决哪些物联网挑战?

物联网平台架构设计需应对以下核心挑战,MQTT 与 Kafka 的组合可针对性解决:

  1. 连接性和网络弹性:应对网联汽车等场景中网络不稳定、延迟等问题,确保数据稳定传输。
  2. 扩展性:支持设备数量增长,处理不断增加的物联网设备产生的大量数据。
  3. 消息吞吐量:适配物联网设备实时产生的传感器读数、位置信息等海量数据,保障数据有效采集、处理与分发。
  4. 数据存储:提供高效的数据流存储与管理方案,应对物联网设备持续产生的数据。

三、为什么需要在物联网架构中集成 MQTT 与 Kafka?

1. Kafka 在物联网场景中的不足

  • 不可靠的连接:Kafka 客户端需稳定 IP 连接,无法适配移动网络等不稳定环境,易导致通信中断。
  • 客户端复杂性与资源密集性:Kafka 客户端复杂且消耗资源多,不适用于资源受限的小型物联网设备。
  • 主题可扩展性限制:处理大量主题时存在瓶颈,难以适配物联网应用中“多设备+多主题”的场景。

2. MQTT 弥补 Kafka 不足的核心优势

  • 可靠的连接:专为不稳定网络环境设计,保障物联网设备间消息传输可靠性。
  • 轻量级客户端:客户端资源消耗低,适合资源受限的物联网设备。
  • 海量主题扩展:高效处理大量业务主题,可将海量 MQTT 主题汇聚后映射到 Kafka 主题,实现数据汇聚处理。

四、几种可行的 MQTT-Kafka 集成解决方案对比

1. EMQX Kafka 数据集成

  • 核心机制:EMQX(流行 MQTT Broker)通过内置 Kafka 数据集成功能,作为二者桥梁实现无缝通信。
  • 关键特性:支持以生产者(向 Kafka 发消息)、消费者(从 Kafka 收消息)两种角色创建数据桥接;具备双向数据传输能力,架构设计灵活;低延迟、高吞吐量,保障数据桥接高效可靠。
  • 参考文档:可访问 EMQX 文档“集成 Kafka”了解详情。

2. Confluent MQTT 代理

  • 提供方:Kafka 商业运营公司 Confluent。
  • 核心机制:提供 MQTT 协议代理模块,连接 MQTT 客户端与 Kafka Broker,简化客户端发布/订阅 Kafka 主题的流程,抽象直接通信的复杂性,避免多余复制与延迟。
  • 局限性:仅支持 MQTT 3.1.1 版本;MQTT 客户端连接性能可能影响数据吞吐量。

3. 对开源 MQTT Broker 和 Kafka 进行定制开发

  • 实现方式:使用开源 MQTT Broker,自行开发桥接服务——通过 MQTT 客户端从 MQTT Broker 订阅数据,利用 Kafka Producer API 将数据发送到 Kafka。
  • 注意事项:需自行开发与维护桥接服务,同时需考虑可靠性与扩展性问题。

五、使用 EMQX 将 MQTT 数据集成到 Kafka

1. 核心架构优势

EMQX 作为高度可扩展的 MQTT Broker,结合 Kafka 高吞吐量、持久化数据处理能力,为物联网构建完善数据基础设施,支持海量设备连接。
请输入图片描述

2. 关键功能

  • 双向连接:既可以将设备 MQTT 消息批量转发到 Kafka,也能从后端系统订阅 Kafka 消息并下发到物联网客户端。
  • 灵活的主题映射:支持一对一、一对多、多对多等多种 MQTT-Kafka 主题映射方式,同时兼容 MQTT 主题过滤器(通配符)。
  • 多写入模式:Kafka 生产者支持同步/异步写入,可根据场景平衡延迟与吞吐量。
  • 数据处理与监控:提供消息总数、成功/失败交付数、消息速率等实时指标;可结合 SQL 规则,在消息推送至 Kafka 或设备前完成数据提取、过滤、丰富与转换。

六、应用场景示例:MQTT 和 Kafka 赋能网联汽车和车联网

请输入图片描述
MQTT + Kafka 架构在网联汽车与车联网领域应用广泛,核心场景包括:

  1. 车载信息系统和车辆数据分析:实现海量实时车辆数据(传感器读数、GPS 位置、油耗、驾驶行为)的云端接入、流式处理与分析,用于车辆性能监控、预测性维护、车队管理,提升运营效率。
  2. 智能交通管理:获取并处理网联汽车、交通传感器、基础设施等多源交通数据,开发智能交通管理系统,实现实时交通监控、拥堵检测、路线优化、智能交通信号控制。
  3. 远程诊断:支持高吞吐量数据传输,用于车辆远程诊断与故障排除,实现主动维护与快速问题解决。
  4. 能源效率和环境影响:实现网联汽车与智能电网系统、能源管理平台的双向数据交互,包括实时监测能源消耗、实施需求响应机制、优化电动汽车充电策略。
  5. 预测性维护:持续跟踪车辆健康与性能数据,通过高吞吐量实时车载数据收集、异常检测和预测性维护算法,帮助车主及时发现潜在问题并安排维护。

七、结语

MQTT + Kafka 架构适用于需实时数据收集、高扩展性、高可靠性及物联网集成能力的应用场景,可实现数据流畅传输、高效沟通与创新应用(如网联汽车生态系统功能)。二者结合是理想的物联网架构解决方案,能实现物联网设备与云之间的无缝端到端集成,保障双向通信可靠性。

我工作中用MQ的10种场景

前言

在分布式系统开发中,消息队列(MQ)是解决系统解耦、异步通信、流量削峰等问题的核心组件。我在过往的项目开发(电商秒杀、物流调度、支付对账等场景)中,多次用到RabbitMQ、Kafka、RocketMQ等不同类型的MQ,总结出10种高频实用场景,每种场景均包含“业务痛点、MQ解决方案、技术要点、架构优势”,希望能给大家提供参考。

场景一:系统解耦

业务痛点

早期电商系统中,订单创建后需同步调用库存系统、支付系统、物流系统的接口。若其中某一个系统接口超时(如物流系统服务宕机),会导致整个订单流程失败,系统耦合度极高,一处故障影响全局。

MQ解决方案

  1. 订单系统在创建订单后,不再直接调用其他系统接口,而是向MQ发送一条“订单创建成功”的消息(包含订单ID、商品ID、用户ID、订单金额等核心字段);
  2. 库存系统、支付系统、物流系统分别作为消费者,订阅MQ中“订单创建成功”的消息队列;
  3. 各系统接收到消息后,异步执行自身业务逻辑(库存扣减、支付状态同步、物流单生成),执行结果通过MQ反馈给订单系统(如“库存扣减成功”“支付完成”)。

技术要点

  • 消息队列选择:RabbitMQ(支持交换机路由,可按系统类型路由消息);
  • 消息可靠性:开启消息持久化(durable=true)、生产者确认机制(publisher confirm)、消费者手动ACK,避免消息丢失;
  • 代码示例(Java + RabbitMQ):

    // 订单系统生产者发送消息
    @Service
    public class OrderService {
      @Autowired
      private RabbitTemplate rabbitTemplate;
    
      public void createOrder(Order order) {
          // 1. 保存订单到数据库
          orderMapper.insert(order);
          // 2. 发送订单创建消息到MQ
          String message = JSON.toJSONString(order);
          rabbitTemplate.convertAndSend("order_exchange", "order.created", message, new CorrelationData(order.getOrderId()));
          System.out.println("订单创建消息发送成功,订单ID:" + order.getOrderId());
      }
    }
    
    // 库存系统消费者接收消息
    @Component
    public class InventoryConsumer {
      @Autowired
      private InventoryMapper inventoryMapper;
    
      @RabbitListener(queues = "inventory_queue")
      public void handleOrderCreated(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
          try {
              Order order = JSON.parseObject(message, Order.class);
              // 执行库存扣减逻辑
              inventoryMapper.deductStock(order.getProductId(), order.getQuantity());
              // 手动ACK确认消息消费成功
              channel.basicAck(deliveryTag, false);
              System.out.println("库存扣减成功,商品ID:" + order.getProductId());
          } catch (Exception e) {
              // 消费失败,消息重新入队(或进入死信队列)
              channel.basicNack(deliveryTag, false, true);
              e.printStackTrace();
          }
      }
    }

架构优势

  • 解耦:各系统仅依赖MQ消息,无需感知其他系统存在,新增系统(如积分系统)只需订阅消息即可,无需修改订单系统代码;
  • 容错:某一系统故障时,消息暂存MQ,待系统恢复后重新消费,避免流程中断;
  • 可扩展性:各系统可独立扩容(如库存系统压力大时,增加消费者实例),不影响其他系统。

场景二:异步通信

业务痛点

用户在电商平台下单支付后,系统需完成“订单状态更新、发送短信通知、生成积分、同步会员等级”4个操作。若采用同步调用,每个操作耗时100ms,总耗时400ms,用户需等待400ms才能看到支付成功页面,体验较差。

MQ解决方案

  1. 支付系统完成支付后,同步更新订单状态(耗时100ms),随后向MQ发送“支付成功”消息;
  2. 短信服务、积分服务、会员服务作为消费者,异步订阅“支付成功”消息,分别执行短信发送、积分增加、会员等级更新操作;
  3. 用户无需等待后续3个操作完成,支付成功后立即看到结果,后续操作由MQ异步驱动。

技术要点

  • 消息投递模式:采用“扇出交换机(Fanout Exchange)”,让多个消费者同时接收同一条消息;
  • 异步非阻塞:消费者业务逻辑采用异步线程池处理,避免单条消息消费耗时过长阻塞队列;
  • 耗时对比:同步调用总耗时400ms → 异步调用总耗时100ms(仅订单状态更新),用户等待时间缩短75%。

架构优势

  • 提升用户体验:核心流程(支付+订单更新)快速响应,非核心流程异步执行;
  • 提高系统吞吐量:同步调用时系统TPS受限于最慢操作,异步后各操作并行处理,TPS提升3-5倍。

场景三:流量削峰

业务痛点

电商秒杀活动中,每秒请求量可达10000+,但后端库存系统、订单系统的最大处理能力仅为2000 TPS。若直接将请求打向后端,会导致系统过载、数据库连接耗尽,甚至服务宕机。

MQ解决方案

  1. 秒杀活动开始前,提前在MQ中创建“秒杀请求队列”,设置队列最大长度(如10万条,避免消息堆积过多);
  2. 用户秒杀请求先发送至MQ,MQ作为“缓冲器”接收所有请求,后端系统按自身处理能力(2000 TPS)从MQ中拉取请求;
  3. 超过MQ队列长度的请求直接返回“秒杀拥挤,请稍后重试”,避免后端压力过大。

技术要点

  • MQ选择:Kafka(高吞吐量,每秒可处理百万级消息,适合秒杀场景);
  • 流量控制:设置Kafka消费者“每次拉取消息数量”(如每次拉取200条),配合线程池控制并发消费速度;
  • 防重复提交:消息中携带用户ID+商品ID,后端系统通过Redis实现“幂等性校验”,避免重复创建订单;
  • 代码示例(Kafka消费者限流):

    @Service
    public class SeckillConsumer {
      @Autowired
      private SeckillService seckillService;
      @Autowired
      private StringRedisTemplate redisTemplate;
    
      // 配置Kafka消费者,每次拉取200条消息,线程池核心线程数10
      @KafkaListener(topics = "seckill_topic", properties = {
              "max.poll.records=200",
              "fetch.min.bytes=1024",
              "fetch.max.wait.ms=500"
      })
      public void consumeSeckillRequest(ConsumerRecord<String, String> record) {
          String message = record.value();
          SeckillRequest request = JSON.parseObject(message, SeckillRequest.class);
          String userId = request.getUserId();
          String productId = request.getProductId();
          String key = "seckill:order:" + userId + ":" + productId;
    
          // 幂等性校验:判断用户是否已秒杀成功
          if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
              System.out.println("用户" + userId + "已秒杀过商品" + productId,跳过重复请求");
              return;
          }
    
          // 执行秒杀逻辑
          boolean success = seckillService.doSeckill(userId, productId);
          if (success) {
              // 秒杀成功,记录用户秒杀记录(过期时间24小时)
              redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS);
              System.out.println("用户" + userId + "秒杀商品" + productId + "成功");
          } else {
              System.out.println("用户" + userId + "秒杀商品" + productId + "失败,库存不足");
          }
      }
    }

架构优势

  • 流量缓冲:MQ承接瞬时高峰流量,避免后端系统被“冲垮”;
  • 削峰填谷:将秒杀1小时内的高峰流量,分散到后续2-3小时内处理(若库存未售罄),充分利用后端资源;
  • 保护核心系统:仅让符合处理能力的请求进入后端,确保系统稳定运行。

场景四:数据同步

业务痛点

电商系统中,用户在APP端修改个人信息(昵称、手机号、地址)后,需同步更新到“用户中心数据库”“搜索服务索引库”“推荐系统用户画像库”3个数据源。若采用代码中逐个调用接口同步,存在“同步失败数据不一致”“新增数据源需改代码”等问题。

MQ解决方案

  1. 用户中心系统在用户信息修改后,向MQ发送“用户信息更新”消息(包含用户ID、修改字段、新值);
  2. 搜索服务、推荐服务分别订阅该消息,接收到消息后:

    • 搜索服务:更新Elasticsearch中的用户索引;
    • 推荐服务:更新Redis或MySQL中的用户画像数据;
  3. 所有数据源通过MQ消息异步同步,确保数据最终一致性。

技术要点

  • 消息格式:采用“增量更新”格式,仅携带修改的字段(如{userId:123, nickname:"新昵称"}),减少消息体积;
  • 数据一致性:设置消息重试机制(如重试3次),重试失败后进入死信队列,人工介入处理;
  • 同步延迟:通过监控平台统计消息从发送到消费的延迟(目标<1s),确保各数据源同步及时。

架构优势

  • 数据一致性:避免手动同步导致的数据源不一致问题;
  • 扩展性:新增数据源(如数据分析平台)只需订阅消息,无需修改用户中心代码;
  • 可追溯:MQ记录所有同步消息,便于排查数据不一致原因(如某条消息消费失败)。

场景五:事务消息

业务痛点

电商“下单扣库存”场景中,存在“订单创建成功但库存扣减失败”的问题:若先创建订单,再扣库存,库存扣减失败会导致订单无库存却存在;若先扣库存,再创建订单,订单创建失败会导致库存被扣却无订单,出现数据不一致。

MQ解决方案

采用RocketMQ的“事务消息”机制,实现“订单创建”与“库存扣减”的原子性操作,流程如下:

  1. 订单系统发送“半事务消息”到MQ(此时消息处于“待确认”状态,消费者无法消费);
  2. MQ收到半事务消息后,向订单系统返回“消息发送成功”;
  3. 订单系统执行本地事务(创建订单+扣减库存,在同一个数据库事务中):

    • 若本地事务执行成功,向MQ发送“确认消息”,MQ将半事务消息标记为“可消费”,库存系统(或其他消费者)可消费消息;
    • 若本地事务执行失败,向MQ发送“回滚消息”,MQ删除半事务消息,避免消息被消费;
  4. 若订单系统因网络问题未及时发送确认/回滚消息,MQ会定时向订单系统发起“事务状态回查”,订单系统查询本地事务状态后,补发送确认/回滚消息。

技术要点

  • MQ选择:RocketMQ(原生支持事务消息,RabbitMQ需通过自定义逻辑实现);
  • 本地事务:订单创建与库存扣减需在同一个数据库事务中,确保两者同时成功或同时失败;
  • 回查机制:设置RocketMQ事务回查次数(如3次),回查间隔(如10s),避免无限回查;
  • 代码示例(RocketMQ事务消息):

    // 1. 订单系统发送半事务消息
    @Service
    public class OrderTransactionService {
      @Autowired
      private RocketMQTemplate rocketMQTemplate;
      @Autowired
      private OrderMapper orderMapper;
      @Autowired
      private InventoryMapper inventoryMapper;
    
      public void createOrderWithTransaction(Order order) {
          // 构造半事务消息
          String message = JSON.toJSONString(order);
          Message<String> msg = MessageBuilder.withPayload(message)
                  .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                  .build();
    
          // 发送半事务消息,指定事务监听器
          rocketMQTemplate.sendMessageInTransaction(
                  "order_transaction_topic",  // 事务消息主题
                  "order_tag",                // 消息标签
                  msg,                        // 消息内容
                  order                       // 本地事务参数(传递给事务监听器)
          );
      }
    }
    
    // 2. 事务监听器(执行本地事务+回查事务状态)
    @Component
    @RocketMQTransactionListener(txProducerGroup = "order_producer_group")
    public class OrderTransactionListener implements RocketMQLocalTransactionListener {
      @Autowired
      private OrderMapper orderMapper;
      @Autowired
      private InventoryMapper inventoryMapper;
      @Autowired
      private OrderStatusMapper orderStatusMapper;
    
      // 执行本地事务
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          Order order = (Order) arg;
          Connection connection = null;
          try {
              // 获取数据库连接,开启手动事务
              connection = DruidDataSourceFactory.createDataSource().getConnection();
              connection.setAutoCommit(false);
    
              // 执行订单创建
              orderMapper.insert(order);
              // 执行库存扣减
              int rows = inventoryMapper.deductStock(order.getProductId(), order.getQuantity());
              if (rows == 0) {
                  // 库存不足,回滚本地事务
                  connection.rollback();
                  return RocketMQLocalTransactionState.ROLLBACK;
              }
    
              // 提交本地事务
              connection.commit();
              // 记录订单事务状态(用于回查)
              orderStatusMapper.insert(order.getOrderId(), "SUCCESS");
              return RocketMQLocalTransactionState.COMMIT;
          } catch (Exception e) {
              // 本地事务执行异常,回滚
              try {
                  if (connection != null) connection.rollback();
              } catch (SQLException ex) {
                  ex.printStackTrace();
              }
              e.printStackTrace();
              return RocketMQLocalTransactionState.ROLLBACK;
          } finally {
              try {
                  if (connection != null) connection.close();
              } catch (SQLException e) {
                  e.printStackTrace();
              }
          }
      }
    
      // 事务状态回查
      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          String message = new String((byte[]) msg.getPayload());
          Order order = JSON.parseObject(message, Order.class);
          // 查询本地订单事务状态
          String status = orderStatusMapper.select(order.getOrderId());
          if ("SUCCESS".equals(status)) {
              return RocketMQLocalTransactionState.COMMIT;
          } else if ("FAIL".equals(status)) {
              return RocketMQLocalTransactionState.ROLLBACK;
          } else {
              // 状态未知,继续回查(最多回查3次)
              return RocketMQLocalTransactionState.UNKNOWN;
          }
      }
    }

架构优势

  • 事务一致性:确保“订单创建”与“库存扣减”等操作原子性,避免数据不一致;
  • 可靠性:通过回查机制解决网络异常导致的事务状态未知问题;
  • 低耦合:相比分布式事务(如Seata),无需引入额外框架,依赖MQ原生能力即可实现。

场景六:日志收集

业务痛点

分布式系统中,服务部署在多台服务器上(如订单服务部署5台、支付服务部署3台),日志分散在各服务器的本地文件中。当系统出现故障时,运维人员需逐台登录服务器查看日志,定位问题效率极低(如查找某条订单的日志,需打开5台服务器的日志文件搜索)。

MQ解决方案

采用“MQ + ELK”架构(Kafka + Elasticsearch + Logstash + Kibana),实现日志集中收集与分析:

  1. 各服务通过日志框架(如Logback、Log4j2)将日志输出到本地文件,同时通过“MQ日志Appender”将日志发送到Kafka队列;
  2. Logstash作为消费者,从Kafka中拉取日志,对日志进行清洗(如提取字段、过滤无用日志)、结构化处理(如转为JSON格式);
  3. Logstash将处理后的日志写入Elasticsearch(搜索引擎),构建日志索引;
  4. 运维人员通过Kibana(可视化工具)查询Elasticsearch中的日志,支持按“服务名、时间、日志级别、订单ID”等多维度搜索,快速定位问题。

技术要点

  • MQ选择:Kafka(高吞吐量,适合日志等大量非实时数据传输);
  • 日志结构化:Logstash配置过滤规则,将非结构化日志(如“2024-05-20 10:30:00 [INFO] OrderService: create order success, orderId=123”)转为JSON格式({"time":"2024-05-20 10:30:00","level":"INFO","service":"OrderService","content":"create order success","orderId":"123"});
  • 日志分片:Kafka按“服务名”分区(如订单服务日志写入order_service分区,支付服务日志写入pay_service分区),便于后续按服务查询;
  • 配置示例(Logback对接Kafka):

    <!-- Logback配置文件:logback-spring.xml -->
    <configuration>
      <!-- 1. 输出日志到控制台 -->
      <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
          <encoder>
              <pattern>%d{yyyy-MM-dd HH:mm:ss} [%level] %logger{50} - %msg%n</pattern>
          </encoder>
      </appender>
    
      <!-- 2. 输出日志到Kafka -->
      <appender name="KAFKA" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
          <!-- Kafka地址(Logstash监听地址) -->
          <destination>192.168.1.100:9500</destination>
          <encoder class="net.logstash.logback.encoder.LogstashEncoder">
              <!-- 自定义日志字段 -->
              <customFields>{"service":"order_service","env":"prod"}</customFields>
              <fieldNames>
                  <timestamp>time</timestamp>
                  <message>content</message>
                  <logger>logger</logger>
                  <thread>thread</thread>
                  <level>level</level>
              </fieldNames>
          </encoder>
      </appender>
    
      <!-- 3. 根日志配置,同时输出到控制台和Kafka -->
      <root level="INFO">
          <appender-ref ref="CONSOLE" />
          <appender-ref ref="KAFKA" />
      </root>
    </configuration>

架构优势

  • 集中化:所有服务日志统一存储到Elasticsearch,无需逐台查看服务器;
  • 高效查询:支持多维度搜索(如“orderId=123”“level=ERROR”),定位问题时间从小时级缩短到分钟级;
  • 可分析:通过Kibana生成日志统计报表(如各服务ERROR日志数量趋势、请求耗时分布),提前发现系统隐患。

场景七:延迟消息

业务痛点

电商订单创建后,若用户未在30分钟内支付,需自动取消订单并释放库存。早期实现方式是“定时任务每隔1分钟扫描未支付订单”,存在两个问题:一是延迟(最快1分钟后取消,最慢2分钟),二是压力大(订单量多时,扫描SQL执行耗时久,占用数据库资源)。

MQ解决方案

采用MQ的“延迟消息”功能,实现订单30分钟未支付自动取消:

  1. 订单系统创建未支付订单后,向MQ发送一条“延迟30分钟”的消息(消息内容包含订单ID);
  2. MQ接收到延迟消息后,暂存消息,不立即投递;
  3. 30分钟后,MQ将消息投递到“订单取消队列”;
  4. 订单系统作为消费者,接收到消息后,查询订单支付状态:

    • 若未支付:取消订单,释放库存;
    • 若已支付:忽略消息(通过幂等性校验避免重复取消)。

技术要点

  • MQ选择:

    • RabbitMQ:通过“死信队列(DLX)+ 消息过期时间(TTL)”实现延迟消息;
    • RocketMQ:原生支持延迟消息,提供1s、5s、10s、30s、1min、5min等预设延迟级别,也可自定义延迟时间;
  • 幂等性:消费者处理消息时,必须先查询订单状态,避免已支付订单被取消;
  • 消息可靠性:开启消息持久化,避免MQ宕机导致延迟消息丢失;
  • 代码示例(RabbitMQ延迟消息):

    // 1. 配置RabbitMQ延迟队列(死信队列)
    @Configuration
    public class RabbitMQDelayConfig {
      // 普通队列(接收延迟消息,消息过期后转发到死信队列)
      @Bean
      public Queue orderDelayQueue() {
          Map<String, Object> args = new HashMap<>();
          // 设置消息过期时间(30分钟=1800000ms)
          args.put("x-message-ttl", 1800000);
          // 设置死信交换机
          args.put("x-dead-letter-exchange", "order_dead_exchange");
          // 设置死信路由键
          args.put("x-dead-letter-routing-key", "order.dead.cancel");
          return QueueBuilder.durable("order_delay_queue").withArguments(args).build();
      }
    
      // 死信交换机
      @Bean
      public DirectExchange orderDeadExchange() {
          return ExchangeBuilder.directExchange("order_dead_exchange").durable(true).build();
      }
    
      // 死信队列(实际处理订单取消的队列)
      @Bean
      public Queue orderDeadQueue() {
          return QueueBuilder.durable("order_dead_queue").build();
      }
    
      // 绑定死信交换机与死信队列
      @Bean
      public Binding deadExchangeBindingDeadQueue() {
          return BindingBuilder.bind(orderDeadQueue())
                  .to(orderDeadExchange())
                  .with("order.dead.cancel");
      }
    
      // 绑定普通交换机与普通延迟队列
      @Bean
      public DirectExchange orderDelayExchange() {
          return ExchangeBuilder.directExchange("order_delay_exchange").durable(true).build();
      }
    
      @Bean
      public Binding delayExchangeBindingDelayQueue() {
          return BindingBuilder.bind(orderDelayQueue())
                  .to(orderDelayExchange())
                  .with("order.delay.create");
      }
    }
    
    // 2. 订单系统发送延迟消息
    @Service
    public class OrderDelayService {
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private OrderMapper orderMapper;
    
      public void createUnpaidOrder(Order order) {
          // 1. 保存未支付订单
          order.setStatus("UNPAID");
          orderMapper.insert(order);
          // 2. 发送30分钟延迟消息
          String message = JSON.toJSONString(order.getOrderId());
          rabbitTemplate.convertAndSend("order_delay_exchange", "order.delay.create", message);
          System.out.println("未支付订单创建成功,订单ID:" + order.getOrderId() + ",延迟30分钟后检查支付状态");
      }
    }
    
    // 3. 订单系统消费死信队列消息,取消未支付订单
    @Component
    public class OrderCancelConsumer {
      @Autowired
      private OrderMapper orderMapper;
      @Autowired
      private InventoryMapper inventoryMapper;
    
      @RabbitListener(queues = "order_dead_queue")
      public void handleOrderCancel(String orderId) {
          // 查询订单当前状态
          Order order = orderMapper.selectById(orderId);
          if (order == null) {
              System.out.println("订单不存在,订单ID:" + orderId);
              return;
          }
          // 若未支付,取消订单并释放库存
          if ("UNPAID".equals(order.getStatus())) {
              order.setStatus("CANCELLED");
              orderMapper.updateById(order);
              // 释放库存
              inventoryMapper.restoreStock(order.getProductId(), order.getQuantity());
              System.out.println("订单30分钟未支付已取消,订单ID:" + orderId + ",库存已释放");
          } else {
              // 已支付,忽略消息
              System.out.println("订单已支付,无需取消,订单ID:" + orderId);
          }
      }
    }

架构优势

  • 精准延迟:消息延迟时间可精确到秒级(RocketMQ),避免定时任务的扫描延迟;
  • 低压力:无需定时扫描数据库,减少数据库查询压力(订单量10万时,定时任务需扫描10万条记录,延迟消息仅需处理未支付的订单);
  • 可扩展:支持其他延迟场景(如“订单支付后24小时未发货提醒”“会员到期前3天通知”),只需发送对应延迟级别的消息。

场景八:广播通知

业务痛点

分布式系统中,配置中心(如Nacos、Apollo)更新某一配置(如数据库连接池大小、接口超时时间)后,需通知所有依赖该配置的服务(如订单服务、支付服务、库存服务)重新加载配置。若采用“配置中心逐个调用服务接口”的方式,服务数量多时(如20个),调用效率低且易出现调用失败。

MQ解决方案

  1. 配置中心更新配置后,向MQ发送一条“配置更新”广播消息(包含配置Key、新值、版本号);
  2. 所有依赖该配置的服务作为消费者,订阅MQ的“配置更新”广播队列;
  3. 各服务接收到消息后,检查消息中的配置Key是否为自身依赖的配置:

    • 若是:重新加载配置(如更新数据库连接池、接口超时时间),并记录配置版本号;
    • 若否:忽略消息。

技术要点

  • MQ投递模式:采用“扇出交换机(Fanout Exchange)”或“主题交换机(Topic Exchange)”,实现消息广播;

    • 扇出交换机:所有绑定该交换机的队列都会收到消息,适合全量广播;
    • 主题交换机:通过通配符(如“config.order.#”“config.pay.#”)实现按服务类型广播;
  • 配置版本号:消息中携带配置版本号,服务端对比本地版本号,避免重复加载(如配置中心重试发送消息时);
  • 代码示例(RabbitMQ扇出交换机广播):

    // 1. 配置扇出交换机与消费者队列
    @Configuration
    public class ConfigBroadcastConfig {
      // 扇出交换机(配置更新广播)
      @Bean
      public FanoutExchange configFanoutExchange() {
          return ExchangeBuilder.fanoutExchange("config_broadcast_exchange").durable(true).build();
      }
    
      // 订单服务队列(绑定扇出交换机)
      @Bean
      public Queue orderConfigQueue() {
          return QueueBuilder.durable("order_config_queue").build();
      }
    
      // 支付服务队列(绑定扇出交换机)
      @Bean
      public Queue payConfigQueue() {
          return QueueBuilder.durable("pay_config_queue").build();
      }
    
      // 库存服务队列(绑定扇出交换机)
      @Bean
      public Queue inventoryConfigQueue() {
          return QueueBuilder.durable("inventory_config_queue").build();
      }
    
      // 绑定交换机与各队列
      @Bean
      public Binding orderConfigBinding() {
          return BindingBuilder.bind(orderConfigQueue()).to(configFanoutExchange());
      }
    
      @Bean
      public Binding payConfigBinding() {
          return BindingBuilder.bind(payConfigQueue()).to(configFanoutExchange());
      }
    
      @Bean
      public Binding inventoryConfigBinding() {
          return BindingBuilder.bind(inventoryConfigQueue()).to(configFanoutExchange());
      }
    }
    
    // 2. 配置中心发送广播消息
    @Service
    public class ConfigCenterService {
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private ConfigMapper configMapper;
    
      public void updateConfig(String configKey, String newValue) {
          // 1. 更新配置中心数据库
          Config config = configMapper.selectByKey(configKey);
          if (config == null) {
              config = new Config();
              config.setConfigKey(configKey);
              config.setConfigValue(newValue);
              config.setVersion(1);
              configMapper.insert(config);
          } else {
              config.setConfigValue(newValue);
              config.setVersion(config.getVersion() + 1);
              configMapper.updateById(config);
          }
    
          // 2. 发送配置更新广播消息
          ConfigUpdateMessage message = new ConfigUpdateMessage();
          message.setConfigKey(configKey);
          message.setNewValue(newValue);
          message.setVersion(config.getVersion());
          rabbitTemplate.convertAndSend("config_broadcast_exchange", "", JSON.toJSONString(message));
          System.out.println("配置更新广播消息发送成功,配置Key:" + configKey + ",版本号:" + config.getVersion());
      }
    }
    
    // 3. 订单服务消费广播消息,重新加载配置
    @Component
    public class OrderConfigConsumer {
      @Autowired
      private ConfigService configService;
      // 本地缓存配置版本号
      private Map<String, Integer> localConfigVersion = new ConcurrentHashMap<>();
    
      @RabbitListener(queues = "order_config_queue")
      public void handleConfigUpdate(String message) {
          ConfigUpdateMessage updateMessage = JSON.parseObject(message, ConfigUpdateMessage.class);
          String configKey = updateMessage.getConfigKey();
          int newVersion = updateMessage.getVersion();
          String newValue = updateMessage.getNewValue();
    
          // 对比本地版本号,避免重复加载
          Integer localVersion = localConfigVersion.getOrDefault(configKey, 0);
          if (newVersion <= localVersion) {
              System.out.println("配置版本号未更新,忽略消息,配置Key:" + configKey);
              return;
          }
    
          // 重新加载配置(以数据库连接池为例)
          if ("db.pool.maxActive".equals(configKey)) {
              configService.updateDbPoolMaxActive(Integer.parseInt(newValue));
              localConfigVersion.put(configKey, newVersion);
              System.out.println("订单服务更新数据库连接池配置,maxActive:" + newValue + ",版本号:" + newVersion);
          } else if ("api.timeout.order".equals(configKey)) {
              configService.updateOrderApiTimeout(Integer.parseInt(newValue));
              localConfigVersion.put(configKey, newVersion);
              System.out.println("订单服务更新接口超时配置,timeout:" + newValue + "ms,版本号:" + newVersion);
          }
          // 其他配置Key的处理逻辑...
      }
    }

架构优势

  • 高效广播:一次发送消息,所有服务同时接收,避免逐个调用的效率问题;
  • 低耦合:配置中心无需知道有多少服务依赖配置,新增服务只需绑定队列即可;
  • 可靠通知:通过MQ消息重试机制,确保所有服务都能收到配置更新消息(即使服务暂时宕机,恢复后也能消费)。

场景九:限流熔断

业务痛点

分布式系统中,服务间存在依赖关系(如订单服务依赖支付服务)。当支付服务因故障响应缓慢(如接口超时从100ms变为5000ms)或返回错误(如500错误)时,订单服务会因等待支付服务响应而产生大量线程阻塞,进而导致订单服务线程池耗尽,无法处理新请求,引发“级联故障”。

MQ解决方案

采用“MQ + 限流熔断”组合方案,保护订单服务不被依赖服务的故障影响:

  1. 订单服务调用支付服务前,先检查“支付服务健康状态”(通过熔断器统计调用成功率、超时率);
  2. 若支付服务健康(成功率>90%,超时率<5%):直接调用支付服务接口;
  3. 若支付服务不健康(成功率<80%,超时率>10%):触发熔断,订单服务不直接调用支付服务,而是向MQ发送“支付请求”消息,同时返回“支付处理中,请稍后查询结果”给用户;
  4. 支付服务恢复健康后,从MQ中拉取“支付请求”消息,异步处理支付,处理完成后通过MQ反馈结果给订单服务;
  5. 订单服务接收到支付结果后,更新订单状态,并通过短信/APP推送通知用户。

技术要点

  • 熔断器选择:Sentinel(阿里开源,支持限流、熔断、降级)或Resilience4j(轻量级,适合微服务);
  • MQ角色:熔断时作为“请求缓冲池”,暂存支付请求,避免订单服务线程阻塞;
  • 健康检查:熔断器定时统计支付服务调用指标(成功率、超时率、错误率),动态调整熔断状态(闭合→半开→打开);
  • 代码示例(Sentinel熔断 + RabbitMQ缓冲):

    // 1. 配置Sentinel熔断器(支付服务调用熔断规则)
    @Configuration
    public class SentinelConfig {
      @PostConstruct
      public void initFlowRules() {
          // 熔断规则:支付服务调用接口
          DegradeRule rule = new DegradeRule();
          rule.setResource("payServiceCall");  // 资源名(支付服务调用接口)
          rule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO);  // 按异常比例熔断
          rule.setCount(0.1);  // 异常比例阈值(10%)
          rule.setTimeWindow(60);  // 熔断时间窗口(60秒)
          rule.setMinRequestAmount(10);  // 最小请求数(10次请求后才开始统计)
          DegradeRuleManager.loadRules(Collections.singletonList(rule));
    
          // 限流规则:订单服务接口限流(1000 TPS)
          FlowRule flowRule = new FlowRule();
          flowRule.setResource("createOrderApi");
          flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
          flowRule.setCount(1000);
          FlowRuleManager.loadRules(Collections.singletonList(flowRule));
      }
    }
    
    // 2. 订单服务调用支付服务(带熔断与MQ缓冲)
    @Service
    public class OrderPayService {
      @Autowired
      private PayServiceFeignClient payServiceFeignClient;  // Feign调用支付服务
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private OrderMapper orderMapper;
    
      // 订单创建接口(带限流)
      @SentinelResource(value = "createOrderApi", blockHandler = "createOrderBlockHandler")
      public String createOrder(Order order) {
          // 保存订单
          orderMapper.insert(order);
          // 调用支付服务(带熔断)
          String payResult = callPayServiceWithDegrade(order);
          return payResult;
      }
    
      // 支付服务调用(带熔断)
      @SentinelResource(value = "payServiceCall", fallback = "payServiceFallback")
      private String callPayServiceWithDegrade(Order order) {
          // 直接调用支付服务Feign接口
          PayRequest request = new PayRequest();
          request.setOrderId(order.getOrderId());
          request.setAmount(order.getAmount());
          PayResponse response = payServiceFeignClient.pay(request);
          if ("SUCCESS".equals(response.getCode())) {
              order.setStatus("PAID");
              orderMapper.updateById(order);
              return "订单创建成功,已支付";
          } else {
              return "订单创建成功,支付失败:" + response.getMsg();
          }
      }
    
      // 支付服务熔断降级方法(触发熔断时调用)
      private String payServiceFallback(Order order, Throwable e) {
          // 向MQ发送支付请求消息,异步处理
          PayRequestMessage message = new PayRequestMessage();
          message.setOrderId(order.getOrderId());
          message.setAmount(order.getAmount());
          message.setUserId(order.getUserId());
          rabbitTemplate.convertAndSend("pay_request_queue", JSON.toJSONString(message));
          // 更新订单状态为“支付处理中”
          order.setStatus("PAYING");
          orderMapper.updateById(order);
          // 返回用户提示
          return "订单创建成功,支付处理中,请稍后在【我的订单】中查询结果";
      }
    
      // 订单接口限流降级方法
      public String createOrderBlockHandler(Order order, BlockException e) {
          return "当前订单创建人数过多,请稍后重试";
      }
    }
    
    // 3. 支付服务恢复后,消费MQ消息处理支付
    @Component
    public class PayRequestConsumer {
      @Autowired
      private PayService payService;
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private OrderFeignClient orderFeignClient;
    
      @RabbitListener(queues = "pay_request_queue")
      public void handlePayRequest(String message) {
          PayRequestMessage requestMessage = JSON.parseObject(message, PayRequestMessage.class);
          String orderId = requestMessage.getOrderId();
          BigDecimal amount = requestMessage.getAmount();
    
          try {
              // 处理支付逻辑
              PayResult result = payService.processPay(orderId, amount);
              // 发送支付结果消息给订单服务
              PayResultMessage resultMessage = new PayResultMessage();
              resultMessage.setOrderId(orderId);
              resultMessage.setPayStatus(result.isSuccess() ? "PAID" : "PAY_FAILED");
              resultMessage.setPayTime(new Date());
              rabbitTemplate.convertAndSend("pay_result_queue", JSON.toJSONString(resultMessage));
              System.out.println("支付请求处理完成,订单ID:" + orderId + ",支付状态:" + resultMessage.getPayStatus());
          } catch (Exception e) {
              // 支付处理失败,消息重新入队(重试3次后进入死信队列)
              throw new AmqpRejectAndDontRequeueException("支付处理失败,订单ID:" + orderId, e);
          }
      }
    }
    
    // 4. 订单服务消费支付结果消息,更新订单状态
    @Component
    public class PayResultConsumer {
      @Autowired
      private OrderMapper orderMapper;
      @Autowired
      private SmsService smsService;
    
      @RabbitListener(queues = "pay_result_queue")
      public void handlePayResult(String message) {
          PayResultMessage resultMessage = JSON.parseObject(message, PayResultMessage.class);
          String orderId = resultMessage.getOrderId();
          String payStatus = resultMessage.getPayStatus();
    
          // 更新订单状态
          Order order = orderMapper.selectById(orderId);
          if (order == null) {
              System.out.println("订单不存在,订单ID:" + orderId);
              return;
          }
          order.setStatus(payStatus);
          order.setPayTime(resultMessage.getPayTime());
          orderMapper.updateById(order);
    
          // 发送支付结果短信通知
          String smsContent = "您的订单(ID:" + orderId + ")" + 
                  ("PAID".equals(payStatus) ? "已支付成功" : "支付失败,请重新尝试") + 
                  ",详情可在APP【我的订单】中查看。";
          smsService.sendSms(order.getUserId(), smsContent);
          System.out.println("订单状态更新完成,订单ID:" + orderId + ",支付状态:" + payStatus);
      }
    }

架构优势

  • 熔断保护:依赖服务故障时,触发熔断,避免订单服务线程阻塞,保障核心订单创建功能可用;
  • 缓冲请求:MQ暂存支付请求,待依赖服务恢复后异步处理,不丢失用户请求;
  • 限流保护:通过Sentinel限流,避免订单服务被瞬时高峰流量冲垮,保障系统稳定。

场景十:分布式事务最终一致性

业务痛点

跨银行转账场景中,用户从A银行转账1000元到B银行,需完成“A银行扣减1000元”和“B银行增加1000元”两个操作。由于A银行和B银行是不同的系统(不同数据库、不同服务),无法使用传统的数据库事务保证原子性,存在“A银行扣钱成功但B银行加钱失败”的风险,导致数据不一致。

MQ解决方案

采用“可靠消息最终一致性”方案,实现跨系统事务的最终一致性,流程如下:

  1. A银行系统执行本地事务(扣减1000元):

    • 若扣减失败:直接返回转账失败给用户,流程结束;
    • 若扣减成功:向MQ发送一条“转账发起”消息(包含转账ID、A银行账户、B银行账户、金额1000元);
  2. MQ收到消息后,向A银行返回“消息发送成功”确认;
  3. B银行系统订阅“转账发起”消息,执行本地事务(增加1000元):

    • 若增加成功:向MQ发送“转账成功”确认消息,MQ删除“转账发起”消息;
    • 若增加失败:不发送确认消息,MQ会定时重新投递“转账发起”消息(重试机制);
  4. A银行系统订阅“转账成功”消息,更新本地转账状态为“成功”;
  5. 若B银行多次重试仍失败(如B银行系统长时间宕机),MQ将“转账发起”消息转入死信队列,A银行系统监控到死信消息后,执行“回滚本地事务”(退还1000元给用户),并通过人工介入处理B银行问题。

技术要点

  • 消息可靠性:开启MQ消息持久化、生产者确认、消费者手动ACK,确保消息不丢失;
  • 重试机制:设置MQ消息重试次数(如5次),重试间隔(如10s、30s、1min、5min、10min),避免频繁重试加重B银行系统负担;
  • 死信处理:死信队列需关联监控告警(如短信通知运维人员),确保异常情况及时被发现;
  • 幂等性:B银行系统处理“转账发起”消息时,需通过“转账ID”做幂等性校验(如查询是否已处理过该转账ID),避免重复加钱;
  • 代码示例(核心流程):

    // 1. A银行系统执行本地事务并发送消息
    @Service
    public class ABankTransferService {
      @Autowired
      private ABankAccountMapper accountMapper;
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private TransferRecordMapper transferRecordMapper;
    
      @Transactional
      public String transferToBBank(String fromAccount, String toBBankAccount, BigDecimal amount) {
          // 1. 生成唯一转账ID
          String transferId = UUID.randomUUID().toString();
          // 2. 扣减A银行账户余额(本地事务)
          int rows = accountMapper.deductBalance(fromAccount, amount);
          if (rows == 0) {
              throw new RuntimeException("A银行账户余额不足,转账失败");
          }
          // 3. 记录转账记录(状态:处理中)
          TransferRecord record = new TransferRecord();
          record.setTransferId(transferId);
          record.setFromAccount(fromAccount);
          record.setToAccount(toBBankAccount);
          record.setAmount(amount);
          record.setStatus("PROCESSING");
          record.setCreateTime(new Date());
          transferRecordMapper.insert(record);
          // 4. 发送转账发起消息(开启生产者确认)
          try {
              TransferMessage message = new TransferMessage();
              message.setTransferId(transferId);
              message.setFromAccount(fromAccount);
              message.setToBBankAccount(toBBankAccount);
              message.setAmount(amount);
              // 同步等待生产者确认
              rabbitTemplate.invoke(new RabbitOperations.Callback<String>() {
                  @Override
                  public String doInRabbit(RabbitOperations operations) throws AmqpException {
                      operations.convertAndSend("transfer_exchange", "transfer.to.bbank", JSON.toJSONString(message),
                              new CorrelationData(transferId));
                      return null;
                  }
              });
              System.out.println("A银行转账发起,转账ID:" + transferId + ",已扣减余额:" + amount);
              return "转账处理中,转账ID:" + transferId;
          } catch (Exception e) {
              // 消息发送失败,回滚本地事务(Spring事务管理自动回滚)
              throw new RuntimeException("转账消息发送失败,已回滚A银行余额", e);
          }
      }
    
      // 5. 消费B银行转账成功消息,更新转账状态
      @RabbitListener(queues = "abank_transfer_success_queue")
      public void handleTransferSuccess(String message) {
          TransferSuccessMessage successMessage = JSON.parseObject(message, TransferSuccessMessage.class);
          String transferId = successMessage.getTransferId();
          // 更新转账记录状态为成功
          TransferRecord record = transferRecordMapper.selectByTransferId(transferId);
          if (record != null && "PROCESSING".equals(record.getStatus())) {
              record.setStatus("SUCCESS");
              record.setCompleteTime(new Date());
              transferRecordMapper.updateById(record);
              System.out.println("A银行转账成功,转账ID:" + transferId);
          }
      }
    
      // 6. 消费死信消息,回滚本地事务
      @RabbitListener(queues = "transfer_dead_queue")
      public void handleTransferDeadLetter(String message) {
          TransferMessage transferMessage = JSON.parseObject(message, TransferMessage.class);
          String transferId = transferMessage.getTransferId();
          String fromAccount = transferMessage.getFromAccount();
          BigDecimal amount = transferMessage.getAmount();
          // 回滚A银行余额
          accountMapper.restoreBalance(fromAccount, amount);
          // 更新转账记录状态为失败
          TransferRecord record = transferRecordMapper.selectByTransferId(transferId);
          if (record != null) {
              record.setStatus("FAIL");
              record.setFailReason("B银行处理失败,已回滚");
              transferRecordMapper.updateById(record);
          }
          // 发送告警短信给运维人员
          String alarmContent = "转账ID:" + transferId + " 进入死信队列,已回滚A银行余额 " + amount + ",请检查B银行系统";
          SmsUtils.sendSms("13800138000", alarmContent);
          System.out.println("转账失败已回滚,转账ID:" + transferId + ",告警已发送");
      }
    }
    
    // 2. B银行系统消费转账发起消息,执行本地事务
    @Component
    public class BBankTransferConsumer {
      @Autowired
      private BBankAccountMapper accountMapper;
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private BBankTransferRecordMapper transferRecordMapper;
    
      @RabbitListener(queues = "bbank_transfer_queue")
      public void handleTransferRequest(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
          TransferMessage transferMessage = JSON.parseObject(message, TransferMessage.class);
          String transferId = transferMessage.getTransferId();
          String toAccount = transferMessage.getToBBankAccount();
          BigDecimal amount = transferMessage.getAmount();
    
          try {
              // 1. 幂等性校验:判断是否已处理过该转账ID
              BBankTransferRecord record = transferRecordMapper.selectByTransferId(transferId);
              if (record != null && "SUCCESS".equals(record.getStatus())) {
                  // 已处理过,手动ACK
                  channel.basicAck(deliveryTag, false);
                  System.out.println("B银行已处理该转账,转账ID:" + transferId + ",忽略消息");
                  return;
              }
    
              // 2. 执行本地事务:增加B银行账户余额
              accountMapper.increaseBalance(toAccount, amount);
              // 3. 记录B银行转账记录
              if (record == null) {
                  record = new BBankTransferRecord();
                  record.setTransferId(transferId);
                  record.setToAccount(toAccount);
                  record.setAmount(amount);
                  record.setStatus("SUCCESS");
                  record.setCreateTime(new Date());
                  transferRecordMapper.insert(record);
              } else {
                  record.setStatus("SUCCESS");
                  transferRecordMapper.updateById(record);
              }
    
              // 4. 发送转账成功消息给A银行
              TransferSuccessMessage successMessage = new TransferSuccessMessage();
              successMessage.setTransferId(transferId);
              rabbitTemplate.convertAndSend("transfer_exchange", "transfer.success.to.abank", JSON.toJSONString(successMessage));
    
              // 5. 手动ACK确认消息消费成功
              channel.basicAck(deliveryTag, false);
              System.out.println("B银行转账处理成功,转账ID:" + transferId + ",已增加余额:" + amount);
          } catch (Exception e) {
              // 6. 处理失败,判断是否重试(重试5次后进入死信队列)
              Integer retryCount = (Integer) channel.getMessageProperties().getHeaders().get("retry-count");
              if (retryCount == null) retryCount = 0;
              if (retryCount < 5) {
                  // 重新入队,重试次数+1
                  channel.getMessageProperties().getHeaders().put("retry-count", retryCount + 1);
                  channel.basicNack(deliveryTag, false, true);
                  System.out.println("B银行转账处理失败,重试次数:" + (retryCount + 1) + ",转账ID:" + transferId);
              } else {
                  // 重试5次失败,进入死信队列
                  channel.basicNack(deliveryTag, false, false);
                  System.out.println("B银行转账处理失败,重试5次后进入死信队列,转账ID:" + transferId);
              }
              e.printStackTrace();
          }
      }
    }

架构优势

  • 最终一致性:通过MQ消息重试与死信处理,确保“扣钱”与“加钱”两个操作最终同时成功或同时失败,避免数据不一致;
  • 可靠性:消息不丢失,即使跨系统、跨网络,也能通过重试机制保障事务推进;
  • 低耦合:A银行与B银行仅通过MQ消息交互,无需感知对方系统细节,便于维护与扩展。

最后说一句

MQ的核心价值在于“解耦、异步、削峰”,但并非所有场景都适合用MQ——若业务逻辑简单、无跨系统交互、对实时性要求极高(如毫秒级响应),直接调用接口可能更高效。在实际项目中,需结合业务痛点、技术成本、维护难度综合判断是否使用MQ,以及选择哪种MQ(RabbitMQ/Kafka/RocketMQ)。希望以上10种场景能给大家带来启发,避免在项目中滥用或误用MQ。

民企艰难、外包泛滥、平台垄断:中国信息化行业怎么走成了现在这样?

曾几何时,中国的信息化行业,是民营企业最具上升通道、最能“拼技术吃饭”的少数赛道之一。靠着一根网线、一台服务器、几个做系统的工程师,一批民营IT公司白手起家,成就了从零到一的辉煌。而如今,一种新的势力正悄然改写这个行业的秩序。地方一家公司悄然中标“城市大脑”;另一家公司拿下“全域感知平台”;再一家拿走了“政务云”“医保数据治理”“交通一网统管”……你看过去的招投标记录,会发现一个个叫“数科”“数智”“信创”“数字城市”的新名字横扫政务信息化项目。而这些公司背后的共同标签只有一个:地方国资。信息化行业的逻辑和玩法,真的变了。

  1. 1995—2005:民营IT的黄金时代

故事的起点,是外资软硬件产品的引入。在信息化的萌芽期,一批最早接触IT的中国企业家,做起了国外产品的代理:IBM服务器、Oracle数据库、Cisco交换机、HP打印机、微软Windows。最开始只是“帮人装设备”,后来开始“帮人做系统”,于是系统集成应运而生。神州数码是联想拆分出来的代理龙头;金山、东软、中软、用友、金蝶,则迅速转型做软件开发与解决方案的佼佼者;细分行业应用领域如东华、南威、瑞星、明源、广联达、深信服……全国各地软件企业群星璀璨。那是一个“拼能力、拼项目、拼客户”的年代,那是一个努力就有回报的年代。很多现在耳熟能详的政务系统——金税、金财、金保、金农——背后都有这些民营或半民营公司参与的身影。只要你方案写得好、产品技术强、落地做得快,就能赢得客户。技术是王牌,服务是生命线。

  1. 2005—2010:科研院所的“市场化突围”

进入2000年代中期,原本做科研立项的“国家队”也开始下场市场化。中国电子科技集团(电科)、中国电子信息产业集团(中电)、航天科工等央企,陆续将旗下研究所“公司化”:从研究院、信息中心变成了“科技股份有限公司”“智能系统有限公司”。这些公司有一个天然优势——懂技术、懂标准、懂政策。他们一边承接国家级的信息工程(如金盾工程、国家电子政务平台),一边迅速布局本地化项目,从公安网到政府办公系统,从财政数据到人口档案。而民营企业在这些高安全等级、强监管要求的项目中逐渐被边缘,开始退居到做二包、三包,或者配合交付的位置。那是信息化的第一次“内卷”——能力还重要,但资质更重要了。

  1. 2010—2020:通信运营商主导信息化

真正大规模“统包政务项目”的,是三大运营商。彼时,电信、移动、联通正值3G转4G之际,网络设备升级带来大量收入,而“转型政企”成为高层的核心战略。他们有广覆盖的地市分公司、稳定的人才队伍、与政府打交道的经验,更重要的是——他们是“可信赖的合作方”。于是,政务系统开始走向运营商统包:你出平台,我出数据,服务都交给你们。各地电信政企部、政企云、系统集成公司迅速成长,一跃成为本地政府最常见的“总集成商”。但运营商不是技术公司,它们自己不会开发系统——于是再度分包、转包,民营IT企业依旧承担了大量“脏活累活”:定制开发、系统部署、上线培训、后期运维……只是,这一次,“谁干活谁挣钱”变成了“谁拿资源谁分利”。信息化项目越来越像“建筑工程”:运营商是“包工头”,IT产品和方案上都是“外包工人”。

  1. 2020至今:本地数科公司的登场

真正的剧变,发生在“数字中国”“智慧城市”战略全面推进之后。2020年后,几乎每一个省、地级市、甚至县区,都在成立自己的“数科公司”,名字可能是“智慧城市运营公司”“数字城市集团”“大数据发展公司”“信创科技集团”……它们的出身很简单:? ? 多数由国资委牵头设立;? ? 资金来自财政或平台基金;? ? 董事长、总经理是原来的机关干部或国企高管;? ? 有一张“官方背景的身份证”。这些公司承接的项目也很“高大上”:? ? 城市大脑、数据中台;? ? 一网统管、一网通办;? ? 区域医保云、教育智治平台;? ? 数字哨兵、交通AI中枢……而它们的能力呢?基本无技术、无团队、无产品,靠合作伙伴完成交付,靠“自家身份”稳拿项目。如果外地企业要拿到项目,还要求你跟本地的数科成立合资公司,全国的市场变得极度割裂。这就是今天的信息化生态:项目越来越大,链条越来越长,利润越来越薄,交付越来越急,而真正做事的民企,越来越苦。

  1. 企业的创新力遭系统性打击

当一个城市、一个省份的数字化项目都落到“自家数科公司”手里:? ? 本地有技术的民企,没有登场机会;? ? 外地的专业厂商,被区域保护排斥出局;? ? 垂直领域解决方案供应商,连项目入口都摸不到;? ? 项目交付被层层分包、压价、打散,形成“拼凑式交付”。这不仅让项目质量难以保障,更让曾经的“IT技术积累”彻底断层:? ? 做技术开发的企业无法贴近客户,就没有产品演化;? ? 没有项目主导权,就无法锻炼架构能力;? ? 没有利润空间,就无力招人留人搞研发。创新不是喊口号喊出来的,而是靠客户锤出来的。今天数科公司看起来拿了大量订单,但技术沉淀、产品能力几乎为零。项目验收后就“沉睡”,逐渐沦为角落里没人用、没人看、没人维护的数据“空壳”。

  1. 产业生态进入“形式化数字化”死循环

这是一种结构性扭曲:政府投入的是真金白银,交付上来的,是一堆“平台+大屏+可视化”的形式工程。? ? 数科公司靠资源吃饭,不懂业务、不做产品;? ? 真正干活的分包企业靠低价搏命,做完交付就走人;? ? 真正懂某个领域(如医保、政法、交通)的公司,连门都进不去。整个行业逐步变成“挂靠式创新 + 形式化交付”。而那条从技术出发、靠能力打天下的民营信息化成长路径,正被快速抹去。

  1. 时代的叹息:一个慢慢关上的窗口

回顾这段信息化的发展史,像极了一个窗口慢慢关闭的过程:? ? 最初,凭技术拿项目,民企靠创新起家;? ? 后来,靠资源拿项目,央企科研所转型为主;? ? 再后来,靠关系拿项目,运营商成为总包方;? ? 到今天,靠体制拿项目,地方国资封锁了最后的空间。这个行业最宝贵的东西——技术创新、人才成长、开放竞争,正被一层又一层的“身份”“通道”“行政授权”所取代。而本应推动全社会数字化转型的“数科集团”,却在资源的温床中长出了封闭、自利、不透明的影子。信息化这个词,本该是通向开放的钥匙,而不是权力的壳、垄断的门。

尾声:我们要的“数字中国”,不是这样我们并不反对国资参与数字建设。问题不在于身份,而在于规则是不是公平,竞争是不是开放,价值是不是靠能力决定。当“数科公司”成为信息化入口的唯一渠道,我们三十年积累的IT生态,正被悄悄重写。如果我们今天构建的,是一个数字版本的“计划经济”,那信息化三十年的积累将付诸东流,真正创新有技术的企业将再无动力,社会也将再无信心。“这些央国企数科公司90%会迅速消亡”,一位行业资深大佬说,“也终将沦为时代留下一纸空壳的笑话。”

一些心得,不定期更新[2025-11]

  1. 只要利益不发生冲突,别人说话一般不要去反驳。
  2. 人际交往的高段位技巧,热情大方,一问三不知。
  3. 你真正要做的事,连神明都不要讲,安静的做成功了再说,是以秘城,言以懈息。
  4. 努力克制自己的情绪,学会赞美和闭嘴。
  5. 给别人办事,一定要装的很难办,给领导送礼一定要装的很自然,给爱人花钱,一定要装得很痛快,请别人吃饭,一定要装得很大气,和人谈生意,一定要装得很成功,和人谈感情,一定要装得很纯洁,和人谈工作,一定要装得很专业,不能喝酒时,一定要装得很醉,不能赴约时,一定要装得很失落,不能帮忙时,一定要装的很难过。
  6. 做人一定要有城府,逢人只说三分话,不可全抛一片心。
  7. 你要是想看透人心,只需要落魄一次就够了。也为人在落魄的时候,身边的牛鬼蛇神都会露出真实面目。
  8. 德不配位,财不配身,怎么破,很简单,就是习惯它,升官了,静默半年别折腾,赚钱了,放手里半年不花,就行了。
  9. 处理问题,包括应急事件,狭路相逢和退一步海阔天空都不可取,通用的是沉默是金和让子弹飞,有奇效。
  10. 刷视频,多看看打架斗殴、车祸集锦、意外身亡的,危机意识和避险意识会潜移默化。
  11. 多读历史,人虽然会变得冷漠,但是能够提高生存能力,你死我活才是本质。

社会感悟

1、改命 选妻
2、成事 借势
3、立威 沉默
4、服众 果断
5、上位 贵人
6、守业 制衡
7、破局 舍弃
8、避祸 低调
9、谈判 底牌
10、止损 狠心
11、见识 远行
12、人脉 主动
13、机遇 胆识
14、平安 示弱
15、信任 靠谱
16、人心 让利
17、赚钱 信息差
18、藏锋 糊涂
19、人性 吃亏
20、长久 后路

买房要点

1、判断房主是否着急售卖 看挂牌周期、降价记录; #调研#客观情况#时机
2、不谈缺点,只说适合,表达想要; #输出#同理心
3、哭穷,借的、经济不景气; #输出#同情心
4、不说心理价位,只说越便宜越好; #输出#博弈
5、与人打交道,心理博弈,让别人舒服; #方法论

/* * @Author: your name * @Date: 2016-09-06 00:00:00 * @LastEditTime: 2020-03-17 18:29:35 * @LastEditors: Please set LastEditors * @Description: In User Settings Edit * @FilePath: \htdocs\usr\themes\default\footer.php */