在Python中使用多处理时应该如何logging?

现在我在一个框架中有一个中心模块,它使用Python 2.6 multiprocessing模块产生多个进程。 因为它使用multiprocessing ,所以有模块级的多处理感知日志LOG = multiprocessing.get_logger() 。 根据文档 ,这个logging器拥有进程共享的锁,这样你就不会在sys.stderr (或任何文件句柄)中sys.stderr ,因为多个进程同时写入。

我现在的问题是,框架中的其他模块不支持多处理。 我看到它的方式,我需要使这个中央模块上的所有依赖使用多处理感知日志logging。 这是框架烦人的,更不用说为框架的所有客户。 有没有替代品我没有想到?

处理这种非侵入性的唯一方法是:

  1. 产生每个工作进程,使其日志到不同的文件描述符 (到磁盘或pipe道)。理想情况下,所有的日志条目应该是时间戳。
  2. 您的控制器进程然后可以执行以下任一操作:
    • 如果使用磁盘文件:在运行结束时合并日志文件,按时间戳sorting
    • 如果使用pipe道(推荐):将来自所有pipe道的日志条目合并到中央日志文件中。 (例如,定期从pipe道的文件描述符中select ,对可用的日志条目进行合并sorting,并刷新到集中式日志。

我刚刚写了一个我自己的日志处理程序,它只是通过pipe道将所有内容提供给父进程。 我只testing了十分钟,但似乎工作得很好。

注意:这是硬编码到RotatingFileHandler ,这是我自己的用例。)


更新:实现!

这现在使用一个队列来正确处理并发,并正确地从错误中恢复。 我已经在生产中使用了几个月,下面的版本没有问题。

 from logging.handlers import RotatingFileHandler import multiprocessing, threading, logging, sys, traceback class MultiProcessingLog(logging.Handler): def __init__(self, name, mode, maxsize, rotate): logging.Handler.__init__(self) self._handler = RotatingFileHandler(name, mode, maxsize, rotate) self.queue = multiprocessing.Queue(-1) t = threading.Thread(target=self.receive) t.daemon = True t.start() def setFormatter(self, fmt): logging.Handler.setFormatter(self, fmt) self._handler.setFormatter(fmt) def receive(self): while True: try: record = self.queue.get() self._handler.emit(record) except (KeyboardInterrupt, SystemExit): raise except EOFError: break except: traceback.print_exc(file=sys.stderr) def send(self, s): self.queue.put_nowait(s) def _format_record(self, record): # ensure that exc_info and args # have been stringified. Removes any chance of # unpickleable things inside and possibly reduces # message size sent over the pipe if record.args: record.msg = record.msg % record.args record.args = None if record.exc_info: dummy = self.format(record) record.exc_info = None return record def emit(self, record): try: s = self._format_record(record) self.send(s) except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) def close(self): self._handler.close() logging.Handler.close(self) 

还有一种替代scheme可能是logging包中的各种基于文件的日志处理程序 :

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(和别的)

通过这种方式,您可以轻松地将日志logging守护程序放置在某个可以安全写入的位置,并且可以正确处理结果。 (例如,一个简单的套接字服务器,只是取消消息并发送到自己的旋转文件处理程序。)

SyslogHandler也会照顾你。 当然,你可以使用自己的syslog实例,而不是系统syslog实例。

