事务消息

事务消息方案

  • 在事务消息方案中(如 RocketMQ 事务消息),消息的发送和数据库的本地事务是原子操作,它们在同一个事务内完成。
  • 具体来说,事务消息的工作流程是:
    1. 生产者首先向消息队列发送一个半消息,这不会立即对消费者可见。
    2. 生产者开始执行本地事务(如数据库写入操作)。
    3. 如果本地事务成功提交,生产者会向消息队列发送确认消息,使半消息变为可消费状态;如果本地事务失败,则发送回滚指令,删除半消息。
  • 不存在消息发送失败的情况:在这种模式下,由于消息和数据库事务在同一个事务上下文中,确保了消息发送与数据写入的原子性。要么两者都成功,要么两者都失败。因此,在事务消息方案中,不会出现数据库存储成功但消息发送失败的情况。

RocketMQ 事务消息方案中,事务消息和本地事务通过一个事务上下文实现原子性。以下是一个基于 RocketMQ 实现的事务消息代码示例:

1. 添加 RocketMQ 依赖

xmlCopy code<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.2</version> <!-- 选择适合的版本 -->
</dependency>

2. 配置 RocketMQ 事务生产者

javaCopy codeimport org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.LocalTransactionState;

public class RocketMQTransactionProducer {

    public static void main(String[] args) throws Exception {
        // 1. 创建事务消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876"); // 替换为 RocketMQ NameServer 地址

        // 2. 设置事务监听器
        producer.setTransactionListener(new TransactionListener() {

            // 处理本地事务
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                try {
                    // 3.1 执行本地事务逻辑,如数据库写入操作
                    boolean transactionResult = executeLocalDBTransaction();
                    
                    // 3.2 根据本地事务执行结果决定消息状态
                    if (transactionResult) {
                        // 事务成功,提交消息
                        return LocalTransactionState.COMMIT_MESSAGE;
                    } else {
                        // 事务失败,回滚消息
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                } catch (Exception e) {
                    // 本地事务未知状态
                    return LocalTransactionState.UNKNOW;
                }
            }

            // RocketMQ 事务回查,确保事务一致性
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 4. 根据业务唯一标识检查本地事务状态(如查询数据库状态)
                boolean transactionStatus = checkLocalTransactionStatus(msg);
                if (transactionStatus) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
        });

        // 5. 启动生产者
        producer.start();

        // 6. 发送事务消息
        try {
            Message msg = new Message("TopicTest", "TagA", "orderID123", "order data".getBytes());
            TransactionSendResult result = producer.sendMessageInTransaction(msg, null);

            System.out.printf("%s%n", result);
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 7. 保持应用运行
        Thread.sleep(10000);
        producer.shutdown();
    }

    // 模拟本地事务执行
    private static boolean executeLocalDBTransaction() {
        // 例如:在数据库中插入订单数据
        return true; // 返回 true 表示事务成功;false 表示失败
    }

    // 模拟事务状态检查
    private static boolean checkLocalTransactionStatus(MessageExt msg) {
        // 根据 msg 的唯一标识检查本地事务状态
        return true; // 返回 true 表示事务已提交;false 表示未提交或回滚
    }
}

代码解析

  1. 创建事务生产者(TransactionMQProducer)
    • 创建 TransactionMQProducer 对象,用于发送事务消息。
    • 设置 NameServer 地址,让生产者能与 RocketMQ 服务端进行通信。
  2. 设置事务监听器(TransactionListener)
    • executeLocalTransaction:在事务消息发送后调用该方法。
      • 这里执行本地业务事务操作(如数据库写入)。
      • 根据本地事务的执行结果返回不同的事务状态:COMMIT_MESSAGEROLLBACK_MESSAGEUNKNOW
    • checkLocalTransaction:当 RocketMQ 未收到生产者的确认状态时,会进行事务回查。
      • 根据消息的唯一标识(如订单号)查询本地事务状态,决定是否提交或回滚消息。
  3. 发送事务消息(sendMessageInTransaction)
    • 调用 sendMessageInTransaction 方法发送半消息,启动事务处理。
    • 半消息不会立即对消费者可见,只有在本地事务成功提交后,消息才会被标记为可消费状态。
  4. 事务状态的确认和补偿
    • 如果本地事务执行成功,发送确认消息。
    • 如果本地事务失败,发送回滚消息。
    • 如果 RocketMQ 未在指定时间内收到确认,将通过回查接口检查事务状态,以确保最终一致性。

优缺点

