如何加速读取多个文件并将数据放入数据框?

我有一些文本文件,比如50,我需要读入一个巨大的数据框。 目前,我正在使用以下步骤。

  1. 阅读每个文件,并检查标签是什么。 我需要的信息通常包含在前几行中。 相同的标签只是为文件的其余部分重复,每次都列出不同types的数据。
  2. 用这些标签创build一个数据框。
  3. 再次读取文件并填充dataframe。
  4. 将该dataframe与主dataframe连接起来。

对于文件大小为100 KB的文件来说,这种方法非常有效 – 几分钟,但在50 MB的文件中,只需要几个小时,而且不实用。

我如何优化我的代码? 尤其是 –

  1. 我怎样才能确定哪些function花费最多的时间,我需要优化? 这是文件的阅读吗? 是写入数据框吗? 我的课程在哪里花费时间?
  2. 我应该考虑multithreading还是多处理?
  3. 我可以改进algorithm吗?
    • 也许读一个列表中的整个文件,而不是一行一行,
    • 以块/整个文件parsing数据,而不是逐行parsing,
    • 将数据以块/一次分配给dataframe,而不是逐行分配。
  4. 还有什么我可以做,让我的代码执行更快?

这是一个示例代码。 我自己的代码稍微复杂一些,因为文本文件比较复杂,所以我必须使用大约10个正则expression式和多个while循环来读取数据并将其分配到正确的数组中。 为了保持MWE简单,我还没有在MWE的input文件中使用重复标签,所以它会让我无故读取文件两次。 我希望这是有道理的!

import re import pandas as pd df = pd.DataFrame() paths = ["../gitignore/test1.txt", "../gitignore/test2.txt"] reg_ex = re.compile('^(.+) (.+)\n') # read all files to determine what indices are available for path in paths: file_obj = open(path, 'r') print file_obj.readlines() ['a 1\n', 'b 2\n', 'end'] ['c 3\n', 'd 4\n', 'end'] indices = [] for path in paths: index = [] with open(path, 'r') as file_obj: line = True while line: try: line = file_obj.readline() match = reg_ex.match(line) index += match.group(1) except AttributeError: pass indices.append(index) # read files again and put data into a master dataframe for path, index in zip(paths, indices): subset_df = pd.DataFrame(index=index, columns=["Number"]) with open(path, 'r') as file_obj: line = True while line: try: line = file_obj.readline() match = reg_ex.match(line) subset_df.loc[[match.group(1)]] = match.group(2) except AttributeError: pass df = pd.concat([df, subset_df]).sort_index() print df Number a 1 b 2 c 3 d 4 

我的input文件:

test1.txt的

 a 1 b 2 end 

的test2.txt

 c 3 d 4 end 

在拔出多处理锤之前,你的第一步应该是做一些剖析。 使用cProfile快速查看以确定哪些function需要很长时间。 不幸的是,如果你的线路全部在一个函数调用中,他们将显示为库调用。 line_profiler更好,但需要更多的安装时间。

注意。 如果使用ipython,则可以使用%timeit(timeit模块的magic命令)和%prun(profile模块的magic命令)来为语句和函数定时。 谷歌search将显示一些指南。

pandas是一个非常棒的图书馆,但是我偶尔也是一个糟糕的结果。 尤其要小心append()/ concat()操作。 这可能是你的瓶颈,但你应该确定。 通常,如果不需要执行索引/列alignment,则numpy.vstack()和numpy.hstack()操作会更快。 在你的情况下,看起来你可能能够通过系列或一维numpy ndarrays可以节省时间。

顺便说一句,python中的try块比检查无效条件慢10倍甚至更多,因此确保在将每条线粘贴到循环中时绝对需要它。 这可能是时间的另一个掠夺者。 我想象一下,在match.group(1)失败的情况下,你试图阻止try块来检查AttributeError。 我会先检查有效的比赛。

即使这些小的修改应该足够让您的程序在尝试任何像多处理一样激烈的情况下运行得更快。 这些Python库非常棒,但带来了一系列新的挑战。

我已经使用了很多次,因为它是一个特别简单的多处理实现。

 import pandas as pd from multiprocessing import Pool def reader(filename): return pd.read_excel(filename) def main(): pool = Pool(4) # number of cores you want to use file_list = [file1.xlsx, file2.xlsx, file3.xlsx, ...] df_list = pool.map(reader, file_list) #creates a list of the loaded df's df = pd.concat(df_list) # concatenates all the df's into a single df if __name__ == '__main__': main() 

