wake-up-neo.net

Python liest getrennt aus dem Unterprozess stdout und stderr, wobei die Reihenfolge erhalten bleibt

Ich habe einen Python-Subprozess, von dem ich versuche, Ausgabe- und Fehlerströme zu lesen. Momentan funktioniert es, aber ich kann erst von stderr lesen, nachdem ich von stdout gelesen habe. So sieht es aus:

process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout_iterator = iter(process.stdout.readline, b"")
stderr_iterator = iter(process.stderr.readline, b"")

for line in stdout_iterator:
    # Do stuff with line
    print line

for line in stderr_iterator:
    # Do stuff with line
    print line

Wie Sie sehen, kann die stderr for-Schleife erst starten, wenn die stdout-Schleife abgeschlossen ist. Wie kann ich das ändern, um in der richtigen Reihenfolge die Zeilen lesen zu können?

Zur Klarstellung: Ich muss noch erkennen können, ob eine Zeile von stdout oder stderr stammt, da sie in meinem Code unterschiedlich behandelt werden.

22
Luke Sapan

Der Code in Ihrer Frage kann zu einem Deadlock führen, wenn der untergeordnete Prozess auf stderr genügend Ausgabe erzeugt (~ 100 KB auf meinem Linux-Computer).

Es gibt eine communicate()-Methode, mit der getrennt von stdout und stderr gelesen werden kann:

from subprocess import Popen, PIPE

process = Popen(command, stdout=PIPE, stderr=PIPE)
output, err = process.communicate()

Wenn Sie die Streams lesen müssen, während der untergeordnete Prozess noch läuft, verwendet die tragbare Lösung Threads (nicht getestet):

from subprocess import Popen, PIPE
from threading import Thread
from Queue import Queue # Python 2

def reader(pipe, queue):
    try:
        with pipe:
            for line in iter(pipe.readline, b''):
                queue.put((pipe, line))
    finally:
        queue.put(None)

process = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=1)
q = Queue()
Thread(target=reader, args=[process.stdout, q]).start()
Thread(target=reader, args=[process.stderr, q]).start()
for _ in range(2):
    for source, line in iter(q.get, None):
        print "%s: %s" % (source, line),

Sehen:

16
jfs

Die Reihenfolge, in der ein Prozess Daten in verschiedene Pipes schreibt, geht nach dem Schreiben verloren.

Sie können auf keinen Fall feststellen, ob stdout vor stderr geschrieben wurde.

Sie können versuchen, Daten gleichzeitig aus mehreren Dateideskriptoren auf nicht-blockierende Weise auszulesen Sobald Daten verfügbar sind, würde dies jedoch nur die Wahrscheinlichkeit verringern, dass die Reihenfolge falsch ist.

Dieses Programm sollte dies demonstrieren:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import select
import subprocess

testapps={
    'slow': '''
import os
import time
os.write(1, 'aaa')
time.sleep(0.01)
os.write(2, 'bbb')
time.sleep(0.01)
os.write(1, 'ccc')
''',
    'fast': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbb')
os.write(1, 'ccc')
''',
    'fast2': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbbbbbbbbbbbbbb')
os.write(1, 'ccc')
'''
}

def readfds(fds, maxread):
    while True:
        fdsin, _, _ = select.select(fds,[],[])
        for fd in fdsin:
            s = os.read(fd, maxread)
            if len(s) == 0:
                fds.remove(fd)
                continue
            yield fd, s
        if fds == []:
            break

def readfromapp(app, rounds=10, maxread=1024):
    f=open('testapp.py', 'w')
    f.write(testapps[app])
    f.close()

    results={}
    for i in range(0, rounds):
        p = subprocess.Popen(['python', 'testapp.py'], stdout=subprocess.PIPE
                                                     , stderr=subprocess.PIPE)
        data=''
        for (fd, s) in readfds([p.stdout.fileno(), p.stderr.fileno()], maxread):
            data = data + s
        results[data] = results[data] + 1 if data in results else 1

    print 'running %i rounds %s with maxread=%i' % (rounds, app, maxread)
    results = sorted(results.items(), key=lambda (k,v): k, reverse=False)
    for data, count in results:
        print '%03i x %s' % (count, data)


print
print "=> if output is produced slowly this should work as whished"
print "   and should return: aaabbbccc"
readfromapp('slow',  rounds=100, maxread=1024)

print
print "=> now mostly aaacccbbb is returnd, not as it should be"
readfromapp('fast',  rounds=100, maxread=1024)

print
print "=> you could try to read data one by one, and return"
print "   e.g. a whole line only when LF is read"
print "   (b's should be finished before c's)"
readfromapp('fast',  rounds=100, maxread=1)

print
print "=> but even this won't work ..."
readfromapp('fast2', rounds=100, maxread=1)

und gibt so etwas aus:

=> if output is produced slowly this should work as whished
   and should return: aaabbbccc
running 100 rounds slow with maxread=1024
100 x aaabbbccc

=> now mostly aaacccbbb is returnd, not as it should be
running 100 rounds fast with maxread=1024
006 x aaabbbccc
094 x aaacccbbb

=> you could try to read data one by one, and return
   e.g. a whole line only when LF is read
   (b's should be finished before c's)
