如何查看文件以进行更改?

我有一个日志文件正在写另一个进程,我想看着变化。 每次发生更改时,我都希望读取新数据以对其进行一些处理。

什么是最好的方法来做到这一点? 我希望有PyWin32库的某种钩子。 我发现win32file.FindNextChangeNotification函数,但不知道如何要求它观看一个特定的文件。

如果任何人做了这样的事情,我会非常感激听到如何…

[编辑]我应该提到,我是在不需要轮询的解决scheme之后。

诅咒! 看来这不适用于映射的networking驱动器。 我猜Windows不会像在本地磁盘上那样“听到”文件的任何更新。

您是否已经查看了http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html上的文档?; 如果你只是需要它在Windows下工作的第二个例子似乎正是你想要的(如果你换目录的path与你要观看的文件之一)。

否则,轮询可能是唯一真正与平台无关的选项。

注意:我还没有尝试过这些解决scheme。

你有没有尝试使用看门狗 ?

Python API库和shell实用程序来监视文件系统事件。

目录监控很容易

  • 一个跨平台的API。
  • 用于响应目录更改而运行命令的shell工具。

使用快速入门中的简单示例快速入门 …

如果轮询对你来说足够好,我只是看“修改时间”文件属性是否改变。 阅读它:

 os.stat(filename).st_mtime 

