抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

分布式事务

  • 传统的应用都是单一数据库事务
  • 所有的业务表都在同一个数据库内
  • 数据库的事务可以很好地得到支持
  • 分布式系统中, 业务被分成多个数据库
  • 多个独立地数据库之间, 无法统一事务
  • 造成数据不一致的情况

CAP 原理

  • C: Consistent, 一致性. 具体指操作成功之后, 所有节点, 在同一时间, 看到的数据都是完全一致的. 一致性说的就是数据的一致性.
  • A: Availability, 可用性. 指服务一致可用, 在规定时间内完成响应.
  • P: Partition Tolerance, 分区容错性. 指分布式系统在遇到某节点或网络分区故障的时候, 仍然能够对外提供服务.

详解:

使用分布式系统, 就是为了在某个节点不可用的情况下, 整个服务对外还是可用的, 这正是满足P(分区容错性). 如果服务不满足P(分区容错性), 那么的系统也就不是分布式系统了, 所以, 在分布式系统中, P(分布容错性)总是成立的.

那么, A(可用性)和C(一致性)能不能同时满足

A和B是两个数据节点, A向B同步数据, 并且作为一个整体对外提供服务. 由于的系统保证了P(分区容错性), 那么A和B的同步, 允许出现故障. 接下来再保证A(可用性), 也就是说A和B同步出现问题时, 客户端还能够访问的系统, 那么客户端既可能访问A也可能访问B, 这时, A和B的数据是不一致的, 所以C(一致性)不能满足.

如果满足C(一致性), 也就是说客户端无论访问A还是访问B, 得到的结果都是一样的, 那么现在A和B的数据不一致, 需要等到A和B的数据一致以后, 也就是同步恢复以后, 才可对外提供服务. 这样虽然满足了C(一致性), 却不能满足A(可用性).
所以, 系统在满足P(分区容错性)的同时, 只能在A(可用性)和C(一致性)当中选择一个不能CAP同时满足. 的分布式系统只能是AP或者CP.

ACID 与 BASE 原理

在关系型数据库中, 最大的特点就是事务处理, 也就是ACID. ACID是事务处理的4个特性.

  • A : Atomicity 原子性, 事务中的操作, 要么都做, 要么都不做.
  • C : Consistency 一致性, 系统必须始终处在强一致状态下.
  • I : Isolation 隔离性, 一个事务执行不能被其他事务所干扰.
  • D : Durability 持久性, 一个已提交的事务对数据库中的改变是永久性的.

ACID强调的是强一致性, 要么全做, 要么全不做, 所有的用户看到的都是一致的数据. 传统的数据库都有ACID特性, 它们在CAP原理中, 保证的是CA. 但是在分布式系统大行其道的今天, 满足CA特性的系统很难生存下去. ACID也逐渐的向BASE转换.

BASE 是 Basically Available 基本可用, Soft-state 软状态, Eventually Consistent 最终一致 的缩写.

  • Basically Available:基本可用是指分布式系统在出现故障的时候, 允许损失部分可用性, 即保证核心可用. 电商大促时, 为了应对访问量激增, 部分用户可能会被引导到降级页面, 服务层也可能只提供降级服务. 这就是损失部分可用性的体现.
  • Soft State:软状态是指允许系统存在中间状态, 而该中间状态不会影响系统整体可用性. 分布式存储中一般一份数据至少会有两到三个副本, 允许不同节点间副本同步的延时就是软状态的体现. mysql replication的异步复制也是一种体现.
  • Eventual Consistency:最终一致性是指系统中的所有数据副本经过一定时间后, 最终能够达到一致的状态. 弱一致性和强一致性相反, 最终一致性是弱一致性的一种特殊情况. BASE模型是传统ACID模型的反面, 不同与ACID, BASE强调牺牲高一致性, 从而获得可用性, 数据允许在一段时间内的不一致, 只要保证最终一致就可以了.

在分布式事务的解决方案中, 它们都是依赖了ACID或者BASE的模型而实现的。基于XA协议的两阶段提交和事务补偿机制就是基于ACID实现的, 而基于本地消息表和基于MQ的最终一致方案都是通过BASE原理实现的。

基于XA协议的两阶段提交

  • XA是由X/Open组织提出的分布式事务规范
  • 由一个**事务管理器(TM)和多个资源管理器(RM)**组成
  • 提交分为两个阶段:prepare和commit
  • 保证了数据的强一致性
  • commit阶段出现问题, 事务出现不一致需要人工处理
  • 效率低下, 性能与本地事务相差10倍
  • MySql5.7及以上版本支持XA协议
  • MySql Connector/J 5.0 以上支持XA协议
  • Java系统中, 数据源采用Atomikos

