在一个十亿档案中find最大的百位数字

我今天去面试了,被问到这个问题:

假设你有一个十亿个未分类的整数在一个磁盘文件中。 你如何确定最大的百位数字?

我甚至不确定我会从这个问题开始。 什么是最有效的过程来给出正确的结果? 我是否需要通过磁盘文件百次抓取尚未列入清单的最高号码,还是有更好的办法?

这是我的初始algorithm:

create array of size 100 [0..99]. read first 100 numbers and put into array. sort array in ascending order. while more numbers in file: get next number N. if N > array[0]: if N > array[99]: shift array[1..99] to array[0..98]. set array[99] to N. else find, using binary search, first index i where N <= array[i]. shift array[1..i-1] to array[0..i-2]. set array[i-1] to N. endif endif endwhile 

这个(非常轻微的)优势在于前100个元素没有O(n ^ 2)混洗,只是一个O(n log n)sorting,而且你很快识别并丢弃那些太小的元素。 它也使用二进制search(7个最大比较)来find正确的插入点,而不是50(平均),为一个简单的线性search(不是我build议任何人提供这样的解决scheme,只是这可能会打动采访者)。

你甚至可以获得奖励积分,build议使用像C的memcpy这样的优化shift操作,前提是你可以确定重叠不是问题。


你可能要考虑的另一个可能性是保持三个列表(每个列表最多100个整数):

 read first hundred numbers into array 1 and sort them descending. while more numbers: read up to next hundred numbers into array 2 and sort them descending. merge-sort lists 1 and 2 into list 3 (only first (largest) 100 numbers). if more numbers: read up to next hundred numbers into array 2 and sort them descending. merge-sort lists 3 and 2 into list 1 (only first (largest) 100 numbers). else copy list 3 to list 1. endif endwhile 

我不确定,但最终可能比持续的洗牌更有效率。

合并sorting是一个简单的select(用于合并sorting列表1和2到3):

 list3.clear() while list3.size() < 100: while list1.peek() >= list2.peek(): list3.add(list1.pop()) endwhile while list2.peek() >= list1.peek(): list3.add(list2.pop()) endwhile endwhile 

简而言之,由于它们已经按降序排列,因此将组合列表中的前100个值排除在外。 我没有详细检查这是否会更有效率,我只是提供这种可能性。

我怀疑面试官会对“开箱即用”思想的潜力以及你说过应该对绩效进行评估的事实留下深刻的印象。

与大多数采访一样,技术技能是他们正在研究的事情之一

面试官很明显要你指出两个关键的事实:

  • 你不能将整个整数列表读入内存,因为它太大了。 所以你必须逐一阅读。
  • 您需要一个有效的数据结构来保存100个最大的元素。 这个数据结构必须支持以下操作:
    • Get-Size :获取容器中的值的数量。
    • Find-Min :获取最小的值。
    • Delete-Min :删除最小的值,将其replace为新的更大的值。
    • Insert :将另一个元素插入到容器中。

通过评估数据结构的要求,一位计算机科学教授希望您推荐使用堆 (最小堆),因为它旨在支持我们在这里需要的操作。

例如,对于Fibonacci堆 ,操作Get-SizeFind-MinInsert all都是O(1)Delete-MinO(log n) (在这种情况下n <= 100 )。