使用这个,你应该能够在没有太多工作的情况下大幅提高程序的速度。 如果你不知道你有多less处理器,你可以通过拉你的shell和打字来检查

 echo %NUMBER_OF_PROCESSORS% 

编辑:为了使这个运行更快,考虑改变你的文件到CSV和使用pandas函数pandas.read_csv

一般的python注意事项:

首先关于时间测量,你可以使用这样的片段:

 from time import time, sleep class Timer(object): def __init__(self): self.last = time() def __call__(self): old = self.last self.last = time() return self.last - old @property def elapsed(self): return time() - self.last timer = Timer() sleep(2) print timer.elapsed print timer() sleep(1) print timer() 

然后你可以多次testing运行代码,并检查差异。

关于这个,我在线评论:

 with open(path, 'r') as file_obj: line = True while line: #iterate on realdines instead. try: line = file_obj.readline() match = reg_ex.match(line) index += match.group(1) #if match: # index.extend(match.group(1)) # or extend except AttributeError: pass 

你以前的代码不是真正pythonic,你可能想尝试/除了。 然后尝试只在尽可能less的行上做。

相同的通知适用于第二个代码块。

如果您需要多次读取相同的文件。 您可以使用StringIO将它们存储在RAM中,或者更轻松地保留一次只能读取一次的{path:content}字典。

Python的正则expression式很慢,你的数据看起来很简单,你可以考虑在你的input行上使用split和strip方法。

  striped=[l.split() for l in [c.strip() for c in file_desc.readlines()] if l] 

我build议你阅读这个: https : //gist.github.com/JeffPaine/6213790对应的video在这里https://www.youtube.com/watch?v=OSGv2VnC0go

首先,如果你多次阅读这个文件,看来这将是瓶颈。 尝试将文件读入1个string对象,然后多次使用cStringIO

其次,在阅读所有文件之前,你还没有真正显示出build立索引的任何理由。 即使你这样做,你为什么使用pandasIO? 看起来你可以在普通的Python数据结构中build立它(也许使用__slots__ ),然后把它放在主数据框中。 如果你在读取文件Y之前不需要文件X索引(就像第二个循环似乎提示的那样),你只需要循环一次文件。

第三,你可以在string上使用简单的split / strip来分隔空格,或者如果它更复杂(有string引号等),可以使用Python标准库中的CSV模块。 在展示如何真正构build数据之前,很难提出与此相关的修复scheme。

到目前为止所显示的内容可以用简单的方法快速完成

 for path in paths: data = [] with open(path, 'r') as file_obj: for line in file_obj: try: d1, d2 = line.strip().split() except ValueError: pass data.append(d1, int(d2))) index, values = zip(*data) subset_df = pd.DataFrame({"Number": pd.Series(values, index=index)}) 

我在虚拟机上运行的时间与预先分配的磁盘空间不同(生成的文件大小约为24MB),这是不同的:

 import pandas as pd from random import randint from itertools import combinations from posix import fsync outfile = "indexValueInput" for suffix in ('1', '2'): with open(outfile+"_" + suffix, 'w') as f: for i, label in enumerate(combinations([chr(i) for i in range(ord('a'), ord('z')+1)], 8)) : val = randint(1, 1000000) print >>f, "%s %d" % (''.join(label), val) if i > 3999999: break print >>f, "end" fsync(f.fileno()) def readWithPandas(): data = [] with open(outfile + "_2", 'r') as file_obj: for line in file_obj: try: d1, d2 = str.split(line.strip()) except ValueError: pass data.append((d1, int(d2))) index, values = zip(*data) subset_df = pd.DataFrame({"Numbers": pd.Series(values, index=index)}) def readWithoutPandas(): data = [] with open(outfile+"_1", 'r') as file_obj: for line in file_obj: try: d1, d2 = str.split(line.strip()) except ValueError: pass data.append((d1, int(d2))) index, values = zip(*data) def time_func(func, *args): import time print "timing function", str(func.func_name) tStart = time.clock() func(*args) tEnd = time.clock() print "%f seconds " % (tEnd - tStart) time_func(readWithoutPandas) time_func(readWithPandas) 

由此产生的时间是:

 timing function readWithoutPandas 4.616853 seconds timing function readWithPandas 4.931765 seconds 