保持日志logging和队列线程分开的其他变体。

 """sample code for logging in subprocesses using multiprocessing * Little handler magic - The main process uses loggers and handlers as normal. * Only a simple handler is needed in the subprocess that feeds the queue. * Original logger name from subprocess is preserved when logged in main process. * As in the other implementations, a thread reads the queue and calls the handlers. Except in this implementation, the thread is defined outside of a handler, which makes the logger definitions simpler. * Works with multiple handlers. If the logger in the main process defines multiple handlers, they will all be fed records generated by the subprocesses loggers. tested with Python 2.5 and 2.6 on Linux and Windows """ import os import sys import time import traceback import multiprocessing, threading, logging, sys DEFAULT_LEVEL = logging.DEBUG formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s") class SubProcessLogHandler(logging.Handler): """handler used by subprocesses It simply puts items on a Queue for the main process to log. """ def __init__(self, queue): logging.Handler.__init__(self) self.queue = queue def emit(self, record): self.queue.put(record) class LogQueueReader(threading.Thread): """thread to write subprocesses log records to main process log This thread reads the records written by subprocesses and writes them to the handlers defined in the main process's handlers. """ def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue self.daemon = True def run(self): """read from the queue and write to the log handlers The logging documentation says logging is thread safe, so there shouldn't be contention between normal logging (from the main process) and this thread. Note that we're using the name of the original logger. """ # Thanks Mike for the error checking code. while True: try: record = self.queue.get() # get the logger for this record logger = logging.getLogger(record.name) logger.callHandlers(record) except (KeyboardInterrupt, SystemExit): raise except EOFError: break except: traceback.print_exc(file=sys.stderr) class LoggingProcess(multiprocessing.Process): def __init__(self, queue): multiprocessing.Process.__init__(self) self.queue = queue def _setupLogger(self): # create the logger to use. logger = logging.getLogger('test.subprocess') # The only handler desired is the SubProcessLogHandler. If any others # exist, remove them. In this case, on Unix and Linux the StreamHandler # will be inherited. for handler in logger.handlers: # just a check for my sanity assert not isinstance(handler, SubProcessLogHandler) logger.removeHandler(handler) # add the handler handler = SubProcessLogHandler(self.queue) handler.setFormatter(formatter) logger.addHandler(handler) # On Windows, the level will not be inherited. Also, we could just # set the level to log everything here and filter it in the main # process handlers. For now, just set it from the global default. logger.setLevel(DEFAULT_LEVEL) self.logger = logger def run(self): self._setupLogger() logger = self.logger # and here goes the logging p = multiprocessing.current_process() logger.info('hello from process %s with pid %s' % (p.name, p.pid)) if __name__ == '__main__': # queue used by the subprocess loggers queue = multiprocessing.Queue() # Just a normal logger logger = logging.getLogger('test') handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(DEFAULT_LEVEL) logger.info('hello from the main process') # This thread will read from the subprocesses and write to the main log's # handlers. log_queue_reader = LogQueueReader(queue) log_queue_reader.start() # create the processes. for i in range(10): p = LoggingProcess(queue) p.start() # The way I read the multiprocessing warning about Queue, joining a # process before it has finished feeding the Queue can cause a deadlock. # Also, Queue.empty() is not realiable, so just make sure all processes # are finished. # active_children joins subprocesses when they're finished. while multiprocessing.active_children(): time.sleep(.1) 

所有当前的解决scheme都通过使用处理程序来耦合到日志loggingconfiguration。 我的解决scheme具有以下架构和function:

  • 您可以使用任何您想要的日志configuration
  • 日志logging是在守护进程线程中完成的
  • 通过使用上下文pipe理器安全closures守护进程
  • 与日志线程的通信由multiprocessing.Queue完成
  • 在subprocess中, logging.Logger (和已经定义的实例)被修补以发送所有logging到队列中
  • 新增function :在发送到队列之前格式化回溯和消息以防止酸洗错误

具有使用示例和输出的代码可以在以下Gist中find: https : //gist.github.com/schlamar/7003737

我也喜欢zzzeek的答案,但安德烈是正确的,需要一个队列,以防止gar </s>。 我有一些运气与pipe道,但看到有些预期的gar </s>。 实现它比我想象的要困难得多,特别是由于在Windows上运行,对全局variables和东西有一些额外的限制(请参阅: 在Windows上如何实现Python多处理? )

但是,我终于搞定了。 这个例子可能不完美,欢迎提出意见和build议。 它也不支持设置格式化程序或除根logging程序以外的任何设置。 基本上,你必须重新使用队列在每个池进程中logging器,并设置logging器上的其他属性。

