将发生器拆分成块,不需要预先走线

(这个问题与这个和这个有关,但是那些是在发生器前面走的,这正是我想要避免的)

我想分成一个大块的发电机。 要求是:

  • 不要填充块:如果剩余元素的数量小于块大小,最后一个块必须更小。
  • 不要事先走生成器:计算元素是昂贵的,而且它只能由消费函数来完成,而不是由chunker
  • 这意味着,当然:不要在内存中积累(没有列表)

我已经尝试了下面的代码:

def head(iterable, max=10): for cnt, el in enumerate(iterable): yield el if cnt >= max: break def chunks(iterable, size=10): i = iter(iterable) while True: yield head(i, size) # Sample generator: the real data is much more complex, and expensive to compute els = xrange(7) for n, chunk in enumerate(chunks(els, 3)): for el in chunk: print 'Chunk %3d, value %d' % (n, el) 

这不知何故起作用:

 Chunk 0, value 0 Chunk 0, value 1 Chunk 0, value 2 Chunk 1, value 3 Chunk 1, value 4 Chunk 1, value 5 Chunk 2, value 6 ^CTraceback (most recent call last): File "xxxx.py", line 15, in <module> for el in chunk: File "xxxx.py", line 2, in head for cnt, el in enumerate(iterable): KeyboardInterrupt 

Buuuut …它永远不会停止(我必须按^C ),因为这个while True 。 我想停止这个循环,每当发生器被消耗,但我不知道如何检测这种情况。 我曾经尝试过举例:

 class NoMoreData(Exception): pass def head(iterable, max=10): for cnt, el in enumerate(iterable): yield el if cnt >= max: break if cnt == 0 : raise NoMoreData() def chunks(iterable, size=10): i = iter(iterable) while True: try: yield head(i, size) except NoMoreData: break # Sample generator: the real data is much more complex, and expensive to compute els = xrange(7) for n, chunk in enumerate(chunks(els, 2)): for el in chunk: print 'Chunk %3d, value %d' % (n, el) 

但是,这个例外只是在消费者的情况下才产生的,这不是我想要的(我想保持消费者代码的清洁)

 Chunk 0, value 0 Chunk 0, value 1 Chunk 0, value 2 Chunk 1, value 3 Chunk 1, value 4 Chunk 1, value 5 Chunk 2, value 6 Traceback (most recent call last): File "xxxx.py", line 22, in <module> for el in chunk: File "xxxx.py", line 9, in head if cnt == 0 : raise NoMoreData __main__.NoMoreData() 

如何检测发生器在chunksfunction中耗尽,而不是走路?

一种方法是查看第一个元素(如果有),然后创build并返回实际的生成器。

 def head(iterable, max=10): first = next(iterable) # raise exception when depleted def head_inner(): yield first # yield the extracted first element for cnt, el in enumerate(iterable): yield el if cnt + 1 >= max: # cnt + 1 to include first break return head_inner() 

只需在你的chunk生成器中使用它,并像捕获自定义exception一样捕获StopIterationexception。


更新:这是另一个版本,使用itertools.islice来replace大部分head函数和一个for循环。 这个简单的for循环实际上和原始代码中那些难以处理的while-try-next-except-break结构完全一样 ,所以结果更具可读性。

 def chunks(iterable, size=10): iterator = iter(iterable) for first in iterator: # stops when iterator is depleted def chunk(): # construct generator for next chunk yield first # yield element from for loop for more in islice(iterator, size - 1): yield more # yield more elements from the iterator yield chunk() # in outer generator, yield next chunk 

而且我们可以使用itertools.chain来取代内部的生成器:

 def chunks(iterable, size=10): iterator = iter(iterable) for first in iterator: yield chain([first], islice(iterator, size - 1)) 

创build组/组块的另一种方式是在使用itertools.count对象的键函数上使用itertools.count 。 由于count对象独立于可迭代 ,所以可以容易地生成块,而不需要知道可迭代是什么。

