如何处理传入的实时数据与pythonpandas

哪一个是用pandas处理现场传入数据的最推荐/ pythonic方式?

每隔几秒钟,我以下面的格式收到一个数据点:

{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0} 

我想将其附加到现有的DataFrame,然后运行一些分析。

问题是,使用DataFrame.append追加行可能会导致所有复制的性能问题。

我试过的东西:

有些人build议预先分配一个大的DataFrame,并在数据进入时更新它:

 In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5) In [2]: columns = ['high', 'low', 'open', 'close'] In [3]: df = pd.DataFrame(index=t, columns=columns) In [4]: df Out[4]: high low open close 2013-01-01 00:00:00 NaN NaN NaN NaN 2013-01-01 00:00:01 NaN NaN NaN NaN 2013-01-01 00:00:02 NaN NaN NaN NaN 2013-01-01 00:00:03 NaN NaN NaN NaN 2013-01-01 00:00:04 NaN NaN NaN NaN In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0} In [6]: data_ = pd.Series(data) In [7]: df.loc[data['time']] = data_ In [8]: df Out[8]: high low open close 2013-01-01 00:00:00 NaN NaN NaN NaN 2013-01-01 00:00:01 NaN NaN NaN NaN 2013-01-01 00:00:02 4 3 2 1 2013-01-01 00:00:03 NaN NaN NaN NaN 2013-01-01 00:00:04 NaN NaN NaN NaN 

另一个select是build立一个列表的字典。 只需将传入的数据附加到列表中,然后将其分割成更小的dataframe即可完成工作。

 In [9]: ls = [] In [10]: for n in range(5): .....: # Naive stuff ahead =) .....: time = '2013-01-01 00:00:0' + str(n) .....: d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10} .....: ls.append(d) In [11]: df = pd.DataFrame(ls[1:3]).set_index('time') In [12]: df Out[12]: close high low open stock time 2013-01-01 00:00:01 3.270078 1.008289 7.486118 2.180683 BLAH 2013-01-01 00:00:02 3.883586 2.215645 0.051799 2.310823 BLAH 

或类似的东西,也许处理input一点点。

实际上,您正试图解决两个问题:捕获实时数据并分析数据。 第一个问题可以用专门为此devise的Python日志来解决。 然后通过读取相同的日志文件可以解决另一个问题。

我将使用HDF5 / pytables如下:

  1. 将数据保持为“尽可能长”的python列表。
  2. 将您的结果附加到该列表。
  3. 当它变得“大”时:
    • 使用pandasio(和可附表)推送到HDF5商店。
    • 清除列表。
  4. 重复。

事实上,我定义的函数为每个“key”使用一个列表,以便您可以在同一个进程中将多个DataFrame存储到HDF5 Store中。


我们定义一个你每行调用一个函数d

 CACHE = {} STORE = 'store.h5' # Note: another option is to keep the actual file open def process_row(d, key, max_len=5000, _cache=CACHE): """ Append row d to the store 'key'. When the number of items in the key's cache reaches max_len, append the list of rows to the HDF5 store and clear the list. """ # keep the rows for each key separate. lst = _cache.setdefault(key, []) if len(lst) >= max_len: store_and_clear(lst, key) lst.append(d) def store_and_clear(lst, key): """ Convert key's cache list to a DataFrame and append that to HDF5. """ df = pd.DataFrame(lst) with pd.HDFStore(STORE) as store: store.append(key, df) lst.clear() 

注意:我们使用with语句在每次写入后自动closures存储。 保持开放可能会更快,但如果是这样,build议您定期冲洗(closures冲洗) 。 另外请注意,使用集合deque而不是列表可能更具可读性,但列表的性能在这里稍微好一些。

要使用这个你打电话为:

 process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}, key="df") 

注意:“df”是在pytables商店中使用的存储密钥 。

一旦工作完成,确保您store_and_clear剩余的caching:

 for k, lst in CACHE.items(): # you can instead use .iteritems() in python 2 store_and_clear(lst, k) 

现在您可以通过以下方式获得完整的DataFrame:

 with pd.HDFStore(STORE) as store: df = store["df"] # other keys will be store[key] 

一些评论:

  • 5000可以调整,尝试一些较小/较大的数字,以满足您的需求。
  • 列表追加是O(1) ,DataFrame追加是O( len(df) )。
  • 在你做数据统计或数据统计之前,你不需要pandas,使用最快的数据。
  • 此代码适用于多个密钥(数据点)进来。
  • 这是非常小的代码,我们留在香草python列表,然后pandas数据框…

此外,为了获得最新的读取,您可以定义一个get方法, 读取之前存储和清除。 这样你就可以得到最新的数据:

 def get_latest(key, _cache=CACHE): store_and_clear(_cache[key], key) with pd.HDFStore(STORE) as store: return store[key] 

现在,当你访问:

 df = get_latest("df") 

你会得到最新的“DF”。


另一个选项稍微涉及:在香草pytables中定义一个自定义表,请参阅教程 。

注意:您需要知道字段名称以创build列描述符 。