用Python获取文件的最后n行,类似于尾部

我正在为Web应用程序编写一个日志文件查看器,为此我想通过日志文件的行分页。 文件中的项目是基于底部最新项目的行。

所以我需要一个tail()方法,可以从底部读取n行,并支持偏移量。 我想到的是这样的:

 def tail(f, n, offset=0): """Reads an lines from f with an offset of offset lines.""" avg_line_length = 74 to_read = n + offset while 1: try: f.seek(-(avg_line_length * to_read), 2) except IOError: # woops. apparently file is smaller than what we want # to step back, go to the beginning instead f.seek(0) pos = f.tell() lines = f.read().splitlines() if len(lines) >= to_read or pos == 0: return lines[-to_read:offset and -offset or None] avg_line_length *= 1.3 

这是一个合理的方法吗? 推荐使用偏移量来logging日志文件的方法是什么?

这可能比你的更快。 不作线条长度的假设。 一次只读一个文件块,直到find正确数目的“\ n”个字符。

 def tail( f, lines=20 ): total_lines_wanted = lines BLOCK_SIZE = 1024 f.seek(0, 2) block_end_byte = f.tell() lines_to_go = total_lines_wanted block_number = -1 blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting # from the end of the file while lines_to_go > 0 and block_end_byte > 0: if (block_end_byte - BLOCK_SIZE > 0): # read the last block we haven't yet read f.seek(block_number*BLOCK_SIZE, 2) blocks.append(f.read(BLOCK_SIZE)) else: # file too small, start from begining f.seek(0,0) # only read what was not read blocks.append(f.read(block_end_byte)) lines_found = blocks[-1].count('\n') lines_to_go -= lines_found block_end_byte -= BLOCK_SIZE block_number -= 1 all_read_text = ''.join(reversed(blocks)) return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:]) 

我不喜欢棘手的关于线路长度的假设 – 作为一个实际问题 – 你永远不可能知道这样的事情。

一般来说,这将在循环的第一或第二遍中定位最后20行。 如果你的74个字符的东西实际上是准确的,那么你可以使块大小为2048,并且你几乎可以马上追踪20行。

另外,我并没有燃烧大量的大脑卡路里,试图与物理OS块alignment。 使用这些高级I / O软件包,我怀疑你会看到尝试在OS块边界上alignment的任何性能结果。 如果您使用较低级别的I / O,则可能会看到加速。

假设一个类似unix的系统。

 import os def tail(f, n, offset=0): stdin,stdout = os.popen2("tail -n "+n+offset+" "+f) stdin.close() lines = stdout.readlines(); stdout.close() return lines[:,-offset] 

如果读取整个文件是可以接受的,那么使用双端队列。

 from collections import deque deque(f, maxlen=n) 

在2.6之前,deques没有maxlen选项,但实现起来很简单。

 import itertools def maxque(items, size): items = iter(items) q = deque(itertools.islice(items, size)) for item in items: del q[0] q.append(item) return q 

如果需要从最后读取文件,则使用驰骋(又名指数)search。

 def tail(f, n): assert n >= 0 pos, lines = n+1, [] while len(lines) <= n: try: f.seek(-pos, 2) except IOError: f.seek(0) break finally: lines = list(f) pos *= 2 return lines[-n:] 

上面的S.洛特的答案几乎适用于我,但最终给我部分线路。 事实certificate,它会破坏块边界上的数据,因为数据以相反的顺序保存读取块。 当调用“.join(data)”时,块的顺序是错误的。 这解决了这个问题。

 def tail(f, window=20): """ Returns the last `window` lines of file `f` as a list. """ if window == 0: return [] BUFSIZ = 1024 f.seek(0, 2) bytes = f.tell() size = window + 1 block = -1 data = [] while size > 0 and bytes > 0: if bytes - BUFSIZ > 0: # Seek back one whole BUFSIZ f.seek(block * BUFSIZ, 2) # read BUFFER data.insert(0, f.read(BUFSIZ)) else: # file too small, start from begining f.seek(0,0) # only read what was not read data.insert(0, f.read(bytes)) linesFound = data[0].count('\n') size -= linesFound bytes -= BUFSIZ block -= 1 return ''.join(data).splitlines()[-window:] 

