Python生成器可迭代迭代器的随机样本

你知道是否有办法让Python的random.sample使用生成器对象。 我试图从一个非常大的文本语料库中随机抽样。 问题是random.sample()引发下面的错误。

 TypeError: object of type 'generator' has no len() 

我在想,也许有一些itertools的做法,但找不到任何东西与一点search。

一个有点弥补的例子:

 import random def list_item(ls): for item in ls: yield item random.sample( list_item(range(100)), 20 ) 


UPDATE


按照MartinPieters的要求,我对目前提出的三种方法做了一些时间安排。 结果如下。

 Sampling 1000 from 10000 Using iterSample 0.0163 s Using sample_from_iterable 0.0098 s Using iter_sample_fast 0.0148 s Sampling 10000 from 100000 Using iterSample 0.1786 s Using sample_from_iterable 0.1320 s Using iter_sample_fast 0.1576 s Sampling 100000 from 1000000 Using iterSample 3.2740 s Using sample_from_iterable 1.9860 s Using iter_sample_fast 1.4586 s Sampling 200000 from 1000000 Using iterSample 7.6115 s Using sample_from_iterable 3.0663 s Using iter_sample_fast 1.4101 s Sampling 500000 from 1000000 Using iterSample 39.2595 s Using sample_from_iterable 4.9994 s Using iter_sample_fast 1.2178 s Sampling 2000000 from 5000000 Using iterSample 798.8016 s Using sample_from_iterable 28.6618 s Using iter_sample_fast 6.6482 s 

所以事实certificate, array.insert在涉及大样本量时有一个严重的缺点。 我用来计时的方法的代码

 from heapq import nlargest import random import timeit def iterSample(iterable, samplesize): results = [] for i, v in enumerate(iterable): r = random.randint(0, i) if r < samplesize: if i < samplesize: results.insert(r, v) # add first samplesize items in random order else: results[r] = v # at a decreasing rate, replace random items if len(results) < samplesize: raise ValueError("Sample larger than population.") return results def sample_from_iterable(iterable, samplesize): return (x for _, x in nlargest(samplesize, ((random.random(), x) for x in iterable))) def iter_sample_fast(iterable, samplesize): results = [] iterator = iter(iterable) # Fill in the first samplesize elements: for _ in xrange(samplesize): results.append(iterator.next()) random.shuffle(results) # Randomize their positions for i, v in enumerate(iterator, samplesize): r = random.randint(0, i) if r < samplesize: results[r] = v # at a decreasing rate, replace random items if len(results) < samplesize: raise ValueError("Sample larger than population.") return results if __name__ == '__main__': pop_sizes = [int(10e+3),int(10e+4),int(10e+5),int(10e+5),int(10e+5),int(10e+5)*5] k_sizes = [int(10e+2),int(10e+3),int(10e+4),int(10e+4)*2,int(10e+4)*5,int(10e+5)*2] for pop_size, k_size in zip(pop_sizes, k_sizes): pop = xrange(pop_size) k = k_size t1 = timeit.Timer(stmt='iterSample(pop, %i)'%(k_size), setup='from __main__ import iterSample,pop') t2 = timeit.Timer(stmt='sample_from_iterable(pop, %i)'%(k_size), setup='from __main__ import sample_from_iterable,pop') t3 = timeit.Timer(stmt='iter_sample_fast(pop, %i)'%(k_size), setup='from __main__ import iter_sample_fast,pop') print 'Sampling', k, 'from', pop_size print 'Using iterSample', '%1.4f s'%(t1.timeit(number=100) / 100.0) print 'Using sample_from_iterable', '%1.4f s'%(t2.timeit(number=100) / 100.0) print 'Using iter_sample_fast', '%1.4f s'%(t3.timeit(number=100) / 100.0) print '' 

我也做了一个testing,检查所有的方法是否确实采取了一个公正的发电机样本。 因此,对于所有的方法,我从10000 100000次抽取了1000元素,并计算了人口中每个项目的平均出现频率,结果是所有三种方法所期望的。

虽然Martijn Pieters的答案是正确的,但是当samplesize变大时,速度会变慢,因为在循环中使用list.insert可能具有二次复杂性。

