還原案發現場
簡單看一下下面的程式碼,死鎖形成的原因是,執行緒A巢狀了一個任務執行緒B,A在執行任務執行緒B之前獲取了一個鎖,執行完B之後纔會釋放,而B執行緒執行的時候也需要獲取鎖,好巧不巧,他使用的鎖跟A使用的是同一個,bang,死鎖了。
public class DeadLockNested { static ReentrantLock lock = new ReentrantLock(); private static Lock getLock() { // 實際生產環境不會這樣寫,這裏是爲了還原問題 return lock; } public static void main(String[] args) { Lock mainLock = getLock(); mainLock.lock(); try { FutureTask<String> futureTask = new FutureTask<String>(() -> { String result = ""; // 不巧的是,taskLock 跟 mainLock 是同一個鎖 Lock taskLock = getLock(); taskLock.lock(); try { result += "hello"; } catch (Exception e) { System.out.println("Error"); System.out.println(e.getMessage()); // throw new RuntimeException(e); } finally { taskLock.unlock(); } return result; }); new Thread(futureTask).start(); futureTask.get(); } catch (Exception e) { System.out.println("Error"); System.out.println(e.getMessage()); // throw new RuntimeException(e); } finally { mainLock.unlock(); } } }
我們在taskLock.lock();
這一行打斷點,可以明顯看到,目前這個鎖,是main執行緒持有的,執行緒ID為1,(exclusiveOwnerThread欄位)。
然後我們在左側的執行緒下拉選單中查詢這個執行緒,找"main"@1
即可
這裏的執行緒的格式為
"ThreadName"@ThreadID
。當我們透過IDEA除錯(或者遠端除錯)進入斷點的時候,其實我們是可以看到當前JVM中的所有執行緒的,選中任意一個執行緒,我們就可以看到在當前斷點下,此執行緒此時執行到哪一行,以及在這個呼叫棧中相關變數的值。
點選這個執行緒,可以看到,main執行緒的棧,起始位置是DeadLockNested
的40行
點選進入這個棧,可以看到,main執行緒正在等待futureTask任務執行完成,好啦,閉環形成,main執行緒正在等futureTask完成,futureTask正在等main執行緒釋放鎖,最終結果,死鎖。
其實問題的癥結也就在於此,如果在main執行緒中不呼叫futureTask.get();
的話,即使兩個執行緒使用了同一個鎖,任務執行緒也可以等待mian執行緒結束釋放鎖之後獲得鎖,並不會死鎖。
public class DeadLockNested { static ReentrantLock lock = new ReentrantLock(); private static Lock getLock() { // 實際生產環境不會這樣寫,這裏是爲了還原問題 return lock; } public static void main(String[] args) { Lock mainLock = getLock(); mainLock.lock(); try { FutureTask<String> futureTask = new FutureTask<String>(() -> { String result = ""; // 不巧的是,taskLock 跟 mainLock 是同一個鎖 Lock taskLock = getLock(); taskLock.lock(); try { result += "hello"; Thread.sleep(5000); } catch (Exception e) { System.out.println("Error"); System.out.println(e.getMessage()); // throw new RuntimeException(e); } finally { taskLock.unlock(); } return result; }); new Thread(futureTask).start(); // futureTask.get(); } catch (Exception e) { System.out.println("Error"); System.out.println(e.getMessage()); // throw new RuntimeException(e); } finally { mainLock.unlock(); } } }
以上程式碼等待任務執行緒執行完畢之後(等待5s後)自動結束。
所以造成死鎖的根本原因是:兩執行緒使用同一個鎖+主執行緒等待子執行緒執行完成。
原因分析
這次死鎖的原因是因為,執行緒B和執行緒A使用了同一個鎖,為什麼會這樣?getLock
方法不可能犯這樣的錯誤。
實際上,在生產環境中,getLock
還真的就返回了同一個鎖。為什麼?
我們使用了Spring提供的org.springframework.integration.support.locks.LockRegistry
作為鎖的註冊中心,透過其來獲取鎖,同時,我們沒有使用自定義實現,而是使用的預設的實現DefaultLockRegistry
,我們來看看自定義的程式碼
public final class DefaultLockRegistry implements LockRegistry { // 內部陣列,用於儲存所有的鎖 private final Lock[] lockTable; // 雜湊掩碼 private final int mask; public DefaultLockRegistry() { // 注意 mask 預設只有256,這不是一個很大的數子 this(0xFF); // NOSONAR magic number } public DefaultLockRegistry(int mask) { String bits = Integer.toBinaryString(mask); Assert.isTrue(bits.length() < 32 && (mask == 0 || bits.lastIndexOf('0') < bits.indexOf('1')), "Mask must be a power of 2 - 1"); // NOSONAR magic number this.mask = mask; int arraySize = this.mask + 1; // 根據掩碼的大小來建立內部鎖陣列,方便後續根據掩碼來進行對映對映 this.lockTable = new ReentrantLock[arraySize]; for (int i = 0; i < arraySize; i++) { this.lockTable[i] = new ReentrantLock(); } } @Override public Lock obtain(Object lockKey) { Assert.notNull(lockKey, "'lockKey' must not be null"); // 將lockKey的雜湊碼跟DefaultLockRegistry內部的雜湊掩碼進行 與 運算,這個運算的本質,就是將對應的二進制碼進行掩碼長度的截斷,將結果作為取得鎖的索引 Integer lockIndex = lockKey.hashCode() & this.mask; return this.lockTable[lockIndex]; } }
我們可以很容易看到這裏麵的風險點,根據lockKey獲取鎖的索引的方式,是將lockKey的雜湊碼跟DefaultLockRegistry內部的雜湊掩碼進行&
運算,這個運算的本質,就是將對應的二進制碼進行掩碼長度的截斷,將結果作為取得鎖的索引,也就是說,如果物件的雜湊碼的最後8位是一樣的,那麼透過這種方式獲取的鎖的索引就是一致的,我們很容易找到這樣的例子:
public static void main(String[] args) { final int mask = 0xFF; System.out.println(Integer.toBinaryString(1726183714)); System.out.println(Integer.toBinaryString(1726184738)); // 可以看到他們的最後 8 位是一致的, // 因此他們進行 & mask 運算的結果也是一樣的 int result1 = 1726183714 & mask; int result2 = 1726184738 & mask; System.out.println(result1); System.out.println(result2); }
輸出
1100110111000110111100100100010 1100110111000110111110100100010 34 34
獲取的鎖的索引是一致的,那麼最終返回的鎖就是同一個,其實這樣也不會有問題,只要這兩個獲取了同一個鎖的執行緒沒有關係,那也不會出錯,怕就怕,是上一節中提到的,兩個相互巢狀的場景獲取了同一個鎖,那樣就會出現死鎖了。
解決和最佳化
再回過頭來看DefaultLockRegistry
的設計,為什麼要這樣設計DefaultLockRegistry
?我們可以看到在一開始就把所有的鎖都初始化好了,這樣可以提升鎖的獲取效率,同時因為陣列的長度定死了,因此避免了建立任意個鎖的可能,避免了記憶體洩漏。
那我們可以怎麼最佳化呢?
用一個Map來儲存鎖,根據物件的hashCode來獲取鎖,但是這樣的話,頻繁地呼叫obtain這個方法可能會讓這個Map變得無限大,造成記憶體問題,因為,我們需要做兩個工作
限制Map的大小,給一個初始化大小,比如1000。如果達到了這個容量上限,就開始阻塞,並進行清理,清理已經釋放了的鎖。
如何判斷鎖是否已經釋放,呼叫立即返回的lock()方法,如果能立即獲得鎖,說明鎖已經沒有人使用了,就可以立即被回收
清理完Map之後,再往Map裡新增鎖