Spring文档: [https://docs.spring.io/spring-boot/docs/current/reference/html/]

基于SpringBoot的XA协议配置

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    <version>2.4.1</version>
</dependency>

配置数据源, 也就是资源管理器(RM), 使用 Atomikos 作为数据源

package org.lgq.xademo.config;

import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;

import javax.sql.DataSource;
import java.io.IOException;

/**
 * @author DevLGQ
 * @version 1.0
 */
@Configuration
@MapperScan(value = "org.lgq.xademo.dao.db26", sqlSessionFactoryRef = "sqlSessionFactoryBean26")
public class ConfigDb26 {

    @Bean("db26")
    public DataSource db26() {
        MysqlXADataSource xaDataSource = new MysqlXADataSource();
        xaDataSource.setUser("root");
        xaDataSource.setPassword("lgq2020");
        xaDataSource.setUrl("jdbc:mysql://192.168.123.26:3306/xa_26?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&" +
                "allowPublicKeyRetrieval=true&useSSL=false");
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSource(xaDataSource);
        atomikosDataSourceBean.setUniqueResourceName("db26");
        return atomikosDataSourceBean;
    }

    @Bean("sqlSessionFactoryBean26")
    public SqlSessionFactoryBean sqlSessionFactoryBean26(@Qualifier("db26") DataSource dataSource) throws IOException {
        SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
        sqlSessionFactory.setDataSource(dataSource);
        ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
        sqlSessionFactory.setMapperLocations(resourceResolver.getResources("mybatis/db26/*.xml"));
        return sqlSessionFactory;
    }

}

配置事务管理器(TM)

package org.lgq.xademo.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.UserTransaction;


/**
 * 事务管理器
 *
 * @author DevLGQ
 * @version 1.0
 */
@Configuration
public class TransactionManager {

    @Bean("xaTransaction")
    public JtaTransactionManager jtaTransactionManager() {
        UserTransaction userTransaction = new UserTransactionImp();
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        return new JtaTransactionManager(userTransaction, userTransactionManager);
    }

}

使用时指定事务管理器

@Service
public class XAService {
    @Resource
    private XA26Mapper xa26Mapper;
    @Resource
    private XA128Mapper xa128Mapper;
    @Transactional(transactionManager = "xaTransaction")
    public void testXA(){
        XA26 xa26 = new XA26();
        xa26.setId(1);
        xa26.setName("lgq1");
        this.xa26Mapper.insert(xa26);

        XA128 xa128 = new XA128();
        xa128.setId(2);
        xa128.setName("lgq2");
        this.xa128Mapper.insert(xa128);
    }
}

MyCat 事务配置

配置 server.xml

<!-- 分布式事务开关, 0为不过滤分布式事务, 1为过滤分布式事务(如果分布式事务内只涉及全局表, 则不过滤), 2为不过滤分布式事务,但是记录分布式事务日志 -->
<property name="handleDistributedTransactions">0</property>
set autocommit = 0;
-- 开启XA事务
set xa=on;
insert into `user`(id, username) values (1, 'lgq1'),(2, 'lgq2');
commit;

Spring项目中直接使用@Transactional(rollbackFor = Exception.class)注解即可.

ShardingJdbc 事务配置

默认就支持, 直接使用注解 @Transactional 即可.

事务补偿机制

  • 针对每个操作, 都要注册一个与其对应的补偿(撤销)操作.
  • 在执行失败时, 调用补偿操作, 撤销之前的操作.

例子:

  • A给B转账, A和B在两家不同的银行
  • A账户减200元, B账户加200元
  • 两个操作要保证原子性, 要么都成功, 要么都失败
  • 转账的接口需要提供补偿机制
  • B在增加余额的过程中, 出现问题了, 要调用A的补偿接口
  • A之前的扣减操作, 得到了补偿, 进行撤销
  • 保证了A和B的帐是没有问题的
  • 注意补偿如果失败了要重试, 超过一定次数的话要人为处理

优点

  • 逻辑清晰, 流程简单

缺点

  • 数据一致性比XA还差, 可能出错的点比较多

TCC 属于应用层面的一种补偿方式, 需要程序实现大量代码。

事务补偿机制实现示例

核心逻辑

public class AccountService {
    @Resource
    private AccountAMapper accountAMapper;
    @Resource
    private AccountBMapper accountBMapper;

    @Transactional(transactionManager = "tm26")
    public void transferAccount() {
        AccountA accountA = this.accountAMapper.selectByPrimaryKey(1);
        accountA.setBalance(accountA.getBalance().subtract(new BigDecimal(200)));
        this.accountAMapper.updateByPrimaryKey(accountA);
        AccountB accountB = this.accountBMapper.selectByPrimaryKey(2);
        accountB.setBalance(accountB.getBalance().add(new BigDecimal(200)));
        this.accountBMapper.updateByPrimaryKey(accountB);
        try {
            int i = 1 / 0;
        } catch (Exception e) {
            e.printStackTrace();
            // 补偿
            AccountB accountb = this.accountBMapper.selectByPrimaryKey(2);
            accountb.setBalance(accountB.getBalance().subtract(new BigDecimal(200)));
            throw e;
        }
    }

}

具体查看工程 tcc-demo

基于本地消息表的最终一致方案

  • 采用BASE原理, 保证事务最终一致
  • 在一致性方法, 允许一段时间内不一致, 但最终会一致
  • 在实际的系统当中, 要根据具体情况, 判断是否采用
  • 基于本地消息表的方案中, 将本事务外操作, 记录在消息表中
  • 其他事务, 提供操作接口
  • 定时任务轮询本地消息表, 将未执行的消息发送给操作接口
  • 操作接口处理成功, 返回成功标识, 处理失败返回失败标识
  • 定时任务接到标识, 更新消息的状态
  • 定时任务按照一定的中周期反复执行
  • 对于屡次失败的消息, 可以设置最大失败次数
  • 超过最大失败次数的消息, 不再进行接口调用
  • 等待人工处理

优点

  • 避免了分布式事务, 实现了最终一致性

缺点

  • 要注意重试时的幂等性操作, 就是说每次再重试的时候, 不能进行重复的操作

数据库设计

192.168.123.26 是支付数据库, 192.168.123.128 是订单数据库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for payment_msg
-- ----------------------------
DROP TABLE IF EXISTS `payment_msg`;
CREATE TABLE `payment_msg`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `order_id` int(11) NOT NULL COMMENT '订单Id',
  `status` int(1) NOT NULL COMMENT '0: 未发送 1: 发送成功 2: 超过最大发送次数',
  `fail_count` int(1) NOT NULL COMMENT '失败次数, 最大5次',
  `create_time` datetime(0) NOT NULL COMMENT '创建时间',
  `create_user` int(11) NOT NULL COMMENT '创建人',
  `update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
  `update_user` int(11) NOT NULL COMMENT '更新人',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of payment_msg
-- ----------------------------

SET FOREIGN_KEY_CHECKS = 1;

支付接口

@Transactional(rollbackFor = Exception.class, transactionManager = "tm26")
public int pay(int userId, int orderId, BigDecimal amount) {
    // 支付操作
    AccountA accountA = this.accountAMapper.selectByPrimaryKey(userId);
    if (accountA == null) {
        return 1;
    }
    if (accountA.getBalance().compareTo(amount) < 0) {
        return 2;
    }
    accountA.setBalance(accountA.getBalance().subtract(amount));
    this.accountAMapper.updateByPrimaryKey(accountA);
    // 本地消息表
    PaymentMsg paymentMsg = new PaymentMsg();
    paymentMsg.setOrderId(orderId);
    paymentMsg.setStatus(0); // 未支付
    paymentMsg.setFailCount(0);
    paymentMsg.setCreateTime(new Date());
    paymentMsg.setCreateUser(userId);
    paymentMsg.setUpdateTime(new Date());
    paymentMsg.setUpdateUser(userId);
    this.paymentMsgMapper.insertSelective(paymentMsg);

    return 0;
}

订单回调接口

/**
 * 订单回调接口,修改订单状态
 *
 * @param orderId
 * @return 1: 订单不存在  0: 成功
 */
public int handleOrder(int orderId) {
    Order order = this.orderMapper.selectByPrimaryKey(orderId);
    if (order == null) {
        return 1;
    }
    // 更新订单支付状态
    order.setOrderStatus(1);
    order.setUpdateTime(new Date());
    order.setUpdateUser(0);
    this.orderMapper.updateByPrimaryKey(order);
    return 0;
}

定时任务,查询订单状态,根据状态修改本地消息表

@Scheduled(cron = "0/10 * * * * ?")
public void orderNotify() {
    log.info("定时任务执行中....");
    // 获取本地消息表
    PaymentMsgExample paymentMsgExample = new PaymentMsgExample();
    paymentMsgExample.createCriteria().andStatusEqualTo(0);
    List<PaymentMsg> paymentMsgs = this.paymentMsgMapper.selectByExample(paymentMsgExample);
    if (paymentMsgs == null || paymentMsgs.isEmpty()) {
        return;
    }
    CloseableHttpClient httpClient = HttpClientBuilder.create().build();
    paymentMsgs.forEach(paymentMsg -> {
        Integer orderId = paymentMsg.getOrderId();
        log.info("获取到订单id:{}", orderId);
        HttpGet getRequest = new HttpGet("http://localhost:8080/order?orderId=" + orderId);
        try (httpClient) {
            CloseableHttpResponse response = httpClient.execute(getRequest);
            String responseStr = EntityUtils.toString(response.getEntity());
            if ("success".equals(responseStr)) {
                // 设置为支付成功
                paymentMsg.setStatus(1);
            } else {
                Integer failCount = paymentMsg.getFailCount();
                failCount++;
                if (failCount > 5) {
                    paymentMsg.setStatus(2);
                }
                paymentMsg.setFailCount(failCount);
            }
            paymentMsg.setUpdateUser(0);
            paymentMsg.setUpdateTime(new Date());
            // 更新本地消息表
            paymentMsgMapper.updateByPrimaryKey(paymentMsg);
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
}

具体实现查看工程local-msg-demo

基于MQ的最终一致方案

原理、流程和本地消息表相似

不同点:

  • 本地消息表改为MQ
  • 定时任务改为消费者

优点

  • 不依赖定时任务, 基于MQ更高效, 更可靠
  • 适合于公司内的系统
  • 不同公司之间无法基于MQ, 本地消息表更适合

RocketMQ 安装

# 下载二进制包
wget https://mirror.bit.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
unzip rocketmq-all-4.8.0-bin-release.zip
mv rocketmq-all-4.8.0-bin-release /usr/local/rocketmq-4.8.0

# 下载源码编译安装
wget https://mirror.bit.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip
unzip rocketmq-all-4.8.0-source-release.zip
cd rocketmq-all-4.8.0
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0

# 启动 nameserver
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

# 启动 broker -n 指定nameserver地址和端口
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

# 一些报错问题
mkdir /root/store/consumequeue -p
mkdir /root/store/commitlog -p

# 测试发送消息
cd /bin
export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
# 测试接收消息
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

实现Demo

支付接口

@Transactional(rollbackFor = Exception.class, transactionManager = "tm26")
public int pay(int userId, int orderId, BigDecimal amount) throws Exception {
    AccountA accountA = this.accountAMapper.selectByPrimaryKey(userId);
    if (accountA == null) {
        return 1;
    }
    if (accountA.getBalance().compareTo(amount) < 0) {
        return 2;
    }
    accountA.setBalance(accountA.getBalance().subtract(amount));
    this.accountAMapper.updateByPrimaryKey(accountA);
    Message message = new Message();
    message.setTopic(RocketMQCfg.PAYMENT);
    message.setKeys(orderId + "");
    message.setBody("订单已支付".getBytes(StandardCharsets.UTF_8));
    try {
        // 发送消息到消息队列
        SendResult result = this.producer.send(message);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            return 0;
        } else {
            throw new Exception("消息发送失败");
        }
    } catch (Exception e) {
        e.printStackTrace();
        throw e;
    }
}

消费者服务,修改订单状态

@Component("messageListener")
public class ChangeOrderStatus implements MessageListenerConcurrently {

    private static final Logger log = LoggerFactory.getLogger(ChangeOrderStatus.class);
    @Resource
    private OrderMapper orderMapper;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        if (msgs == null || msgs.isEmpty()) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        for (MessageExt msg : msgs) {
            String orderId = msg.getKeys();
            String orderMsg = new String(msg.getBody());
            log.info("order message: {}", orderMsg);
            Order order = this.orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));
            if (order == null) {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            try {
                order.setOrderStatus(1);
                order.setUpdateTime(new Date());
                order.setUpdateUser(0);
                this.orderMapper.updateByPrimaryKey(order);
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

具体查看项目rocketmq-demo

评论