模板模式实现分布式锁实战

Posted by zhangtao on Thursday, January 4, 2024

前言

分布式锁相信大家都有用过,常见的分布式锁实现方式例如redis、zookeeper、数据库都可以实现,而我们代码中强引用这些分布式锁的代码,那么当我们以后想替换分布式锁的实现方式时,需要修改代码的成本会很高,于是我们需要借鉴一些设计模式思想来设计,下面我介绍下这三个分布式锁的实现逻辑以及我们项目中是怎么实现

实现方式

数据库实现

首先我们设计一张这样的表

CREATE TABLE `lock` (
  `key` varchar(128) C NOT NULL,
  `created` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `version` int(8) DEFAULT '1',
  PRIMARY KEY (`key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT = '分布式锁表';

首先方法开启事务,当我们需要对某块业务上锁时,确定加锁的颗粒度设置key的值,执行插入语句,这里我们可以设计的全面一点,对比其他的分布式锁实现方式,貌似还缺了点什么,比如这个锁不可重入,无法设置超时时间 其实这些我们也可以解决, 首先可重入方面,version字段记录重入的次数

insert into table (`key`) values (#key#) on duplicate key `version` total = `version` + 1

key是唯一索引,执行完这个语句后,会在这一行添加行锁。这时候后续有两种可能性,如果是当前线程A重复插入锁,那么会更新version字段;如果是其他线程B想要插入这个锁,那么由于A持有的行锁还未释放,所以B会阻塞

超时方面,我们可以设置数据库的innodb_lock_wait_timeout参数来设置超时时间,默认50s,等待时间超时这个时间则会报错

由于insert语句在RR隔离级别会生成间隙锁,在并发较高的情况下会产生死锁的情况,所以建议在RC情况下使用

Redis实现

如果我们自己设计redis实现加锁的话,我们第一个想到的就是setNx语法,它的作用就是当key不存在的情况下,将key 的值设置为 value,同时返回1,如果key已存在,则不设置value,并且返回0

如果还要加上超时时间,那么还需要执行expire语法

setnx lock true
expire lock 10

使用这两个语法,会产生一个问题,就是这两个语法不是原子性的,当执行一个语法后,系统报错了,那么超时时间就无法设置,这样子这个key就无法过期

基于这个问题,Redis官方将这两个指令组合在了一起,解决Redis分布式锁原子性操作的问题

SET key value [EX 过期时间] NX 将key 的值设置为 value,同时返回1,如果key已存在,则不设置value,并且返回0;同时加上超时时间,这是一个原子性操作

由此可见,我们自己实现需要踩很多坑,市面上有成熟的redis实现的分布式锁框架redission,我们直接用就可以了,它帮我们把坑都踩了一遍,不用重复造轮子了,简单讲解下它的原理

首先讲一下它的键值kv结构

  • key为锁的key
  • value为一个hash对象 {key:线程id,value:重入次数}

value为什么这么设计?我们要解决两个问题

1、删除锁时,A线程把B线程持有的锁给删除了?为了解决这个问题,我们需要在锁中记录线程id,删除时就可以判断锁是否是当前线程持有的,一致才可以删除

2、实现可重入的逻辑,所以需要在锁中记录重入次数,每次重入次数+1

不同于上面SET key value [EX 过期时间] NX方式,redission的加锁逻辑是通过一段lua脚本来实现的,redis的lua脚本可以实现原子性的操作,下图是lua脚本

image-20240105170729529

解锁逻辑也是一样的,由于需要判断当前线程才能执行删除,所以也需要通过lua脚本来实现删除的逻辑

当我们设置了过期时间后,如果我们的业务执行时间超过了设定的过期时间,那么锁会提前删除,就会出现各种各样的问题。所以redission实现了一个watch dog逻辑。它是一个后台线程,每10s检查一次,将锁的过期时间延长

redis我们通过都会设置高可用,最常见的方案就是主从或者哨兵,但是它保证了高可用的同时,无法保证高一致性。这样子当redis的主节点挂了,从节点还没有同步到主节点的数据,就变成了主节点,那么锁就会发生丢失

redission提供了RedLock算法,通过使用多个Redis实例,各个实例之间没有主从关系,相互独立,超过一半节点加锁成功才算获取到锁。不过这种算法也不是一个完美的算法,多个实例加锁效率低,同时也会衍生出一些其他问题

Zookeeper实现

Zookeeper有很多种节点种类,其中有一种节点种类叫做临时顺序节点,这个节点有两个特性,首先是当客户端向Zookeeper添加了这个节点后,如果之后客户端挂了,那么这个临时节点会被删除,不会一直存在,其次是这个节点是有序递增的。这些特性很适合用来做分布式锁

加锁就是在Zookeeper上添加临时顺序节点,判断是否是最小节点,如果是最小节点,则无需排队直接执行。如果不是最小的,则往后加一个顺序节点,并且向前一个节点添加一个watch监听,线程阻塞等待排队

当前一个节点删除时,当前节点监听到删除事件并唤醒线程。这样子第一个通知第二个,第二个通知第三个,这种击鼓传花的方式可以避免羊群效应

羊群效应就是前一个节点释放锁后,所有节点被唤醒,这样会给服务器带来巨大压力

哪种更好?

这个问题,我觉得得根据我们的现实情况做判定,当我们的系统只有数据库,又不想依赖其他的中间件,那我们使用数据库实现的方方式就可以了,但是性能会很差,容易出现瓶颈

Redis和Zookeeper性能都比数据库好,这两者相比较而言,Redis作为分布式锁大家使用的会多一些,主要原因我想应该是Zookeeper的cp特性导致单leader节点易出现瓶颈,而redis如果出现瓶颈后弹性伸缩(增加节点)会很方便,所以性能更高

踩坑点

之前我们在代码中加入分布式锁时,碰到过一个坑,就是明明加入了分布式锁,但是没有"锁住"代码。这里有几种原因,我就不一一展开了,我们之前踩过的一个坑就是我们的方法加了事务@Transactional注解,同时方法里面的逻辑加了redis分布式锁,类似下面的代码。

@Transactional
private void test() {
    // 加锁
    // 查询 id =1的记录
    // 更新 id=1的记录
    // 释放锁
}

这时候,当线程A进入了方法,首先进入事务,加锁,然后执行方法里面的逻辑,释放锁,但是事务还未释放。这时候线程B也进入了这个方法,锁因为已经释放了,就直接进入方法逻辑了,但是线程A的事务此时还没有提交,所以线程B查询的id=1的记录是不对的。

这个是踩坑了,解决方法有两种,一种是把事务的方法放在加锁的逻辑里面;另外一种就是释放锁的逻辑改成监听spring 事务提交的事件,实现事务完成后再释放锁,我们最后也是这么改的

@Transactional
private void test() {
    // 加锁
    // 查询 id =1的记录
    // 更新 id=1的记录
    // unlockAfterTransaction方法
}


private void unlockAfterTransaction(LockResult lockResult) {
  //事物完成后释放锁
  TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
      @Override
      public void afterCompletion(int status) {
          super.afterCompletion(status);
          distLockSservice.unlock(lockResult);
      }
  });
}

实战

这里再重审下为什么使用模板模式,因为我们使用分布式锁的实现有多种,如果我们在业务代码中直接依赖某一种分布式锁的话,那么后续我们想替换分布式锁的实现会很麻烦,所有有依赖的类都得替换,所以我们使用模版模式,把分布式锁的实现以及加锁、释放锁的逻辑放到公共代码中,我们业务类只需要实现自己的业务逻辑即可,无需关心分布式锁的相关逻辑,调用方法即可

我们在1.8之前使用模版模式会比较繁琐,我们需要准备一个抽象类,定义一些公共的逻辑,然后子类继承这个抽象类来自定义不同的逻辑。下面我以redission分布式锁的实战为例

public abstract class AbstractLockService {

    RedissonClient redissonClient;

    public void lock(List<String> keyNames, Long timeout,Object... o) throws InterruptedException {
        // 加锁
        RLock[] locks = new RLock[keyNames.size()];
        for(int i = 0; i < keyNames.size(); ++i) {
            locks[i] = this.redissonClient.getLock(keyNames.get(i));
        }
        RLock lock = this.redissonClient.getMultiLock(locks);
        boolean success = lock.tryLock(timeout, TimeUnit.MILLISECONDS);
        // 加锁成功,走业务逻辑
        if (success) {
            this.doBusiness(o);
        }
        // 释放锁
        unlockAfterTransaction(lock);
    }

    // 业务逻辑
    abstract Object doBusiness(Object... o);

    private void unlockAfterTransaction(RLock lock) {
        //事物完成后释放锁
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCompletion(int status) {
                super.afterCompletion(status);
                lock.unlock();
            }
        });
    }
}

当我们使用1.8以上的JDK时,针对模板模式,做了很多优化。我们可以将实现方法作为函数式方法传入模版中

public class LockFunctionService {

    RedissonClient redissonClient;

    public void lock(List<String> keyNames, Long timeout, ILockCallback lockCallback) throws InterruptedException {
        // 加锁
        RLock[] locks = new RLock[keyNames.size()];
        for(int i = 0; i < keyNames.size(); ++i) {
            locks[i] = this.redissonClient.getLock(keyNames.get(i));
        }
        RLock lock = this.redissonClient.getMultiLock(locks);
        boolean success = lock.tryLock(timeout, TimeUnit.MILLISECONDS);
        // 加锁成功,走业务逻辑
        if (success) {
            lockCallback.callback();
        }
        // 释放锁
        unlockAfterTransaction(lock);
    }


    private void unlockAfterTransaction(RLock lock) {
        //事物完成后释放锁
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCompletion(int status) {
                super.afterCompletion(status);
                lock.unlock();
            }
        });
    }

    
}

interface ILockCallback<T> {
    T callback();
}

当我们调用时,比之前方便多了

public static void main(String[] args) throws InterruptedException {
  LockFunctionService lockFunctionService = new LockFunctionService();
  List<String> keys = Lists.newArrayList("lock_1");
  // 需要加锁执行释放
  lockFunctionService.lock(keys, 5L, () -> {
    System.out.println("执行业务逻辑");
    return null;
  });
}