还原案发现场
简单看一下下面的代码,死锁形成的原因是,线程A嵌套了一个任务线程B,A在执行任务线程B之前获取了一个锁,执行完B之后才会释放,而B线程执行的时候也需要获取锁,好巧不巧,他使用的锁跟A使用的是同一个,bang,死锁了。
public class DeadLockNested { static ReentrantLock lock = new ReentrantLock(); private static Lock getLock() { // 实际生产环境不会这样写,这里是为了还原问题 return lock; } public static void main(String[] args) { Lock mainLock = getLock(); mainLock.lock(); try { FutureTask<String> futureTask = new FutureTask<String>(() -> { String result = ""; // 不巧的是,taskLock 跟 mainLock 是同一个锁 Lock taskLock = getLock(); taskLock.lock(); try { result += "hello"; } catch (Exception e) { System.out.println("Error"); System.out.println(e.getMessage()); // throw new RuntimeException(e); } finally { taskLock.unlock(); } return result; }); new Thread(futureTask).start(); futureTask.get(); } catch (Exception e) { System.out.println("Error"); System.out.println(e.getMessage()); // throw new RuntimeException(e); } finally { mainLock.unlock(); } } }
我们在taskLock.lock();
这一行打断点,可以明显看到,目前这个锁,是main线程持有的,线程ID为1,(exclusiveOwnerThread字段)。
然后我们在左侧的线程下拉列表中查找这个线程,找"main"@1
即可
这里的线程的格式为
"ThreadName"@ThreadID
。当我们通过IDEA调试(或者远程调试)进入断点的时候,其实我们是可以看到当前JVM中的所有线程的,选中任意一个线程,我们就可以看到在当前断点下,此线程此时运行到哪一行,以及在这个调用栈中相关变量的值。
点击这个线程,可以看到,main线程的栈,起始位置是DeadLockNested
的40行
点击进入这个栈,可以看到,main线程正在等待futureTask任务执行完成,好啦,闭环形成,main线程正在等futureTask完成,futureTask正在等main线程释放锁,最终结果,死锁。
其实问题的症结也就在于此,如果在main线程中不调用futureTask.get();
的话,即使两个线程使用了同一个锁,任务线程也可以等待mian线程结束释放锁之后获得锁,并不会死锁。
public class DeadLockNested { static ReentrantLock lock = new ReentrantLock(); private static Lock getLock() { // 实际生产环境不会这样写,这里是为了还原问题 return lock; } public static void main(String[] args) { Lock mainLock = getLock(); mainLock.lock(); try { FutureTask<String> futureTask = new FutureTask<String>(() -> { String result = ""; // 不巧的是,taskLock 跟 mainLock 是同一个锁 Lock taskLock = getLock(); taskLock.lock(); try { result += "hello"; Thread.sleep(5000); } catch (Exception e) { System.out.println("Error"); System.out.println(e.getMessage()); // throw new RuntimeException(e); } finally { taskLock.unlock(); } return result; }); new Thread(futureTask).start(); // futureTask.get(); } catch (Exception e) { System.out.println("Error"); System.out.println(e.getMessage()); // throw new RuntimeException(e); } finally { mainLock.unlock(); } } }
以上代码等待任务线程执行完毕之后(等待5s后)自动结束。
所以造成死锁的根本原因是:两线程使用同一个锁+主线程等待子线程执行完成。
原因分析
这次死锁的原因是因为,线程B和线程A使用了同一个锁,为什么会这样?getLock
方法不可能犯这样的错误。
实际上,在生产环境中,getLock
还真的就返回了同一个锁。为什么?
我们使用了Spring提供的org.springframework.integration.support.locks.LockRegistry
作为锁的注册中心,通过其来获取锁,同时,我们没有使用自定义实现,而是使用的默认的实现DefaultLockRegistry
,我们来看看自定义的代码
public final class DefaultLockRegistry implements LockRegistry { // 内部数组,用于保存所有的锁 private final Lock[] lockTable; // 哈希掩码 private final int mask; public DefaultLockRegistry() { // 注意 mask 默认只有256,这不是一个很大的数子 this(0xFF); // NOSONAR magic number } public DefaultLockRegistry(int mask) { String bits = Integer.toBinaryString(mask); Assert.isTrue(bits.length() < 32 && (mask == 0 || bits.lastIndexOf('0') < bits.indexOf('1')), "Mask must be a power of 2 - 1"); // NOSONAR magic number this.mask = mask; int arraySize = this.mask + 1; // 根据掩码的大小来创建内部锁数组,方便后续根据掩码来进行映射映射 this.lockTable = new ReentrantLock[arraySize]; for (int i = 0; i < arraySize; i++) { this.lockTable[i] = new ReentrantLock(); } } @Override public Lock obtain(Object lockKey) { Assert.notNull(lockKey, "'lockKey' must not be null"); // 将lockKey的哈希码跟DefaultLockRegistry内部的哈希掩码进行 与 运算,这个运算的本质,就是将对应的二进制码进行掩码长度的截断,将结果作为取得锁的索引 Integer lockIndex = lockKey.hashCode() & this.mask; return this.lockTable[lockIndex]; } }
我们可以很容易看到这里面的风险点,根据lockKey获取锁的索引的方式,是将lockKey的哈希码跟DefaultLockRegistry内部的哈希掩码进行&
运算,这个运算的本质,就是将对应的二进制码进行掩码长度的截断,将结果作为取得锁的索引,也就是说,如果对象的哈希码的最后8位是一样的,那么通过这种方式获取的锁的索引就是一致的,我们很容易找到这样的例子:
public static void main(String[] args) { final int mask = 0xFF; System.out.println(Integer.toBinaryString(1726183714)); System.out.println(Integer.toBinaryString(1726184738)); // 可以看到他们的最后 8 位是一致的, // 因此他们进行 & mask 运算的结果也是一样的 int result1 = 1726183714 & mask; int result2 = 1726184738 & mask; System.out.println(result1); System.out.println(result2); }
输出
1100110111000110111100100100010 1100110111000110111110100100010 34 34
获取的锁的索引是一致的,那么最终返回的锁就是同一个,其实这样也不会有问题,只要这两个获取了同一个锁的线程没有关系,那也不会出错,怕就怕,是上一节中提到的,两个相互嵌套的场景获取了同一个锁,那样就会出现死锁了。
解决和优化
再回过头来看DefaultLockRegistry
的设计,为什么要这样设计DefaultLockRegistry
?我们可以看到在一开始就把所有的锁都初始化好了,这样可以提升锁的获取效率,同时因为数组的长度定死了,因此避免了创建任意个锁的可能,避免了内存泄漏。
那我们可以怎么优化呢?
用一个Map来保存锁,根据对象的hashCode来获取锁,但是这样的话,频繁地调用obtain这个方法可能会让这个Map变得无限大,造成内存问题,因为,我们需要做两个工作
限制Map的大小,给一个初始化大小,比如1000。如果达到了这个容量上限,就开始阻塞,并进行清理,清理已经释放了的锁。
如何判断锁是否已经释放,调用立即返回的lock()方法,如果能立即获得锁,说明锁已经没有人使用了,就可以立即被回收
清理完Map之后,再往Map里添加锁