在我看来,这是一个替代scheme,可以在提高性能的同时保持一致性:

 def iter_sample_fast(iterable, samplesize): results = [] iterator = iter(iterable) # Fill in the first samplesize elements: try: for _ in xrange(samplesize): results.append(iterator.next()) except StopIteration: raise ValueError("Sample larger than population.") random.shuffle(results) # Randomize their positions for i, v in enumerate(iterator, samplesize): r = random.randint(0, i) if r < samplesize: results[r] = v # at a decreasing rate, replace random items return results 

差异开始显示10000以上的samplesize值。 拨打(1000000, 100000)

  • iterSample:5.05s
  • iter_sample_fast:2.64s

你不能。

您有两种select:将整个生成器读入列表,然后从该列表中进行采样,或者使用一种方法逐个读取生成器,然后从中选取采样:

 import random def iterSample(iterable, samplesize): results = [] for i, v in enumerate(iterable): r = random.randint(0, i) if r < samplesize: if i < samplesize: results.insert(r, v) # add first samplesize items in random order else: results[r] = v # at a decreasing rate, replace random items if len(results) < samplesize: raise ValueError("Sample larger than population.") return results 

此方法根据迭代中的项目数来调整下一个项目是样本的一部分的机会。 内存中不需要保存大于samplesize项目。

解决scheme不是我的; 这是作为SO的另一个答案的一部分提供的。

就这一点而言,这里是一个单线程,用于在O( n lg k )时间内生成n个项目的情况下对k个元素进行抽样而无需replace:

 from heapq import nlargest def sample_from_iterable(it, k): return (x for _, x in nlargest(k, ((random.random(), x) for x in it))) 

如果迭代器中的项目数是已知的(在其他地方对项目进行计数),另一种方法是:

 def iter_sample(iterable, iterlen, samplesize): if iterlen < samplesize: raise ValueError("Sample larger than population.") indexes = set() while len(indexes) < samplesize: indexes.add(random.randint(0,iterlen)) indexesiter = iter(sorted(indexes)) current = indexesiter.next() ret = [] for i, item in enumerate(iterable): if i == current: ret.append(item) try: current = indexesiter.next() except StopIteration: break random.shuffle(ret) return ret 

我觉得这个更快,特别是在与iterlen相比较小的情况下。 然而,当整体或接近整体的样本被要求时,就会出现问题。

iter_sample(iterlen = 10000,samplesize = 100)time:(1,'ms')iter_sample_fast(iterlen = 10000,samplesize = 100)time:(15,'ms')

iter_sample(iterlen = 1000000,samplesize = 100)time:(65,'ms')iter_sample_fast(iterlen = 1000000,samplesize = 100)time:(1477,'ms')

iter_sample(iterlen = 1000000,samplesize = 1000)time:(64,'ms')iter_sample_fast(iterlen = 1000000,samplesize = 1000)time:(1459,'ms')

iter_sample(iterlen = 1000000,samplesize = 10000)time:(86,'ms')iter_sample_fast(iterlen = 1000000,samplesize = 10000)time:(1480,'ms')

iter_sample(iterlen = 1000000,samplesize = 100000)time:(388,'ms')iter_sample_fast(iterlen = 1000000,samplesize = 100000)time:(1521,'ms')

iter_sample(iterlen = 1000000,samplesize = 1000000)time:(25359,'ms')iter_sample_fast(iterlen = 1000000,samplesize = 1000000)time:(2178,'ms')

最快的方法,直到certificate发生器是多长时间(并且将渐近地均匀分布)为止:

 def gen_sample(generator_list, sample_size, iterlen): num = 0 inds = numpy.random.random(iterlen) <= (sample_size * 1.0 / iterlen) results = [] iterator = iter(generator_list) gotten = 0 while gotten < sample_size: try: b = iterator.next() if inds[num]: results.append(b) gotten += 1 num += 1 except: num = 0 iterator = iter(generator_list) inds = numpy.random.random(iterlen) <= ((sample_size - gotten) * 1.0 / iterlen) return results 

它既是小型迭代中最快的,也是巨大的可迭代的(也可能在这之间)

 # Huge res = gen_sample(xrange(5000000), 200000, 5000000) timing: 1.22s # Small z = gen_sample(xrange(10000), 1000, 10000) timing: 0.000441