再次,如何使代码更好的任何build议,欢迎。 我当然不知道所有的Python技巧:-)

 import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue class MultiProcessingLogHandler(logging.Handler): def __init__(self, handler, queue, child=False): logging.Handler.__init__(self) self._handler = handler self.queue = queue # we only want one of the loggers to be pulling from the queue. # If there is a way to do this without needing to be passed this # information, that would be great! if child == False: self.shutdown = False self.polltime = 1 t = threading.Thread(target=self.receive) t.daemon = True t.start() def setFormatter(self, fmt): logging.Handler.setFormatter(self, fmt) self._handler.setFormatter(fmt) def receive(self): #print "receive on" while (self.shutdown == False) or (self.queue.empty() == False): # so we block for a short period of time so that we can # check for the shutdown cases. try: record = self.queue.get(True, self.polltime) self._handler.emit(record) except Queue.Empty, e: pass def send(self, s): # send just puts it in the queue for the server to retrieve self.queue.put(s) def _format_record(self, record): ei = record.exc_info if ei: dummy = self.format(record) # just to get traceback text into record.exc_text record.exc_info = None # to avoid Unpickleable error return record def emit(self, record): try: s = self._format_record(record) self.send(s) except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) def close(self): time.sleep(self.polltime+1) # give some time for messages to enter the queue. self.shutdown = True time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown def __del__(self): self.close() # hopefully this aids in orderly shutdown when things are going poorly. def f(x): # just a logging command... logging.critical('function number: ' + str(x)) # to make some calls take longer than others, so the output is "jumbled" as real MP programs are. time.sleep(x % 3) def initPool(queue, level): """ This causes the logging module to be initialized with the necessary info in pool threads to work correctly. """ logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True)) logging.getLogger('').setLevel(level) if __name__ == '__main__': stream = StringIO.StringIO() logQueue = multiprocessing.Queue(100) handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue) logging.getLogger('').addHandler(handler) logging.getLogger('').setLevel(logging.DEBUG) logging.debug('starting main') # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging. pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes pool.map(f, range(0,50)) pool.close() logging.debug('done') logging.shutdown() print "stream output is:" print stream.getvalue() 

python日志食谱在这里有两个完整的例子: https : //docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes

它使用了QueueHandler ,它是Python 3.2中新增的,但可以很容易地从以下位置复制到自己的代码中(如我自己在Python 2.7中): https : //gist.github.com/vsajip/591589

每个进程都将其日志logging在Queue ,然后一个listener线程或进程(每个进程提供一个示例)将其选中并将它们全部写入文件 – 没有任何损坏或瑕疵的风险。

下面是另一个解决scheme,重点是简单的任何人(如我)从谷歌到这里。 logging应该很容易! 只有3.2或更高。

 import multiprocessing import logging from logging.handlers import QueueHandler, QueueListener import time import random def f(i): time.sleep(random.uniform(.01, .05)) logging.info('function called with {} in worker thread.'.format(i)) time.sleep(random.uniform(.01, .05)) return i def worker_init(q): # all records from worker processes go to qh and then into q qh = QueueHandler(q) logger = logging.getLogger() logger.setLevel(logging.DEBUG) logger.addHandler(qh) def logger_init(): q = multiprocessing.Queue() # this is the handler for all log records handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s")) # ql gets records from the queue and sends them to the handler ql = QueueListener(q, handler) ql.start() logger = logging.getLogger() logger.setLevel(logging.DEBUG) # add the handler to the logger so records from this process are handled logger.addHandler(handler) return ql, q def main(): q_listener, q = logger_init() logging.info('hello from main thread') pool = multiprocessing.Pool(4, worker_init, [q]) for result in pool.map(f, range(10)): pass pool.close() pool.join() q_listener.stop() if __name__ == '__main__': main() 

只是发布你的logging器的实例。 这样,其他模块和客户端可以使用您的API来获取logging器,而无需import multiprocessing

我喜欢zzzeek的答案。 我只是用一个队列replacePipe,因为如果多个线程/进程使用相同的pipe道端来生成日志消息,他们将会变成乱码。

