从数据存储中查询大量ndb实体的最佳实践

我已经遇到了App Engine数据存储的一个有趣的限制。 我正在创build一个处理程序来帮助我们分析一个生产服务器上的一些使用情况数据。 为了执行分析,我需要查询并总结从数据存储中拉出的10,000多个实体。 计算并不难,只是通过使用示例的特定filter的项目的直方图。 我遇到的问题是,我无法从数据存储中快速获取数据,以便在查询截止date之前进行任何处理。

我已经尝试了所有我能想到的将查询分块为并行RPC调用以提高性能,但是根据appstats,我似乎无法获得实际并行执行的查询。 无论我尝试什么方法(见下文),似乎总是会回到顺序的下一个查询的瀑布。

注意:查询和分析代码确实工作,只是运行缓慢,因为我无法从数据存储快速获取数据。

背景

我没有可以分享的现场版本,但是这里是我所说的系统部分的基本模型:

class Session(ndb.Model): """ A tracked user session. (customer account (company), version, OS, etc) """ data = ndb.JsonProperty(required = False, indexed = False) class Sample(ndb.Model): name = ndb.StringProperty (required = True, indexed = True) session = ndb.KeyProperty (required = True, kind = Session) timestamp = ndb.DateTimeProperty(required = True, indexed = True) tags = ndb.StringProperty (repeated = True, indexed = True) 

您可以将样本视为用户使用某个特定名称的function的时间。 (例如:'systemA.feature_x')。 标签基于客户的详细信息,系统信息和function。 例如:['winxp','2.5.1','systemA','feature_x','premium_account'])。 所以这些标签形成一个非正规化的令牌集合,可以用来find感兴趣的样本。

我试图做的分析包括采取一个date范围,并询问每个客户帐户(公司,而不是每个用户)每天(或每小时)使用的一组function(可能是所有function)的function的次数。

所以处理程序的input是这样的:

  • 开始date
  • 结束date
  • 标签(S)

输出将是:

 [{ 'company_account': <string>, 'counts': [ {'timeperiod': <iso8601 date>, 'count': <int>}, ... ] }, ... ] 

查询的通用代码

以下是所有查询的一些共同代码。 处理程序的一般结构是一个使用webapp2的简单获取处理程序,它设置查询参数,运行查询,处理结果,创build要返回的数据。

 # -- Build Query Object --- # query_opts = {} query_opts['batch_size'] = 500 # Bring in large groups of entities q = Sample.query() q = q.order(Sample.timestamp) # Tags tag_args = [(Sample.tags == t) for t in tags] q = q.filter(ndb.query.AND(*tag_args)) def handle_sample(sample): session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb count_key = session_obj.data['customer'] addCountForPeriod(count_key, sample.timestamp) 

方法尝试

我尝试了多种方法来尝试尽可能快速并行地从数据存储中获取数据。 我迄今为止所尝试的方法包括:

A.单次迭代

这是一个更简单的基本情况下比较其他方法。 我只是build立查询,并遍历所有的项目,让ndb做它一个接一个地拉他们。

 q = q.filter(Sample.timestamp >= start_time) q = q.filter(Sample.timestamp <= end_time) q_iter = q.iter(**query_opts) for sample in q_iter: handle_sample(sample) 

B.大取

这里的想法是看我是否可以做一个非常大的获取。

 q = q.filter(Sample.timestamp >= start_time) q = q.filter(Sample.timestamp <= end_time) samples = q.fetch(20000, **query_opts) for sample in samples: handle_sample(sample) 

C.asynchronous提取时间范围

这里的想法是要认识到,样本间的时间间隔相当好,所以我可以创build一组独立的查询,将整个时间区域分成块,并尝试使用asynchronous并行运行每个块:

 # split up timestamp space into 20 equal parts and async query each of them ts_delta = (end_time - start_time) / 20 cur_start_time = start_time q_futures = [] for x in range(ts_intervals): cur_end_time = (cur_start_time + ts_delta) if x == (ts_intervals-1): # Last one has to cover full range cur_end_time = end_time f = q.filter(Sample.timestamp >= cur_start_time, Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts) q_futures.append(f) cur_start_time = cur_end_time # Now loop through and collect results for f in q_futures: samples = f.get_result() for sample in samples: handle_sample(sample) 

D.asynchronous映射