在实践中,您可以使用您最喜欢的语言的标准库(例如C ++中的#include <queue>中的priority_queue )来使用通常使用堆实现的priority_queue #include <queue>

创build一个100个数字的数组都是-2 ^ 31。

检查从磁盘读取的第一个数字是否大于列表中的第一个数字。 如果是将数组向下复制1个索引并将其更新为新的数字。 如果不在100中检查下一个等等。

当你读完所有10亿位数字后,你应该有100个数组。

任务完成。

我会按顺序遍历这个列表。 当我走了,我添加元素到一个(或多重取决于重复)。 当集合达到100时,只有当值大于集合中的最小值(O(log m))时才会插入。 然后删除最小。

调用列表n中的值的数量和要查找的值的数量m:

这是O(n * log m)

处理algorithm的速度是绝对不相关的(除非它是完全愚蠢的)。

这里的瓶颈是I / O(指定它们在磁盘上)。 所以确保你使用大缓冲区。

保持100个整数的固定数组。 将它们初始化为一个Int.MinValue。 读取时,从10亿个整数,将它们与数组的第一个单元格(索引0)中的数字进行比较。 如果更大,则上移至下一个。 再次如果更大,则向上移动,直到达到最终值或更小的值。 然后将值存储在索引中,并将之前单元格中的所有值移入一个单元格中…执行此操作,您将find100个最大整数。

我相信最快的方法是使用一个非常大的位图来logging哪些数字存在。 为了表示一个32位的整数,这需要是大约== 536MB的2 ^ 32/8个字节。 扫描整数只需设置位图中的相应位。 然后查找最高的100个条目。

注意:如果您看到差异,则会查找最高的100个数字,而不是数字的最高100个实例。

这种方法在你的面试官可能读过的非常好的书“程序珍珠”中讨论过!

你将不得不检查每一个数字,这是没有办法的。

就提供的解决scheme略有改进而言,

给出一个100个号码的列表:

 9595 8505 ... 234 1 

你会检查新的发现值是否>我们数组的最小值,如果是,插入它。 然而,从下往上search可能会相当昂贵,您可以考虑采取分而治之的方法,例如通过评估数组中的第50项并进行比较,然后知道是否需要将值插入前50个项目,或底部50.你可以重复这个过程,以更快的search,因为我们已经消除了50%的search空间。

还要考虑整数的数据types。 如果它们是32位整数,并且你在64位系统上,那么你可能能够做一些聪明的内存处理和按位操作来处理磁盘上的两个数字,如果它们连续存在的话。

我想现在有人应该提到一个优先级队列 。 你只需要保持目前的前100名,知道最低的是什么,并能够用更高的数字来取代。 这是一个优先级队列为你做的 – 有些实现可能会对列表进行sorting,但这不是必需的。

  1. 假设1个账单+ 100个号码适合内存,最好的sortingalgorithm是堆sorting。 形成堆并获得前100个号码。 复杂性o(nlogn + 100(取前100个号码))

    改进解决scheme

    把实现分成两堆(这样插入不那么复杂),而取前100个元素做皇家合并algorithm。

下面是一些python代码,它实现了上面的ferdinand beyer提出的algorithm。 本质上它是一堆,唯一的区别是删除已经与插入操作合并

 import random import math class myds: """ implement a heap to find k greatest numbers out of all that are provided""" k = 0 getnext = None heap = [] def __init__(self, k, getnext ): """ k is the number of integers to return, getnext is a function that is called to get the next number, it returns a string to signal end of stream """ assert k>0 self.k = k self.getnext = getnext def housekeeping_bubbleup(self, index): if index == 0: return() parent_index = int(math.floor((index-1)/2)) if self.heap[parent_index] > self.heap[index]: self.heap[index], self.heap[parent_index] = self.heap[parent_index], self.heap[index] self.housekeeping_bubbleup(parent_index) return() def insertonly_level2(self, n): self.heap.append(n) #pdb.set_trace() self.housekeeping_bubbleup(len(self.heap)-1) def insertonly_level1(self, n): """ runs first k times only, can be as slow as i want """ if len(self.heap) == 0: self.heap.append(n) return() elif n > self.heap[0]: self.insertonly_level2(n) else: return() def housekeeping_bubbledown(self, index, length): child_index_l = 2*index+1 child_index_r = 2*index+2 child_index = None if child_index_l >= length and child_index_r >= length: # No child return() elif child_index_r >= length: #only left child if self.heap[child_index_l] < self.heap[index]: # If the child is smaller child_index = child_index_l else: return() else: #both child if self.heap[ child_index_r] < self.heap[ child_index_l]: child_index = child_index_r else: child_index = child_index_l self.heap[index], self.heap[ child_index] = self.heap[child_index], self.heap[index] self.housekeeping_bubbledown(child_index, length) return() def insertdelete_level1(self, n): self.heap[0] = n self.housekeeping_bubbledown(0, len(self.heap)) return() def insert_to_myds(self, n ): if len(self.heap) < self.k: self.insertonly_level1(n) elif n > self.heap[0]: #pdb.set_trace() self.insertdelete_level1(n) else: return() def run(self ): for n in self.getnext: self.insert_to_myds(n) print(self.heap) # import pdb; pdb.set_trace() return(self.heap) def createinput(n): input_arr = range(n) random.shuffle(input_arr) f = file('input', 'w') for value in input_arr: f.write(str(value)) f.write('\n') input_arr = [] with open('input') as f: input_arr = [int(x) for x in f] myds_object = myds(4, iter(input_arr)) output = myds_object.run() print output 

如果使用快速sortingfind第100个订单统计量,则平均工作量为O(十亿)。 但是我怀疑用这样的数字和由于这种方法所需的随机访问会比O(十亿log(100))更快。

这是另一个解决scheme(关于一个后来,我没有遗憾!)基于@paxdiablo提供的第二个解决scheme。 基本的想法是,只有当它们大于已有的最小值时,才应该读取另外的k个数字,而sorting并不是必须的:

 // your variables n = 100 k = a number > n and << 1 billion create array1[n], array2[k] read first n numbers into array2 find minimum and maximum of array2 while more numbers: if number > maximum: store in array1 if array1 is full: // I don't need contents of array2 anymore array2 = array1 array1 = [] else if number > minimum: store in array2 if array2 is full: x = n - array1.count() find the x largest numbers of array2 and discard the rest find minimum and maximum of array2 else: discard the number endwhile // Finally x = n - array1.count() find the x largest numbers of array2 and discard the rest return merge array1 and array2 

关键的一步是findarray2中最大的x个数的函数。 但是你可以使用这个事实,即你知道最小和最大的加速函数来寻找array2中最大的x个数。

其实,有很多可能的优化,因为你不需要sorting它,你只需要x最大的数字。

而且,如果k足够大,并且有足够的内存,那么甚至可以将它转换为用于查找n个最大数字的recursionalgorithm。

最后,如果数字已经sorting(以任何顺序),algorithm是O(n)。

显然,这只是理论上的,因为在实践中,你会使用标准的sortingalgorithm,瓶颈可能是IO。

有很多聪明的方法(比如优先级队列解决scheme),但是可以做的最简单的事情之一也是快速和高效的。

如果你想要n的前k ,考虑:

 allocate an array of k ints while more input perform insertion sort of next value into the array 

这听起来可能荒谬简单。 你可能会认为这是O(n^2) ,但实际上只有O(k*n) ,如果kn小得多(如问题陈述中所假定的那样),它接近O(n)

你可能会争辩说,恒定的因素太高,因为做平均k/2比较和移动每个input是很多。 但是,大多数价值观在第一次比较中将被拒绝,而迄今为止所看到的第k个最大值。 如果你有十亿个投入,那么只有一小部分可能会比​​第100个大。

(你可以解释一个最坏情况的input,其中每个值都比前一个值大,因此需要k比较并移动每个input,但实际上这是一个sorting后的input,问题说明input未sorting。

即使是二进制search的改进(find插入点),也只是将比较的结果削减到ceil(log_2(k)) ,除非特殊情况下与第k个这样的额外比较,否则你不太可能得到绝大多数投入的微不足道的拒绝。 它并没有减less你需要的移动次数。 给定cachingscheme和分支预测,进行7次非连续比较,然后50次连续移动似乎不会比连续进行50次比较和移动快得多。 这就是为什么许多系统都会放弃Quicksort来支持小尺寸的插入sorting。

还要考虑到这几乎不需要额外的内存,而且algorithm对caching非常友好(对于堆或优先级队列来说,这可能是对的,也可能不对),写入没有错误也是微不足道的。

读取文件的过程可能是主要瓶颈,所以真正的性能提升可能是通过为select做一个简单的解决scheme,你可以集中精力寻找一个好的缓冲策略来最小化I / O。

如果k可以任意大,接近n ,那么考虑一个优先队列或其他更智能的数据结构是有意义的。 另一种select是将input拆分为多个块,并行sorting每个块,然后合并。