fork / join框架如何比线程池更好?

使用新的fork / join框架的好处是,只需简单地将大任务分解为N个子任务,然后将它们发送到caching线程池(从Executors )并等待每个任务完成? 我看不出如何使用fork / join抽象来简化问题,或者使得解决scheme从我们多年以来的效率中获得更高的效率。

例如, 教程示例中的并行模糊algorithm可以像这样实现:

public class Blur implements Runnable { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; private int mBlurWidth = 15; // Processing window size, should be odd. public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } public void run() { computeDirectly(); } protected void computeDirectly() { // As in the example, omitted for brevity } } 

开始分割并将任务发送到线程池:

 // source image pixels are in src // destination image pixels are in dst // threadPool is a (cached) thread pool int maxSize = 100000; // analogous to FJ's "sThreshold" List<Future> futures = new ArrayList<Future>(); // Send stuff to thread pool: for (int i = 0; i < src.length; i+= maxSize) { int size = Math.min(maxSize, src.length - i); ForkBlur task = new ForkBlur(src, i, size, dst); Future f = threadPool.submit(task); futures.add(f); } // Wait for all sent tasks to complete: for (Future future : futures) { future.get(); } // Done! 

任务转到线程池的队列,在工作线程可用时从中执行它们。 只要分割足够精细(以避免必须特别等待最后一个任务)并且线程池有足够的(至less有N个处理器)线程,所有处理器都在全速运行,直到完成整个计算。

我错过了什么吗? 使用fork / join框架的附加价值是什么?

我认为基本的误解是,叉/join的例子显示工作窃取,而只是某种标准的分而治之。

偷工作会是这样的:工人B已经完成了他的工作。 他是一个善良的人,所以他环顾四周,看到工人A还在努力工作。 他漫步,问道:“嘿,伙计,我可以帮你一把。” 答复。 “很酷,我有这1000个单位的任务,到目前为止我已经完成了345个,剩下655个。请问673到1000的工作,我会做346到672。 B说:“好吧,我们开始吧,我们可以早点去酒吧。”

你看 – 即使在开始真正的工作时,工作人员也必须相互沟通。 这是例子中缺less的部分。

另一方面的例子只显示“使用分包商”这样的东西:

工人 – 答:当时,我有1000个工作单位,对我来说太多了,我自己做500个工作,把500个工作分包给别人。 这一直持续下去,直到把大任务分解成每个10个单元的小包。 这些将由可用的工作人员执行。 但是,如果一个小包是一种毒丸,并且比其他小包花费的时间要长得多 – 运气不好,分化阶段就结束了。

Fork / Join和分离任务之间唯一的区别就是这样的:当事先分开时,你的工作队列从头开始就是完整的。 例如:1000单位,门槛是10,所以队列有100个条目。 这些数据包被分配给线程池成员。

Fork / Join更复杂,并且尝试保持队列中的数据包数量更小:

