如果使用带有大对象的枚举,Parallel.ForEach可能会导致“内存不足”exception

我试图迁移一个数据库,其中存储在数据库中的图像到数据库中的一条logging指向硬盘上的一个文件。 我正在尝试使用Parallel.ForEach来加快使用此方法查询数据的过程。

但是我注意到我正在得到一个OutOfMemoryexception。 我知道Parallel.ForEach将查询一批枚举types,以减less开销的代价,如果有一个用于间隔查询(所以如果你一次做了一堆查询,你的源代码更有可能将下一条loggingcaching在内存中把它们隔开)。 这个问题是由于我返回的logging是一个1-4Mb字节的数组,caching导致整个地址空间用完(程序必须以x86模式运行,因为目标平台将是32位机器)

有什么办法可以禁用caching或使TPL更小?


这里是一个示例程序来展示这个问题。 这必须在x86模式下进行编译,以显示问题,如果需要花费很长时间,或者在您的机器上没有发生问题,则会增加arrays的大小(我发现1 << 20在我的机器上需要大约30秒, 4 << 20几乎是瞬间的)

 class Program { static void Main(string[] args) { Parallel.ForEach(CreateData(), (data) => { data[0] = 1; }); } static IEnumerable<byte[]> CreateData() { while (true) { yield return new byte[1 << 20]; //1Mb array } } } 

Parallel.ForEach的默认选项只在任务受CPU限制并线性缩放时才能正常工作 。 当任务是CPU绑定的时候,一切正常。 如果您有一个四核,而没有其他进程在运行,那么Parallel.ForEach将使用全部四个处理器。 如果您的计算机上有四核处理器,并且其他进程正在使用一个完整的CPU,则Parallel.ForEach将使用大约三个处理器。

但是,如果任务不受CPU限制,那么Parallel.ForEach将保持启动任务,尽力保持所有CPU忙碌。 然而,无论并行运行多less任务,总会有更多未使用的CPU马力,因此不断创造任务。

你怎么知道你的任务是CPU限制? 希望只是通过检查。 如果你是素数的因素,这是显而易见的。 但其他情况并不明显。 判断你的任务是否受CPU限制的经验性方法是用ParallelOptions.MaximumDegreeOfParallelism限制最大并行度,并观察你的程序的行为。 如果你的任务是CPU限制,那么你应该在四核系统上看到这样的模式:

  • ParallelOptions.MaximumDegreeOfParallelism = 1 :使用一个完整的CPU或25%的CPU利用率
  • ParallelOptions.MaximumDegreeOfParallelism = 2 :使用两个CPU或50%的CPU使用率
  • ParallelOptions.MaximumDegreeOfParallelism = 4 :使用所有CPU或100%的CPU利用率

如果它的行为如此,那么你可以使用默认的Parallel.ForEach选项,并获得好的结果。 线性CPU利用率意味着很好的任务调度

但是,如果我在Intel i7上运行示例应用程序,无论设置的最大并行度如何,我都可以获得大约20%的CPU利用率。 为什么是这样? 垃圾收集器阻塞线程的内存太多了。 应用程序是资源绑定的,资源是内存。

同样,对数据库服务器执行长时间运行查询的I / O绑定任务也将永远无法有效地利用本地计算机上可用的所有CPU资源。 而在这种情况下,任务调度程序无法“知道何时停止”开始新的任务。

如果您的任务不受CPU限制或者CPU利用率不能以最大并行度进行线性调整,那么您应该build议Parallel.ForEach不要一次启动太多的任务。 最简单的方法是指定一个允许一些并行性来重叠I / O绑定任务的数字,但不能太多以至于压倒本地计算机对资源的需求,或者使任何远程服务器负担过重。 试验和错误是为了得到最好的结果:

 static void Main(string[] args) { Parallel.ForEach(CreateData(), new ParallelOptions { MaxDegreeOfParallelism = 4 }, (data) => { data[0] = 1; }); } 

所以,尽pipe里克所说的确是一个重要的观点,但我认为另外一个缺失的是关于分区的讨论。

Parallel::ForEach将使用默认的Partitioner<T>实现,对于没有已知长度的IEnumerable<T>将使用块分区策略。 这意味着Parallel::ForEach将用于处理数据集的每个工作线程将从IEnumerable<T>读取一些元素,然后仅由该线程处理(忽略现在的工作窃取) 。 这样做是为了节省不断的开销,分配一些新的工作,并为另一个工作线程安排时间。 所以,通常这是一件好事。但是,在您的具体情况下,想象一下您正在使用四核,并且已经将MaxDegreeOfParallelism设置为4个线程用于您的工作,现在每个线程都会从您的工作中MaxDegreeOfParallelism 100个元素IEnumerable<T> 。 那么,那就是100-400兆只是为了那个特定的工作者线程吧?

那么你如何解决这个问题呢? 很简单,你写一个自定义的Partitioner<T>实现 。 现在,分块在你的情况下仍然是有用的,所以你可能不想用一个单一的元素分区策略,因为那样你会引入开销和所有需要的任务协调。 相反,我会写一个可configuration的版本,你可以调整通过一个appsetting,直到你find你的工作量的最佳平衡。 好消息是,虽然编写这样一个实现非常简单,但实际上你甚至不必自己写这个实现,因为PFX团队已经做到了,并把它放到并行编程示例项目中 。

这个问题与分区程序有关,与并行程度无关。 解决scheme是实现一个自定义的数据分区。

如果数据集很大,看起来TPL的单声道实现保证内存不足。最近发生了这种情况(本质上是我正在运行上述循环,发现内存线性增加,直到它给我一个OOMexception)。

追查问题后,我发现,默认情况下,mono将使用EnumerablePartitioner类来分隔枚举器。 这个类有一个行为,每次数据输出到一个任务时,它都会以一个不断增加的(不可改变的)因子“分块”数据。因此,当任务第一次请求数据时,会得到一个大小的块1,下一次大小2 * 1 = 2,下一次2 * 2 = 4,然后2 * 4 = 8,等等。结果是交给任务的数据量,因此存储在内存同时增加,随着任务长度的增加,如果大量的数据正在处理,内存不足的例外不可避免地会发生。

据推测,这种行为的原因是,它想要避免每个线程多次返回来获取数据,但它似乎是基于所有正在处理的数据可以适应内存的假设(而不是从大文件)。

如前所述,可以使用自定义分区程序来避免此问题。 一个简单的返回数据到每个任务一个项目的一个通用的例子是在这里:

https://gist.github.com/evolvedmicrobe/7997971

只要首先实例化该类,然后将其交给Parallel.For而不是枚举本身