27 votes

blocs - envoyer une entrée au pipeline de sous-processus python

Je suis en essais sous-processus canalisations avec python. Je suis conscient que je peux faire ce que les programmes ci-dessous le faire en python directement, mais ce n'est pas le point. Je veux juste tester le pipeline donc je sais comment l'utiliser.

Mon système Linux Ubuntu 9.04 avec par défaut la version 2.6 de python.

J'ai commencé avec cette documentation exemple.

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

Ce qui fonctionne, mais depuis p1s' stdin n'est pas redirigé, j'ai à taper des trucs dans le terminal pour nourrir la pipe. Quand je tape ^D clôture stdin, je obtenir le résultat que je veux.

Cependant, je veux envoyer des données à la conduite à l'aide d'un python variable de chaîne. J'ai d'abord essayé d'écrire sur stdin:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

N'a pas fonctionné. J'ai essayé d'utiliser p2.stdout.read() plutôt sur la dernière ligne, mais il bloque également. J'ai ajouté p1.stdin.flush() et p1.stdin.close() , mais il ne fonctionne pas non plus. J'ai Ensuite j'ai déménagé à communiquer:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]

Donc c'est toujours pas ça.

J'ai remarqué que l'exécution d'un processus unique (comme p1 - dessus, en supprimant p2) fonctionne parfaitement. Et en passant un descripteur de fichier pour p1 (stdin=open(...)) fonctionne aussi. Le problème est donc de:

Est-il possible de transmettre des données à un pipeline de 2 ou plusieurs sous-processus en python, sans blocage? Pourquoi pas?

Je suis conscient que je pourrais exécuter un shell et exécuter le pipeline dans la coque, mais ce n'est pas ce que je veux.


Mise à JOUR 1: à la Suite d'Aaron Digulla l'astuce ci-dessous je vais maintenant essayer d'utiliser des threads pour le faire fonctionner.

Tout d'abord j'ai essayé de courir p1.communiquer sur un fil.

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

Ok, n'a pas fonctionné. Essayé d'autres combinaisons, comme la modifier afin de l' .write() et également p2.read(). Rien. Maintenant, nous allons essayer de l'approche inverse:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

le code se termine le blocage quelque part. Soit dans la donné naissance à fil, ou dans le thread principal, ou les deux. Si cela ne fonctionne pas. Si vous savez comment le faire fonctionner, il serait plus facile si vous pouvez fournir des code de travail. J'essaie ici.


Mise à JOUR 2

Paul Du Bois-réponse ci-dessous avec quelques informations, donc je n'ai plus de tests. J'ai lu toute la subprocess.py module et a obtenu de la façon dont il fonctionne. J'ai donc essayé d'appliquer exactement ce que fait le code.

Je suis sur linux, mais depuis que j'ai été le tester avec les fils, ma première démarche a été de reproduire l'exacte windows filetage code vu sur subprocess.pys' communicate() méthode, mais pour les deux processus au lieu d'un. Voici la liste complète de ce que j'ai essayé:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

Bien. Il n'a pas de travail. Même après l' p1.stdin.close() a été appelé, p2.stdout.read() encore des blocs.

Ensuite, j'ai essayé la posix code sur subprocess.py:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

Bloque aussi sur select.select(). En diffusant prints autour, j'ai trouvé ceci:

  • La lecture est de travail. Code lit à de nombreuses reprises au cours de l'exécution.
  • L'écriture est également à l'œuvre. Les données sont écrites à l' p1.stdin.
  • À la fin de l' numwrites, p1.stdin.close() est appelé.
  • Lors de l' select() commence blocage, seulement to_read a quelque chose, p2.stdout. to_write est déjà vide.
  • os.read() appel renvoie toujours à quelque chose, alors p2.stdout.close() n'est jamais appelée.

Conclusion à partir de deux tests: la Fermeture de la stdin du premier processus sur le pipeline (grep dans l'exemple) est de ne pas en faire de vidage de sa mise en mémoire tampon de sortie pour le prochain et de mourir.

Aucun moyen de le faire fonctionner?

PS: je ne veux pas utiliser un fichier temporaire, je l'ai déjà testé avec des fichiers et je sais que ça fonctionne. Et je ne veux pas utiliser windows.

18voto

nosklo Points 75862

J'ai trouvé comment le faire.

Il n'est pas à propos des threads, et non pas sur select().

Lorsque je lance le premier processus (grep), il crée deux bas-niveau des descripteurs de fichiers, un pour chaque canal. Permet d'appeler ceux - a et b.

Quand je lance le deuxième processus, b est passée de cut sdtin. Mais il est en état de mort cérébrale par défaut sur Popen - close_fds=False.

La conséquence, c'est qu' cut hérite également a. Donc, grep ne peut pas mourir, même si je les ferme, a, parce que stdin est toujours ouverte en cuts'processus (cut l'ignore).