(另请注意,Windows本机更改事件解决scheme在任何情况下都不起作用,例如在networking驱动器上。

 import os class Monkey(object): def __init__(self): self._cached_stamp = 0 self.filename = '/path/to/file' def ook(self): stamp = os.stat(self.filename).st_mtime if stamp != self._cached_stamp: self._cached_stamp = stamp # File has changed, so do something... 

如果你想要一个多平台的解决scheme,然后检查QFileSystemWatcher 。 这里有一个示例代码(未经过消毒):

 from PyQt4 import QtCore @QtCore.pyqtSlot(str) def directory_changed(path): print('Directory Changed!!!') @QtCore.pyqtSlot(str) def file_changed(path): print('File Changed!!!') fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3']) fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed) fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed) 

它不应该在Windows上工作(也许与cygwin?),但对于Unix用户,你应该使用“fcntl”系统调用。 这里是Python中的一个例子。 如果你需要用C编写(相同的函数名)

 import time import fcntl import os import signal FNAME = "/HOME/TOTO/FILETOWATCH" def handler(signum, frame): print "File %s modified" % (FNAME,) signal.signal(signal.SIGIO, handler) fd = os.open(FNAME, os.O_RDONLY) fcntl.fcntl(fd, fcntl.F_SETSIG, 0) fcntl.fcntl(fd, fcntl.F_NOTIFY, fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT) while True: time.sleep(10000) 

检查pyinotify 。

inotify在更新的linuxes中replacednotify(来自较早的答案),并且允许文件级别而不是目录级别的监视。

在Tim Golden的剧本被黑客攻击之后,我有以下几个似乎很好的工作:

 import os import win32file import win32con path_to_watch = "." # look at the current directory file_to_watch = "test.txt" # look for changes to a file called test.txt def ProcessNewData( newData ): print "Text added: %s"%newData # Set up the bits we'll need for output ACTIONS = { 1 : "Created", 2 : "Deleted", 3 : "Updated", 4 : "Renamed from something", 5 : "Renamed to something" } FILE_LIST_DIRECTORY = 0x0001 hDir = win32file.CreateFile ( path_to_watch, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None ) # Open the file we're interested in a = open(file_to_watch, "r") # Throw away any exising log data a.read() # Wait for new data and call ProcessNewData for each new chunk that's written while 1: # Wait for a change to occur results = win32file.ReadDirectoryChangesW ( hDir, 1024, False, win32con.FILE_NOTIFY_CHANGE_LAST_WRITE, None, None ) # For each change, check to see if it's updating the file we're interested in for action, file in results: full_filename = os.path.join (path_to_watch, file) #print file, ACTIONS.get (action, "Unknown") if file == file_to_watch: newText = a.read() if newText != "": ProcessNewData( newText ) 

它可能会做一个加载更多的错误检查,但只是看一个日志文件,并做了一些处理,然后再吐出到屏幕上,这个效果很好。

谢谢大家的意见 – 好东西!

检查我的答案 类似的问题 。 你可以在Python中尝试相同的循环。 本页build议:

 import time while 1: where = file.tell() line = file.readline() if not line: time.sleep(1) file.seek(where) else: print line, # already has newline 

另请参阅问题tail()Python文件 。

那么,因为你正在使用Python,你可以打开一个文件,并保持从它读取的线。

 f = open('file.log') 

如果读取的行不是空的 ,则处理它。

 line = f.readline() if line: // Do what you want with the line 

您可能会错过在EOF中继续调用readline是可以的。 在这种情况下,它只会返回一个空string。 当某些事情被追加到日志文件中时,读数会从您停止的地方继续。

如果您正在寻找使用事件或特定图书馆的解决scheme,请在您的问题中指定。 否则,我认为这个解决scheme就好了。

最简单的解决scheme是使用看门狗的工具watchmedo

https://pypi.python.org/pypi/watchdog我现在有一个进程查找目录中的sql文件,并在必要时执行它们。;

 watchmedo shell-command \ --patterns="*.sql" \ --recursive \ --command='~/Desktop/load_files_into_mysql_database.sh' \ . 

下面是一个Kender代码的简化版本,它看起来可以做同样的技巧,不会导入整个文件:

 # Check file for new data. import time f = open(r'c:\temp\test.txt', 'r') while True: line = f.readline() if not line: time.sleep(1) print 'Nothing New' else: print 'Call Function: ', line 

这是对Tim Goldan脚本的另一个修改,它在linux上运行,并通过使用字典(file => time)为文件修改添加了一个简单的监视器。

用法:whateverName.py path_to_dir_to_watch

 #!/usr/bin/env python import os, sys, time def files_to_timestamp(path): files = [os.path.join(path, f) for f in os.listdir(path)] return dict ([(f, os.path.getmtime(f)) for f in files]) if __name__ == "__main__": path_to_watch = sys.argv[1] print "Watching ", path_to_watch before = files_to_timestamp(path_to_watch) while 1: time.sleep (2) after = files_to_timestamp(path_to_watch) added = [f for f in after.keys() if not f in before.keys()] removed = [f for f in before.keys() if not f in after.keys()] modified = [] for f in before.keys(): if not f in removed: if os.path.getmtime(f) != before.get(f): modified.append(f) if added: print "Added: ", ", ".join(added) if removed: print "Removed: ", ", ".join(removed) if modified: print "Modified ", ", ".join(modified) before = after 

正如你可以在Tim Golden的文章中看到的 ,由Horst Gutmann指出,WIN32相对来说比较复杂,并且只能看目录,而不是单个文件。

我想build议你看看IronPython ,这是一个.NET Python的实现。 借助IronPython,您可以使用所有的.NETfunction – 包括

 System.IO.FileSystemWatcher 

它用一个简单的Event界面处理单个文件。

这是检查文件更改的示例。 一个可能不是最好的做法,但它肯定是一个很短的路。

方便的工具,用于在对源进行更改时重新启动应用程序。 我在玩pygame的时候做了这个,所以我可以在文件保存后立即看到效果。

当在pygame中使用时,确保'while'循环中的东西放置在游戏循环中,也就是更新或其他任何东西。 否则你的应用程序将陷入无限循环,你不会看到你的游戏更新。

 file_size_stored = os.stat('neuron.py').st_size while True: try: file_size_current = os.stat('neuron.py').st_size if file_size_stored != file_size_current: restart_program() except: pass 

万一你想要我在网上find的重启代码。 这里是。 (与问题无关,虽然可以派上用场)

 def restart_program(): #restart application python = sys.executable os.execl(python, python, * sys.argv) 

玩得开心,让电子做你想做的事情。

 ACTIONS = { 1 : "Created", 2 : "Deleted", 3 : "Updated", 4 : "Renamed from something", 5 : "Renamed to something" } FILE_LIST_DIRECTORY = 0x0001 class myThread (threading.Thread): def __init__(self, threadID, fileName, directory, origin): threading.Thread.__init__(self) self.threadID = threadID self.fileName = fileName self.daemon = True self.dir = directory self.originalFile = origin def run(self): startMonitor(self.fileName, self.dir, self.originalFile) def startMonitor(fileMonitoring,dirPath,originalFile): hDir = win32file.CreateFile ( dirPath, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None ) # Wait for new data and call ProcessNewData for each new chunk that's # written while 1: # Wait for a change to occur results = win32file.ReadDirectoryChangesW ( hDir, 1024, False, win32con.FILE_NOTIFY_CHANGE_LAST_WRITE, None, None ) # For each change, check to see if it's updating the file we're # interested in for action, file_M in results: full_filename = os.path.join (dirPath, file_M) #print file, ACTIONS.get (action, "Unknown") if len(full_filename) == len(fileMonitoring) and action == 3: #copy to main file ... 

下面是一个例子,可以看到input文件每秒钟不超过一行,但通常less得多。 目标是将最后一行(最新写入)追加到指定的输出文件。 我从我的一个项目中复制了这个,并删除了所有不相关的行。 您必须填写或更改缺less的符号。

 from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread from ui_main_window import Ui_MainWindow # Qt Creator gen'd class MainWindow(QMainWindow, Ui_MainWindow): def __init__(self, parent=None): QMainWindow.__init__(self, parent) Ui_MainWindow.__init__(self) self._fileWatcher = QFileSystemWatcher() self._fileWatcher.fileChanged.connect(self.fileChanged) def fileChanged(self, filepath): QThread.msleep(300) # Reqd on some machines, give chance for write to complete # ^^ About to test this, may need more sophisticated solution with open(filepath) as file: lastLine = list(file)[-1] destPath = self._filemap[filepath]['dest file'] with open(destPath, 'a') as out_file: # a= append out_file.writelines([lastLine]) 

当然,包含QMainWindow类并不是严格要求的, 你可以单独使用QFileSystemWatcher。

我不知道任何Windows特定的function。 您可以尝试每秒/分钟/小时获取文件的MD5散列(取决于您需要多快),并将其与最后一个散列进行比较。 当它不同时,你知道文件已经改变,你读出最新的行。

我会尝试这样的事情。

  try: f = open(filePath) except IOError: print "No such file: %s" % filePath raw_input("Press Enter to close window") try: lines = f.readlines() while True: line = f.readline() try: if not line: time.sleep(1) else: functionThatAnalisesTheLine(line) except Exception, e: # handle the exception somehow (for example, log the trace) and raise the same exception again raw_input("Press Enter to close window") raise e finally: f.close() 

循环检查自上次读取文件以来是否有新行(如果存在)读取并传递给functionThatAnalisesTheLine函数。 如果不是,则脚本等待1秒钟,然后重试该过程。