Seata分布式事务失败通知

一、背景

在我们使用Seata作为分布式事务时,有些时候我们的分布式时候并不是每次都可以成功的,而对于这些失败的分布式事务就需要进行通知。这篇文章简单记录一下如何实现通知。

二、功能实现

  1. 此处模拟邮件通知,但是不真正发送邮件,只是简单记录一个日志。

三、注意事项

1、 假设我们的分布式事务回滚失败,在AT模式中是会锁定表记录数据的。后期需要获取这条记录的全局锁操作,都会失败。

举例:

假设存在如下数据表记录数据

账号 金额
zhangsan 100

zhangsan这条记录参与分布式事务。

  1. 在分布式事务中将 zhangsan 的金额修改成 90.
  2. 另外一个同事,直接操作数据库,将 zhangsan 的记录修改成 80. (此时是可以修改成功的,因为 Seata 是二阶段提交,在第一阶段结束后,会释放记录的本地锁,不释放记录的全局锁)
  3. 接下来继续别的分布式事务操作,但是发生了异常,此时分布式事务会回滚失败,因为 zhangsan 的金额变成了80,和之前的对应不上。
  4. 如果再次对 zhangsan 进行分布式事务操作或者需要获取全局锁的操作,那么都会失败。

四、实现步骤

1、编写一个类实现FailureHandler接口

FailureHandler 的全类名为:io.seata.tm.api.FailureHandler,它有一个默认的实现DefaultFailureHandlerImpl, 此处我们写一个类继承这个类。

package com.huan.seata.handler;

import io.seata.tm.api.DefaultFailureHandlerImpl;
import io.seata.tm.api.GlobalTransaction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * Seata 分布式事物失败的处理
 *
 * @author huan.fu 2021/10/8 - 下午1:51
 */
@Component("failureHandler")
@Slf4j
public class EmailSeataFailureHandler extends DefaultFailureHandlerImpl {
    
    @Override
    public void onBeginFailure(GlobalTransaction tx, Throwable cause) {
        super.onBeginFailure(tx, cause);
        log.warn("邮件通知:分布式事物出现异常:[onBeginFailure],xid:[{}]", tx.getXid());
    }
    
    @Override
    public void onCommitFailure(GlobalTransaction tx, Throwable cause) {
        super.onCommitFailure(tx, cause);
        log.warn("邮件通知:分布式事物出现异常:[onCommitFailure],xid:[{}]", tx.getXid());
    }
    
    @Override
    public void onRollbackFailure(GlobalTransaction tx, Throwable originalException) {
        super.onRollbackFailure(tx, originalException);
        log.warn("邮件通知:分布式事物出现异常:[onRollbackFailure],xid:[{}]", tx.getXid());
    }
    
    @Override
    public void onRollbackRetrying(GlobalTransaction tx, Throwable originalException) {
        super.onRollbackRetrying(tx, originalException);
        log.warn("邮件通知:分布式事物出现异常:[onRollbackRetrying],xid:[{}]", tx.getXid());
    }
}

2、加入到Spring中的BeanName的值

在测试的时候发现FailureHandler加入到Spring中的beanName必须是failureHandler,否则报错,是seata的自动配置中如下代码决定的。

BEAN_NAME_FAILURE_HANDLER 常量的值为 failureHandler


/**
 * 源码
 * The type Seata auto configuration
 */
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class SeataAutoConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

    @Bean(BEAN_NAME_FAILURE_HANDLER)
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }

    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

}

五、测试

访问curl -X GET http://localhost:50027/createOrder\?accountId\=1\&amount\=10\&hasException\=true,然后手动打断点,修改数据库数据,使数据不一致,然后代码跑出异常,此处回滚失败,因为undo_log表中的数据对不起来。

Seata分布式事务失败通知_第1张图片

六、完整代码

https://gitee.com/huan1993/spring-cloud-parent/tree/master/seata/seata-springboot-failure-handler

你可能感兴趣的