  • 步骤1:将一个包含(1 … 1000)的数据包放入队列
  • 步骤2:一名工人popup数据包(1 … 1000),并用两个数据包(1 … 500)和(501 … 1000)replace数据包。
  • 步骤3:一个工人popup数据包(500 … 1000)并按下(500 … 750)和(751 … 1000)。
  • 步骤n:堆栈包含这些数据包:(1..500),(500 … 750),(750 … 875)…(991..1000)
  • 步骤n + 1:数据包(991..1000)被popup并执行
  • 步骤n + 2:数据包(981..990)被popup并执行
  • 步骤n + 3:分组(961..980)被popup并分成(961 … 970)和(971..980)。 ….

您会看到:在Fork / Join中,队列变小(在本例中为6),“拆分”和“工作”阶段交错排列。

当多名工作人员同时出现时,相互之间的交stream并不那么清楚。

如果你有n个繁忙的线程全部独立工作,这将比叉式join(FJ)池中的n个线程更好。 但它从来没有这样的方式。

可能无法精确地将问题分成n个相同的部分。 即使你这样做,线程调度也有一些不公平的地方。 你会最终等待最慢的线程。 如果你有多个任务,那么每个任务都可以以less于n路并行(通常更高效)的方式运行,而当其他任务完成时,它们可以以n路的方式运行。

那么为什么我们不把问题简化成FJ大小的块,并且有一个线程池。 典型的FJ用法将问题切成小块。 以随机顺序进行这些操作,需要在硬件层面进行多方面的协调。 pipe理费用将是一个杀手。 在FJ中,任务被放到一个队列中,线程以后进先出的顺序(LIFO /堆栈)读取,而偷工(通常在核心工作中)是先进先出(FIFO /“队列”)。 结果是长arrays处理可以很大程度上依次完成,即使它被分成很小的块。 (也有可能把这个问题分解成一个个大的小块,把它们分解成几个小块,也许并不是微不足道的。

结论:FJ允许在不均匀的情况下更有效地使用硬件线程,如果您拥有多个线程,则总会有这种情况。

fork / join与线程池不同,因为它实现了偷工。 从叉/join

与任何ExecutorService一样,fork / join框架将任务分配给线程池中的工作线程。 fork / join框架是不同的,因为它使用工作偷algorithm。 工作线程耗尽的事情可以从其他仍然繁忙的线程中窃取任务。

假设你有两个线程,4个任务a,b,c,d分别取1,1,5和6秒。 最初,a和b分配给线程1,c和d分配给线程2.在线程池中,这需要11秒。 使用fork / join,线程1完成并可以从线程2窃取工作,因此任务d最终将由线程1执行。线程1执行a,b和d,线程2只是c。 总体时间:8秒,不是11。

编辑:如Joonas指出,任务不一定预先分配给一个线程。 fork / join的思想是一个线程可以select将一个任务分成多个子部分。 所以要重申以上内容:

我们有两个任务(ab)和(cd)分别需要2秒和11秒。 线程1开始执行ab并将其分成两个子任务a&b。 与线程2类似,它分成两个子任务c&d。 当线程1完成a&b时,它可以从线程2中窃取d。

在这个例子中,Fork / Join不会增加任何值,因为不需要分叉,并且工作线程中的工作量被平均分配。 分叉/连接只会增加开销。

这是一个关于这个问题的好文章 。 引用:

总的来说,我们可以说ThreadPoolExecutor是在工作线程中平均分配工作负载的首选。 为了能够保证这一点,你需要确切地知道input数据是什么样的。 相比之下,无论input数据如何,ForkJoinPool都能提供良好的性能,因此是一个更加稳健的解决scheme。

上面的每个人都是正确的,偷工作带来的好处是可以实现的,但要扩大为什么这样做。

主要的好处是工作者线程之间的有效协调。 这项工作必须分解和重新组合,这需要进行协调。 正如你在AH的回答中所看到的,每个线程都有自己的工作列表。 这个列表的一个重要属性是它被sorting(大任务在顶部和小任务在底部)。 每个线程执行列表底部的任务,并从其他线程列表的顶部窃取任务。

结果是:

  • 任务列表的头部和尾部可以独立同步,减less列表上的争用。
  • 重要的工作子树被拆分并由同一个线程重新组装,所以这些子树不需要线程间的协调。
  • 当一个线程窃取工作,它需要一个大块,然后再细分到自己的名单
  • 工作钢化意味着线程几乎被完全利用,直到过程结束。

大多数使用线程池的分治策略需要更多的线程间通信和协调。

线程池和Fork / Join的最终目标是一致的:两者都希望尽可能利用可用的CPU功率,以实现最大吞吐量。 最大吞吐量意味着尽可能多的任务应该在很长一段时间内完成。 需要做什么? (以下我们将假定不缺less计算任务:对于100%的CPU利用率,总是足够的,另外,在超线程的情况下,对于内核或者虚拟内核,我使用“CPU”)。

  1. 至less需要有多less个CPU可用的线程运行,因为运行较less的线程会使一个内核不被使用。
  2. 最多的线程数量必须与CPU数量一样多,因为运行更多的线程会给Scheduler分配额外的负载,而这个负载将CPU分配给不同的线程,从而导致一些CPU时间进入调度器,而不是我们的计算任务。

因此,我们发现为了获得最大的吞吐量,我们需要拥有与CPU相同数量的线程。 在Oracle的模糊例子中,您可以使用线程数量等于可用CPU数量的固定大小的线程池,也可以使用线程池。 这不会有所作为,你是对的!

那么你什么时候会遇到线程池问题呢? 那就是如果一个线程阻塞 ,因为你的线程正在等待另一个任务完成。 假设下面的例子:

 class AbcAlgorithm implements Runnable { public void run() { Future<StepAResult> aFuture = threadPool.submit(new ATask()); StepBResult bResult = stepB(); StepAResult aResult = aFuture.get(); stepC(aResult, bResult); } } 

我们在这里看到的是一个由三个步骤A,B和C组成的algorithm.A和B可以彼此独立执行,但是步骤C需要步骤A和B的结果。这个algorithm做的是提交任务A到线程池并直接执行任务b。 之后,线程将等待任务A完成,并继续步骤C.如果A和B同时完成,那么一切都很好。 但是如果A需要比B长的时间呢? 这可能是因为任务A的性质决定了它,但也可能是这种情况,因为在开始时没有任务A的线程可用,并且任务A需要等待。 (如果只有一个CPU可用,因此你的线程池只有一个线程,这甚至会导致死锁,但现在除了这点之外)。 重点是刚刚执行任务B 的线程会阻塞整个线程 。 由于我们拥有与CPU相同的线程数量,并且一个线程被阻塞,这意味着一个CPU空闲

Fork / Join解决了这个问题:在fork / join框架中,你可以编写如下相同的algorithm:

 class AbcAlgorithm implements Runnable { public void run() { ATask aTask = new ATask()); aTask.fork(); StepBResult bResult = stepB(); StepAResult aResult = aTask.join(); stepC(aResult, bResult); } } 

看起来一样,不是吗? 然而,线索是一个aTask.join 不会阻止 。 相反,这里是偷工减料的地方:线程会环顾过去分叉的其他任务,并继续执行这些任务。 首先检查它已经分叉的任务是否已经开始处理。 所以如果A还没有被另一个线程启动,那么它将会执行A,否则它将检查其他线程的队列并窃取他们的工作。 一旦另一个线程的这个其他任务完成,它将检查A是否现在完成。 如果是以上algorithm可以调用stepC 。 否则,它会寻找另一个偷取任务。 因此, fork / join池可以实现100%的CPU利用率,即使面对阻塞操作

但是有一个陷阱:偷工减料只能用于ForkJoinTaskjoin调用。 对于等待另一个线程或等待I / O操作的外部阻止操作,无法完成。 那么等待I / O完成是一件常见的任务呢? 在这种情况下,如果我们可以将一个额外的线程添加到Fork / Join池中,阻塞操作完成后将立即再次停止,这将是第二好的事情。 如果我们使用ManagedBlockerForkJoinPool实际上可以做到这一点。

斐波那契

在JavaDoc for RecursiveTask中是一个用Fork / Join计算斐波那契数的例子。 对于经典的recursion解决scheme,请参阅:

