之前秒殺專案中就用到了這個 Redisson 分散式鎖 👇,這篇就一起來看看原始碼吧!
tryLock 加鎖 流程
// RedissonLock.java @Override public boolean tryLock() { return get(tryLockAsync()); } @Override public RFuture<Boolean> tryLockAsync() { return tryLockAsync(Thread.currentThread().getId()); } @Override public RFuture<Boolean> tryLockAsync(long threadId) { return tryAcquireOnceAsync(-1, -1, null, threadId); } private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Boolean> acquiredFuture; // 續租時間:鎖的過期時間(沒有設定的話就用預設的 internalLockLeaseTime 看門狗時間) if (leaseTime > 0) { acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } else { acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> { // lock acquired if (acquired) { if (leaseTime > 0) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 沒配置過期時間就執行這裏 scheduleExpirationRenewal(threadId); } } return acquired; }); return new CompletableFutureWrapper<>(f); }
程式碼很長,主要看
tryLockInnerAsync
和scheduleExpirationRenewal
方法。
前置知識
// EVAL 命令,用於在 Redis 伺服器端執行 Lua 指令碼。 RedisStrictCommand<Boolean> EVAL_NULL_BOOLEAN = new RedisStrictCommand<Boolean>("EVAL", new BooleanNullReplayConvertor()); // BooleanNullReplayConvertor 判斷是不是 NULL。 public class BooleanNullReplayConvertor implements Convertor<Boolean> { @Override public Boolean convert(Object obj) { return obj == null; } }
tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // getRawName 即 鎖的名稱 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, // 鎖不存在,新增 hash 資料,可重入次數加一,毫秒級別過期時間,返回 null "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 鎖存在,可重入次數加一,毫秒級別過期時間,返回 null "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 鎖被別人佔有, 返回毫秒級別過期時間 "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
ARGV[1] 過期時間
ARGV[2] 即 getLockName(threadId) ,這裏是 redisson 客戶端id + 這個執行緒 ID , 如下 👇
scheduleExpirationRenewal (看門狗機制)
上面加鎖完,就來到這段程式碼。
沒有設定過期時間的話,預設給你設定 30 s 過期,並每隔 10s 自動續期,確保鎖不會在使用過程中過期。
同時,防止客戶端宕機,留下死鎖。
// RedissonBaseLock.java protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); try { // 看這裏 renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { cancelExpirationRenewal(threadId); } } } } private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 延時任務,10s 續期一次。 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 續期操作 CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); } // 三分之一時間,30s /3= 10s }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); } // 續期指令碼 protected CompletionStage<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
get
上面的加鎖操作,最終返回的是 return new CompletableFutureWrapper<>(f);
這個非同步操作。
還記得上面的 BooleanNullReplayConvertor 嗎,當 eval 執行加鎖指令碼時,成功會返回 null,並在這裏轉成 True 。
@Override public <V> V get(RFuture<V> future) { if (Thread.currentThread().getName().startsWith("redisson-netty")) { throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners"); } try { return future.toCompletableFuture().get(); } catch (InterruptedException e) { future.cancel(true); Thread.currentThread().interrupt(); throw new RedisException(e); } catch (ExecutionException e) { throw convertException(e); } }
那麼,加鎖的部分到這裏就結束, 解鎖 的就簡單過一下 👇
unlock 解鎖
// RedissonLock.java protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 不存在,直接返回 null "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 減一 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 大於0,設定毫秒級過期時間,並返回0 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + // 刪除鎖,並向指定channel釋出 0 這個訊息,並返回1 "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + // 返回 null "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
KEYS[1] 為鎖名,KEYS[2] channel 名 👇
ARGV[1] 為0 👇, ARGV[2] 過期時間,ARGV[3] 為 redisson 客戶端id + 這個執行緒 ID
解鎖後,取消續期任務。
結尾
透過原始碼,我們瞭解到上文提到的 redisson 框架的幾個特點:自動續期,可重入鎖, lua指令碼。
當然,它不止這些功能,小夥伴們可以在這裏查閱 👇