我试过这个方法,因为文档使得它听起来像ndb可能会自动利用一些并行使用Query.map_async方法。

 q = q.filter(Sample.timestamp >= start_time) q = q.filter(Sample.timestamp <= end_time) @ndb.tasklet def process_sample(sample): period_ts = getPeriodTimestamp(sample.timestamp) session_obj = yield sample.session.get_async() # Lookup the session object from cache count_key = session_obj.data['customer'] addCountForPeriod(count_key, sample.timestamp) raise ndb.Return(None) q_future = q.map_async(process_sample, **query_opts) res = q_future.get_result() 

结果

我testing了一个示例查询来收集总体响应时间和appstats跟踪。 结果是:

A.单次迭代

真实的:15.645s

这个顺序依次通过获取批次,然后从memcache中检索每个会话。

方法A appstats

B.大取

实际:12.12s

与选项A有效相同,但由于某种原因,速度稍快。

方法B appstats

C.asynchronous提取时间范围

真实的:15.251s

似乎在开始时提供了更多的并行性,但似乎在迭代结果期间被下一个调用序列放缓。 也似乎不能将会话memcache查找与挂起的查询重叠。

方法C appstats

D.asynchronous映射

实际:13.752s

这对我来说是最难理解的。 看起来它有很多重叠,但是一切似乎都是以瀑布而不是平行的方式展开的。

方法D appstats

build议

基于这一切,我错过了什么? 我只是在App Engine上设置了一个限制,还是有更好的方法来并行拖动大量的实体?

接下来要尝试什么,我感到不知所措。 我想重写客户端并行发送多个请求到应用程序引擎,但这似乎是非常蛮力。 我真的希望,应用程序引擎应该能够处理这个用例,所以我猜测有我缺less的东西。

更新

最后我发现C选项对我来说是最好的select。 我能够优化它在6.1秒内完成。 仍然不完美,但好多了。

在得到几个人的build议后,我发现以下几点是理解和记住的关键:

  • 多个查询可以并行运行
  • 只有10个RPC可以同时在飞行
  • 尝试去规范化到没有辅助查询的地步
  • 这种types的任务是更好地留下映射减less和任务队列,而不是实时查询

所以我做了什么来加快速度:

  • 我根据时间从一开始就分割查询空间。 (注意:返回实体的分区越平等越好)
  • 我进一步对数据进行了非规范化处理,以消除次级会话查询的需要
  • 我使用ndbasynchronous操作和wait_any()来重叠查询与处理

我还没有得到我期望或喜欢的表演,但现在是可行的。 我只是希望他们是一个更好的方式来把大量的连续的实体快速地放入处理器中。

像这样的大处理不应该在60秒的时间限制的用户请求中完成。 相反,它应该在支持长时间运行的请求的上下文中完成。 任务队列支持长达10分钟的请求,(我相信)正常的内存限制(F1实例,默认情况下,有128MB的内存 )。 对于更高的限制(无需请求超时,1GB内存),请使用后端 。

这里有一些可以尝试的方法:设置一个URL,在访问时触发一个任务队列任务。 如果任务队列任务已完成,它将返回一个轮询每个〜5s到另一个URL的网页,该url以true / false作为响应。 任务队列处理数据,这可能需要几十秒钟的时间,并将结果保存到数据存储区,或者作为计算的数据或渲染的网页。 一旦初始页面检测到它已经完成,用户被redirect到页面,该页面从数据存储区获取现在计算的结果。

新的实验数据处理function(AppEngine API for MapReduce)看起来非常适合解决这个问题。 它执行自动分片来执行多个并行工作进程。

App Engine上的大数据操作最好使用某种mapreduce操作来实现。

这里有一个video描述的过程,但包括BigQuery的https://developers.google.com/events/io/sessions/gooio2012/307/

这听起来不像你需要BigQuery,但你可能想使用pipe道的地图和减less部分。

你正在做的事情和mapreduce情况之间的主要区别是,你正在启动一个实例并遍历查询,在mapreduce上,每个查询都会有一个单独的实例并行运行。 你将需要一个reduce操作来“总结”所有的数据,然后把结果写在某个地方。

你有的另一个问题是你应该使用游标迭代。 https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

如果迭代器使用的是查询偏移量,则效率会很低,因为偏移量会发出相同的查询,跳过许多结果,并给出下一个集合,而光标会直接跳转到下一个集合。

Interesting Posts