我最终使用的代码。 我认为这是迄今为止最好的:

 def tail(f, n, offset=None): """Reads an lines from f with an offset of offset lines. The return value is a tuple in the form ``(lines, has_more)`` where `has_more` is an indicator that is `True` if there are more lines in the file. """ avg_line_length = 74 to_read = n + (offset or 0) while 1: try: f.seek(-(avg_line_length * to_read), 2) except IOError: # woops. apparently file is smaller than what we want # to step back, go to the beginning instead f.seek(0) pos = f.tell() lines = f.read().splitlines() if len(lines) >= to_read or pos == 0: return lines[-to_read:offset and -offset or None], \ len(lines) > to_read or pos > 0 avg_line_length *= 1.3 

这是我的答案。 纯python。 使用时间似乎相当快。 拖拽具有100,000行日志文件的100行:

 >>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10) 0.0014600753784179688 >>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100) 0.00899195671081543 >>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000) 0.05842900276184082 >>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000) 0.5394978523254395 >>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000) 5.377126932144165 

这里是代码:

 import os def tail(f, lines=1, _buffer=4098): """Tail a file and get X lines from the end""" # place holder for the lines found lines_found = [] # block counter will be multiplied by buffer # to get the block size from the end block_counter = -1 # loop until we find X lines while len(lines_found) < lines: try: f.seek(block_counter * _buffer, os.SEEK_END) except IOError: # either file is too small, or too many lines requested f.seek(0) lines_found = f.readlines() break lines_found = f.readlines() # we found enough lines, get out # Removed this line because it was redundant the while will catch # it, I left it for history # if len(lines_found) > lines: # break # decrement the block counter to get the # next X bytes block_counter -= 1 return lines_found[-lines:] 

使用mmap简单快速的解决scheme:

 import mmap import os def tail(filename, n): """Returns last n lines from the filename. No exception handling""" size = os.path.getsize(filename) with open(filename, "rb") as f: # for Windows the mmap parameters are different fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ) try: for i in xrange(size - 1, -1, -1): if fm[i] == '\n': n -= 1 if n == -1: break return fm[i + 1 if i else 0:].splitlines() finally: fm.close() 

我发现上面的Popen是最好的解决scheme。 它是快速和肮脏的,它的工作原理对于UNIX上的Python 2.6我使用以下

  def GetLastNLines(self, n, fileName): """ Name: Get LastNLines Description: Gets last n lines using Unix tail Output: returns last n lines of a file Keyword argument: n -- number of last lines to return filename -- Name of the file you need to tail into """ p=subprocess.Popen(['tail','-n',str(n),self.__fileName], stdout=subprocess.PIPE) soutput,sinput=p.communicate() return soutput 

soutput将会包含最后的n行代码。 逐行遍历遍历:

 for line in GetLastNLines(50,'myfile.log').split('\n'): print line 

基于S.洛特的顶级投票答案(08年9月25日在21:43),但固定的小文件。

 def tail(the_file, lines_2find=20): the_file.seek(0, 2) #go to end of file bytes_in_file = the_file.tell() lines_found, total_bytes_scanned = 0, 0 while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned: byte_block = min(1024, bytes_in_file-total_bytes_scanned) the_file.seek(-(byte_block+total_bytes_scanned), 2) total_bytes_scanned += byte_block lines_found += the_file.read(1024).count('\n') the_file.seek(-total_bytes_scanned, 2) line_list = list(the_file.readlines()) return line_list[-lines_2find:] #we read at least 21 line breaks from the bottom, block by block for speed #21 to ensure we don't get a half line 

希望这是有用的。

根据类似问题的回答,在评论者的要求下提供答案,其中使用相同的技术来改变文件的最后一行,而不仅仅是得到它。

对于大尺寸的文件, mmap是最好的方法。 为了改进现有的mmap答案,这个版本在Windows和Linux之间是可移植的,并且应该运行得更快(尽pipe如果没有对GB范围内的文件进行一些修改,32位Python就不能运行,请参阅另一个关于处理提示的答案这个,并修改为在Python 2上工作 )。

 import io # Gets consistent version of open for both Py2.7 and Py3.x import itertools import mmap def skip_back_lines(mm, numlines, startidx): '''Factored out to simplify handling of n and offset''' for _ in itertools.repeat(None, numlines): startidx= mm.rfind(b'\n', 0, startidx) if startidx< 0: break return startidx def tail(f, n, offset=0): # Reopen file in binary mode with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm: # len(mm) - 1 handles files ending w/newline by getting the prior line startofline = skip_back_lines(mm, offset, len(mm) - 1) if startofline < 0: return [] # Offset lines consumed whole file, nothing to return # If using a generator function (yield-ing, see below), # this should be a plain return, no empty list endoflines = startofline + 1 # Slice end to omit offset lines # Find start of lines to capture (add 1 to move from newline to beginning of following line) startofline = skip_back_lines(mm, n, startofline) + 1 # Passing True to splitlines makes it return the list of lines without # removing the trailing newline (if any), so list mimics f.readlines() return mm[startofline:endoflines].splitlines(True) # If Windows style \r\n newlines need to be normalized to \n, and input # is ASCII compatible, can normalize newlines with: # return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True) 

