是否有可能从一个InputStream超时读取?

具体来说,问题是写一个这样的方法:

int maybeRead(InputStream in, long timeout) 

如果数据在“超时”毫秒内可用,则返回值与in.read()相同,否则为-2。 在方法返回之前,所有派生的线程都必须退出。

为了避免参数,这里的主题java.io.InputStream,由Sun(任何Java版本)所记载。 请注意,这不像看起来那么简单。 以下是Sun的文档直接支持的一些事实。

  1. in.read()方法可能是不可中断的。

  2. 在Reader或InterruptibleChannel中包装InputStream并没有帮助,因为所有这些类都可以做的是InputStream的调用方法。 如果可以使用这些类,那么可以编写一个解决scheme,直接在InputStream上执行相同的逻辑。

  3. in.available()返回0总是可以接受的。

  4. in.close()方法可能会阻塞或者什么也不做。

  5. 没有通用的方法来杀死另一个线程。

使用inputStream.available()

System.in.available()返回0总是可以接受的。

我发现相反 – 它始终返回可用字节数的最佳值。 用于InputStream.available() Javadoc:

 Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream. 

估计是不可避免的,由于时间/陈旧。 这个数字可能是一次性低估,因为新数据不断到达。 然而它在下次通话中总是“赶上” – 它应该logging所有到达的数据,在新通话的时刻到达。 有数据时永久返回0不符合上述条件。

First警告:InputStream的具体子类负责可用()

InputStream是一个抽象类。 它没有数据源。 它有可用的数据是没有意义的。 因此, available() javadoc也说明:

 The available method for class InputStream always returns 0. This method should be overridden by subclasses. 

实际上,具体的inputstream类会覆盖available(),提供有意义的值,而不是常量0。

第二个警告:确保您在Windows中inputinput时使用回车符。

如果使用System.in ,你的程序只有在你的命令行程序交给你的时候才会收到input。 如果你正在使用文件redirect/pipe道(例如somefile> java myJavaApp或somecommand | java myJavaApp),那么input数据通常会立即移交。 但是,如果您手动键入input,则数据切换可能会延迟。 例如使用windows cmd.exeshell,数据在cmd.exeshell中缓冲。 数据仅在回车(control-m或<enter> )后传递给正在执行的java程序。 这是执行环境的限制。 当然,只要shell缓冲数据,InputStream.available()将返回0,这是正确的行为; 目前没有可用的数据。 一旦数据从shell中获得,该方法返回一个值> 0.注意:Cygwin也使用cmd.exe。

最简单的解决scheme(无阻塞,所以不需要超时)

只需使用这个:

  byte[] inputData = new byte[1024]; int result = is.read(inputData, 0, is.available()); // result will indicate number of bytes read; -1 for EOF with no data read. 

或者等同地,

  BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024); // ... // inside some iteration / processing logic: if (br.ready()) { int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset); } 

更丰富的解决scheme(在超时期限内最大限度地填充缓冲区)

声明这个:

 public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis) throws IOException { int bufferOffset = 0; long maxTimeMillis = System.currentTimeMillis() + timeoutMillis; while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) { int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset); // can alternatively use bufferedReader, guarded by isReady(): int readResult = is.read(b, bufferOffset, readLength); if (readResult == -1) break; bufferOffset += readResult; } return bufferOffset; } 

然后使用这个:

  byte[] inputData = new byte[1024]; int readCount = readInputStreamWithTimeout(System.in, inputData, 6000); // 6 second timeout // readCount will indicate number of bytes read; -1 for EOF with no data read. 

假设你的stream没有被套接字支持(所以你不能使用Socket.setSoTimeout() ),我认为解决这类问题的标准方法是使用Future。

假设我有以下执行者和stream:

  ExecutorService executor = Executors.newFixedThreadPool(2); final PipedOutputStream outputStream = new PipedOutputStream(); final PipedInputStream inputStream = new PipedInputStream(outputStream); 

我写了一些数据,然后在写入最后一个数据和closuresstream之前等待5秒钟:

  Runnable writeTask = new Runnable() { @Override public void run() { try { outputStream.write(1); outputStream.write(2); Thread.sleep(5000); outputStream.write(3); outputStream.close(); } catch (Exception e) { e.printStackTrace(); } } }; executor.submit(writeTask); 

正常的阅读方式如下。 读取将无限期阻止数据,所以这在5秒内完成:

  long start = currentTimeMillis(); int readByte = 1; // Read data without timeout while (readByte >= 0) { readByte = inputStream.read(); if (readByte >= 0) System.out.println("Read: " + readByte); } System.out.println("Complete in " + (currentTimeMillis() - start) + "ms"); 

其输出:

 Read: 1 Read: 2 Read: 3 Complete in 5001ms 

