42 votes

Comment générer des processus enfants parallèles sur un système multiprocesseur?

J'ai un script Python que je veux l'utiliser comme un contrôleur à un autre script Python. J'ai un serveur avec 64 processeurs, si voulez lancer jusqu'à 64 processus enfants de ce second script Python. L'enfant script est appelé:

$ python create_graphs.py --name=NAME

où NOM est quelque chose comme XYZ, ABC, NYU etc.

Dans mon contrôleur de parent script que j'ai récupérer le nom de la variable à partir d'une liste:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

Donc ma question est, quelle est la meilleure façon de spawn hors de ces processus comme des enfants? Je veux limiter le nombre d'enfants à 64 ans, à une époque, donc besoin de suivre l'état (si l'enfant est terminée ou pas) pour que je puisse garder efficacement l'ensemble de la génération en cours d'exécution.

J'ai regardé dans l'aide de la sous-processus, mais l'a rejeté parce qu'il ne génère un seul enfant à la fois. J'ai enfin trouvé le multiprocesseur paquet, mais j'avoue être submergé par l'ensemble de threads contre les sous-processus de documentation.

Pour l'instant, mon script utilise subprocess.call seulement frayer un enfant à un moment et ressemble à ceci:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

Je le veux vraiment, pour frayer, en hausse de 64 enfants à la fois. En d'autres stackoverflow questions j'ai vu des gens à l'aide de la File d'attente, mais il semble que ce crée un gain de performance?

61voto

Nadia Alramli Points 40381

Ce que vous recherchez, c'est le processus de la piscine de la classe dans le multitraitement.

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

Et voici un exemple de calcul pour le rendre plus facile à comprendre. Le suivant va diviser 10000 tâches sur les processus N où N est le cpu le comte. Notez que je suis de passage d'Aucun comme le nombre de processus. Ce sera la cause de la Piscine de la classe à utiliser cpu_count pour le nombre de processus (de référence)

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results

2voto

tatlar Points 633

Voici la solution que j'ai développée, basée sur Nadia et Jim commentaires. Je ne suis pas sûr si c'est le meilleur moyen, mais il fonctionne. L'original de l'enfant script appelé doit être un script shell parce que j'ai besoin d'utiliser certaines applications 3ème partie, y compris Matlab. J'ai donc eu à le sortir de Python et le code qu'il en bash.

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

Cela vous semble comme une solution raisonnable? J'ai essayé d'utiliser Jim boucle while format, mais mon script juste n'a rien retourné. Je ne suis pas sûr pourquoi ce serait. Voici le résultat lorsque je lance le script avec Jim", alors que "la boucle de remplacer le "pour" de la boucle:

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

Quand je le lance avec le "pour" de la boucle, j'obtiens quelque chose de plus significatif:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

Si cela fonctionne, et je suis heureux. Cependant, je ne comprends toujours pas pourquoi je ne peux pas utiliser de Jim", alors que "le style de la boucle au lieu de le "pour" de la boucle que j'utilise. Merci pour toute l'aide - je suis impressionnée par l'étendue des connaissances @ stackoverflow.

1voto

Jiaaro Points 14379

Je ne pense pas que vous avez besoin de la file d'attente, sauf si vous avez l'intention d'obtenir des données sur l'ensemble des demandes (Qui, si vous ne souhaitez que des données, je pense qu'il peut être plus facile pour l'ajouter à une base de données de toute façon)

mais essayez ceci pour la taille:

mettre le contenu de votre create_graphs.py script tous dans une fonction appelée "create_graphs"

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

Je sais que le résultat sera 1 de moins de threads que de processeurs, ce qui est probablement la bonne, il laisse un processeur pour gérer les threads, les i/o disque, et d'autres choses qui se passe sur l'ordinateur. Si vous décidez que vous voulez utiliser la dernière de base suffit d'en ajouter un pour elle

edit: je pense que j'ai peut-être mal interprété le but de my_list. Vous n'avez pas besoin d' my_list pour garder une trace du fils (comme ils sont tous référencés par les éléments de la threads de la liste). Mais c'est une bonne façon de nourrir le processus d'entrée - ou encore mieux: utiliser un générateur de fonction ;)

Le but de l' my_list et threads

my_list contient les données que vous avez besoin de processus dans votre fonction
threads est juste une liste des threads en cours d'exécution

la boucle while fait deux choses, créer de nouveaux threads pour traiter les données, et de vérifier si tous les threads sont faites en cours d'exécution.

Donc, tant que vous ont (a) plus de données à traiter, ou (b) de threads qui ne sont pas fini de courir.... vous voulez le programme de continuer à fonctionner. Une fois que les deux listes sont vides ils évalueront d' False et la boucle while va sortir

1voto

Aaron Maenpaa Points 39173

J'utiliserais certainement le multitraitement plutôt que de lancer ma propre solution en utilisant un sous-processus.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X