4 votes

Redémarrer périodiquement le pool de multiprocesseurs Python

J'ai un pool multiprocesseur Python qui effectue un travail très long et qui, même après un débogage approfondi, n'est pas assez robuste pour ne pas tomber en panne toutes les 24 heures environ, car il dépend de nombreux outils tiers non Python aux interactions complexes. En outre, la machine sous-jacente présente certains problèmes que je ne peux pas contrôler. Notez que par défaillance, je ne veux pas dire que le programme entier se plante, mais que certains ou la plupart des processus deviennent inactifs à cause de certaines erreurs, et que l'application elle-même se suspend ou continue le travail avec les processus qui n'ont pas échoué.

Pour l'instant, ma solution consiste à arrêter périodiquement le travail, manuellement, puis à le redémarrer à partir de là où il était.

Même si ce n'est pas idéal, ce que je veux faire maintenant est le suivant : redémarrer le pool de multiprocesseurs périodiquement, par programme, à partir du code Python lui-même. Je ne me soucie pas vraiment si cela implique de tuer les travailleurs du pool au milieu de leur travail. Quelle serait la meilleure façon de le faire ?

Mon code ressemble à ça :

with Pool() as p:
    for _ in p.imap_unordered(function, data):
        save_checkpoint()
        log()

Ce que j'ai en tête serait quelque chose comme :

start = 0
end = 1000  # magic number
while start + 1 < len(data):
    current_data = data[start:end]
    with Pool() as p:
        for _ in p.imap_unordered(function, current_data):
            save_checkpoint()
            log()
            start += 1
            end += 1

Ou :

start = 0
end = 1000  # magic number
while start + 1 < len(data):
    current_data = data[start:end]
    start_timeout(time=TIMEOUT) # which would be the best way to to do that without breaking multiprocessing?
    try:
        with Pool() as p:
            for _ in p.imap_unordered(function, current_data):
                save_checkpoint()
                log()
                start += 1
                end += 1
    except Timeout:
        pass

Ou toute autre suggestion que vous pensez être meilleure. Toute aide serait très appréciée, merci !

1voto

2e0byo Points 1069

Le problème avec votre code actuel est qu'il itère les résultats multitraités directement, et cet appel va bloquer. Heureusement, il existe une solution simple : utilisez la fonction apply_async exactement comme suggéré dans les docs . Mais en raison de la façon dont vous décrivez le cas d'utilisation ici et l'échec, je l'ai adapté quelque peu. Tout d'abord, une tâche fictive :

from multiprocessing import Pool, TimeoutError, cpu_count
from time import sleep
from random import randint

def log():
    print("logging is a dangerous activity: wear a hard hat.")

def work(d):
    sleep(randint(1, 100) / 100)
    print("finished working")
    if randint(1, 10) == 1:
        print("blocking...")
        while True:
            sleep(0.1)

    return d

Cette fonction de travail échouera avec une probabilité de 0.1 , bloquant indéfiniment. Nous créons les tâches :

data = list(range(100))
nproc = cpu_count()

Et ensuite générer des futurs pour chacun d'entre eux :

while data:
    print(f"== Processing {len(data)} items. ==")
    with Pool(nproc) as p:
        tasks = [p.apply_async(work, (d,)) for d in data]

Ensuite, nous pouvons essayer d'accomplir les tâches manuellement :

        for task in tasks:
            try:
                res = task.get(timeout=1)
                data.remove(res)
                log()
            except TimeoutError:
                failed.append(task)
                if len(failed) < nproc:
                    print(
                        f"{len(failed)} processes are blocked,"
                        f" but {nproc - len(failed)} remain."
                    )
                else:
                    break

Le délai de contrôle ici est le délai pour .get . Il doit être aussi long que le processus le plus long auquel vous vous attendez. Notez que nous détectons lorsque l'ensemble du pool est immobilisé et que nous abandonnons.

Mais comme dans le scénario que vous décrivez, certains fils vont prendre plus de temps que d'autres, nous pouvons donner aux processus "défaillants" un certain temps pour se rétablir. Ainsi, chaque fois qu'une tâche échoue, nous vérifions rapidement si les autres ont en fait réussi :

            for task in failed:
                try:
                    res = task.get(timeout=0.01)
                    data.remove(res)
                    failed.remove(task)
                    log()
                except TimeoutError:
                    continue

La question de savoir si cet ajout est judicieux dans votre cas dépend de la question de savoir si vos tâches sont vraiment aussi aléatoires que je le suppose.

La sortie du gestionnaire de contexte pour le pool mettra fin au pool, donc nous n'avons même pas besoin de gérer cela nous-mêmes. Si vous avez une variation significative, vous pouvez augmenter la taille du pool (augmentant ainsi le nombre de tâches qui sont autorisées à caler) ou accorder une période de grâce aux tâches avant de les considérer comme ayant échoué.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X