如何编写asynchronousLINQ查询?

在我读了一堆LINQ相关的东西之后,我突然意识到没有文章介绍如何编写asynchronousLINQ查询。

假设我们使用LINQ to SQL,下面的语句是明确的。 但是,如果SQL数据库响应缓慢,则使用此代码块的线程将受到阻碍。

var result = from item in Products where item.Price > 3 select item.Name; foreach (var name in result) { Console.WriteLine(name); } 

似乎目前的LINQ查询规范不提供支持。

有什么办法做asynchronous编程的LINQ? 它的工作原理就好像有一个callback通知,当结果准备好使用时,在I / O上没有任何阻塞延迟。

虽然LINQ本身并没有这个function,但框架本身就是这样的……你可以很容易地把你自己的asynchronous查询执行程序放在30行左右……实际上,我只是把它们扔在一起:)

编辑:通过写这个,我发现他们为什么没有实现它。 它不能处理匿名types,因为它们是本地作用域。 因此,你没有办法定义你的callback函数。 这是一个非常重要的事情,因为很多linq到sql的东西在select子句中创build它们。 下面的任何build议遭受同样的命运,所以我仍然认为这是最容易使用!

编辑:唯一的解决办法是不使用匿名types。 您可以将callback声明为仅使用IEnumerable(无types参数),并使用reflection来访问字段(ICK !!)。 另一种方法是将callback声明为“dynamic”…哦…等待…这还没有结束。 :)这是如何使用dynamic的另一个体面的例子。 有些人可能称之为滥用。

