事务消息方案
- 在事务消息方案中(如 RocketMQ 事务消息),消息的发送和数据库的本地事务是原子操作,它们在同一个事务内完成。
- 具体来说,事务消息的工作流程是:
- 生产者首先向消息队列发送一个半消息,这不会立即对消费者可见。
- 生产者开始执行本地事务(如数据库写入操作)。
- 如果本地事务成功提交,生产者会向消息队列发送确认消息,使半消息变为可消费状态;如果本地事务失败,则发送回滚指令,删除半消息。
- 不存在消息发送失败的情况:在这种模式下,由于消息和数据库事务在同一个事务上下文中,确保了消息发送与数据写入的原子性。要么两者都成功,要么两者都失败。因此,在事务消息方案中,不会出现数据库存储成功但消息发送失败的情况。
在 RocketMQ 事务消息方案中,事务消息和本地事务通过一个事务上下文实现原子性。以下是一个基于 RocketMQ 实现的事务消息代码示例:
1. 添加 RocketMQ 依赖
xmlCopy code<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version> <!-- 选择适合的版本 -->
</dependency>
2. 配置 RocketMQ 事务生产者
javaCopy codeimport org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.LocalTransactionState;
public class RocketMQTransactionProducer {
public static void main(String[] args) throws Exception {
// 1. 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876"); // 替换为 RocketMQ NameServer 地址
// 2. 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
// 处理本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 3.1 执行本地事务逻辑,如数据库写入操作
boolean transactionResult = executeLocalDBTransaction();
// 3.2 根据本地事务执行结果决定消息状态
if (transactionResult) {
// 事务成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 事务失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
// 本地事务未知状态
return LocalTransactionState.UNKNOW;
}
}
// RocketMQ 事务回查,确保事务一致性
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 4. 根据业务唯一标识检查本地事务状态(如查询数据库状态)
boolean transactionStatus = checkLocalTransactionStatus(msg);
if (transactionStatus) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
// 5. 启动生产者
producer.start();
// 6. 发送事务消息
try {
Message msg = new Message("TopicTest", "TagA", "orderID123", "order data".getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", result);
} catch (Exception e) {
e.printStackTrace();
}
// 7. 保持应用运行
Thread.sleep(10000);
producer.shutdown();
}
// 模拟本地事务执行
private static boolean executeLocalDBTransaction() {
// 例如:在数据库中插入订单数据
return true; // 返回 true 表示事务成功;false 表示失败
}
// 模拟事务状态检查
private static boolean checkLocalTransactionStatus(MessageExt msg) {
// 根据 msg 的唯一标识检查本地事务状态
return true; // 返回 true 表示事务已提交;false 表示未提交或回滚
}
}
代码解析
- 创建事务生产者(TransactionMQProducer):
- 创建
TransactionMQProducer
对象,用于发送事务消息。 - 设置 NameServer 地址,让生产者能与 RocketMQ 服务端进行通信。
- 创建
- 设置事务监听器(TransactionListener):
- executeLocalTransaction:在事务消息发送后调用该方法。
- 这里执行本地业务事务操作(如数据库写入)。
- 根据本地事务的执行结果返回不同的事务状态:
COMMIT_MESSAGE
、ROLLBACK_MESSAGE
或UNKNOW
。
- checkLocalTransaction:当 RocketMQ 未收到生产者的确认状态时,会进行事务回查。
- 根据消息的唯一标识(如订单号)查询本地事务状态,决定是否提交或回滚消息。
- executeLocalTransaction:在事务消息发送后调用该方法。
- 发送事务消息(sendMessageInTransaction):
- 调用
sendMessageInTransaction
方法发送半消息,启动事务处理。 - 半消息不会立即对消费者可见,只有在本地事务成功提交后,消息才会被标记为可消费状态。
- 调用
- 事务状态的确认和补偿:
- 如果本地事务执行成功,发送确认消息。
- 如果本地事务失败,发送回滚消息。
- 如果 RocketMQ 未在指定时间内收到确认,将通过回查接口检查事务状态,以确保最终一致性。
优缺点
- 优点:
- 通过事务消息机制,将消息发送和数据库操作绑定在一个原子事务中,实现消息与数据库的一致性。
- RocketMQ 提供了事务回查机制,确保在网络分区或生产者故障时仍能保证一致性。
- 缺点:
- 实现相对复杂,需处理回查、幂等性、网络超时等问题。
- 存在延迟:事务回查和确认的延迟可能会影响消息的及时性。
适用场景
- 金融场景:如支付、转账、订单等涉及高一致性的场景。
- 跨服务事务:如跨多个微服务的订单和库存操作,通过事务消息确保最终一致性。
4o
You said:
如果boolean transactionResult = executeLocalDBTransaction();这一行执行完后,服务被 kill,此时会怎样?数据库写入了吗?消息肯定没发送出去吧
ChatGPT said:
ChatGPT
在事务消息方案中,boolean transactionResult = executeLocalDBTransaction();
这一行代表本地数据库事务的执行。假设此行执行完后,服务被 kill,这将影响分布式事务的一致性。以下是可能的情况:
1. 数据库写入完成了
- 在执行
executeLocalDBTransaction()
这行代码时,如果服务在完成本地事务的提交操作(如commit
)后立即被 kill,则数据库写入是成功的。这意味着订单或其他业务数据已经在数据库中完成了提交。
2. 消息没发送出去
- 在 RocketMQ 的事务消息机制中,只有在本地事务成功后,生产者才会发送确认消息,将半消息转为可消费状态。
- 如果服务在完成本地事务后被 kill,还没来得及发送确认消息,此时半消息仍处于未确认状态。RocketMQ 的默认处理逻辑是等待一段时间后调用回查接口,以检查本地事务的实际状态。
3. 事务回查机制的作用
- 由于生产者未发送确认消息,RocketMQ 会调用生产者的回查接口(
checkLocalTransaction()
),以确认本地事务的状态。 - 回查接口的实现通常是通过查询数据库状态来确定事务是否成功。
- 如果数据库写入成功:回查接口会返回
COMMIT_MESSAGE
,RocketMQ 会将半消息转为可消费状态,消费者可以正常处理消息。 - 如果数据库写入失败或状态未知:回查接口会返回
ROLLBACK_MESSAGE
,RocketMQ 会回滚半消息,不会对消费者可见。
- 如果数据库写入成功:回查接口会返回
总结
- 数据库写入已完成:如果
executeLocalDBTransaction()
在服务被 kill 之前完成了本地事务提交,则数据写入成功。 - 消息未发送:由于服务被 kill,确认消息未能及时发送,RocketMQ 将依赖回查机制来确保一致性。
- RocketMQ 的回查机制:确保在服务异常时,通过数据库状态检查,决定半消息的最终状态,保证消息和数据库的一致性。
方案的关键点
- 回查机制是事务消息方案中最重要的一部分,它确保即便生产者服务在本地事务完成后崩溃,也能通过查询数据库状态来确认消息的最终状态,确保分布式事务的一致性。
- 延迟一致性:由于回查机制存在延迟,在极端情况下会出现短暂的不一致,但最终会恢复到一致状态。
实际应用中的注意事项
- 回查实现需要保证幂等性:确保重复调用回查不会引发副作用。
- 回查策略需合理设置:根据业务需要调整回查时间和频率,以尽量减少一致性延迟。
这种设计是事务消息方案的核心逻辑,通过引入回查机制解决了服务被 kill 后导致的分布式事务不一致问题。