Ich verwende das Subprozessmodul , um einen Subprozess zu starten und eine Verbindung zu seinem Ausgabestream (stdout) herzustellen. Ich möchte in der Lage sein, nicht blockierende Lesevorgänge auf seinem stdout durchzuführen. Gibt es eine Möglichkeit, .readline nicht zu blockieren oder zu überprüfen, ob Daten im Stream vorhanden sind, bevor ich .readline
Aufrufe? Ich möchte, dass dies portabel ist oder zumindest unter Windows und Linux funktioniert.
so mache ich das jetzt (es blockiert den .readline
, wenn keine Daten verfügbar sind):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
fcntl
, select
, asyncproc
hilft in diesem Fall nicht.
Ein zuverlässiger Weg, einen Stream zu lesen, ohne ihn unabhängig vom Betriebssystem zu blockieren, ist die Verwendung von Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
Ich hatte oft ein ähnliches Problem. Python Programme, die ich häufig schreibe, müssen in der Lage sein, einige primäre Funktionen auszuführen und gleichzeitig Benutzereingaben über die Befehlszeile (stdin) zu akzeptieren. Das einfache Einfügen der Benutzereingabebehandlungsfunktion in einen anderen Thread reicht nicht aus Lösen Sie das Problem, weil readline()
blockiert und keine Zeitüberschreitung aufweist.Wenn die primäre Funktionalität abgeschlossen ist und Sie nicht mehr auf weitere Benutzereingaben warten müssen, möchte ich normalerweise, dass mein Programm beendet wird readline()
blockiert immer noch im anderen Thread und wartet auf eine Zeile. Eine Lösung für dieses Problem besteht darin, stdin mit dem fcntl-Modul zu einer nicht blockierenden Datei zu machen:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
Meiner Meinung nach ist dies ein bisschen sauberer als die Verwendung der Select- oder Signal-Module, um dieses Problem zu lösen, aber andererseits funktioniert es nur unter UNIX ...
Python 3.4 führt die neue vorläufige API für asynchrone IO - asyncio
module ein.
Die Vorgehensweise ist ähnlich wie bei twisted
- Antwort von @Bryan Ward - Definieren Sie ein Protokoll und dessen Methoden werden aufgerufen, sobald die Daten bereit sind:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
Siehe "Unterprozess" in den Dokumenten .
Es gibt eine übergeordnete Schnittstelle asyncio.create_subprocess_exec()
, die Process
objects zurückgibt und das asynchrone Lesen einer Zeile mit StreamReader.readline()
coroutine ermöglicht (mit async
/await
Python 3.5+ Syntax ):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
führt die folgenden Aufgaben aus:
Jeder Schritt kann bei Bedarf durch eine Zeitüberschreitung von Sekunden begrenzt werden.
Probieren Sie das Modul asyncproc aus. Zum Beispiel:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
Das Modul übernimmt das gesamte Threading, wie von S.Lott vorgeschlagen.
Sie können dies sehr einfach in Twisted tun. Abhängig von Ihrer vorhandenen Codebasis ist dies möglicherweise nicht so einfach zu bedienen. Wenn Sie jedoch eine verdrehte Anwendung erstellen, werden solche Dinge fast trivial. Sie erstellen eine ProcessProtocol
-Klasse und überschreiben die outReceived()
-Methode. Twisted (abhängig vom verwendeten Reaktor) ist normalerweise nur eine große select()
-Schleife mit installierten Callbacks, um Daten aus verschiedenen Dateideskriptoren (häufig Netzwerk-Sockets) zu verarbeiten. Die Methode outReceived()
installiert also einfach einen Rückruf für die Verarbeitung von Daten, die von STDOUT
stammen. Ein einfaches Beispiel, das dieses Verhalten demonstriert, lautet wie folgt:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
Die Twisted documentation hat einige gute Informationen dazu.
Wenn Sie Ihre gesamte Anwendung auf Twisted aufbauen, wird die asynchrone Kommunikation mit anderen lokalen oder Remote-Prozessen auf diese Weise sehr elegant. Wenn Ihr Programm jedoch nicht auf Twisted aufbaut, ist dies nicht wirklich hilfreich. Hoffentlich kann dies für andere Leser hilfreich sein, auch wenn es für Ihre spezielle Anwendung nicht zutreffend ist.
Verwenden Sie select & read (1).
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
Für readline () - wie:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
Eine Lösung besteht darin, einen anderen Prozess zu erstellen, um das Lesen des Prozesses durchzuführen, oder einen Thread des Prozesses mit einer Zeitüberschreitung zu erstellen.
Hier ist die Thread-Version einer Timeout-Funktion:
http://code.activestate.com/recipes/473878/
Müssen Sie jedoch die Standardausgabe lesen, wenn sie eingeht? Eine andere Lösung kann darin bestehen, die Ausgabe in eine Datei zu sichern und zu warten, bis der Vorgang mit p.wait () abgeschlossen ist.
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
Bestehende Lösungen haben bei mir nicht funktioniert (Details unten). Was schließlich funktionierte, war die Implementierung von readline mit read (1) (basierend auf diese Antwort ). Letzteres blockiert nicht:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
Warum existierende Lösungen nicht funktionierten:
Haftungsausschluss: Dies funktioniert nur für Tornados
Sie können dies tun, indem Sie den fd nicht blockieren und dann mit ioloop Rückrufe registrieren. Ich habe dies in ein Ei mit dem Namen tornado_subprocess gepackt und Sie können es über PyPI installieren:
easy_install tornado_subprocess
jetzt kannst du so etwas machen:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
sie können es auch mit einem RequestHandler verwenden
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
Diese nicht blockierende Version von read does erfordert spezielle Module und funktioniert auf den meisten Linux-Distributionen sofort.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def Shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, Shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __== '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = Shell(cmd.split())
Ich füge dieses Problem hinzu, um einen Teilprozess zu lesen. Öffnen Sie stdout. Hier ist meine nicht blockierende Leselösung:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", Shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Hier ist mein Code, der verwendet wird, um alle Ausgaben des Subprozesses, einschließlich Teilzeilen, so schnell wie möglich zu erfassen. Es pumpt gleichzeitig und stdout und stderr in fast korrekter Reihenfolge.
Getestet und korrekt gearbeitet Python 2.7 Linux & Windows.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
Shell=True, # Try to run via Shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
Elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __== '__main__':
__main__()
Hinzufügen dieser Antwort hier, da es die Möglichkeit bietet, nicht blockierende Pipes unter Windows und Unix festzulegen.
Alle Details zu ctypes
sind @ techtoniks Antwort zu verdanken.
Es gibt eine leicht modifizierte Version, die sowohl auf Unix- als auch auf Windows-Systemen verwendet werden kann.
Auf diese Weise können Sie dieselbe Funktion und Ausnahme für Unix- und Windows-Code verwenden.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
Um zu vermeiden, dass unvollständige Daten gelesen werden, habe ich meinen eigenen Readline-Generator geschrieben (der die Byte-Zeichenfolge für jede Zeile zurückgibt).
Es ist ein Generator, damit Sie zum Beispiel ...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
Ich habe das Problem des ursprünglichen Fragestellers, wollte aber keine Threads aufrufen. Ich mischte Jesses Lösung mit einem direkten read () aus der Pipe und meinem eigenen Buffer-Handler für Zeilenlesevorgänge (mein Unterprozess - ping - schrieb jedoch immer vollständige Zeilen <einer Systemseitengröße). Ich vermeide es, zu lange zu warten, indem ich nur eine von einem Objekt registrierte Uhr einlese. Heutzutage führe ich normalerweise Code in einem Gobject MainLoop aus, um Threads zu vermeiden.
def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
Der Beobachter ist
def watch(f, *other):
print 'reading',f.read()
return True
Das Hauptprogramm richtet einen Ping ein und ruft dann die Gobject-Mail-Schleife auf.
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
Alle anderen Arbeiten sind mit Rückrufen in gobject verknüpft.
In meinem Fall brauchte ich ein Protokollierungsmodul, das die Ausgabe der Hintergrundanwendungen auffängt und erweitert (Hinzufügen von Zeitstempeln, Farben usw.).
Ich endete mit einem Hintergrund-Thread, der die eigentliche E/A erledigt. Der folgende Code gilt nur für POSIX-Plattformen. Ich habe nicht wesentliche Teile entfernt.
Wenn jemand dieses Biest für lange Läufe verwenden wird, sollten Sie offene Deskriptoren verwalten. In meinem Fall war es kein großes Problem.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
warum stören Thread & Queue? Im Gegensatz zu readline () blockiert BufferedReader.read1 () das Warten auf\r\n nicht und gibt ASAP zurück, wenn eine Ausgabe eingeht.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __== '__main__':
__main__()
Das Auswahl Modul hilft Ihnen festzustellen, wo der nächste nützliche Eingang ist.
Mit separaten Threads sind Sie jedoch fast immer zufriedener. Der eine blockiert das Lesen der Standardeingabe, der andere tut es, wo immer Sie es nicht wollen.
Aus der Antwort von J. F. Sebastian und mehreren anderen Quellen habe ich einen einfachen Subprozessmanager zusammengestellt. Es ermöglicht das nicht blockierende Lesen der Anforderung sowie das parallele Ausführen mehrerer Prozesse. Es verwendet keine betriebssystemspezifischen Aufrufe (die mir bekannt sind) und sollte daher überall funktionieren.
Es ist von pypi erhältlich, also nur pip install shelljob
. Beispiele und vollständige Dokumente finden Sie auf der Projektseite .
EDIT: Diese Implementierung blockiert noch. Verwenden Sie stattdessen die Antwort von J.F. Sebastian .
Ich habe die Top-Antwort ausprobiert, aber das zusätzliche Risiko und die Pflege des Thread-Codes war besorgniserregend.
Als ich das io-Modul durchgesehen habe (und auf 2.6 beschränkt war), habe ich BufferedReader gefunden. Dies ist meine gewindefreie, nicht blockierende Lösung.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
Im modernen Python sieht es viel besser aus.
Hier ist ein einfaches Unterprogramm, "hello.py":
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
Und ein Programm, um damit zu interagieren:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
Das druckt aus:
b'hello bob\n'
b'hello alice\n'
Beachten Sie, dass das eigentliche Muster, das auch von fast allen vorherigen Antworten hier und in verwandten Fragen stammt, darin besteht, den stdout-Dateideskriptor des Kindes auf nicht blockierend zu setzen und ihn dann in einer Art Auswahlschleife abzufragen. In diesen Tagen wird diese Schleife natürlich von asyncio bereitgestellt.
Dies ist ein Beispiel für die Ausführung eines interaktiven Befehls im Unterprozess, und die Standardausgabe ist mithilfe eines Pseudoterminals interaktiv. Sie können sich beziehen auf: https://stackoverflow.com/a/43012138/3555925
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
Elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Mein Problem ist ein bisschen anders, da ich sowohl stdout als auch stderr aus einem laufenden Prozess sammeln wollte, aber letztendlich das gleiche, da ich die Ausgabe in einem Widget als generiert rendern wollte.
Ich wollte nicht auf viele der vorgeschlagenen Problemumgehungen mit Warteschlangen oder zusätzlichen Threads zurückgreifen, da diese nicht erforderlich sein sollten, um eine so häufige Aufgabe wie das Ausführen eines anderen Skripts und das Sammeln der Ausgabe auszuführen.
Nachdem ich die vorgeschlagenen Lösungen und python docs gelesen hatte, löste ich mein Problem mit der folgenden Implementierung. Ja, es funktioniert nur für POSIX, da ich den Funktionsaufruf select
verwende.
Ich bin damit einverstanden, dass die Dokumente verwirrend sind und die Implementierung für solch eine gemeinsame Skriptaufgabe umständlich ist. Ich glaube, dass ältere Versionen von python unterschiedliche Standardeinstellungen für Popen
und unterschiedliche Erklärungen haben, was viel Verwirrung stiftete. Dies scheint für beide gut zu funktionieren Python 2.7.12 und 3.5.2.
Der Schlüssel war, bufsize=1
Für die Zeilenpufferung zu setzen und dann universal_newlines=True
Als Textdatei anstelle einer Binärdatei zu verarbeiten, was bei der Einstellung von bufsize=1
Die Standardeinstellung zu sein scheint.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, DEBUG und VERBOSE sind einfach Makros, die die Ausgabe auf dem Terminal ausgeben.
Diese Lösung ist meiner Meinung nach zu 99,99% effektiv, da sie weiterhin die blockierende Funktion readline
verwendet. Wir gehen daher davon aus, dass der Unterprozess Nice ist und vollständige Zeilen ausgibt.
Ich freue mich über Feedback, um die Lösung zu verbessern, da ich noch neu in Python bin.
Ich habe eine Bibliothek erstellt, die auf J. F. Sebastians Lösung basiert. Du kannst es benutzen.
Diese Lösung verwendet das Modul select
, um "alle verfügbaren Daten zu lesen" aus einem Stream IO. Diese Funktion blockiert zunächst, bis Daten verfügbar sind, liest dann jedoch nur die verfügbaren Daten und blockiert nicht weiter.
Da das select
-Modul verwendet wird, funktioniert dies nur unter Unix.
Der Code ist vollständig PEP8-konform.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
Ich stellte mich auch dem von Jesse beschriebenen Problem und löste es mit "select" wie Bradley , Andy und andere taten dies aber in einem Sperrmodus zu Vermeiden Sie eine belegte Schleife. Es benutzt eine Dummy-Pipe als Fake-Standard. Die Auswahl blockiert und wartet, bis entweder stdin oder die Pipe bereit ist. Wenn eine Taste gedrückt wird, wird die Sperre aufgehoben, und der Schlüsselwert kann mit read (1) abgerufen werden. Wenn ein anderer Thread in die Pipe schreibt, gibt die Pipe die Auswahl frei, und dies kann als Hinweis darauf angesehen werden, dass der Bedarf an stdin abgelaufen ist. Hier ist ein Referenzcode:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
Ich bin vor kurzem auf das gleiche Problem gestoßen, das ich zum Lesen einer Zeile aus dem Stream (Tail Run im Subprozess) im blockierungsfreien Modus benötige. Ich wollte die nächsten Probleme vermeiden: CPU nicht brennen, Stream nicht byteweise lesen ( wie readline) usw
Hier ist meine Implementierung https://Gist.github.com/grubberr/5501e1a9760c3eab5e0a Windows wird nicht unterstützt (Umfrage), EOF wird nicht unterstützt, aber es funktioniert gut für mich