扔在你的公用事业库:

 public static class AsynchronousQueryExecutor { public static void Call<T>(IEnumerable<T> query, Action<IEnumerable<T>> callback, Action<Exception> errorCallback) { Func<IEnumerable<T>, IEnumerable<T>> func = new Func<IEnumerable<T>, IEnumerable<T>>(InnerEnumerate<T>); IEnumerable<T> result = null; IAsyncResult ar = func.BeginInvoke( query, new AsyncCallback(delegate(IAsyncResult arr) { try { result = ((Func<IEnumerable<T>, IEnumerable<T>>)((AsyncResult)arr).AsyncDelegate).EndInvoke(arr); } catch (Exception ex) { if (errorCallback != null) { errorCallback(ex); } return; } //errors from inside here are the callbacks problem //I think it would be confusing to report them callback(result); }), null); } private static IEnumerable<T> InnerEnumerate<T>(IEnumerable<T> query) { foreach (var item in query) //the method hangs here while the query executes { yield return item; } } } 

你可以像这样使用它:

 class Program { public static void Main(string[] args) { //this could be your linq query var qry = TestSlowLoadingEnumerable(); //We begin the call and give it our callback delegate //and a delegate to an error handler AsynchronousQueryExecutor.Call(qry, HandleResults, HandleError); Console.WriteLine("Call began on seperate thread, execution continued"); Console.ReadLine(); } public static void HandleResults(IEnumerable<int> results) { //the results are available in here foreach (var item in results) { Console.WriteLine(item); } } public static void HandleError(Exception ex) { Console.WriteLine("error"); } //just a sample lazy loading enumerable public static IEnumerable<int> TestSlowLoadingEnumerable() { Thread.Sleep(5000); foreach (var i in new int[] { 1, 2, 3, 4, 5, 6 }) { yield return i; } } } 

现在去把它放在我的博客上,非常方便。

SoftwareJedi和ulrikb的(也称为user316318)解决scheme适用于任何LINQtypes,但是(如Chris Moschini所指出的)不会委托使用Windows I / O完成端口的底层asynchronous调用。

Wesley Bakker的asynchronousDataContext文章(由Scott Hanselman的博客文章触发)描述了LINQ to SQL的类,该类使用了使用Windows I / O完成端口的sqlCommand.BeginExecuteReader / sqlCommand.EndExecuteReader。

I / O完成端口为处理多处理器系统上的多个asynchronousI / O请求提供了高效的线程模型。

基于Michael Freidgeim的回答,并提到了Scott Hansellman的博客文章,以及可以使用async / await事实,可以实现可重用的ExecuteAsync<T>(...)方法,该方法asynchronous执行底层的SqlCommand

 protected static async Task<IEnumerable<T>> ExecuteAsync<T>(IQueryable<T> query, DataContext ctx, CancellationToken token = default(CancellationToken)) { var cmd = (SqlCommand)ctx.GetCommand(query); if (cmd.Connection.State == ConnectionState.Closed) await cmd.Connection.OpenAsync(token); var reader = await cmd.ExecuteReaderAsync(token); return ctx.Translate<T>(reader); } 

然后你可以(重新)像这样使用它:

 public async Task WriteNamesToConsoleAsync(string connectionString, CancellationToken token = default(CancellationToken)) { using (var ctx = new DataContext(connectionString)) { var query = from item in Products where item.Price > 3 select item.Name; var result = await ExecuteAsync(query, ctx, token); foreach (var name in result) { Console.WriteLine(name); } } } 

我开始了一个名为Asynq的简单github项目来执行asynchronous的LINQ到SQL查询。 这个想法很简单,尽pipe在这个阶段“脆弱”(截至2011年8月16日):

  1. 让LINQ-to-SQL通过DataContext.GetCommand()将你的IQueryable翻译成一个DbCommand
  2. 对于从GetCommand()获得的抽象DbCommand实例来build立SQL 200 [058]以获得SqlCommand 。 如果你使用的是SQL CE,那么你的运气不好,因为SqlCeCommand不会公开BeginExecuteReaderEndExecuteReader的asynchronous模式。
  3. 使用BeginExecuteReaderEndExecuteReaderclosuresSqlCommand使用标准的.NET框架asynchronousI / O模式,在传递给BeginExecuteReader方法的完成callback委托中获得一个DbDataReader
  4. 现在我们有了一个DbDataReader ,我们不知道它包含的是什么列,也不知道如何将这些值映射回IQueryableElementType (最可能是连接的匿名types)。 当然,在这一点上,您可以手写您自己的列映射器,将其结果转化为匿名types或其他内容。 您必须为每个查询结果types编写一个新的查询,具体取决于LINQ-to-SQL如何处理您的IQueryable以及它生成的SQL代码。 这是一个非常讨厌的选项,我不推荐它,因为它不可维护,也不总是正确的。 LINQ to SQL可以根据传入的参数值更改查询forms,例如query.Take(10).Skip(0)产生与query.Take(10).Skip(10)不同的SQL一个不同的结果集模式。 你最好的select是以编程方式处理这个实现问题:
  5. “重新实现”一个简单的运行时对象实现器,根据IQueryableElementType Type的LINQ-to-SQL映射属性,按照定义的顺序将列从DbDataReader中提取出来。 正确实施这可能是此解决scheme中最具挑战性的部分。

正如其他人所发现的, DataContext.Translate()方法不处理匿名types,只能将DbDataReader直接映射到正确属性的LINQ-to-SQL代理对象。 由于大多数值得在LINQ中编写的查询将涉及复杂的连接,这最终需要匿名types作为最终的select子句,所以使用这个提供的泛化DataContext.Translate()方法是毫无意义的。

利用现有的成熟的LINQ-to-SQL IQueryable提供程序时,此解决scheme存在一些小缺陷:

  1. 你不能将一个对象实例映射到IQueryable的最后一个select子句中的多个匿名types属性,例如from x in db.Table1 select new { a = x, b = x } 。 LINQ-to-SQL在内部跟踪哪些列标题映射到哪些属性; 它不会将此信息公开给最终用户,因此您不知道DbDataReader中的哪些列被重用,哪些列是“不同的”。
  2. 你不能在最后的select子句中包含常量值 – 这些值不会被转换成SQL,而会从DbDataReader缺失,所以你必须构build自定义的逻辑来从IQueryableExpression树中取出这些常量值,相当麻烦,根本不合理。

我确定还有其他的查询模式可能会被破坏,但是这些是我能想到的两个最大的问题,可能会导致现有的LINQ到SQL数据访问层出现问题。

这些问题很容易失败 – 只是不要在你的查询中做这些事情,因为这两个模式都不能为查询的最终结果提供任何好处。 希望这个build议适用于所有可能导致对象实现问题的查询模式:-P。 解决不能访问LINQ-to-SQL的列映射信息是一个很难的问题。

解决问题的一个更“完整”的方法是有效地重新实现几乎所有的LINQ到SQL,这是更加耗时的:-P。 从一个高质量的开源LINQ到SQL提供者的实现将是一个很好的方法来到这里。 您需要重新实现的原因是,您可以访问所有用于将DbDataReader结果物化为对象实例的列映射信息,而不会丢失任何信息。