 public static int fib(int n) { if (n <= 1) { return n; } return fib(n - 1) + fib(n - 2); } 

正如JavaDocs中所解释的那样,这是计算斐波那契数的一个很好的转储方法,因为这个algorithm具有O(2 ^ n)的复杂性,而更简单的方法是可能的。 但是这个algorithm非常简单易懂,所以我们坚持下去。 假设我们想用Fork / Join加快速度。 一个天真的实现看起来像这样:

 class Fibonacci extends RecursiveTask<Long> { private final long n; Fibonacci(long n) { this.n = n; } public Long compute() { if (n <= 1) { return n; } Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); return f2.compute() + f1.join(); } } 

这个任务分成的步骤太短了,因此这将会执行非常糟糕的事情,但是你可以看到框架一般工作得很好:两个加法可以独立计算,但是我们需要两个加法来构build最终的结果。 所以一半是在另一个线程完成的。 玩得开心的做线程池没有得到死锁(可能的,但不是那么简单)。

只是为了完整:如果你真的想要使用这种recursion方法计算斐波纳契数字,这里是一个优化版本:

 class FibonacciBigSubtasks extends RecursiveTask<Long> { private final long n; FibonacciBigSubtasks(long n) { this.n = n; } public Long compute() { return fib(n); } private long fib(long n) { if (n <= 1) { return 1; } if (n > 10 && getSurplusQueuedTaskCount() < 2) { final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1); final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2); f1.fork(); return f2.compute() + f1.join(); } else { return fib(n - 1) + fib(n - 2); } } } 