  • 优点
    • 通过事务消息机制,将消息发送和数据库操作绑定在一个原子事务中,实现消息与数据库的一致性。
    • RocketMQ 提供了事务回查机制,确保在网络分区或生产者故障时仍能保证一致性。
  • 缺点
    • 实现相对复杂,需处理回查、幂等性、网络超时等问题。
    • 存在延迟:事务回查和确认的延迟可能会影响消息的及时性。

适用场景

  • 金融场景:如支付、转账、订单等涉及高一致性的场景。
  • 跨服务事务:如跨多个微服务的订单和库存操作,通过事务消息确保最终一致性。

4o

You said:

如果boolean transactionResult = executeLocalDBTransaction();这一行执行完后,服务被 kill,此时会怎样?数据库写入了吗?消息肯定没发送出去吧

ChatGPT said:

ChatGPT

在事务消息方案中,boolean transactionResult = executeLocalDBTransaction(); 这一行代表本地数据库事务的执行。假设此行执行完后,服务被 kill,这将影响分布式事务的一致性。以下是可能的情况:

1. 数据库写入完成了

  • 在执行 executeLocalDBTransaction() 这行代码时,如果服务在完成本地事务的提交操作(如 commit)后立即被 kill,则数据库写入是成功的。这意味着订单或其他业务数据已经在数据库中完成了提交。

2. 消息没发送出去

  • 在 RocketMQ 的事务消息机制中,只有在本地事务成功后,生产者才会发送确认消息,将半消息转为可消费状态。
  • 如果服务在完成本地事务后被 kill,还没来得及发送确认消息,此时半消息仍处于未确认状态。RocketMQ 的默认处理逻辑是等待一段时间后调用回查接口,以检查本地事务的实际状态。

3. 事务回查机制的作用

  • 由于生产者未发送确认消息,RocketMQ 会调用生产者的回查接口checkLocalTransaction()),以确认本地事务的状态。
  • 回查接口的实现通常是通过查询数据库状态来确定事务是否成功。
    • 如果数据库写入成功:回查接口会返回 COMMIT_MESSAGE,RocketMQ 会将半消息转为可消费状态,消费者可以正常处理消息。
    • 如果数据库写入失败或状态未知:回查接口会返回 ROLLBACK_MESSAGE,RocketMQ 会回滚半消息,不会对消费者可见。

总结

  1. 数据库写入已完成:如果 executeLocalDBTransaction() 在服务被 kill 之前完成了本地事务提交,则数据写入成功。
  2. 消息未发送:由于服务被 kill,确认消息未能及时发送,RocketMQ 将依赖回查机制来确保一致性。
  3. RocketMQ 的回查机制:确保在服务异常时,通过数据库状态检查,决定半消息的最终状态,保证消息和数据库的一致性。

方案的关键点

  • 回查机制是事务消息方案中最重要的一部分,它确保即便生产者服务在本地事务完成后崩溃,也能通过查询数据库状态来确认消息的最终状态,确保分布式事务的一致性。
  • 延迟一致性:由于回查机制存在延迟,在极端情况下会出现短暂的不一致,但最终会恢复到一致状态。

实际应用中的注意事项

  • 回查实现需要保证幂等性:确保重复调用回查不会引发副作用。
  • 回查策略需合理设置:根据业务需要调整回查时间和频率,以尽量减少一致性延迟。

这种设计是事务消息方案的核心逻辑,通过引入回查机制解决了服务被 kill 后导致的分布式事务不一致问题。

0 0 投票数
Article Rating
订阅评论
提醒
guest
0 评论
最旧
最新 最多投票
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x