这假设行尾数量足够小,您可以安全地将它们全部读入内存中; 你也可以使这个发电机function,手动读取一行,通过取代最后一行:

  mm.seek(startofline) # Call mm.readline n times, or until EOF, whichever comes first for line in itertools.islice(iter(mm.readline, b''), n): yield line 

最后,这是读取二进制模式(必要使用mmap ),所以它给出了str行(Py2)和bytes行(Py3); 如果你想要unicode (Py2)或str (Py3),迭代的方法可以调整为你解码和/或修复换行符:

  lines = itertools.islice(iter(mm.readline, b''), n) if f.encoding: # Decode if the passed file was opened with a specific encoding lines = (line.decode(f.encoding) for line in lines) if 'b' not in f.mode: # Fix line breaks if passed file opened in text mode lines = (line.replace(os.linesep, '\n') for line in lines) for line in lines: yield line 

注意:我在一台没有Pythontesting权限的机器上打字。 如果我弄错了,请告诉我。 这与我的其他答案类似,我认为它应该工作,但调整(如处理offset )可能会导致微妙的错误。 如果有任何错误,请在评论中告诉我。

为了提高非常大的文件的效率(在可能需要使用tail的日志文件情况下很常见),通常需要避免读取整个文件(即使不是一次只读取整个文件即可)但是,需要以某种方式计算行中的偏移量而不是字符。 一种可能性是用char()char(char)向后读取,但是这非常缓慢。 相反,它更好地处理更大的块。

我有一个前段时间写了一个实用函数来读取可以在这里使用的文件。

 import os, itertools def rblocks(f, blocksize=4096): """Read file as series of blocks from end of file to start. The data itself is in normal order, only the order of the blocks is reversed. ie. "hello world" -> ["ld","wor", "lo ", "hel"] Note that the file must be opened in binary mode. """ if 'b' not in f.mode.lower(): raise Exception("File must be opened using binary mode.") size = os.stat(f.name).st_size fullblocks, lastblock = divmod(size, blocksize) # The first(end of file) block will be short, since this leaves # the rest aligned on a blocksize boundary. This may be more # efficient than having the last (first in file) block be short f.seek(-lastblock,2) yield f.read(lastblock) for i in range(fullblocks-1,-1, -1): f.seek(i * blocksize) yield f.read(blocksize) def tail(f, nlines): buf = '' result = [] for block in rblocks(f): buf = block + buf lines = buf.splitlines() # Return all lines except the first (since may be partial) if lines: result.extend(lines[1:]) # First line may not be complete if(len(result) >= nlines): return result[-nlines:] buf = lines[0] return ([buf]+result)[-nlines:] f=open('file_to_tail.txt','rb') for line in tail(f, 20): print line 

[编辑]添加更多特定版本(避免需要翻转两次)

你可以使用f.seek(0,2)到你的文件的末尾,然后用readline()replace下面的代码逐行读出:

 def readline_backwards(self, f): backline = '' last = '' while not last == '\n': backline = last + backline if f.tell() <= 0: return backline f.seek(-1, 1) last = f.read(1) f.seek(-1, 1) backline = last last = '' while not last == '\n': backline = last + backline if f.tell() <= 0: return backline f.seek(-1, 1) last = f.read(1) f.seek(-1, 1) f.seek(1, 1) return backline 

基于Eyecue答案(10年6月10日在21:28):这个类添加head()和tail()方法文件对象。

 class File(file): def head(self, lines_2find=1): self.seek(0) #Rewind file return [self.next() for x in xrange(lines_2find)] def tail(self, lines_2find=1): self.seek(0, 2) #go to end of file bytes_in_file = self.tell() lines_found, total_bytes_scanned = 0, 0 while (lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned): byte_block = min(1024, bytes_in_file-total_bytes_scanned) self.seek(-(byte_block+total_bytes_scanned), 2) total_bytes_scanned += byte_block lines_found += self.read(1024).count('\n') self.seek(-total_bytes_scanned, 2) line_list = list(self.readlines()) return line_list[-lines_2find:] 

