RocketMQ 事务消息

zhidiantech · · 29 次点击 · · 开始浏览    
```java import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.common.message.*; import java.util.concurrent.*; public class TransactionProducer { public static void main(String[] args) throws Exception { // 1. 创建事务消息生产者(注意与普通生产者的区别) TransactionMQProducer producer = new TransactionMQProducer("Transaction_Producer_Group"); producer.setNamesrvAddr("localhost:9876"); // 设置NameServer地址 // 2. 配置事务监听器(核心组件) TransactionListener transactionListener = new TransactionListenerImpl(); producer.setTransactionListener(transactionListener); // 3. 配置线程池处理异步回调 ExecutorService executorService = new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> { Thread thread = new Thread(r); thread.setName("transaction-check-thread"); return thread; } ); producer.setExecutorService(executorService); // 4. 启动生产者 producer.start(); // 5. 构造事务消息(以订单支付场景为例) Message msg = new Message( "PAYMENT_TRANSACTION_TOPIC", // 事务主题 "PAY_SUCCESS_TAG", // 消息标签 "ORDER_123456".getBytes() // 业务主键 ); msg.putUserProperty("amount", "500.00"); // 自定义属性 // 6. 发送事务消息(关键方法) TransactionSendResult result = producer.sendMessageInTransaction(msg, "ORDER_123456"); System.out.println("事务消息发送结果:" + result.getSendStatus()); // 7. 关闭生产者(生产环境需长期运行) producer.shutdown(); } } // 事务监听器实现类(核心业务逻辑) class TransactionListenerImpl implements TransactionListener { // 本地事务执行(与消息发送在同一个事务中) @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 模拟本地事务(如更新订单状态) String orderId = new String(msg.getBody()); System.out.println("执行本地事务,订单ID:" + orderId); // 真实场景应在此处执行数据库操作 boolean success = updateOrderStatus(orderId); return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; } catch (Exception e) { return LocalTransactionState.UNKNOW; // 触发事务回查 } } // 事务状态回查(Broker主动发起) @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = new String(msg.getBody()); System.out.println("回查事务状态,订单ID:" + orderId); // 查询数据库确认事务状态 boolean isCompleted = checkOrderStatus(orderId); return isCompleted ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; } // 模拟数据库操作 private boolean updateOrderStatus(String orderId) { // 实际应替换为真实的DAO操作 return Math.random() > 0.3; // 70%成功率模拟 } private boolean checkOrderStatus(String orderId) { // 实际应查询数据库记录 return true; // 模拟事务已提交 } } ``` #### 关键机制说明: 1. **两阶段提交**: • **预提交阶段**:发送半消息(Half Message)到Broker,此时消息对消费者不可见 • **事务确认阶段**:根据本地事务结果提交(COMMIT)或回滚(ROLLBACK)消息 2. **事务回查机制**: • 当Broker未收到二次确认时,会通过`checkLocalTransaction`方法主动查询事务状态 • 默认回查间隔为60秒,最大回查次数15次(可通过配置调整) 3. **注意事项**: • 必须使用`TransactionMQProducer`而非普通生产者 • 消息主题需提前创建且类型为TRANSACTION(5.x+版本要求) • 确保本地事务操作的幂等性(防止网络重试导致重复执行) #### 典型应用场景: • 电商支付链路:保证订单状态更新与支付消息发送的原子性 • 库存扣减:确保商品库存扣减与物流通知的最终一致性 完整的事务消息处理流程可参考官方文档,建议配合死信队列(DLQ)和监控告警系统构建高可靠方案。
29 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传