ForkJoinPool.java中pool方法源码如下:
final ForkJoinTask<?> poll() {
int b, k, cap; ForkJoinTask<?>[] a;
while ((a = array) != null && (cap = a.length) > 0 &&
top - (b = base) > 0) {
ForkJoinTask<?> t = (ForkJoinTask<?>)
QA.getAcquire(a, k = (cap - 1) & b);
if (base == b++) {
if (t == null)
Thread.yield(); // await index advance
else if (QA.compareAndSet(a, k, t, null)) {
BASE.setOpaque(this, b);
return t;
}
}
}
return null;
}
poll
方法,用于从当前线程的工作队列中获取并返回一个任务。该方法的核心逻辑是从任务数组中取出一个任务,并确保线程安全地移除该任务。在特定条件下,Thread.yield()忙等待导致线程不断让出 CPU 资源,而无法继续执行其他任务。
1. 方法签名和参数
final ForkJoinTask<?> poll()
poll
:这是一个WorkQueue
类中的方法,用于从当前工作队列中获取并返回一个任务。- 返回值:如果成功获取到任务,则返回该任务;否则返回
null
。
2. 主要逻辑分析
2.1 检查任务数组
while ((a = array) != null && (cap = a.length) > 0 &&
top - (b = base) > 0) {
array
:这是当前工作队列的任务数组,存储了尚未处理的任务。cap = a.length
:任务数组的容量。top - b > 0
:检查任务数组中是否有未处理的任务。top
表示任务数组中的任务数量,base
表示已经处理的任务数量。如果top - base > 0
,说明队列中有未处理的任务。
2.2 获取任务
ForkJoinTask<?> t = (ForkJoinTask<?>) QA.getAcquire(a, k = (cap - 1) & b);
k = (cap - 1) & b
:计算任务数组中的索引k
,使用位运算将base
映射到数组的有效索引范围内。QA.getAcquire(a, k)
:从任务数组中获取任务t
。QA
是ForkJoinTask
的静态字段,表示任务数组的访问器(Accessor),getAcquire
是一种内存屏障操作,确保读取任务时的可见性。
2.3 检查并更新 base
if (base == b++) {
base == b
:确保在获取任务后,base
没有被其他线程修改。b++
是后置递增操作,意味着在比较后base
会被递增,表示已经处理了一个任务。
2.4 处理任务为空的情况
if (t == null)
Thread.yield(); // await index advance
t == null
:如果从任务数组中获取的任务t
为null
,说明任务已经被其他线程移除或尚未准备好。Thread.yield()
:在这种情况下,线程会调用Thread.yield()
,主动让出 CPU 资源,等待其他线程完成任务的插入或移除操作,从而使base
或top
发生变化。
2.5 使用 CAS 移除任务
else if (QA.compareAndSet(a, k, t, null)) {
BASE.setOpaque(this, b);
return t;
}
QA.compareAndSet(a, k, t, null)
:使用 CAS(Compare-And-Swap)操作将任务t
从任务数组中移除。这确保了多线程环境下的线程安全。BASE.setOpaque(this, b)
:更新当前工作队列的base
,表示已经处理了一个任务。return t
:返回窃取到的任务t
,结束poll
方法。
2.6 返回 null
return null;
- 如果任务数组为空或没有可处理的任务,
poll
方法返回null
,表示当前工作队列中没有任务可以处理。
3. 为什么线程会一直执行 Thread.yield()
?
线程会一直执行 Thread.yield()
的主要原因是在某些情况下,base
和 top
之间的差值大于 0,但实际从任务数组中获取的任务 t
为 null
。这可能是由于以下几种情况:
3.1 任务已被其他线程移除
- 当多个线程同时访问同一个任务数组时,可能会发生竞争条件。例如,一个线程刚刚检查了
top - base > 0
,但在它尝试获取任务t
之前,另一个线程已经通过 CAS 操作移除了该任务。因此,当前线程获取到的任务t
为null
。 - 在这种情况下,当前线程会调用
Thread.yield()
,主动让出 CPU 资源,等待其他线程完成任务的插入或移除操作,从而使base
或top
发生变化。
3.2 任务尚未准备好
- 在某些情况下,任务可能还没有完全准备好,或者任务数组中的某个位置暂时没有任务。例如,任务可能正在被其他线程插入,但尚未完成插入操作(oom可能导致插入线程异常,进而导致状态异常)。
- 在这种情况下,
poll
方法会不断尝试获取任务,但由于任务尚未准备好,t
仍然为null
,导致线程不断调用Thread.yield()
。
3.3 任务数组的竞争
ForkJoinPool
中的工作队列是共享的,多个线程可能会同时访问同一个任务数组。当多个线程同时尝试从同一个任务数组中获取任务时,可能会发生竞争,导致某些线程频繁获取到null
。- 这种竞争会导致线程不断调用
Thread.yield()
,试图让出 CPU 资源,等待其他线程完成任务的插入或移除操作。
3.4 死锁或活锁
- 在极端情况下,如果多个线程之间存在复杂的依赖关系,可能会导致死锁或活锁。例如,线程 A 等待线程 B 完成任务插入,而线程 B 又等待线程 A 完成任务移除。这种情况下,线程可能会陷入无限循环,不断调用
Thread.yield()
,但实际上没有任何进展。
4. 如何避免线程频繁调用 Thread.yield()
?
为了避免线程频繁调用 Thread.yield()
,可以考虑以下优化策略:
4.1 减少任务数组的竞争
- 增加任务队列的数量:通过增加
ForkJoinPool
的并行度,可以减少每个线程共享同一个任务数组的情况,从而减少竞争。 - 使用独立的任务队列:尽量让每个线程有自己的任务队列,减少多个线程同时访问同一个任务数组的机会。
4.2 优化任务插入和移除的顺序
- 批量插入任务:如果可能,尽量批量插入任务,而不是逐个插入。这样可以减少任务数组的访问频率,降低竞争的可能性。
- 批量移除任务:类似地,尽量批量移除任务,减少 CAS 操作的次数。
4.3 使用更高效的同步机制
- 使用无锁数据结构:考虑使用更高效的无锁数据结构(如
ConcurrentLinkedQueue
或Deque
)来管理任务队列,减少 CAS 操作的开销。 - 减少 CAS 操作的频率:尽量减少 CAS 操作的频率,避免频繁的失败重试。
4.4 调整 Thread.yield()
的调用频率
- 引入自旋等待:可以在
Thread.yield()
之前引入短暂的自旋等待(spin-wait),给其他线程更多的时间完成任务的插入或移除操作。例如,可以使用LockSupport.parkNanos()
来实现微秒级别的等待。 - 动态调整
Thread.yield()
的调用频率:可以根据当前任务队列的状态动态调整Thread.yield()
的调用频率。例如,如果多次连续获取到null
,可以增加Thread.yield()
的调用频率,反之则减少。
5. 总结
poll
方法的作用:poll
方法用于从当前线程的工作队列中获取并返回一个任务。它通过 CAS 操作确保线程安全地移除任务,并在任务为空时调用Thread.yield()
让出 CPU 资源。Thread.yield()
的原因:线程会一直执行Thread.yield()
的原因是任务数组中可能存在竞争条件,导致多个线程同时访问同一个任务数组,或者任务尚未准备好。此外,任务数组的竞争、死锁或活锁也可能导致线程频繁调用Thread.yield()
。- 优化建议:为了减少线程频繁调用
Thread.yield()
,可以考虑减少任务数组的竞争、优化任务插入和移除的顺序、使用更高效的同步机制,以及调整Thread.yield()
的调用频率。