Redis分布式锁的实现方式

一、分布式锁是什么

分布式锁是 满足分布式系统或集群模式下多进程可见并且互斥的锁。

基于Redis实现分布式锁:

1、获取锁

  • 互斥:确保只能有一个线程获取锁;
  • 非阻塞:尝试获取锁,成功返回true,失败返回false;

添加锁过期时间,避免服务宕机引起死锁。

SET lock thread1 NX EX 10

2、释放锁

  • 手动释放;DEL key1
  • 超时释放,获取锁时添加一个超时锁;

二、代码实例

  1. package com.guor.utils;
  2.  
  3. import org.springframework.data.redis.core.StringRedisTemplate;
  4.  
  5. import Java.util.concurrent.TimeUnit;
  6.  
  7. public class RedisLock implements ILock{
  8.  
  9.      private String name;
  10.      private StringRedisTemplate stringRedisTemplate;
  11.  
  12.      public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
  13.          this.name = name;
  14.          this.stringRedisTemplate = stringRedisTemplate;
  15.      }
  16.  
  17.      private static final String KEY_PREFIX = “lock:”;
  18.  
  19.      @Override
  20.      public boolean tryLock(long timeout) {
  21.          // 获取线程唯一标识
  22.          long threadId = Thread.currentThread().getId();
  23.          // 获取锁
  24.          Boolean success = stringRedisTemplate.opsForValue()
  25.                  .setIfAbsent(KEY_PREFIX + name, threadId+””, timeout, TimeUnit.SECONDS);
  26.          // 防止拆箱的空指针异常
  27.          return Boolean.TRUE.equals(success);
  28.      }
  29.  
  30.      @Override
  31.      public void unlock() {
  32.          stringRedisTemplate.delete(KEY_PREFIX + name);
  33.      }
  34. }

上面代码存在锁误删问题:

  1. 如果线程1获取锁,但线程1发生了阻塞,导致Redis超时释放锁;
  2. 此时,线程2尝试获取锁,成功,并执行业务;
  3. 此时,线程1重新开始执行任务,并执行完毕,执行释放锁(即删除锁);
  4. 但是,线程1删除的锁,和线程2的锁是同一把锁,这就是分布式锁误删问题

在释放锁时,释放线程自己的分布式锁,就可以解决这个问题。

  1. package com.guor.utils;
  2.  
  3. import cn.hutool.core.lang.UUID;
  4. import org.springframework.data.redis.core.StringRedisTemplate;
  5.  
  6. import java.util.concurrent.TimeUnit;
  7.  
  8. public class RedisLock implements ILock{
  9.  
  10.      private String name;
  11.      private StringRedisTemplate stringRedisTemplate;
  12.  
  13.      public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
  14.          this.name = name;
  15.          this.stringRedisTemplate = stringRedisTemplate;
  16.      }
  17.  
  18.      private static final String KEY_PREFIX = “lock:”;
  19.      private static final String UUID_PREFIX = UUID.randomUUID().toString(true) + “-“;
  20.  
  21.      @Override
  22.      public boolean tryLock(long timeout) {
  23.          // 获取线程唯一标识
  24.          String threadId = UUID_PREFIX + Thread.currentThread().getId();
  25.          // 获取锁
  26.          Boolean success = stringRedisTemplate.opsForValue()
  27.                  .setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.SECONDS);
  28.          // 防止拆箱的空指针异常
  29.          return Boolean.TRUE.equals(success);
  30.      }
  31.  
  32.      @Override
  33.      public void unlock() {
  34.          // 获取线程唯一标识
  35.          String threadId = UUID_PREFIX + Thread.currentThread().getId();
  36.          // 获取锁中的标识
  37.          String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
  38.          // 判断标示是否一致
  39.          if(threadId.equals(id)) {
  40.              // 释放锁
  41.              stringRedisTemplate.delete(KEY_PREFIX + name);
  42.          }
  43.      }
  44. }

三、基于SETNX实现的分布式锁存在下面几个问题

1、不可重入

同一个线程无法多次获取同一把锁。

2、不可重试

获取锁只尝试一次就返回false,没有重试机制。

3、超时释放

锁的超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患。

4、主从一致性

如果Redis是集群部署的,主从同步存在延迟,当主机宕机时,此时会选一个从作为主机,但是此时的从没有锁标识,此时,其它线程可能会获取到锁,导致安全问题。

四、Redisson实现分布式锁

Redisson是一个在Redis的基础上实现的Java驻内存数据网格。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中包含各种分布式锁的实现。

1、pom

  1. <!–redisson–>
  2. <dependency>
  3.      <groupId>org.redisson</groupId>
  4.      <artifactId>redisson</artifactId>
  5.      <version>3.13.6</version>
  6. </dependency>

