```java
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.*;
import java.util.concurrent.*;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 1. 创建事务消息生产者(注意与普通生产者的区别)
TransactionMQProducer producer = new TransactionMQProducer("Transaction_Producer_Group");
producer.setNamesrvAddr("localhost:9876"); // 设置NameServer地址
// 2. 配置事务监听器(核心组件)
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
// 3. 配置线程池处理异步回调
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
r -> {
Thread thread = new Thread(r);
thread.setName("transaction-check-thread");
return thread;
}
);
producer.setExecutorService(executorService);
// 4. 启动生产者
producer.start();
// 5. 构造事务消息(以订单支付场景为例)
Message msg = new Message(
"PAYMENT_TRANSACTION_TOPIC", // 事务主题
"PAY_SUCCESS_TAG", // 消息标签
"ORDER_123456".getBytes() // 业务主键
);
msg.putUserProperty("amount", "500.00"); // 自定义属性
// 6. 发送事务消息(关键方法)
TransactionSendResult result = producer.sendMessageInTransaction(msg, "ORDER_123456");
System.out.println("事务消息发送结果:" + result.getSendStatus());
// 7. 关闭生产者(生产环境需长期运行)
producer.shutdown();
}
}
// 事务监听器实现类(核心业务逻辑)
class TransactionListenerImpl implements TransactionListener {
// 本地事务执行(与消息发送在同一个事务中)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 模拟本地事务(如更新订单状态)
String orderId = new String(msg.getBody());
System.out.println("执行本地事务,订单ID:" + orderId);
// 真实场景应在此处执行数据库操作
boolean success = updateOrderStatus(orderId);
return success ?
LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.UNKNOW; // 触发事务回查
}
}
// 事务状态回查(Broker主动发起)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = new String(msg.getBody());
System.out.println("回查事务状态,订单ID:" + orderId);
// 查询数据库确认事务状态
boolean isCompleted = checkOrderStatus(orderId);
return isCompleted ?
LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
}
// 模拟数据库操作
private boolean updateOrderStatus(String orderId) {
// 实际应替换为真实的DAO操作
return Math.random() > 0.3; // 70%成功率模拟
}
private boolean checkOrderStatus(String orderId) {
// 实际应查询数据库记录
return true; // 模拟事务已提交
}
}
```
#### 关键机制说明:
1. **两阶段提交**:
• **预提交阶段**:发送半消息(Half Message)到Broker,此时消息对消费者不可见
• **事务确认阶段**:根据本地事务结果提交(COMMIT)或回滚(ROLLBACK)消息
2. **事务回查机制**:
• 当Broker未收到二次确认时,会通过`checkLocalTransaction`方法主动查询事务状态
• 默认回查间隔为60秒,最大回查次数15次(可通过配置调整)
3. **注意事项**:
• 必须使用`TransactionMQProducer`而非普通生产者
• 消息主题需提前创建且类型为TRANSACTION(5.x+版本要求)
• 确保本地事务操作的幂等性(防止网络重试导致重复执行)
#### 典型应用场景:
• 电商支付链路:保证订单状态更新与支付消息发送的原子性
• 库存扣减:确保商品库存扣减与物流通知的最终一致性
完整的事务消息处理流程可参考官方文档,建议配合死信队列(DLQ)和监控告警系统构建高可靠方案。
- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码`
- 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传