由于我们可以将多进程日志logging表示为许多发布者和一个订阅者(侦听者),使用ZeroMQ来实现PUB-SUB消息传递确实是一种select。

此外, PyZMQ模块是ZMQ的Python绑定,实现了PUBHandler ,它是通过zmq.PUB套接字发布日志消息的对象。

Web上有一个解决scheme ,用于使用PyZMQ和PUBHandler从分布式应用程序进行集中式日志logging,可以很容易地在本地使用多个发布进程。

 formatters = { logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"), logging.INFO: logging.Formatter("[%(name)s] %(message)s"), logging.WARN: logging.Formatter("[%(name)s] %(message)s"), logging.ERROR: logging.Formatter("[%(name)s] %(message)s"), logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s") } # This one will be used by publishing processes class PUBLogger: def __init__(self, host, port=config.PUBSUB_LOGGER_PORT): self._logger = logging.getLogger(__name__) self._logger.setLevel(logging.DEBUG) self.ctx = zmq.Context() self.pub = self.ctx.socket(zmq.PUB) self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port)) self._handler = PUBHandler(self.pub) self._handler.formatters = formatters self._logger.addHandler(self._handler) @property def logger(self): return self._logger # This one will be used by listener process class SUBLogger: def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT): self.output_dir = output_dir self._logger = logging.getLogger() self._logger.setLevel(logging.DEBUG) self.ctx = zmq.Context() self._sub = self.ctx.socket(zmq.SUB) self._sub.bind('tcp://*:{1}'.format(ip, port)) self._sub.setsockopt(zmq.SUBSCRIBE, "") handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10) handler.setLevel(logging.DEBUG) formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s") handler.setFormatter(formatter) self._logger.addHandler(handler) @property def sub(self): return self._sub @property def logger(self): return self._logger # And that's the way we actually run things: # Listener process will forever listen on SUB socket for incoming messages def run_sub_logger(ip, event): sub_logger = SUBLogger(ip) while not event.is_set(): try: topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK) log_msg = getattr(logging, topic.lower()) log_msg(message) except zmq.ZMQError as zmq_error: if zmq_error.errno == zmq.EAGAIN: pass # Publisher processes loggers should be initialized as follows: class Publisher: def __init__(self, stop_event, proc_id): self.stop_event = stop_event self.proc_id = proc_id self._logger = pub_logger.PUBLogger('127.0.0.1').logger def run(self): self._logger.info("{0} - Sending message".format(proc_id)) def run_worker(event, proc_id): worker = Publisher(event, proc_id) worker.run() # Starting subscriber process so we won't loose publisher's messages sub_logger_process = Process(target=run_sub_logger, args=('127.0.0.1'), stop_event,)) sub_logger_process.start() #Starting publisher processes for i in range(MAX_WORKERS_PER_CLIENT): processes.append(Process(target=run_worker, args=(stop_event, i,))) for p in processes: p.start() 

如何将所有日志logging委派给从队列中读取所有日志条目的另一个进程?

 LOG_QUEUE = multiprocessing.JoinableQueue() class CentralLogger(multiprocessing.Process): def __init__(self, queue): multiprocessing.Process.__init__(self) self.queue = queue self.log = logger.getLogger('some_config') self.log.info("Started Central Logging process") def run(self): while True: log_level, message = self.queue.get() if log_level is None: self.log.info("Shutting down Central Logging process") break else: self.log.log(log_level, message) central_logger_process = CentralLogger(LOG_QUEUE) central_logger_process.start() 

简单地通过任何多进程机制或甚至inheritance共享LOG_QUEUE,并且一切正常!