这使子任务更小,因为它们只在n > 10 && getSurplusQueuedTaskCount() < 2时被分割n > 10 && getSurplusQueuedTaskCount() < 2是真的,这意味着有大量的方法调用要做( n > 10 ),并且已经没有很多的人工任务等待( getSurplusQueuedTaskCount() < 2 )。

在我的电脑上(4核心(计算超线程时为8),Intel(R)Core TM i7-2720QM CPU @ 2.20GHz), fib(50)经典方法需要64秒,而Fork /join方法,虽然不如理论上可行,但这是一个相当明显的收益。

概要

  • 是的,在你的例子中,Fork / Join与传统线程池没有任何优势。
  • 涉及阻塞时,叉/连接可以显着提高性能
  • 叉/join避免了一些死锁问题

另一个重要的区别似乎是FJ,你可以做多个复杂的“join”阶段。 考虑http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html中的合并sorting,预先分割这个工作将需要太多的编排。; 例如你需要做以下事情:

  • sorting第一季度
  • sorting第二季度
  • 合并前两季
  • 第三季度sorting
  • 整理第四季
  • 合并最后2个季度
  • 合并两半

你如何指定你必须在涉及他们的合并之前进行sorting

我一直在研究如何最好地为每个项目列表做某件事情。 我想我只是预先拆分列表并使用标准的ThreadPool。 FJ似乎最有用的时候,工作不能被预先分割成足够的独立任务,但可以recursion地拆分成彼此独立的任务(例如,sorting半部分是独立的,但是将2个sorting的半部分合并成一个sorting的整体不是)。

当你有昂贵的合并操作时,F / J也有明显的优势。 因为它分裂成一个树结构,你只做log2(n)合并,而不是n合并与线性线程分裂。 (这确实使得理论上的假设是你拥有与线程一样多的处理器,但仍然是一个优势)。对于作业分配,我们必须通过对每个索引处的值进行求和来合并数千个二维数组(所有相同的维度)。 使用fork连接和P处理器时,当P接近无穷大时,时间接近log2(n)。

1 2 3 .. 7 3 1 …. 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9 .. 1 1 0 …. 8 9 9

如果问题是我们必须等待其他线程完成(例如sorting数组或总和数组),则应该使用fork连接,因为Executor(Executors.newFixedThreadPool(2))会因限制而窒息线程数。 在这种情况下,forkjoin池将创build更多的线程来掩盖被阻塞的线程以保持相同的并行性

资料来源: http //www.oracle.com/technetwork/articles/java/fork-join-422606.html

执行分治algorithm的执行者的问题与创build子任务无关,因为Callable可以自由地向其执行者提交新的子任务并以同步或asynchronous方式等待其结果。 问题在于并行性:Callable等待另一个Callable的结果时,它处于等待状态,从而浪费了处理排队等待执行的另一个Callable的机会。

通过Doug Lea的努力在Java SE 7中添加到java.util.concurrent包中的fork / join框架填补了这一空白

来源: https //docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

池通过dynamic添加,挂起或恢复内部工作线程来尝试维护足够的活动(或可用)线程,即使某些任务停止等待join其他任务。 但是,面对阻塞的IO或其他非托pipe的同步,不能保证这样的调整

public int getPoolSize()返回已经开始但尚未终止的工作线程的数量。 此方法返回的结果可能与getParallelism()创build线程时不同,当其他人协作被阻止时保持并行性。

你会惊讶于ForkJoin在应用程序中的performance,如履带。 这里是你将学习的最好的教程 。

Fork / Join的逻辑非常简单:(1)将每个大任务分离(分叉)为较小的任务; (2)在一个单独的线程中处理每个任务(必要时将这些任务分成更小的任务); (3)join结果。