groupby每一次迭代调用count对象的next方法,并通过用块的大小对当前计数值进行整数除法来生成组/块密钥 (随后是块中的项目)。

 from itertools import groupby, count def chunks(iterable, size=10): c = count() for _, g in groupby(iterable, lambda _: next(c)//size): yield g 

由生成器函数生成的每个组/块都是一个迭代器。 但是,由于groupby对所有组使用共享迭代器,因此组迭代器不能存储在列表或任何容器中,每个组迭代器应该在下一个之前被使用。

我可以想出最快的解决scheme,感谢(在CPython中)使用纯粹的C级内置函数。 通过这样做,不需要Python字节码来产生每个块(除非底层的生成器是在Python中实现的),这具有巨大的性能优势。 在返回它之前它会遍历每个 ,但是它不会在它将要返回的块之外进行任何预先走动:

 # Py2 only to get generator based map from future_builtins import map from itertools import islice, repeat, starmap, takewhile def chunker(n, iterable): # n is size of each chunk; last chunk may be smaller return takewhile(bool, map(tuple, starmap(islice, repeat((iter(iterable), n))))) 

由于这是一个有点密集,展开版本的插图:

 def chunker(n, iterable): iterable = iter(iterable) while True: x = tuple(islice(iterable, n)) if not x: return yield x 

enumerate包装一个调用chunker会让你在需要的时候编号。

 from itertools import islice def chunk(it, n): ''' # returns chunks of n elements each >>> list(chunk(range(10), 3)) [ [0, 1, 2, ], [3, 4, 5, ], [6, 7, 8, ], [9, ] ] >>> list(chunk(list(range(10)), 3)) [ [0, 1, 2, ], [3, 4, 5, ], [6, 7, 8, ], [9, ] ] ''' def _w(g): return lambda: tuple(islice(g, n)) return iter(_w(iter(it)), ()) 

如何使用itertools.islice

 import itertools els = iter(xrange(7)) print list(itertools.islice(els, 2)) print list(itertools.islice(els, 2)) print list(itertools.islice(els, 2)) print list(itertools.islice(els, 2)) 

这使:

 [0, 1] [2, 3] [4, 5] [6] 

我有同样的问题,但发现比这里提到的更简单的解决scheme:

 def chunker(iterable, chunk_size): els = iter(iterable) while True: next_el = next(els) yield chain([next_el], islice(els, chunk_size - 1)) for i, chunk in enumerate(chunker(range(11), 2)): for el in chunk: print(i, el) # Prints the following: 0 0 0 1 1 2 1 3 2 4 2 5 3 6 3 7 4 8 4 9 5 10 

开始实现这个场景的实用性,以更快的速度制定500k +行的数据库插入解决scheme。

一个生成器处理来自数据源的数据并逐行“收益” 然后另一个生成器将输出按块分组,然后按块“分块”输出。 第二个生成器只知道块大小,没有更多。

下面是一个突出概念的例子:

 #!/usr/bin/python def firstn_gen(n): num = 0 while num < n: yield num num += 1 def chunk_gen(some_gen, chunk_size=7): res_chunk = [] for count, item in enumerate(some_gen, 1): res_chunk.append(item) if count % chunk_size == 0: yield res_chunk res_chunk[:] = [] else: yield res_chunk if __name__ == '__main__': for a_chunk in chunk_gen(firstn_gen(33)): print(a_chunk) 

在Python 2.7.12中testing:

 [0, 1, 2, 3, 4, 5, 6] [7, 8, 9, 10, 11, 12, 13] [14, 15, 16, 17, 18, 19, 20] [21, 22, 23, 24, 25, 26, 27] [28, 29, 30, 31, 32] 

你说过你不想把东西存储在内存中,那么这是否意味着你不能为当前块build立一个中间列表?

为什么不遍历生成器并在块之间插入标记值? 消费者(或合适的包装)可以忽略哨兵:

 class Sentinel(object): pass def chunk(els, size): for i, el in enumerate(els): yield el if i > 0 and i % size == 0: yield Sentinel 

编辑其他解决scheme与发电机的发电机

你不应该在你的迭代器中做一个while True ,而只需迭代它,并在每次迭代时更新块号:

 def chunk(it, maxv): n = 0 for i in it: yield n // mavx, i n += 1 

如果你想要一个发生器的发电机,你可以有:

 def chunk(a, maxv): def inner(it, maxv, l): l[0] = False for i in range(maxv): yield next(it) l[0] = True raise StopIteration it = iter(a) l = [True] while l[0] == True: yield inner(it, maxv, l) raise StopIteration 

与一个可迭代的。

testing:在python 2.7和3.4上:

 for i in chunk(range(7), 3): print 'CHUNK' for a in i: print a 

给出:

 CHUNK 0 1 2 CHUNK 3 4 5 CHUNK 6 

2.7:

 for i in chunk(xrange(7), 3): print 'CHUNK' for a in i: print a 

给出相同的结果。

但是请注意:在2.7和3.4 list(chunk(range(7))