如何在一定数量的执行之后停止Runnable计划重复执行

情况

我有一个Runnable。 我有一个类,调度此Runnable使用scheduleWithFixedDelay ScheduledExecutorService执行。

目标

我想要改变这个类来调度Runnable的固定延迟执行, 无论是无限期的, 或者直到它已经运行了一定的次数,取决于传递给构造函数的一些参数。

如果可能,我想使用相同的Runnable,因为它在概念上应该是“运行”的相同的东西。

可能的方法

方法#1

有两个Runnables,一个执行数量(保持计数)后取消计划,另一个不执行:

public class MyClass{ private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); public enum Mode{ INDEFINITE, FIXED_NO_OF_TIMES } public MyClass(Mode mode){ if(mode == Mode.INDEFINITE){ scheduler.scheduleWithFixedDelay(new DoSomethingTask(), 0, 100, TimeUnit.MILLISECONDS); }else if(mode == Mode.FIXED_NO_OF_TIMES){ scheduler.scheduleWithFixedDelay(new DoSomethingNTimesTask(), 0, 100, TimeUnit.MILLISECONDS); } } private class DoSomethingTask implements Runnable{ @Override public void run(){ doSomething(); } } private class DoSomethingNTimesTask implements Runnable{ private int count = 0; @Override public void run(){ doSomething(); count++; if(count > 42){ // Cancel the scheduling. // Can you do this inside the run method, presumably using // the Future returned by the schedule method? Is it a good idea? } } } private void doSomething(){ // do something } } 

我宁愿只有一个Runnable来执行doSomething方法。 将调度绑定到Runnable感觉不对。 你怎么看待这件事?

方法#2

有一个Runnable来执行我们想要定期运行的代码。 有一个单独的计划runnable,检查第一个Runnable运行了多less次,并在达到一定数量时取消。 这可能不准确,因为它是asynchronous的。 这感觉有点麻烦。 你怎么看待这件事?

方法#3

扩展ScheduledExecutorService并添加一个方法“scheduleWithFixedDelayNTimes”。 也许这样的class级已经存在? 目前,我正在使用Executors.newSingleThreadScheduledExecutor(); 获取我的ScheduledExecutorService实例。 我想大概必须实现类似的function来实例化扩展的ScheduledExecutorService。 这可能是棘手的。 你怎么看待这件事?

没有调度程序的方法[编辑]

我无法使用调度程序。 我可以有这样的东西:

 for(int i = 0; i < numTimesToRun; i++){ doSomething(); Thread.sleep(delay); } 

并在一些线程运行。 你对那个怎么想的? 你可能仍然可以使用runnable并直接调用run方法。


欢迎任何build议。 我正在寻找一场辩论,以find实现我的目标的“最佳做法”。

您可以在Future上使用cancel()方法。 从scheduleAtFixedRate的javadocs

 Otherwise, the task will only terminate via cancellation or termination of the executor 

下面是一些示例代码,它将Runnable包装在另一个跟踪原始运行次数的Runnable中,并在运行N次后取消。

 public void runNTimes(Runnable task, int maxRunCount, long period, TimeUnit unit, ScheduledExecutorService executor) { new FixedExecutionRunnable(task, maxRunCount).runNTimes(executor, period, unit); } class FixedExecutionRunnable implements Runnable { private final AtomicInteger runCount = new AtomicInteger(); private final Runnable delegate; private volatile ScheduledFuture<?> self; private final int maxRunCount; public FixedExecutionRunnable(Runnable delegate, int maxRunCount) { this.delegate = delegate; this.maxRunCount = maxRunCount; } @Override public void run() { delegate.run(); if(runCount.incrementAndGet() == maxRunCount) { boolean interrupted = false; try { while(self == null) { try { Thread.sleep(1); } catch (InterruptedException e) { interrupted = true; } } self.cancel(false); } finally { if(interrupted) { Thread.currentThread().interrupt(); } } } } public void runNTimes(ScheduledExecutorService executor, long period, TimeUnit unit) { self = executor.scheduleAtFixedRate(this, 0, period, unit); } } 

到目前为止, Sbridges解决scheme似乎是最干净的解决scheme,除了你提到的,它将处理执行次数的责任交给了Runnable本身。 不应该担心,重复应该是处理调度的类的一个参数。 为了达到这个目的,我build议下面的devise,为Runnables引入一个新的执行器类。 该类为调度任务提供了两种公共方法,这些方法是标准的Runnables ,具有有限的或无限的重复。 如果需要,可以传递相同的Runnable进行有限和无限的调度(这对于扩展Runnable类以提供有限重复的所有提出的解决scheme来说是不可能的)。 调度器类完全封装了取消有限重复的处理:

