.NETasynchronousstream读/写

我一直在试图解决这个“并行编程”考试练习(在C#中):

知道Stream类包含int Read(byte[] buffer, int offset, int size)void Write(byte[] buffer, int offset, int size)方法,在C#中实现NetToFile方法,将所有从NetworkStream net实例添加到FileStream file实例。 要执行传输,请使用asynchronous读取和同步写入,避免在读取操作期间阻塞一个线程。 当net读取操作返回值0时,传输结束。为简化操作,不需要支持受控的取消操作。

 void NetToFile(NetworkStream net, FileStream file); 

我一直在试图解决这个问题,但是我正在努力解决与问题本身有关的问题。 但首先,这是我的代码:

 public static void NetToFile(NetworkStream net, FileStream file) { byte[] buffer = new byte[4096]; // buffer with 4 kB dimension int offset = 0; // read/write offset int nBytesRead = 0; // number of bytes read on each cycle IAsyncResult ar; do { // read partial content of net (asynchronously) ar = net.BeginRead(buffer,offset,buffer.Length,null,null); // wait until read is completed ar.AsyncWaitHandle.WaitOne(); // get number of bytes read on each cycle nBytesRead = net.EndRead(ar); // write partial content to file (synchronously) fs.Write(buffer,offset,nBytesRead); // update offset offset += nBytesRead; } while( nBytesRead > 0); } 

我的问题是,在问题陈述中说:

要执行传输,请使用asynchronous读取和同步写入,避免在读取操作期间阻塞一个线程

我不确定我的解决scheme是否能够完成本练习中的所需,因为我正在使用AsyncWaitHandle.WaitOne()等待asynchronous读取完成。

另一方面,我并没有真正弄清楚在这种情况下什么是“非阻塞”的解决scheme,因为FileStream写入是为了同步进行的……为了做到这一点,我必须等待直到NetworkStream读取完成才能继续进行FileStream写入,不是吗?

你能帮我解决这个问题吗?


[编辑1] 使用callback解决scheme

好吧,如果我理解了Mitchel Sellers和Willvv的回复,我build议使用callback方法将其变为“非阻塞”解决scheme。 这是我的代码,然后:

 byte[] buffer; // buffer public static void NetToFile(NetworkStream net, FileStream file) { // buffer with same dimension as file stream data buffer = new byte[file.Length]; //start asynchronous read net.BeginRead(buffer,0,buffer.Length,OnEndRead,net); } //asynchronous callback static void OnEndRead(IAsyncResult ar) { //NetworkStream retrieve NetworkStream net = (NetworkStream) ar.IAsyncState; //get number of bytes read int nBytesRead = net.EndRead(ar); //write content to file //... and now, how do I write to FileStream instance without //having its reference?? //fs.Write(buffer,0,nBytesRead); } 

正如你可能已经注意到,我卡在callback方法,因为我没有一个引用到我想要调用“写(…)”方法的FileStream实例。

此外,这不是线程安全的解决scheme,因为byte[]字段是公开的,并可以在并发NetToFile调用中共享。 我不知道如何解决这个问题,而不暴露外部范围内的这个byte[]字段…我几乎可以肯定它可能不会被这样暴露。

我不想使用lambda或匿名方法解决scheme,因为这不在“Concurrent Programing”课程的课程中。

您将需要使用NetStream读取的callback来处理这个问题。 坦率地说,将复制逻辑封装到自己的类中可能会更容易,因此您可以维护活动stream的实例。

这是我如何处理它(未testing):

 public class Assignment1 { public static void NetToFile(NetworkStream net, FileStream file) { var copier = new AsyncStreamCopier(net, file); copier.Start(); } public static void NetToFile_Option2(NetworkStream net, FileStream file) { var completedEvent = new ManualResetEvent(false); // copy as usual but listen for completion var copier = new AsyncStreamCopier(net, file); copier.Completed += (s, e) => completedEvent.Set(); copier.Start(); completedEvent.WaitOne(); } /// <summary> /// The Async Copier class reads the input Stream Async and writes Synchronously /// </summary> public class AsyncStreamCopier { public event EventHandler Completed; private readonly Stream input; private readonly Stream output; private byte[] buffer = new byte[4096]; public AsyncStreamCopier(Stream input, Stream output) { this.input = input; this.output = output; } public void Start() { GetNextChunk(); } private void GetNextChunk() { input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null); } private void InputReadComplete(IAsyncResult ar) { // input read asynchronously completed int bytesRead = input.EndRead(ar); if (bytesRead == 0) { RaiseCompleted(); return; } // write synchronously output.Write(buffer, 0, bytesRead); // get next GetNextChunk(); } private void RaiseCompleted() { if (Completed != null) { Completed(this, EventArgs.Empty); } } } } 

即使这样做是为了帮助人们做家庭作业,因为这已经有一年多的时间了,但这是完成这个任务的正确方法。 所有你需要重叠你的读/写操作 – 没有额外的线程产卵,或其他任何东西是必需的。

 public static class StreamExtensions { private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767 public static void CopyTo( this Stream input , Stream output ) { input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ; return ; } public static void CopyTo( this Stream input , Stream output , int bufferSize ) { if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" ); if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" ); byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ; int[] bufl = { 0 , 0 } ; int bufno = 0 ; IAsyncResult read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ; IAsyncResult write = null ; while ( true ) { // wait for the read operation to complete read.AsyncWaitHandle.WaitOne() ; bufl[bufno] = input.EndRead(read) ; // if zero bytes read, the copy is complete if ( bufl[bufno] == 0 ) { break ; } // wait for the in-flight write operation, if one exists, to complete // the only time one won't exist is after the very first read operation completes if ( write != null ) { write.AsyncWaitHandle.WaitOne() ; output.EndWrite(write) ; } // start the new write operation write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ; // toggle the current, in-use buffer // and start the read operation on the new buffer. // // Changed to use XOR to toggle between 0 and 1. // A little speedier than using a ternary expression. bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ; read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ; } // wait for the final in-flight write operation, if one exists, to complete // the only time one won't exist is if the input stream is empty. if ( write != null ) { write.AsyncWaitHandle.WaitOne() ; output.EndWrite(write) ; } output.Flush() ; // return to the caller ; return ; } public static async Task CopyToAsync( this Stream input , Stream output ) { await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ; return; } public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize ) { if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" ); if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" ); byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ; int[] bufl = { 0 , 0 } ; int bufno = 0 ; Task<int> read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ; Task write = null ; while ( true ) { await read ; bufl[bufno] = read.Result ; // if zero bytes read, the copy is complete if ( bufl[bufno] == 0 ) { break; } // wait for the in-flight write operation, if one exists, to complete // the only time one won't exist is after the very first read operation completes if ( write != null ) { await write ; } // start the new write operation write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ; // toggle the current, in-use buffer // and start the read operation on the new buffer. // // Changed to use XOR to toggle between 0 and 1. // A little speedier than using a ternary expression. bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ; read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ); } // wait for the final in-flight write operation, if one exists, to complete // the only time one won't exist is if the input stream is empty. if ( write != null ) { await write; } output.Flush(); // return to the caller ; return; } } 

干杯。

哇,这些都很复杂! 这是我的asynchronous解决scheme,它只是一个function。 Read()和BeginWrite()同时运行。

 /// <summary> /// Copies a stream. /// </summary> /// <param name="source">The stream containing the source data.</param> /// <param name="target">The stream that will receive the source data.</param> /// <remarks> /// This function copies until no more can be read from the stream /// and does not close the stream when done.<br/> /// Read and write are performed simultaneously to improve throughput.<br/> /// If no data can be read for 60 seconds, the copy will time-out. /// </remarks> public static void CopyStream(Stream source, Stream target) { // This stream copy supports a source-read happening at the same time // as target-write. A simpler implementation would be to use just // Write() instead of BeginWrite(), at the cost of speed. byte[] readbuffer = new byte[4096]; byte[] writebuffer = new byte[4096]; IAsyncResult asyncResult = null; for (; ; ) { // Read data into the readbuffer. The previous call to BeginWrite, if any, // is executing in the background.. int read = source.Read(readbuffer, 0, readbuffer.Length); // Ok, we have read some data and we're ready to write it, so wait here // to make sure that the previous write is done before we write again. if (asyncResult != null) { // This should work down to ~0.01kb/sec asyncResult.AsyncWaitHandle.WaitOne(60000); target.EndWrite(asyncResult); // Last step to the 'write'. if (!asyncResult.IsCompleted) // Make sure the write really completed. throw new IOException("Stream write failed."); } if (read <= 0) return; // source stream says we're done - nothing else to read. // Swap the read and write buffers so we can write what we read, and we can // use the then use the other buffer for our next read. byte[] tbuf = writebuffer; writebuffer = readbuffer; readbuffer = tbuf; // Asynchronously write the data, asyncResult.AsyncWaitHandle will // be set when done. asyncResult = target.BeginWrite(writebuffer, 0, read, null, null); } } 

我怀疑这是最快的代码(从.NET任务抽象有一些开销),但我认为这是一个更清洁的方法来整个asynchronous复制的东西。

我需要一个CopyTransformAsync ,我可以通过一个委托来做一些事情,因为块通过复制操作。 例如在复制时计算消息摘要。 这就是为什么我有兴趣滚动我自己的select。

发现:

  • CopyToAsync bufferSize是敏感的(需要一个大的缓冲区)
  • FileOptions.Asynchronous – >使其非常慢(不知道这是为什么)
  • FileStream对象的bufferSize可以更小(这不是那么重要)
  • Serialtesting显然是最快和最耗费资源的

这是我find的和我用来testing这个程序的完整的源代码 。 在我的机器上,这些testing是在SSD磁盘上运行的,相当于一个文件副本。 通常情况下,你不想用这个来复制文件,而是当你有一个networkingstream(这是我的用例),那就是当你想要使用这样的东西。

 4K buffer Serial... in 0.474s CopyToAsync... timed out CopyToAsync (Asynchronous)... timed out CopyTransformAsync... timed out CopyTransformAsync (Asynchronous)... timed out 8K buffer Serial... in 0.344s CopyToAsync... timed out CopyToAsync (Asynchronous)... timed out CopyTransformAsync... in 1.116s CopyTransformAsync (Asynchronous)... timed out 40K buffer Serial... in 0.195s CopyToAsync... in 0.624s CopyToAsync (Asynchronous)... timed out CopyTransformAsync... in 0.378s CopyTransformAsync (Asynchronous)... timed out 80K buffer Serial... in 0.190s CopyToAsync... in 0.355s CopyToAsync (Asynchronous)... in 1.196s CopyTransformAsync... in 0.300s CopyTransformAsync (Asynchronous)... in 0.886s 160K buffer Serial... in 0.432s CopyToAsync... in 0.252s CopyToAsync (Asynchronous)... in 0.454s CopyTransformAsync... in 0.447s CopyTransformAsync (Asynchronous)... in 0.555s 

在这里,您可以看到Process Explorer,性能图表作为testing运行。 基本上每个顶部 (在三个图中的较低者)是串行testing的开始。 随着缓冲区大小的增加,您可以清楚地看到吞吐量如何急剧增加。 看起来好像它计划在80K左右,这是.NET框架CopyToAsync方法在内部使用的地方。

性能图

这里的好处是最终的实现并不复杂:

 static Task CompletedTask = ((Task)Task.FromResult(0)); static async Task CopyTransformAsync(Stream inputStream , Stream outputStream , Func<ArraySegment<byte>, ArraySegment<byte>> transform = null ) { var temp = new byte[bufferSize]; var temp2 = new byte[bufferSize]; int i = 0; var readTask = inputStream .ReadAsync(temp, 0, bufferSize) .ConfigureAwait(false); var writeTask = CompletedTask.ConfigureAwait(false); for (; ; ) { // synchronize read int read = await readTask; if (read == 0) { break; } if (i++ > 0) { // synchronize write await writeTask; } var chunk = new ArraySegment<byte>(temp, 0, read); // do transform (if any) if (!(transform == null)) { chunk = transform(chunk); } // queue write writeTask = outputStream .WriteAsync(chunk.Array, chunk.Offset, chunk.Count) .ConfigureAwait(false); // queue read readTask = inputStream .ReadAsync(temp2, 0, bufferSize) .ConfigureAwait(false); // swap buffer var temp3 = temp; temp = temp2; temp2 = temp3; } await writeTask; // complete any lingering write task } 

尽pipe存在巨大的缓冲区,这种交织读/写的方法比BCL CopyToAsync快18%。

出于好奇,我改变了asynchronous调用,以典型的开始/结束asynchronous模式调用,并没有改善情况一点,使情况变得更糟。 对于所有我喜欢的任务抽象开销,当你使用async / await关键字编写代码时,他们会做一些漂亮的事情,阅读代码更好!

很奇怪,没有人提到TPL。
PFX团队(Stephen Toub)非常高兴地发表了关于如何实现并发asynchronousstream拷贝的内容。 这个post包含了过时的样本,所以这里有一个:
从code.msdn然后获取并行扩展附加

 var task = sourceStream.CopyStreamToStreamAsync(destinationStream); // do what you want with the task, for example wait when it finishes: task.Wait(); 

另外考虑使用J.Richer的AsyncEnumerator 。

你是对的,你所做的基本上是同步阅读,因为你使用WaitOne()方法,它只是停止执行,直到数据准备好,这基本上是使用Read()而不是BeginRead )和EndRead()。

你需要做的就是使用BeginRead()方法中的callback参数,在这个方法中,你定义了一个callback方法(或者一个lambdaexpression式),这个方法将在信息被读取时调用(在callback方法中必须检查stream的结束,并写入输出stream),这样你就不会阻塞主线程(你不需要WaitOne()和EndRead()。

希望这可以帮助。