如何使用固定数量的工作线程来实现简单线程

我正在寻找最简单,最直接的方法来实现以下内容:

  • 主程序实例化工作线程来完成一项任务。
  • 只有n任务可以同时运行。
  • 当达到n时,不再有工人开始工作,直到正在运行的线程的数量回落到n以下。

我认为, Executors.newFixedThreadPool适合您的要求。 有多种不同的方法可以使用生成的ExecutorService,具体取决于是否要将结果返回给主线程,或者任务是否完全独立,以及是否有一组任务要执行,或者是否任务排队响应某些事件。

  Collection<YourTask> tasks = new ArrayList<YourTask>(); YourTask yt1 = new YourTask(); ... tasks.add(yt1); ... ExecutorService exec = Executors.newFixedThreadPool(5); List<Future<YourResultType>> results = exec.invokeAll(tasks); 

或者,如果您有一个新的asynchronous任务来响应某个事件,您可能只想使用ExecutorService的简单execute(Runnable)方法。

 /* Get an executor service that will run a maximum of 5 threads at a time: */ ExecutorService exec = Executors.newFixedThreadPool(5); /* For all the 100 tasks to be done altogether... */ for (int i = 0; i < 100; i++) { /* ...execute the task to run concurrently as a runnable: */ exec.execute(new Runnable() { public void run() { /* do the work to be done in its own thread */ System.out.println("Running in: " + Thread.currentThread()); } }); } /* Tell the executor that after these 100 steps above, we will be done: */ exec.shutdown(); try { /* The tasks are now running concurrently. We wait until all work is done, * with a timeout of 50 seconds: */ boolean b = exec.awaitTermination(50, TimeUnit.SECONDS); /* If the execution timed out, false is returned: */ System.out.println("All done: " + b); } catch (InterruptedException e) { e.printStackTrace(); } 

Executors.newFixedThreadPool(INT)

 Executor executor = Executors.newFixedThreadPool(n); Runnable runnable = new Runnable() { public void run() { // do your thing here } } executor.execute(runnable); 

使用Executor框架; 即newFixedThreadPool(N)

  1. 如果你的任务队列不会是无界的,任务可以在更短的时间间隔内完成,你可以使用Executors.newFixedThreadPool(n) ; 如专家所build议的那样。

    此解决scheme的唯一缺点是无限的任务队列大小。 你无法控制它。 任务队列中的巨大堆积会降低应用程序的性能,并可能在某些情况下导致内存不足。

  2. 如果要使用ExecutorService并启用work stealing机制,其中空闲工作线程通过窃取任务队列中的任务来从繁忙工作线程共享工作负载。 它将返回ForkJoinPooltypes的Executor服务。

    公共静态ExecutorService newWorkStealingPool (int并行)

    创build一个线程池,维护足够的线程来支持给定的并行性级别,并可以使用多个队列来减less争用。 并行性级别对应于主动参与或可从事任务处理的线程的最大数量。 线程的实际数量可能会dynamic增长和减less。 偷工减料不能保证提交任务的执行顺序。

  3. 由于API的灵活性,我更喜欢ThreadPoolExecutor来控制许多参数,这些参数控制stream任务的执行。

     ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

在你的情况下,设置corePoolSize and maximumPoolSize as N 在这里你可以控制任务队列大小,定义你自己的自定义线程工厂和拒绝处理程序策略。

看看相关的SE问题dynamic控制池大小:

dynamic线程池

如果你想推出自己的:

 private static final int MAX_WORKERS = n; private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS); private boolean roomLeft() { synchronized (workers) { return (workers.size() < MAX_WORKERS); } } private void addWorker() { synchronized (workers) { workers.add(new Worker(this)); } } public void removeWorker(Worker worker) { synchronized (workers) { workers.remove(worker); } } public Example() { while (true) { if (roomLeft()) { addWorker(); } } } 

工作人员是扩展线程的类。 每个工作人员都会调用这个类的removeWorker方法,当它完成这个事情的时候,将自己作为一个参数传入。

这就是说,Executor框架看起来好多了。

编辑:任何人都在意解释为什么这是如此糟糕,而不是只是downmodding呢?

正如其他人在这里提到的,最好的办法是用Executors类创build一个线程池:

但是,如果你想自己推出,这个代码应该给你一个想法如何进行。 基本上,只需将每个新线程添加到一个线程组,并确保组中不会有多于N个活动线程:

 Task[] tasks = getTasks(); // array of tasks to complete ThreadGroup group = new ThreadGroup(); int i=0; while( i<tasks.length || group.activeCount()>0 ) { if( group.activeCount()<N && i<tasks.length ) { new TaskThread(group, tasks[i]).start(); i++; } else { Thread.sleep(100); } }