Redis分布式锁的实现方式
一、分布式锁是什么
分布式锁是 满足分布式系统或集群模式下多进程可见并且互斥的锁。
基于Redis实现分布式锁:
1、获取锁
- 互斥:确保只能有一个线程获取锁;
- 非阻塞:尝试获取锁,成功返回true,失败返回false;
添加锁过期时间,避免服务宕机引起死锁。
SET lock thread1 NX EX 10
2、释放锁
- 手动释放;
DEL key1
- 超时释放,获取锁时添加一个超时锁;
二、代码实例
- package com.guor.utils;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import Java.util.concurrent.TimeUnit;
- public class RedisLock implements ILock{
- private String name;
- private StringRedisTemplate stringRedisTemplate;
- public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
- this.name = name;
- this.stringRedisTemplate = stringRedisTemplate;
- }
- private static final String KEY_PREFIX = “lock:”;
- @Override
- public boolean tryLock(long timeout) {
- // 获取线程唯一标识
- long threadId = Thread.currentThread().getId();
- // 获取锁
- Boolean success = stringRedisTemplate.opsForValue()
- .setIfAbsent(KEY_PREFIX + name, threadId+””, timeout, TimeUnit.SECONDS);
- // 防止拆箱的空指针异常
- return Boolean.TRUE.equals(success);
- }
- @Override
- public void unlock() {
- stringRedisTemplate.delete(KEY_PREFIX + name);
- }
- }
上面代码存在锁误删问题:
- 如果线程1获取锁,但线程1发生了阻塞,导致Redis超时释放锁;
- 此时,线程2尝试获取锁,成功,并执行业务;
- 此时,线程1重新开始执行任务,并执行完毕,执行释放锁(即删除锁);
- 但是,线程1删除的锁,和线程2的锁是同一把锁,这就是
分布式锁误删问题
;
在释放锁时,释放线程自己的分布式锁,就可以解决这个问题。
- package com.guor.utils;
- import cn.hutool.core.lang.UUID;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import java.util.concurrent.TimeUnit;
- public class RedisLock implements ILock{
- private String name;
- private StringRedisTemplate stringRedisTemplate;
- public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
- this.name = name;
- this.stringRedisTemplate = stringRedisTemplate;
- }
- private static final String KEY_PREFIX = “lock:”;
- private static final String UUID_PREFIX = UUID.randomUUID().toString(true) + “-“;
- @Override
- public boolean tryLock(long timeout) {
- // 获取线程唯一标识
- String threadId = UUID_PREFIX + Thread.currentThread().getId();
- // 获取锁
- Boolean success = stringRedisTemplate.opsForValue()
- .setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.SECONDS);
- // 防止拆箱的空指针异常
- return Boolean.TRUE.equals(success);
- }
- @Override
- public void unlock() {
- // 获取线程唯一标识
- String threadId = UUID_PREFIX + Thread.currentThread().getId();
- // 获取锁中的标识
- String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
- // 判断标示是否一致
- if(threadId.equals(id)) {
- // 释放锁
- stringRedisTemplate.delete(KEY_PREFIX + name);
- }
- }
- }
三、基于SETNX
实现的分布式锁存在下面几个问题
1、不可重入
同一个线程无法多次获取同一把锁。
2、不可重试
获取锁只尝试一次就返回false,没有重试机制。
3、超时释放
锁的超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患。
4、主从一致性
如果Redis是集群部署的,主从同步存在延迟,当主机宕机时,此时会选一个从作为主机,但是此时的从没有锁标识,此时,其它线程可能会获取到锁,导致安全问题。
四、Redisson实现分布式锁
Redisson是一个在Redis的基础上实现的Java驻内存数据网格。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中包含各种分布式锁的实现。
1、pom
- <!–redisson–>
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson</artifactId>
- <version>3.13.6</version>
- </dependency>
2、配置类
- package com.guor.config;
- import org.redisson.Redisson;
- import org.redisson.api.RedissonClient;
- import org.redisson.config.Config;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RedissonConfig {
- @Bean
- public RedissonClient redissonClient(){
- // 配置
- Config config = new Config();
- /**
- * 单点地址useSingleServer,集群地址useClusterServers
- */
- config.useSingleServer().setAddress(“redis://127.0.0.1:6379”).setPassword(“123456”);
- // 创建RedissonClient对象
- return Redisson.create(config);
- }
- }
3、测试类
- package com.guor;
- import lombok.extern.slf4j.Slf4j;
- import org.junit.jupiter.api.BeforeEach;
- import org.junit.jupiter.api.Test;
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- import org.springframework.boot.test.context.SpringBootTest;
- import javax.annotation.Resource;
- import java.util.concurrent.TimeUnit;
- @Slf4j
- @SpringBootTest
- class RedissonTest {
- @Resource
- private RedissonClient redissonClient;
- private RLock lock;
- @BeforeEach
- void setUp() {
- // 获取指定名称的锁
- lock = redissonClient.getLock(“nezha”);
- }
- @Test
- void test() throws InterruptedException {
- // 尝试获取锁
- boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
- if (!isLock) {
- log.error(“获取锁失败”);
- return;
- }
- try {
- log.info(“哪吒最帅,哈哈哈”);
- } finally {
- // 释放锁
- lock.unlock();
- }
- }
- }
五、探索tryLock源码
1、tryLock源码
尝试获取锁
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
- // 最大等待时间
- long time = unit.toMillis(waitTime);
- long current = System.currentTimeMillis();
- long threadId = Thread.currentThread().getId();
- Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
- if (ttl == null) {
- return true;
- } else {
- // 剩余等待时间 = 最大等待时间 – 获取锁失败消耗的时间
- time -= System.currentTimeMillis() – current;
- if (time <= 0L) {// 获取锁失败
- this.acquireFailed(waitTime, unit, threadId);
- return false;
- } else {
- // 再次尝试获取锁
- current = System.currentTimeMillis();
- // subscribe订阅其它释放锁的信号
- RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
- // 当Future在等待指定时间time内完成时,返回true
- if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
- if (!subscribeFuture.cancel(false)) {
- subscribeFuture.onComplete((res, e) -> {
- if (e == null) {
- // 取消订阅
- this.unsubscribe(subscribeFuture, threadId);
- }
- });
- }
- this.acquireFailed(waitTime, unit, threadId);
- return false;// 获取锁失败
- } else {
- try {
- // 剩余等待时间 = 剩余等待时间 – 获取锁失败消耗的时间
- time -= System.currentTimeMillis() – current;
- if (time <= 0L) {
- this.acquireFailed(waitTime, unit, threadId);
- boolean var20 = false;
- return var20;
- } else {
- boolean var16;
- do {
- long currentTime = System.currentTimeMillis();
- // 重试获取锁
- ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
- if (ttl == null) {
- var16 = true;
- return var16;
- }
- // 再次失败了,再看一下剩余时间
- time -= System.currentTimeMillis() – currentTime;
- if (time <= 0L) {
- this.acquireFailed(waitTime, unit, threadId);
- var16 = false;
- return var16;
- }
- // 再重试获取锁
- currentTime = System.currentTimeMillis();
- if (ttl >= 0L && ttl < time) {
- // 通过信号量的方式尝试获取信号,如果等待时间内,依然没有结果,会返回false
- ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } else {
- ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
- }
- time -= System.currentTimeMillis() – currentTime;
- } while(time > 0L);
- this.acquireFailed(waitTime, unit, threadId);
- var16 = false;
- return var16;
- }
- } finally {
- this.unsubscribe(subscribeFuture, threadId);
- }
- }
- }
- }
- }
2、重置锁的有效期
- private void scheduleExpirationRenewal(long threadId) {
- RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
- // this.getEntryName():锁的名字,一个锁对应一个entry
- // putIfAbsent:如果不存在,将锁和entry放到map里
- RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
- if (oldEntry != null) {
- // 同一个线程多次获取锁,相当于重入
- oldEntry.addThreadId(threadId);
- } else {
- // 如果是第一次
- entry.addThreadId(threadId);
- // 更新有效期
- this.renewExpiration();
- }
- }
更新有效期,递归调用更新有效期,永不过期
- private void renewExpiration() {
- // 从map中得到当前锁的entry
- RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
- if (ee != null) {
- // 开启延时任务
- Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- public void run(Timeout timeout) throws Exception {
- RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
- if (ent != null) {
- // 取出线程id
- Long threadId = ent.getFirstThreadId();
- if (threadId != null) {
- // 刷新有效期
- RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
- future.onComplete((res, e) -> {
- if (e != null) {
- RedissonLock.log.error(“Can’t update lock “ + RedissonLock.this.getName() + ” expiration”, e);
- } else {
- if (res) {
- // 递归调用更新有效期,永不过期
- RedissonLock.this.renewExpiration();
- }
- }
- });
- }
- }
- }
- }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);// 10S
- ee.setTimeout(task);
- }
- }
更新有效期
- protected RFuture<Boolean> renewExpirationAsync(long threadId) {
- return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- // 判断当前线程的锁是否是当前线程
- “if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then
- // 更新有效期
- redis.call(‘pexpire’, KEYS[1], ARGV[1]);
- return 1;
- end;
- return 0;”,
- Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
- }
3、调用lua脚本
- <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
- // 锁释放时间
- this.internalLockLeaseTime = unit.toMillis(leaseTime);
- return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
- // 判断锁成功
- “if (redis.call(‘exists’, KEYS[1]) == 0) then
- redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); // 如果不存在,记录锁标识,次数+1
- redis.call(‘pexpire’, KEYS[1], ARGV[1]); // 设置锁有效期
- return nil; // 相当于Java的null
- end;
- if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then
- redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); // 如果存在,判断锁标识是否是自己的,次数+1
- redis.call(‘pexpire’, KEYS[1], ARGV[1]); // 设置锁有效期
- return nil;
- end;
- // 判断锁失败,pttl:指定锁剩余有效期,单位毫秒,KEYS[1]:锁的名称
- return redis.call(‘pttl’, KEYS[1]);”,
- Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
- }
六、释放锁unlock源码
1、取消更新任务
- public RFuture<Void> unlockAsync(long threadId) {
- RPromise<Void> result = new RedissonPromise();
- RFuture<Boolean> future = this.unlockInnerAsync(threadId);
- future.onComplete((opStatus, e) -> {
- // 取消更新任务
- this.cancelExpirationRenewal(threadId);
- if (e != null) {
- result.tryFailure(e);
- } else if (opStatus == null) {
- IllegalMonitorStateException cause = new IllegalMonitorStateException(“attempt to unlock lock, not locked by current thread by node id: “ + this.id + ” thread-id: “ + threadId);
- result.tryFailure(cause);
- } else {
- result.trySuccess((Object)null);
- }
- });
- return result;
- }
2、删除定时任务
- void cancelExpirationRenewal(Long threadId) {
- // 从map中取出当前锁的定时任务entry
- RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
- if (task != null) {
- if (threadId != null) {
- task.removeThreadId(threadId);
- }
- // 删除定时任务
- if (threadId == null || task.hasNoThreads()) {
- Timeout timeout = task.getTimeout();
- if (timeout != null) {
- timeout.cancel();
- }
- EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
- }
- }
- }
以上就是Redis分布式锁的实现方式的详细内容,更多关于Redis实现分布式锁的资料请关注我们其它相关文章!
发表评论