running 100 rounds fast with maxread=1
003 x aaabbbccc
003 x aababcbcc
094 x abababccc

=> but even this won't work ...
running 100 rounds fast2 with maxread=1
003 x aaabbbbbbbbbbbbbbbccc
001 x aaacbcbcbbbbbbbbbbbbb
008 x aababcbcbcbbbbbbbbbbb
088 x abababcbcbcbbbbbbbbbb
4
Jörg Schulz

Hier ist eine Lösung, die auf selectors basiert, die jedoch die Reihenfolge beibehält und Zeichen variabler Länge (auch einzelne Zeichen) überträgt.

Der Trick besteht darin, read1() anstelle von read() zu verwenden.

import selectors
import subprocess
import sys

p = subprocess.Popen(
    "python random_out.py", Shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

sel = selectors.DefaultSelector()
sel.register(p.stdout, selectors.EVENT_READ)
sel.register(p.stderr, selectors.EVENT_READ)

while True:
    for key, _ in sel.select():
        data = key.fileobj.read1().decode()
        if not data:
            exit()
        if key.fileobj is p.stdout:
            print(data, end="")
        else:
            print(data, end="", file=sys.stderr)

Wenn Sie ein Testprogramm wünschen, verwenden Sie dieses.

import sys
from time import sleep


for i in range(10):
    print(f" x{i} ", file=sys.stderr, end="")
    sleep(0.1)
    print(f" y{i} ", end="")
    sleep(0.1)
1
Dev Aggarwal

Ich habe etwas geschrieben, um dies vor einer vor langer Zeit zu tun. Ich habe es noch nicht auf Python 3 portiert, aber es sollte nicht zu schwierig sein (Patches akzeptiert!)

Wenn Sie es eigenständig ausführen, werden viele verschiedene Optionen angezeigt. In jedem Fall können Sie stdout von stderr unterscheiden.

0
Patrick Maupin

Ich weiß, dass diese Frage sehr alt ist, aber diese Antwort kann anderen helfen, die auf dieser Seite bei der Suche nach einer Lösung für eine ähnliche Situation stolpern, deshalb poste ich sie trotzdem.

Ich habe ein einfaches Python-Snippet erstellt, das eine beliebige Anzahl von Pipes zu einem einzigen zusammenfügt. Wie oben erwähnt, kann die Reihenfolge natürlich nicht garantiert werden, aber dies ist so nahe wie ich denke, Sie können in Python bekommen. 

Es erzeugt einen Thread für jede der Pipes, liest sie Zeile für Zeile und stellt sie in eine Warteschlange (FIFO). Der Haupt-Thread durchläuft die Warteschlange und liefert jede Zeile.

import threading, queue
def merge_pipes(**named_pipes):
    r'''
    Merges multiple pipes from subprocess.Popen (maybe other sources as well).
    The keyword argument keys will be used in the output to identify the source
    of the line.

    Example:
    p = subprocess.Popen(['some', 'call'],
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    outputs = {'out': log.info, 'err': log.warn}
    for name, line in merge_pipes(out=p.stdout, err=p.stderr):
        outputs[name](line)

    This will output stdout to the info logger, and stderr to the warning logger
    '''

    # Constants. Could also be placed outside of the method. I just put them here
    # so the method is fully self-contained
    PIPE_OPENED=1
    PIPE_OUTPUT=2
    PIPE_CLOSED=3

    # Create a queue where the pipes will be read into
    output = queue.Queue()

    # This method is the run body for the threads that are instatiated below
    # This could be easily rewritten to be outside of the merge_pipes method,
    # but to make it fully self-contained I put it here
    def pipe_reader(name, pipe):
        r"""
        reads a single pipe into the queue
        """
        output.put( ( PIPE_OPENED, name, ) )
        try:
            for line in iter(pipe.readline,''):
                output.put( ( PIPE_OUTPUT, name, line.rstrip(), ) )
        finally:
            output.put( ( PIPE_CLOSED, name, ) )

    # Start a reader for each pipe
    for name, pipe in named_pipes.items():
        t=threading.Thread(target=pipe_reader, args=(name, pipe, ))
        t.daemon = True
        t.start()

    # Use a counter to determine how many pipes are left open.
    # If all are closed, we can return
    pipe_count = 0

    # Read the queue in order, blocking if there's no data
    for data in iter(output.get,''):
        code=data[0]
        if code == PIPE_OPENED:
            pipe_count += 1
        Elif code == PIPE_CLOSED:
            pipe_count -= 1
        Elif code == PIPE_OUTPUT:
            yield data[1:]
        if pipe_count == 0:
            return
0
Marten Jacobs

Dies funktioniert für mich (unter Windows): https://github.com/waszil/subpiper

from subpiper import subpiper

def my_stdout_callback(line: str):
    print(f'STDOUT: {line}')

def my_stderr_callback(line: str):
    print(f'STDERR: {line}')

my_additional_path_list = [r'c:\important_location']

retcode = subpiper(cmd='echo magic',
                   stdout_callback=my_stdout_callback,
                   stderr_callback=my_stderr_callback,
                   add_path_list=my_additional_path_list)
0
waszil