Le code suivant fonctionne maintenant parfaitement.

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True DEVRAIT ÊTRE par DÉFAUT sur les systèmes unix. Sur windows, il ferme toutes les fds, de sorte qu'il empêche la tuyauterie.

EDIT:

PS: Pour les personnes avec un problème similaire, la lecture de cette réponse: Comme pooryorick dit dans un commentaire, qui pourrait également bloquer si les données écrites p1.stdin est plus grand que les tampons. Dans ce cas, vous devez segmenter les données en petits morceaux, et de les utiliser select.select() savoir lors de la lecture/écriture. Le code en question devrait donner une indication sur la façon de l'appliquer.

EDIT2: Trouvé une autre solution, avec plus d'aide de pooryorick - au lieu d'utiliser close_fds=True et de fermer TOUTES les fds, on pourrait fermer l' fds, qui appartient à la première, lorsque l'exécution de la deuxième, et il va fonctionner. La clôture doit être fait à l'enfant afin de l' preexec_fn fonction de Popen vient très pratique pour faire juste cela. Sur l'exécution de p2 que vous pouvez faire:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

5voto

Jed Points 900

Travailler avec des fichiers volumineux

Deux principes doivent être appliqués de manière uniforme lorsque l'on travaille avec de gros fichiers en Python.

  1. Depuis toute IO routine peut bloquer, nous devons garder à chaque étape du pipeline dans un autre thread ou processus. Nous utilisons des fils dans cet exemple, mais les sous-processus permet d'éviter le GIL.
  2. Nous devons utiliser différentiels lit et écrit afin de ne pas attendre pour EOF avant de commencer à faire des progrès.

Une alternative est d'utiliser non bloquantes IO, même si c'est gênant dans le standard de Python. Voir gevent pour un léger filetage de la bibliothèque qui implémente l'synchrone IO API non bloquant à l'aide de primitives.

Exemple de code

Nous allons construire un idiot pipeline qui est à peu près

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

chaque étape dans des accolades {} est implémenté en Python, tandis que les autres utilisent la norme de programmes externes. TL;DR: Voir ce gist.

Nous commençons avec les importations.

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

Python étapes du pipeline

Tous, mais le dernier Python mises en scène de la canalisation doit aller dans un fil de sorte qu'il est IO ne pas bloquer les autres. Il pourrait plutôt s'exécuter en Python sous-processus si vous voulait en fait s'exécuter en parallèle (éviter le GIL).

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

Chacun de ces besoins pour être mis dans son propre thread, que nous allons faire à l'aide de cette fonction de commodité.

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

Création du pipeline

Créer externes étapes à l'aide de Popen et le Python étapes à l'aide de spawn. L'argument bufsize=-1 dit d'utiliser le système de mise en mémoire tampon par défaut (généralement 4 kiB). C'est généralement plus rapide que celle par défaut (sans tampon) ou de la ligne de mise en mémoire tampon, mais vous aurez envie de la ligne de tampon si vous souhaitez surveiller visuellement la sortie sans gal.

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

Lecteur le pipeline

Assemblé comme ci-dessus, de tous les tampons dans le pipeline va se remplir, mais comme personne n'est la lecture à partir de la fin (grepz.stdout), ils vont tout bloquer. Nous avons pu le lire en entier dans un appel à l' grepz.stdout.read(), mais qui utilisent beaucoup de mémoire pour les gros fichiers. Au lieu de cela, nous avons lu de manière incrémentielle.

for line in grepz.stdout:
    sys.stdout.write(line.lower())

Les threads et les processus de nettoyage une fois qu'ils atteignent EOF. Nous pouvons explicitement nettoyer à l'aide de

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6 et plus tôt

En interne, subprocess.Popen des appels fork, configure le tuyau de descripteurs de fichiers, et des appels exec. Le processus de l'enfant à partir de fork a des copies de tous les descripteurs de fichier dans le processus parent, et les deux copies doivent être fermés avant le correspondant lecteur obtiendra EOF. Ceci peut être résolu manuellement en fermant les tuyaux (en close_fds=True ou un adapté preexec_fn argument subprocess.Popen) ou par la définition de l' FD_CLOEXEC drapeau ont exec fermer automatiquement le descripteur de fichier. Cet indicateur est défini automatiquement dans Python 2.7 et plus tard, voir issue12786. On peut obtenir le Python 2.7 comportement dans les versions antérieures de Python en appelant