2、配置类

  1. package com.guor.config;
  2.  
  3. import org.redisson.Redisson;
  4. import org.redisson.api.RedissonClient;
  5. import org.redisson.config.Config;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8.  
  9. @Configuration
  10. public class RedissonConfig {
  11.  
  12.      @Bean
  13.      public RedissonClient redissonClient(){
  14.          // 配置
  15.          Config config = new Config();
  16.  
  17.          /**
  18.              * 单点地址useSingleServer,集群地址useClusterServers
  19.              */
  20.          config.useSingleServer().setAddress(“redis://127.0.0.1:6379”).setPassword(“123456”);
  21.          // 创建RedissonClient对象
  22.          return Redisson.create(config);
  23.      }
  24. }

3、测试类

  1. package com.guor;
  2.  
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.junit.jupiter.api.BeforeEach;
  5. import org.junit.jupiter.api.Test;
  6. import org.redisson.api.RLock;
  7. import org.redisson.api.RedissonClient;
  8. import org.springframework.boot.test.context.SpringBootTest;
  9.  
  10. import javax.annotation.Resource;
  11. import java.util.concurrent.TimeUnit;
  12.  
  13. @Slf4j
  14. @SpringBootTest
  15. class RedissonTest {
  16.  
  17.      @Resource
  18.      private RedissonClient redissonClient;
  19.  
  20.      private RLock lock;
  21.  
  22.      @BeforeEach
  23.      void setUp() {
  24.      // 获取指定名称的锁
  25.          lock = redissonClient.getLock(“nezha”);
  26.      }
  27.  
  28.      @Test
  29.      void test() throws InterruptedException {
  30.          // 尝试获取锁
  31.          boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
  32.          if (!isLock) {
  33.              log.error(“获取锁失败”);
  34.              return;
  35.          }
  36.          try {
  37.              log.info(“哪吒最帅,哈哈哈”);
  38.          } finally {
  39.              // 释放锁
  40.              lock.unlock();
  41.          }
  42.      }
  43. }

五、探索tryLock源码

1、tryLock源码

尝试获取锁

  1. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  2.      // 最大等待时间
  3.      long time = unit.toMillis(waitTime);
  4.      long current = System.currentTimeMillis();
  5.      long threadId = Thread.currentThread().getId();
  6.      Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
  7.      if (ttl == null) {
  8.          return true;
  9.      } else {
  10.          // 剩余等待时间 = 最大等待时间 – 获取锁失败消耗的时间
  11.          time -= System.currentTimeMillis()  current;
  12.          if (time <= 0L) {// 获取锁失败
  13.              this.acquireFailed(waitTime, unit, threadId);
  14.              return false;
  15.          } else {
  16.              // 再次尝试获取锁
  17.              current = System.currentTimeMillis();
  18.              // subscribe订阅其它释放锁的信号
  19.              RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
  20.              // 当Future在等待指定时间time内完成时,返回true
  21.              if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
  22.                  if (!subscribeFuture.cancel(false)) {
  23.                      subscribeFuture.onComplete((res, e) -> {
  24.                      if (== null) {
  25.                      // 取消订阅
  26.                      this.unsubscribe(subscribeFuture, threadId);
  27.                      }
  28.  
  29.                      });
  30.                  }
  31.  
  32.                  this.acquireFailed(waitTime, unit, threadId);
  33.                  return false;// 获取锁失败
  34.              } else {
  35.                  try {
  36.                      // 剩余等待时间 = 剩余等待时间 – 获取锁失败消耗的时间
  37.                      time -= System.currentTimeMillis()  current;
  38.                      if (time <= 0L) {
  39.                      this.acquireFailed(waitTime, unit, threadId);
  40.                      boolean var20 = false;
  41.                      return var20;
  42.                      } else {
  43.                      boolean var16;
  44.                      do {
  45.                      long currentTime = System.currentTimeMillis();
  46.                      // 重试获取锁
  47.                      ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
  48.                      if (ttl == null) {
  49.                      var16 = true;
  50.                      return var16;
  51.                      }
  52.                      // 再次失败了,再看一下剩余时间
  53.                      time -= System.currentTimeMillis()  currentTime;
  54.                      if (time <= 0L) {
  55.                      this.acquireFailed(waitTime, unit, threadId);
  56.                      var16 = false;
  57.                      return var16;
  58.                      }
  59.                      // 再重试获取锁
  60.                      currentTime = System.currentTimeMillis();
  61.                      if (ttl >= 0L && ttl < time) {
  62.                      // 通过信号量的方式尝试获取信号,如果等待时间内,依然没有结果,会返回false
  63.                      ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  64.                      } else {
  65.                      ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
  66.                      }
  67.                      time -= System.currentTimeMillis()  currentTime;
  68.                      } while(time > 0L);
  69.  
  70.                      this.acquireFailed(waitTime, unit, threadId);
  71.                      var16 = false;
  72.                      return var16;
  73.                      }
  74.                  } finally {
  75.                      this.unsubscribe(subscribeFuture, threadId);
  76.                  }
  77.              }
  78.          }
  79.      }
  80. }