 class MaxNScheduler { public enum ScheduleType { FixedRate, FixedDelay } private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); public ScheduledFuture<?> scheduleInfinitely(Runnable task, ScheduleType type, long initialDelay, long period, TimeUnit unit) { return scheduleNTimes(task, -1, type, initialDelay, period, unit); } /** schedule with count repetitions */ public ScheduledFuture<?> scheduleNTimes(Runnable task, int repetitions, ScheduleType type, long initialDelay, long period, TimeUnit unit) { RunnableWrapper wrapper = new RunnableWrapper(task, repetitions); ScheduledFuture<?> future; if(type == ScheduleType.FixedDelay) future = executorService.scheduleWithFixedDelay(wrapper, initialDelay, period, TimeUnit.MILLISECONDS); else future = executorService.scheduleAtFixedRate(wrapper, initialDelay, period, TimeUnit.MILLISECONDS); synchronized(wrapper) { wrapper.self = future; wrapper.notify(); // notify wrapper that it nows about it's future (pun intended) } return future; } private static class RunnableWrapper implements Runnable { private final Runnable realRunnable; private int repetitions = -1; ScheduledFuture<?> self = null; RunnableWrapper(Runnable realRunnable, int repetitions) { this.realRunnable = realRunnable; this.repetitions = repetitions; } private boolean isInfinite() { return repetitions < 0; } private boolean isFinished() { return repetitions == 0; } @Override public void run() { if(!isFinished()) // guard for calls to run when it should be cancelled already { realRunnable.run(); if(!isInfinite()) { repetitions--; if(isFinished()) { synchronized(this) // need to wait until self is actually set { if(self == null) { try { wait(); } catch(Exception e) { /* should not happen... */ } } self.cancel(false); // cancel gracefully (not throwing InterruptedException) } } } } } } } 

公平地说,pipe理重复的逻辑仍然是一个 Runnable ,但它是一个完全在MaxNScheduler内部的Runnable ,而为调度传递的Runnable任务不必关心调度的本质。 如果需要,也可以通过在每次执行RunnableWrapper.run时提供一些callback来轻松地将此问题移出到调度程序中。 这会使代码稍微复杂一些,并且会引入保留一些RunnableWrapper的映射和相应的重复的需要,这就是为什么我select将计数器保留在RunnableWrapper类中的原因。

在设置自我时,我还在包装器上添加了一些同步。 理论上这是需要的,当执行完成时,自己可能还没有被分配(一个相当理论的情况,但是只有1次重复)。

取消是正常处理的,不会抛出InterruptedException并且在执行取消之前,计划另一轮, RunnableWrapper将不会调用底层的Runnable

从API描述( ScheduledExecutorService.scheduleWithFixedDelay )引用:

创build并执行一个定期动作,在给定的初始延迟之后首先启用,然后在一个执行的终止和下一个执行的开始之间给定延迟。 如果任务的任何执行遇到exception,则后续执行被禁止。 否则,任务只会通过取消或终止执行者而终止。

所以,最简单的事情就是“抛出exception” (尽pipe这被认为是不好的做法):

 static class MyTask implements Runnable { private int runs = 0; @Override public void run() { System.out.println(runs); if (++runs >= 20) throw new RuntimeException(); } } public static void main(String[] args) { ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor(); s.scheduleWithFixedDelay(new MyTask(), 0, 100, TimeUnit.MILLISECONDS); } 

你的第一个方法似乎确定。 您可以通过将mode对象传递给其构造函数(或传递-1作为它必须运行的最大次数)来组合这两种types的可运行参数,并使用此模式确定是否必须取消可运行参数:

 private class DoSomethingNTimesTask implements Runnable{ private int count = 0; private final int limit; /** * Constructor for no limit */ private DoSomethingNTimesTask() { this(-1); } /** * Constructor allowing to set a limit * @param limit the limit (negative number for no limit) */ private DoSomethingNTimesTask(int limit) { this.limit = limit; } @Override public void run(){ doSomething(); count++; if(limit >= 0 && count > limit){ // Cancel the scheduling } } } 

你必须把预定的将来传递给你的任务,以便它自己取消,否则你可能会抛出一个exception。

这是我的build议(我相信它可以处理问题中提到的所有情况):

 public class RepeatedScheduled implements Runnable { private int repeatCounter = -1; private boolean infinite; private ScheduledExecutorService ses; private long initialDelay; private long delay; private TimeUnit unit; private final Runnable command; private Future<?> control; public RepeatedScheduled(ScheduledExecutorService ses, Runnable command, long initialDelay, long delay, TimeUnit unit) { this.ses = ses; this.initialDelay = initialDelay; this.delay = delay; this.unit = unit; this.command = command; this.infinite = true; } public RepeatedScheduled(ScheduledExecutorService ses, Runnable command, long initialDelay, long delay, TimeUnit unit, int maxExecutions) { this(ses, command, initialDelay, delay, unit); this.repeatCounter = maxExecutions; this.infinite = false; } public Future<?> submit() { // We submit this, not the received command this.control = this.ses.scheduleWithFixedDelay(this, this.initialDelay, this.delay, this.unit); return this.control; } @Override public synchronized void run() { if ( !this.infinite ) { if ( this.repeatCounter > 0 ) { this.command.run(); this.repeatCounter--; } else { this.control.cancel(false); } } else { this.command.run(); } } } 

另外,它允许外部方停止submit()方法返回的Future

用法:

 Runnable MyRunnable = ...; // Repeat 20 times RepeatedScheduled rs = new RepeatedScheduled( MySes, MyRunnable, 33, 44, TimeUnit.SECONDS, 20); Future<?> MyControl = rs.submit(); ...