为什么要通过一个耗费大量内存的大型Django QuerySet进行迭代?

问题表中包含大约一千万行。

for event in Event.objects.all(): print event 

这导致内存使用量稳步增加到4 GB左右,在这一点上行快速打印。 第一行打印之前的漫长的延迟令我感到惊讶 – 我期望它几乎立即打印。

我也尝试了Event.objects.iterator() ,其行为方式相同。

我不明白什么Django加载到内存或为什么这样做。 我期望Django在数据库级别迭代结果,这意味着结果将以大致恒定的速率打印(而不是在长时间的等待之后立即打印)。

我误解了什么?

(我不知道它是否相关,但是我正在使用PostgreSQL。)

内特C接近,但并不完全。

从文档 :

您可以通过以下方式评估QuerySet:

  • 迭代。 QuerySet是可迭代的,并且在您第一次迭代时执行它的数据库查询。 例如,这将打印数据库中所有条目的标题:

     for e in Entry.objects.all(): print e.headline 

因此,当您首次进入该循环并获取查询集的迭代forms时,您的一千万行将被一次性检索到。 您遇到的等待是Django加载数据库行并为每个行创build对象,然后返回实际上可以迭代的内容。 然后你记忆中的一切,结果溢出。

从我读的文档中, iterator()只是绕过QuerySet的内部caching机制。 我认为这样做可能是有道理的,但是反过来,这需要在您的数据库上进行一千万次单击。 也许不是所有的理想。

有效地迭代大数据集是我们仍然没有得到完全正确的,但是有一些代码片段可能会对您的目的有用:

  • 内存高效的Django QuerySet迭代器
  • 批量查询集
  • QuerySet Foreach

为什么不使用django核心的Paginator和Page对象logging在这里,可能不是更快或最有效的,而是现成的解决scheme:

https://docs.djangoproject.com/en/dev/topics/pagination/

像这样的东西:

 from django.core.paginator import Paginator from djangoapp.models import model paginator = Paginator(model.objects.all(), 1000) # chunks of 1000, you can # change this to desired chunk size for page in range(1, paginator.num_pages + 1): for row in paginator.page(page).object_list: # here you can do whatever you want with the row print "done processing page %s" % page 

Django的默认行为是在评估查询时cachingQuerySet的整个结果。 您可以使用QuerySet的迭代器方法来避免此caching:

 for event in Event.objects.all().iterator(): print event 

https://docs.djangoproject.com/en/dev/ref/models/querysets/#iterator

iterator()方法评估查询集,然后直接读取结果而不在QuerySet级别进行caching。 在遍历大量只需要访问一次的对象时,此方法可以获得更好的性能和显着的内存减less。 请注意,caching仍然在数据库级别完成。

使用迭代器()减less了我的内存使用量,但仍然比我预期的要高。 使用mpafbuild议的paginator方法使用更less的内存,但是对于我的testing用例来说,速度要慢2-3倍。

 from django.core.paginator import Paginator def chunked_iterator(queryset, chunk_size=10000): paginator = Paginator(queryset, chunk_size) for page in range(1, paginator.num_pages + 1): for obj in paginator.page(page).object_list: yield obj for event in chunked_iterator(Event.objects.all()): print event 

这是从文档: http : //docs.djangoproject.com/en/dev/ref/models/querysets/

实际上没有数据库活动发生,直到你做一些事情来评估查询集。

因此,当print event运行时,查询会触发(根据您的命令进行全表扫描)并加载结果。 你要求所有的对象,没有办法得到第一个对象,没有得到所有的对象。

但是,如果你做了这样的事情:

 Event.objects.all()[300:900] 

http://docs.djangoproject.com/en/dev/topics/db/queries/#limiting-querysets

然后它会在内部添加偏移量和限制。

对于大量的logging, 数据库游标执行得更好。 你确实需要Django中的原始SQL,Django-cursor与SQL cursur不同。

Nate Cbuild议的LIMIT – OFFSET方法可能适合您的情况。 对于大量的数据,它比游标要慢,因为它必须一遍又一遍地运行相同的查询,并且必须跳过越来越多的结果。

Django没有很好的从数据库中获取大项目的解决scheme。

 import gc # Get the events in reverse order eids = Event.objects.order_by("-id").values_list("id", flat=True) for index, eid in enumerate(eids): event = Event.object.get(id=eid) # do necessary work with event if index % 100 == 0: gc.collect() print("completed 100 items") 

values_list可以用来获取数据库中的所有id,然后分别获取每个对象。 在一段时间内,大的对象将在内存中创build,不会被垃圾收集,直到退出循环。 上面的代码在每消耗一百个项目之后手动垃圾收集。

因为这样整个查询集的对象会一次加载到内存中。 你需要把你的查询集分成较小的易消化位。 这样做的模式被称为spoonfeeding。 这是一个简短的实现。

 def spoonfeed(qs, func, chunk=1000, start=0): ''' Chunk up a large queryset and run func on each item. Works with automatic primary key fields. chunk -- how many objects to take on at once start -- PK to start from >>> spoonfeed(Spam.objects.all(), nom_nom) ''' while start < qs.order_by('pk').last().pk: for o in qs.filter(pk__gt=start, pk__lte=start+chunk): func(o) start += chunk 

这可以通过多处理来进一步改进,以并行地在多个对象上执行func

这里有一个解决scheme,包括len和count:

 class GeneratorWithLen(object): """ Generator that includes len and count for given queryset """ def __init__(self, generator, length): self.generator = generator self.length = length def __len__(self): return self.length def __iter__(self): return self.generator def __getitem__(self, item): return self.generator.__getitem__(item) def next(self): return next(self.generator) def count(self): return self.__len__() def batch(queryset, batch_size=1024): """ returns a generator that does not cache results on the QuerySet Aimed to use with expected HUGE/ENORMOUS data sets, no caching, no memory used more than batch_size :param batch_size: Size for the maximum chunk of data in memory :return: generator """ total = queryset.count() def batch_qs(_qs, _batch_size=batch_size): """ Returns a (start, end, total, queryset) tuple for each batch in the given queryset. """ for start in range(0, total, _batch_size): end = min(start + _batch_size, total) yield (start, end, total, _qs[start:end]) def generate_items(): queryset.order_by() # Clearing... ordering by id if PK autoincremental for start, end, total, qs in batch_qs(queryset): for item in qs: yield item return GeneratorWithLen(generate_items(), total) 

用法:

 events = batch(Event.objects.all()) len(events) == events.count() for event in events: # Do something with the Event 

我通常使用原始MySQL原始查询而不是Django ORM来处理这种任务。

MySQL支持stream模式,所以我们可以安全快速地循环所有logging,而不会出现内存不足的错误。

 import MySQLdb db_config = {} # config your db here connection = MySQLdb.connect( host=db_config['HOST'], user=db_config['USER'], port=int(db_config['PORT']), passwd=db_config['PASSWORD'], db=db_config['NAME']) cursor = MySQLdb.cursors.SSCursor(connection) # SSCursor for streaming mode cursor.execute("SELECT * FROM event") while True: record = cursor.fetchone() if record is None: break # Do something with record here cursor.close() connection.close() 

参考:

  1. 从MySQL中检索数百万行
  2. MySQL结果集stream如何执行与一次获取整个JDBC ResultSet