如果还有一个更根本的问题,就像作者没有回应,读者会永远屏住。 如果我将来将这个阅读包裹起来,那么我可以如下控制超时:

  int readByte = 1; // Read data with timeout Callable<Integer> readTask = new Callable<Integer>() { @Override public Integer call() throws Exception { return inputStream.read(); } }; while (readByte >= 0) { Future<Integer> future = executor.submit(readTask); readByte = future.get(1000, TimeUnit.MILLISECONDS); if (readByte >= 0) System.out.println("Read: " + readByte); } 

其输出:

 Read: 1 Read: 2 Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228) at java.util.concurrent.FutureTask.get(FutureTask.java:91) at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74) 

我可以捕获TimeoutException并做任何我想要的清理。

我会质疑问题陈述,而不是盲目接受。 您只需要从控制台或通过networking超时。 如果后者有Socket.setSoTimeout()HttpURLConnection.setReadTimeout() ,只要你在构build/获取它们的时候正确地设置它们就可以完成所需的工作。 在应用程序稍后的时间将其留在任意位置时,InputStream就是糟糕的devise,导致实现起来非常尴尬。

如果您的InputStream由Socket支持,则可以使用setSoTimeout设置套接字超时(以毫秒为单位)。 如果read()调用在指定的超时时间内没有解锁,则会抛出SocketTimeoutExceptionexception。

只要确保在进行read()调用之前在Socket上调用setSoTimeout即可。

我没有使用Java NIO包中的类,但似乎他们可能在这里有一些帮助。 具体而言, java.nio.channels.Channels和java.nio.channels.InterruptibleChannel 。

这是一种从System.in中获取NIO FileChannel的方法,并使用超时来检查数据的可用性,这是问题中描述的问题的一个特例。 在控制台运行它,不要input任何input,并等待结果。 它在Windows和Linux上的Java 6下成功testing。

 import java.io.FileInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; public class Main { static final ByteBuffer buf = ByteBuffer.allocate(4096); public static void main(String[] args) { long timeout = 1000 * 5; try { InputStream in = extract(System.in); if (! (in instanceof FileInputStream)) throw new RuntimeException( "Could not extract a FileInputStream from STDIN."); try { int ret = maybeAvailable((FileInputStream)in, timeout); System.out.println( Integer.toString(ret) + " bytes were read."); } finally { in.close(); } } catch (Exception e) { throw new RuntimeException(e); } } /* unravels all layers of FilterInputStream wrappers to get to the * core InputStream */ public static InputStream extract(InputStream in) throws NoSuchFieldException, IllegalAccessException { Field f = FilterInputStream.class.getDeclaredField("in"); f.setAccessible(true); while( in instanceof FilterInputStream ) in = (InputStream)f.get((FilterInputStream)in); return in; } /* Returns the number of bytes which could be read from the stream, * timing out after the specified number of milliseconds. * Returns 0 on timeout (because no bytes could be read) * and -1 for end of stream. */ public static int maybeAvailable(final FileInputStream in, long timeout) throws IOException, InterruptedException { final int[] dataReady = {0}; final IOException[] maybeException = {null}; final Thread reader = new Thread() { public void run() { try { dataReady[0] = in.getChannel().read(buf); } catch (ClosedByInterruptException e) { System.err.println("Reader interrupted."); } catch (IOException e) { maybeException[0] = e; } } }; Thread interruptor = new Thread() { public void run() { reader.interrupt(); } }; reader.start(); for(;;) { reader.join(timeout); if (!reader.isAlive()) break; interruptor.start(); interruptor.join(1000); reader.join(1000); if (!reader.isAlive()) break; System.err.println("We're hung"); System.exit(1); } if ( maybeException[0] != null ) throw maybeException[0]; return dataReady[0]; } } 

有趣的是,在NetBeans 6.5中而不是在控制台上运行程序时,超时根本不起作用,调用System.exit()实际上是杀死僵尸线程所必需的。 会发生什么事是中断线程阻塞(!)在reader.interrupt()调用。 另一个testing程序(此处未显示)另外尝试closures通道,但这也不起作用。

正如jt所说,NIO是最好的(也是正确的)解决scheme。 如果你真的被一个InputStream卡住了,你也可以

  1. 产生一个独占工作的线程是从InputStream中读取并将结果放入一个可以从原始线程中读取而不会被阻塞的缓冲区。 如果您只有一个stream的实例,这应该工作得很好。 否则,您可能会使用Thread类中不推荐使用的方法来终止该线程,尽pipe这可能导致资源泄漏。

  2. 依赖isAvailable来指示可以被读取而不被阻塞的数据。 但是,在某些情况下(例如使用套接字),isAvailable可能会阻塞读取,以报告除0以外的内容。