2、重置锁的有效期

  1. private void scheduleExpirationRenewal(long threadId) {
  2.      RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
  3.      // this.getEntryName():锁的名字,一个锁对应一个entry
  4.      // putIfAbsent:如果不存在,将锁和entry放到map里
  5.      RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
  6.      if (oldEntry != null) {
  7.          // 同一个线程多次获取锁,相当于重入
  8.          oldEntry.addThreadId(threadId);
  9.      } else {
  10.          // 如果是第一次
  11.          entry.addThreadId(threadId);
  12.          // 更新有效期
  13.          this.renewExpiration();
  14.      }
  15. }

更新有效期,递归调用更新有效期,永不过期

  1. private void renewExpiration() {
  2.      // 从map中得到当前锁的entry
  3.      RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
  4.      if (ee != null) {
  5.          // 开启延时任务
  6.          Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
  7.              public void run(Timeout timeout) throws Exception {
  8.                  RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
  9.                  if (ent != null) {
  10.                      // 取出线程id
  11.                      Long threadId = ent.getFirstThreadId();
  12.                      if (threadId != null) {
  13.                      // 刷新有效期
  14.                      RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
  15.                      future.onComplete((res, e) -> {
  16.                      if (!= null) {
  17.                      RedissonLock.log.error(“Can’t update lock “ + RedissonLock.this.getName() + ” expiration”, e);
  18.                      } else {
  19.                      if (res) {
  20.                      // 递归调用更新有效期,永不过期
  21.                      RedissonLock.this.renewExpiration();
  22.                      }
  23.                      }
  24.                      });
  25.                      }
  26.                  }
  27.              }
  28.          }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);// 10S
  29.          ee.setTimeout(task);
  30.      }
  31. }

更新有效期

  1. protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  2.      return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  3.      // 判断当前线程的锁是否是当前线程
  4.      “if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then
  5.          // 更新有效期
  6.          redis.call(‘pexpire’, KEYS[1], ARGV[1]);
  7.          return 1;
  8.          end;
  9.          return 0;”,
  10.          Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
  11. }

3、调用lua脚本

  1. <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  2.      // 锁释放时间
  3.      this.internalLockLeaseTime = unit.toMillis(leaseTime);
  4.      return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
  5.          // 判断锁成功
  6.          “if (redis.call(‘exists’, KEYS[1]) == 0) then
  7.              redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); // 如果不存在,记录锁标识,次数+1
  8.              redis.call(‘pexpire’, KEYS[1], ARGV[1]); // 设置锁有效期
  9.              return nil; // 相当于Java的null
  10.          end;
  11.          if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then
  12.              redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); // 如果存在,判断锁标识是否是自己的,次数+1
  13.              redis.call(‘pexpire’, KEYS[1], ARGV[1]); // 设置锁有效期
  14.              return nil;
  15.          end;
  16.          // 判断锁失败,pttl:指定锁剩余有效期,单位毫秒,KEYS[1]:锁的名称
  17.          return redis.call(‘pttl’, KEYS[1]);”,
  18.              Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
  19. }

六、释放锁unlock源码

1、取消更新任务

  1. public RFuture<Void> unlockAsync(long threadId) {
  2.      RPromise<Void> result = new RedissonPromise();
  3.      RFuture<Boolean> future = this.unlockInnerAsync(threadId);
  4.      future.onComplete((opStatus, e) -> {
  5.          // 取消更新任务
  6.          this.cancelExpirationRenewal(threadId);
  7.          if (!= null) {
  8.              result.tryFailure(e);
  9.          } else if (opStatus == null) {
  10.              IllegalMonitorStateException cause = new IllegalMonitorStateException(“attempt to unlock lock, not locked by current thread by node id: “ + this.id + ” thread-id: “ + threadId);
  11.              result.tryFailure(cause);
  12.          } else {
  13.              result.trySuccess((Object)null);
  14.          }
  15.      });
  16.      return result;
  17. }

2、删除定时任务

  1. void cancelExpirationRenewal(Long threadId) {
  2.      // 从map中取出当前锁的定时任务entry
  3.      RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
  4.      if (task != null) {
  5.          if (threadId != null) {
  6.              task.removeThreadId(threadId);
  7.          }
  8.          // 删除定时任务
  9.          if (threadId == null || task.hasNoThreads()) {
  10.              Timeout timeout = task.getTimeout();
  11.              if (timeout != null) {
  12.                  timeout.cancel();
  13.              }
  14.  
  15.              EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
  16.          }
  17.      }
  18. }

以上就是Redis分布式锁的实现方式的详细内容,更多关于Redis实现分布式锁的资料请关注我们其它相关文章!

标签

发表评论