你可以尝试这些函数与你的索引build设,看看有什么不同的时间。 几乎可以肯定的是,减速来自多个磁盘读取。 由于Pandas没有时间从字典中构build数据框,所以在将数据传递给Pandas之前,最好先弄清楚如何在纯Python中构build索引。 但是不pipe是读取数据还是在1个磁盘读取中build立索引。

我想另外一个警告是,如果你从你的代码里面打印,期望花费大量的时间。 向tty写入纯文本所花费的时间就是读取/写入磁盘所花费的时间。

您可以导入多处理模型,并使用一组工作进程同时打开多个文件作为文件对象,加快代码的加载部分。 要testing时间,请导入date时间函数并使用以下代码:

 import datetime start=datetime.datetime.now() #part of your code goes here execTime1=datetime.datetime.now() print(execTime1-start) #the next part of your code goes here execTime2=datetime.datetime.now() print(execTime2-execTime1) 

至于每个文件只读一次,考虑使用另一个多处理脚本来build立每个文件的行列表,所以你可以检查没有文件I / O操作的匹配。

首先,为你的脚本使用一个分析器( 见这个问题) 。 分析哪一部分消耗更多的时间。 看看你是否可以优化它。

其次,我觉得I / O操作文件读取很可能是瓶颈。 它可以使用并发方法进行优化。 我会build议同时读取文件并创build数据框。 每个线程都可以将新创build​​的dataframe推送到队列中。 主线程监控队列可以从队列中获取dataframe并将其与主dataframe合并。

希望这可以帮助。

1为文件创build一个输出模板(如结果数据框架应该有列A,BC)

2读取每个文件,将其转换为输出模板(在步骤1中build立),并保存文件temp_idxx.csv,这可以并行完成:)

3将这些temp_idxx.csv文件连接成一个大文件并删除临时文件

这个程序的优点是它可以并行运行,并且不会占用所有的内存,因为它们正在创build输出格式并坚持使用它,而磁盘空间的使用

使用pd.read_csv将文件直接读入pandas数据框。 创build你的subset_df。 使用诸如skipfooter之类的方法跳过文件末尾的行,你知道你不需要。 还有更多的方法可以替代你正在使用的一些正则expression式循环函数,比如error_bad_lines和skip_blank_lines。

然后使用pandas提供的工具来清理不需要的数据。

这将允许您读取打开并只读取一次文件。

你的代码不会做你所描述的。

问题 :1.阅读每个文件,并检查标签是什么。 我需要的信息通常包含在前几行中。

但是你阅读整个文件,不仅仅是几行。 这导致两次读取文件!

问题 :2.再次读取文件,并填写数据框的值。

你一遍又一遍地在循环中覆盖df['a'|'b'|'c'|'d'] ,这是没用的
我相信这不是你想要的。
这适用于在问题中给出的数据,但不是如果你必须处理n值。


build议与不同的逻辑:

 data = {} for path in paths: with open(path, 'r') as file_obj: line = True while line: try: line = file_obj.readline() match = reg_ex.match(line) if match.group(1) not in data: data[ match.group(1) ] = [] data[match.group(1)].append( match.group(2) ) except AttributeError: pass print('data=%s' % data) df = pd.DataFrame.from_dict(data, orient='index').sort_index() df.rename(index=str, columns={0: "Number"}, inplace=True) 

输出

 data={'b': ['2'], 'a': ['1'], 'd': ['4'], 'c': ['3']} <class 'pandas.core.frame.DataFrame'> Index: 4 entries, a to d Data columns (total 1 columns): Number 4 non-null object dtypes: object(1) memory usage: 32.0+ bytes Number a 1 b 2 c 3 d 4 

时间表

  Code from Q: to_dict_from_dict 4 values 0:00:00.033071 0:00:00.022146 1000 values 0:00:08.267750 0:00:05.536500 10000 values 0:01:22.677500 0:00:55.365000 

用Pythontesting:3.4.2 – pandas:0.19.2 – re:2.2.1

事实certificate,首先创build一个空白的DataFrame,search索引find一行数据的正确位置,然后更新DataFrame的一行是一个愚蠢的时间昂贵的过程。

这样做的更快的方法是将input文件的内容读入基本的数据结构,例如列表列表或一系列的字典,然后将其转换为DataFrame。

当您读取的所有数据都在相同的列中时使用列表。 否则,使用字典来明确地说出每一位数据应该到哪一列。