p._set_cloexec_flags(p.stdin)

avant de passer p.stdin comme argument à une ultérieure subprocess.Popen.

3voto

Poor Yorick Points 9

Il existe trois principales astuces pour faire des pipes fonctionner comme prévu

  1. Assurez-vous que chaque extrémité de la pipe est utilisé dans un autre thread/processus (quelques exemples près du sommet, souffrent de ce problème).

  2. fermer explicitement le solde non utilisé à la fin de la pipe dans chaque processus

  3. traiter avec mise en mémoire tampon par le désactivant (Python option-u), à l'aide de pty, ou tout simplement de remplissage de la mémoire tampon avec quelque chose qui n'affectera pas la de données, ( peut-être '\n', mais ce qui répond).

Les exemples en Python "pipeline" module (je suis l'auteur) s'adapter à votre scénario exactement, et de rendre le faible niveau des étapes assez clair.

http://pypi.python.org/pypi/pipeline/

Plus récemment, j'ai utilisé le processus secondaire, un module d' producteur-transformateur-consommateur-modèle de contrôleur:

http://www.darkarchive.org/w/Pub/PythonInteract

Cet exemple traite de tampon stdin sans recourir à l'aide d'un pty, et aussi illustre les extrémités des tubes doivent être fermés le cas. Je préfère les processus de les threads, mais le principe est le même. En outre, il illustre la synchronisation des Files d'attente qui nourrissent le producteur et recueillir de sortie de la part du consommateur, et comment faire pour les arrêter proprement (regarder dehors pour les sentinelles inséré dans la les files d'attente). Ce modèle permet à la nouvelle entrée doit être généré sur la base des récentes sortie, permettant récursive de la découverte et de la transformation.

2voto

Aaron Digulla Points 143830

Vous devez le faire en plusieurs threads. Sinon, vous vous retrouverez dans une situation où vous ne pouvez pas envoyer des données de l'enfant: p1 ne pas lire vos commentaires depuis la p2 n'a pas lu p1 est sortie parce que vous n'avez pas lu p2 de sortie.

Si vous avez besoin d'un thread d'arrière-plan qui lit ce p2 écrit. Qui permettra p2 pour continuer après l'écriture de données dans le canal, de sorte qu'il peut lire la ligne suivante de l'entrée de p1 qui permet p1 à traiter les données que vous envoyez.

Alternativement, vous pouvez envoyer les données à p1 avec un thread d'arrière-plan et de lire la sortie de p2 dans le thread principal. Mais de chaque côté doit être un fil.

2voto

Poor Yorick Points 9

Nosklo de la solution proposée va rapidement se briser si trop de données est écrit à la fin de réception de la pipe:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

Si ce script n'a pas l'accrocher sur votre machine, il suffit d'augmenter "20000" à quelque chose qui dépasse la taille de votre système d'exploitation tuyau de tampons.

C'est parce que le système d'exploitation est mise en mémoire tampon de l'entrée de "grep", mais une fois que la mémoire est pleine, l' p1.stdin.write appel se bloque jusqu'à ce que quelque chose de lit à partir de p2.stdout. Dans le jouet des scénarios, vous pouvez obtenir de la manière avec l'écriture de/la lecture à partir d'un tuyau dans le même processus, mais en utilisation normale, il est nécessaire d'écrire à partir d'un thread/processus et de les lire à partir d'un autre thread/processus. Cela est vrai pour les sous-processus.popen, os.pipe, os.popen*, etc.

L'autre hic, c'est que, parfois, vous voulez garder l'alimentation de la conduite avec les éléments générés à partir plus tôt en sortie de la même conduite. La solution est de faire de la conduite d'engraissement et de la pipe lecteur asynchrone de l'homme du programme, et de mettre en œuvre deux files d'attente: entre le programme principal et le tuyau chargeur et un entre le programme principal et le tuyau lecteur. PythonInteract en est un exemple.

Sous-processus est une belle commodité modèle, mais parce qu'il cache les détails de l'os.popen et les os.fourche appels c'est le cas sous le capot, il peut parfois être plus difficile à traiter que le niveau inférieur des appels qu'il utilise. Pour cette raison, les sous-processus n'est pas une bonne façon d'apprendre sur la façon inter-processus de tuyaux fonctionnent vraiment.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X