在python中处理多个进程中的单个文件

我有一个单一的大文本文件,我想处理每一行(做一些操作)并将它们存储在数据库中。 由于一个简单的程序花费的时间太长,我希望通过多个进程或线程完成。 每个线程/进程应该读取来自单个文件的不同的数据(不同的行),并对他们的数据(行)进行一些操作,并把它们放到数据库中,这样最后我处理了整个数据,数据库被转储我需要的数据。

但我无法弄清楚如何解决这个问题。

您正在寻找的是生产者/消费者模式

基本的线程示例

这是一个使用线程模块的基本示例(而不是多处理)

import threading import Queue import sys def do_work(in_queue, out_queue): while True: item = in_queue.get() # process result = item out_queue.put(result) in_queue.task_done() if __name__ == "__main__": work = Queue.Queue() results = Queue.Queue() total = 20 # start for workers for i in xrange(4): t = threading.Thread(target=do_work, args=(work, results)) t.daemon = True t.start() # produce data for i in xrange(total): work.put(i) work.join() # get the results for i in xrange(total): print results.get() sys.exit() 

你不会与线程共享文件对象。 你将通过提供队列中的数据来为他们创造工作。 然后每个线程都会把它拿起来处理,然后将它返回到队列中。

在多处理模块中内置了一些更高级的工具来共享数据,如列表和特殊types的队列 。 使用多处理vs线程有一些权衡,它取决于你的工作是cpu绑定还是IO绑定。

基本的multiprocessing.Pool示例

这是一个多处理池的一个非常基本的例子

 from multiprocessing import Pool def process_line(line): return "FOO: %s" % line if __name__ == "__main__": pool = Pool(4) with open('file.txt') as source_file: # chunk the work into batches of 4 lines at a time results = pool.map(process_line, source_file, 4) print results 

池是pipe理自己的进程的便利对象。 由于打开的文件可以遍历其行,因此可以将其传递给映射,映射将循环并将行传递给辅助函数。 映射块并在完成时返回整个结果。 请注意,在一个非常简单的例子中, map将在完成工作之前一次性使用您的文件。 所以请注意,如果它更大。 有更高级的方法来devise一个生产者/消费者设置。

手动“池”,限制和行重新sorting

这是一个Pool.map的手动示例,但不是消耗整个迭代,而是可以设置一个队列大小,以便只按照它可以处理的速度逐条馈送它。 我还添加了行号,以便您可以跟踪它们,并在稍后想要时使用它们。

 from multiprocessing import Process, Manager import time import itertools def do_work(in_queue, out_list): while True: item = in_queue.get() line_no, line = item # exit signal if line == None: return # fake work time.sleep(.5) result = (line_no, line) out_list.append(result) if __name__ == "__main__": num_workers = 4 manager = Manager() results = manager.list() work = manager.Queue(num_workers) # start for workers pool = [] for i in xrange(num_workers): p = Process(target=do_work, args=(work, results)) p.start() pool.append(p) # produce data with open("source.txt") as f: iters = itertools.chain(f, (None,)*num_workers) for num_and_line in enumerate(iters): work.put(num_and_line) for p in pool: p.join() # get the results # example: [(1, "foo"), (10, "bar"), (0, "start")] print sorted(results) 

这是我捏造的一个非常愚蠢的例子:

 import os.path import multiprocessing def newlinebefore(f,n): f.seek(n) c=f.read(1) while c!='\n' and n > 0: n-=1 f.seek(n) c=f.read(1) f.seek(n) return n filename='gpdata.dat' #your filename goes here. fsize=os.path.getsize(filename) #size of file (in bytes) #break the file into 20 chunks for processing. nchunks=20 initial_chunks=range(1,fsize,fsize/nchunks) #You could also do something like: #initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. with open(filename,'r') as f: start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) end_byte=[i-1 for i in start_byte] [1:] + [None] def process_piece(filename,start,end): with open(filename,'r') as f: f.seek(start+1) if(end is None): text=f.read() else: nbytes=end-start+1 text=f.read(nbytes) # process text here. createing some object to be returned # You could wrap text into a StringIO object if you want to be able to # read from it the way you would a file. returnobj=text return returnobj def wrapper(args): return process_piece(*args) filename_repeated=[filename]*len(start_byte) args=zip(filename_repeated,start_byte,end_byte) pool=multiprocessing.Pool(4) result=pool.map(wrapper,args) #Now take your results and write them to the database. print "".join(result) #I just print it to make sure I get my file back ... 

这里棘手的部分是确保我们将文件拆分为换行符,以便不会错过任何行(或只读取部分行)。 然后,每个进程读取它是文件的一部分,并返回一个可由主线程放入数据库的对象。 当然,你甚至可能需要大块地做这个部分,所以你不必一次把所有的信息保存在内存中。 (这很容易实现 – 只需将“args”列表拆分为X块并调用pool.map(wrapper,chunk) – 请参阅此处 )

把单个大文件分解成多个较小的文件,然后分别在不同的线程中处理它们。