我有一个类似于ironhacker的解决scheme,除了我在我的一些代码中使用logging.exception,并发现我需要格式化exception,然后再通过队列传递回来,因为回溯不pickle'able:

 class QueueHandler(logging.Handler): def __init__(self, queue): logging.Handler.__init__(self) self.queue = queue def emit(self, record): if record.exc_info: # can't pass exc_info across processes so just format now record.exc_text = self.formatException(record.exc_info) record.exc_info = None self.queue.put(record) def formatException(self, ei): sio = cStringIO.StringIO() traceback.print_exception(ei[0], ei[1], ei[2], None, sio) s = sio.getvalue() sio.close() if s[-1] == "\n": s = s[:-1] return s 

下面是一个可以在Windows环境下使用的类,需要ActivePython。 您也可以inheritance其他日志处理程序(StreamHandler等)

 class SyncronizedFileHandler(logging.FileHandler): MUTEX_NAME = 'logging_mutex' def __init__(self , *args , **kwargs): self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME) return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs) def emit(self, *args , **kwargs): try: win32event.WaitForSingleObject(self.mutex , win32event.INFINITE) ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs) finally: win32event.ReleaseMutex(self.mutex) return ret 

这里是一个演示用法的例子:

 import logging import random , time , os , sys , datetime from string import letters import win32api , win32event from multiprocessing import Pool def f(i): time.sleep(random.randint(0,10) * 0.1) ch = random.choice(letters) logging.info( ch * 30) def init_logging(): ''' initilize the loggers ''' formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s") logger = logging.getLogger() logger.setLevel(logging.INFO) file_handler = SyncronizedFileHandler(sys.argv[1]) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) logger.addHandler(file_handler) #must be called in the parent and in every worker process init_logging() if __name__ == '__main__': #multiprocessing stuff pool = Pool(processes=10) imap_result = pool.imap(f , range(30)) for i , _ in enumerate(imap_result): pass 

这里是我的简单的黑客/解决方法…不是最全面的,但很容易修改和更简单的阅读和理解,我觉得比任何其他的答案,我写在这之前:

 import logging import multiprocessing class FakeLogger(object): def __init__(self, q): self.q = q def info(self, item): self.q.put('INFO - {}'.format(item)) def debug(self, item): self.q.put('DEBUG - {}'.format(item)) def critical(self, item): self.q.put('CRITICAL - {}'.format(item)) def warning(self, item): self.q.put('WARNING - {}'.format(item)) def some_other_func_that_gets_logger_and_logs(num): # notice the name get's discarded # of course you can easily add this to your FakeLogger class local_logger = logging.getLogger('local') local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2)) local_logger.debug('hmm, something may need debugging here') return num*2 def func_to_parallelize(data_chunk): # unpack our args the_num, logger_q = data_chunk # since we're now in a new process, let's monkeypatch the logging module logging.getLogger = lambda name=None: FakeLogger(logger_q) # now do the actual work that happens to log stuff too new_num = some_other_func_that_gets_logger_and_logs(the_num) return (the_num, new_num) if __name__ == '__main__': multiprocessing.freeze_support() m = multiprocessing.Manager() logger_q = m.Queue() # we have to pass our data to be parallel-processed # we also need to pass the Queue object so we can retrieve the logs parallelable_data = [(1, logger_q), (2, logger_q)] # set up a pool of processes so we can take advantage of multiple CPU cores pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4) worker_output = pool.map(func_to_parallelize, parallelable_data) pool.close() # no more tasks pool.join() # wrap up current tasks # get the contents of our FakeLogger object while not logger_q.empty(): print logger_q.get() print 'worker output contained: {}'.format(worker_output) 

其中一个替代方法是将多重处理日志logging写入已知文件,并注册一个atexit处理程序以join在stderr上读取的这些进程; 但是,您不会在stderr上获得实时stream到输出消息。

如果在logging模块中锁,线程和分支的组合中出现死锁,那么将在错误报告6721中报告 (另请参阅相关的SO问题 )。

这里有一个小的fixup解决scheme。

但是,这只会解决logging任何潜在的死锁。 这不能解决的事情可能是乱码。 看到在这里提出的其他答案。

Not really a full answer, but to chip in my 2 cents I think for people browsing this thread it would be very useful to know that QueueHandler, QueueListener are made compatible with Python 2.7 through logutils module.