用法:

 f = File('path/to/file', 'r') f.head(3) f.tail(3) 

如果文件不以\ n结尾或确保完整的第一行被读取,则这些解决scheme中有几个会出现问题。

 def tail(file, n=1, bs=1024): f = open(file) f.seek(-1,2) l = 1-f.read(1).count('\n') # If file doesn't end in \n, count it anyway. B = f.tell() while n >= l and B > 0: block = min(bs, B) B -= block f.seek(B, 0) l += f.read(block).count('\n') f.seek(B, 0) l = min(l,n) # discard first (incomplete) line if l > n lines = f.readlines()[-l:] f.close() return lines 

我不得不从文件的最后一行读取一个特定的值,并偶然发现这个线程。 我没有用Python重新发明轮子,而是用一个很小的shell脚本,保存为/ usr / local / bin / get_last_netp:

 #! /bin/bash tail -n1 /home/leif/projects/transfer/export.log | awk {'print $14'} 

在Python程序中:

 from subprocess import check_output last_netp = int(check_output("/usr/local/bin/get_last_netp")) 

在pypi上有一些现有的可以使用pip安装的实现:

  • mtFileUtil
  • multitail
  • log4tailer

根据您的情况,使用这些现有工具可能会有好处。

这是一个非常简单的实现:

 with open('/etc/passwd', 'r') as f: try: f.seek(0,2) s = '' while s.count('\n') < 11: cur = f.tell() f.seek((cur - 10)) s = f.read(10) + s f.seek((cur - 10)) print s except Exception as e: f.readlines() 

将@papercrane解决scheme更新到python3。 打开文件open(filename, 'rb')和:

 def tail(f, window=20): """Returns the last `window` lines of file `f` as a list. """ if window == 0: return [] BUFSIZ = 1024 f.seek(0, 2) remaining_bytes = f.tell() size = window + 1 block = -1 data = [] while size > 0 and remaining_bytes > 0: if remaining_bytes - BUFSIZ > 0: # Seek back one whole BUFSIZ f.seek(block * BUFSIZ, 2) # read BUFFER bunch = f.read(BUFSIZ) else: # file too small, start from beginning f.seek(0, 0) # only read what was not read bunch = f.read(remaining_bytes) bunch = bunch.decode('utf-8') data.insert(0, bunch) size -= bunch.count('\n') remaining_bytes -= BUFSIZ block -= 1 return ''.join(data).splitlines()[-window:] 

不是第一个使用deque的例子,但更简单一个。 这是一般的:它适用于任何可迭代的对象,而不仅仅是一个文件。

 #!/usr/bin/env python import sys import collections def tail(iterable, N): deq = collections.deque() for thing in iterable: if len(deq) >= N: deq.popleft() deq.append(thing) for thing in deq: yield thing if __name__ == '__main__': for line in tail(sys.stdin,10): sys.stdout.write(line) 
 This is my version of tailf import sys, time, os filename = 'path to file' try: with open(filename) as f: size = os.path.getsize(filename) if size < 1024: s = size else: s = 999 f.seek(-s, 2) l = f.read() print l while True: line = f.readline() if not line: time.sleep(1) continue print line except IOError: pass 
 import time attemps = 600 wait_sec = 5 fname = "YOUR_PATH" with open(fname, "r") as f: where = f.tell() for i in range(attemps): line = f.readline() if not line: time.sleep(wait_sec) f.seek(where) else: print line, # already has newline 

再想一想,这可能跟这里的一样快。

 def tail( f, window=20 ): lines= ['']*window count= 0 for l in f: lines[count%window]= l count += 1 print lines[count%window:], lines[:count%window] 

这简单得多。 而且它似乎好好的步伐。

虽然这不是大文件的有效方面,但这个代码非常简单:

  1. 它读取文件对象f
  2. 它分割使用换行符返回的string\n
  3. 它得到数组列出最后的索引,使用负号代表最后的索引,并且:得到一个子数组。

     def tail(f,n): return "\n".join(f.read().split("\n")[-n:])