如何中断阻塞在take()上的BlockingQueue?

我有一个类从BlockingQueue中获取对象,并通过连续循环调用take()来处理它们。 在某些时候,我知道没有更多的对象会被添加到队列中。 如何中断take()方法,使其停止拦截?

以下是处理对象的类:

 public class MyObjHandler implements Runnable { private final BlockingQueue<MyObj> queue; public class MyObjHandler(BlockingQueue queue) { this.queue = queue; } public void run() { try { while (true) { MyObj obj = queue.take(); // process obj here // ... } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } 

这里是使用这个类来处理对象的方法:

 public void testHandler() { BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler // to stop waiting for more objects? } 

如果中断线程不是一种select,另一种方法是在队列中放置一个“标记”或“命令”对象,MyObjHandler可以识别这个对象并跳出循环。

 BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); Thread thread = new Thread(handler); thread.start(); for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } thread.interrupt(); 

但是,如果这样做,线程可能会被中断,而队列中还有项目正在等待处理。 您可能要考虑使用poll而不是take ,这将允许处理线程超时,并在等待一段时间而没有新input时终止。

很晚,但希望这也有助于其他,因为我面临类似的问题,并采用了上述埃里克森提出的poll方法,做了一些小的改动,

 class MyObjHandler implements Runnable { private final BlockingQueue<MyObj> queue; public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL public MyObjHandler(BlockingQueue queue) { this.queue = queue; Finished = false; } @Override public void run() { while (true) { try { MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS); if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return { // process obj here // ... } if(Finished && queue.isEmpty()) return; } catch (InterruptedException e) { return; } } } } public void testHandler() { BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjHandler handler = new MyObjHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler to stop waiting for more objects? handler.Finished = true; //THIS TELLS HIM //If you need you can wait for the termination otherwise remove join myThread.join(); } 

这解决了这两个问题

  1. 标记BlockingQueue以便它知道它不必等待更多的元素
  2. 没有中断之间,所以处理块只有当队列中的所有项目都处理完毕,没有剩余的项目将被添加终止

中断线程:

 thread.interrupt() 

或者不要打断它的讨厌。

  public class MyQueue<T> extends ArrayBlockingQueue<T> { private static final long serialVersionUID = 1L; private boolean done = false; public ParserQueue(int capacity) { super(capacity); } public void done() { done = true; } public boolean isDone() { return done; } /** * May return null if producer ends the production after consumer * has entered the element-await state. */ public T take() throws InterruptedException { T el; while ((el = super.poll()) == null && !done) { synchronized (this) { wait(); } } return el; } } 
  1. 当生产者将对象放入队列时,调用queue.notify() ,如果结束,调用queue.done()
  2. while(!queue.isDone()||!queue.isEmpty())
  3. testingtake()返回null值