wake-up-neo.net

Wie soll ich mich bei der Verwendung von Multiprocessing in Python anmelden?

Im Moment habe ich ein zentrales Modul in einem Framework, das mit dem Modul Python 2.6 multiprocessing mehrere Prozesse erzeugt. Da multiprocessing verwendet wird, gibt es ein multiprozessorfähiges Protokoll auf Modulebene, LOG = multiprocessing.get_logger(). Per the docs verfügt dieser Logger über gemeinsam genutzte Prozesssperren, damit Sie keine Dinge in sys.stderr (oder einem anderen Dateihandle) verschleiern, indem Sie mehrere Prozesse gleichzeitig darauf schreiben.

Das Problem, das ich jetzt habe, ist, dass die anderen Module im Framework nicht Multiprocessing-fähig sind. So wie ich es sehe, muss ich alle Abhängigkeiten von diesem zentralen Modul mit einer multiprozessorgesteuerten Protokollierung machen. Das nervt in des Frameworks, geschweige denn für alle Kunden des Frameworks. Gibt es Alternativen, an die ich nicht denke?

189
cdleary

Die einzige Möglichkeit, dies nicht aufdringlich zu behandeln, besteht darin,

  1. Erstellen Sie jeden Arbeitsprozess so, dass sein Protokoll an einen anderen Dateideskriptor (auf Platte oder in Pipe) geht. Idealerweise sollten alle Protokolleinträge mit einem Zeitstempel versehen sein. 
  2. Ihr Controller-Prozess kann dann Eins der folgenden Aktionen ausführen:
    • Bei Verwendung von Festplattendateien: Kombiniert die Protokolldateien am Ende des Laufs, sortiert nach Zeitstempel
    • Bei Verwendung von Pipes (empfohlen): Kombinieren Sie Protokolleinträge von allen Pipes in eine zentrale Protokolldatei. (Z. B. periodisch select von den Dateideskriptoren der Pipes, führen Sie die Zusammenführungssortierung der verfügbaren Protokolleinträge durch und leeren Sie das zentralisierte Protokoll.
56
vladr

Ich habe gerade einen eigenen Log-Handler geschrieben, der alles über eine Pipe dem übergeordneten Prozess zuführt. Ich habe es erst zehn Minuten getestet, aber es scheint ziemlich gut zu funktionieren. 

(Hinweis: Dies ist fest in RotatingFileHandler codiert, was mein eigener Anwendungsfall ist.)


Update: @javier behält diesen Ansatz nun als Paket bei Pypi - siehe Multiprocessing-Logging bei Pypi, github unter https://github.com/jruere/multiprocessing-logging


Update: Implementierung!

Dies verwendet jetzt eine Warteschlange für die korrekte Handhabung der Parallelität und behebt auch Fehler ordnungsgemäß. Ich verwende das jetzt seit mehreren Monaten in der Produktion, und die aktuelle Version funktioniert ohne Probleme.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
102
zzzeek

Eine weitere Alternative könnten die verschiedenen nicht dateibasierten Protokollierungshandler im logging-Paket sein: 

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(und andere)

Auf diese Weise könnten Sie leicht einen Protokollierungs-Daemon an einem Ort haben, an den Sie sicher schreiben könnten und die Ergebnisse korrekt verarbeiten würden. (Zum Beispiel ein einfacher Socket-Server, der die Nachricht einfach aufhebt und an einen eigenen rotierenden Dateibehandler ausgibt.)

Die SyslogHandler würde das auch für Sie erledigen. Natürlich können Sie Ihre eigene Instanz von syslog verwenden, nicht die Systeminstanz.

19
Ali Afshar

Das Python-Logging-Kochbuch enthält zwei vollständige Beispiele: https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes

Es verwendet QueueHandler, das in Python 3.2 neu ist, aber leicht in Ihren eigenen Code kopiert werden kann (wie ich es in Python 2.7 gemacht habe) von: https://Gist.github.com/vsajip/591589

Jeder Prozess stellt seine Protokollierung auf die Variable Queue, und ein listener-Thread oder -Prozess (für jeden wird ein Beispiel bereitgestellt) nimmt diese auf und schreibt sie in eine Datei. Es besteht keine Gefahr von Korruption oder Verwirrung.

15
fantabolous

Eine Variante der anderen, die den Protokollierungs- und Warteschlangenthread getrennt hält.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __== '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
13
ironhacker

Nachfolgend finden Sie eine weitere Lösung mit dem Fokus auf Einfachheit für alle anderen (wie ich), die von Google hierher kommen. Die Protokollierung sollte einfach sein! Nur für 3.2 oder höher.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __== '__main__':
    main()
12
user2133814

Alle aktuellen Lösungen sind auch über einen Handler an die Protokollierungskonfiguration gekoppelt. Meine Lösung verfügt über folgende Architektur und Funktionen:

  • Sie können any Protokollierungskonfiguration verwenden, die Sie möchten 
  • Die Protokollierung erfolgt in einem Daemon-Thread
  • Sicheres Herunterfahren des Daemons mithilfe eines Kontextmanagers
  • Die Kommunikation zum Logging-Thread erfolgt über multiprocessing.Queue
  • In Unterprozessen werden logging.Logger (und bereits definierte Instanzen) mit einem Patch versehen, um alle -Datensätze an die Warteschlange zu senden
  • New: Traceback und Nachricht vor dem Senden in die Warteschlange formatieren, um Beizfehler zu vermeiden

Code mit Verwendungsbeispiel und Ausgabe finden Sie unter folgendem Link: https://Gist.github.com/schlamar/7003737

10
schlamar

Da wir so viele Publisher und einen Abonnenten (Listener) für die Protokollierung mehrerer Prozesse repräsentieren können, ist die Verwendung von ZeroMQ zur Implementierung von PUB-SUB-Messaging tatsächlich eine Option. 

Außerdem implementiert das Modul PyZMQ , die Python-Bindungen für ZMQ, PUBHandler , ein Objekt zum Veröffentlichen von Protokollnachrichten über einen Socket von zmq.PUB.

Es gibt eine -Lösung im Web für die zentrale Protokollierung von verteilten Anwendungen mit PyZMQ und PUBHandler, die problemlos für das lokale Arbeiten mit mehreren Veröffentlichungsprozessen verwendet werden kann.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, Host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(Host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir,                 "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
6
Samuel

Ich mag auch die Antwort von zzzeek, ​​aber Andre hat recht, dass eine Warteschlange erforderlich ist, um ein Zerreißen zu verhindern. Ich hatte etwas Glück mit der Pfeife, sah aber ein Plätschern, was etwas erwartet wird. Die Implementierung erwies sich als schwieriger als ich dachte, insbesondere aufgrund der Ausführung unter Windows, wo es einige zusätzliche Einschränkungen für globale Variablen und Sachen gibt (siehe: Wie ist Python Multiprocessing unter Windows implementiert? )

Aber ich habe es endlich geschafft. Dieses Beispiel ist wahrscheinlich nicht perfekt, daher sind Kommentare und Vorschläge willkommen. Es unterstützt auch nicht das Festlegen des Formatierers oder etwas anderes als den Root-Logger. Grundsätzlich müssen Sie den Logger in jedem Poolprozess erneut mit der Warteschlange verbinden und die anderen Attribute im Logger einrichten.

Auch hier sind alle Vorschläge zur Verbesserung des Codes willkommen. Ich kenne sicher noch nicht alle Python-Tricks :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __== '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
6
Mike Miller

veröffentlichen Sie einfach irgendwo Ihre Instanz des Loggers. Auf diese Weise können die anderen Module und Clients Ihre API verwenden, um den Logger abzurufen, ohne dass import multiprocessing erforderlich ist.

3
Javier

Die Antwort von zzzeek hat mir gefallen. Ich würde nur die Pipe durch eine Warteschlange ersetzen, da, wenn mehrere Threads/Prozesse dasselbe Pipe-Ende verwenden, um Protokollnachrichten zu erzeugen, diese verstümmelt werden.

3
André Cruz

Wie wäre es, wenn Sie die gesamte Protokollierung an einen anderen Prozess delegieren, der alle Protokolleinträge aus einer Warteschlange liest?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

Teilen Sie einfach LOG_QUEUE über einen der Multiprozessmechanismen oder sogar die Vererbung auf, und alles funktioniert gut!

2
Sawan

Hier ist mein einfacher Hack/Workaround ... nicht der umfassendste, aber leicht zu ändern und einfacher zu lesen und zu verstehen, denke ich, als alle anderen Antworten, die ich vor dem Schreiben fand:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __== '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)
1
nmz787

Nachfolgend finden Sie eine Klasse, die in Windows-Umgebungen verwendet werden kann, erfordert ActivePython . Sie können auch andere Protokollierungshandler erben (StreamHandler usw.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

Und hier ist ein Beispiel, das die Verwendung demonstriert:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __== '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass
1
user6336812

Ich habe eine Lösung, die der von Ironhacker ähnelt, mit der Ausnahme, dass ich in einigen Code Code logging.Exception verwende und festgestellt habe, dass ich die Ausnahme formatieren musste, bevor sie über die Warteschlange zurückgegeben wurde, da Tracebacks nicht pickle'able sind:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
1
Richard Jones

Wenn in der Kombination aus Sperren, Threads und Gabeln im Modul logging Deadlocks auftreten, wird dies im Fehlerbericht 6721 gemeldet (siehe auch related SO question ).

Es gibt eine kleine Fixup-Lösung hier .

Dadurch werden jedoch nur mögliche potenzielle Deadlocks in logging behoben. Das wird nicht beheben, dass die Dinge möglicherweise verstümmelt sind. Sehen Sie sich die anderen Antworten an.

0
Albert

Eine der Alternativen besteht darin, die Protokollierung der Mehrfachverarbeitung in eine bekannte Datei zu schreiben und einen atexit -Handler zu registrieren, der diesen Prozessen beitritt. Sie erhalten jedoch auf diese Weise keinen Echtzeitfluss zu den Ausgabemeldungen von stderr.

0
cdleary

Es gibt dieses tolle Paket

Paket: https://pypi.python.org/pypi/multiprocessing-logging/

code: https://github.com/jruere/multiprocessing-logging

Installieren:

pip install multiprocessing-logging

Dann füge hinzu:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()
0
juan Isaza