31 votes

Comment exécuter deux fonctions "agrégées" (comme sum) simultanément, en les alimentant à partir du même itérateur ?

Imaginons que nous ayons un itérateur, disons iter(range(1, 1000)) . Et nous avons deux fonctions, chacune acceptant un itérateur comme seul paramètre, disons sum() et max() . Dans le monde SQL, nous les appellerions des fonctions agrégées.

Existe-t-il un moyen d'obtenir les résultats des deux sans mettre en mémoire tampon la sortie de l'itérateur ?

Pour ce faire, nous devrions mettre en pause et reprendre l'exécution de la fonction agrégée, afin de les alimenter toutes les deux avec les mêmes valeurs sans les stocker. Peut-être y a-t-il un moyen de l'exprimer en utilisant des choses asynchrones sans sleeps ?

0 votes

sum(a,b,c) est la même chose que sum(a,sum(b,c)) de même pour max . Peut-on supposer que c'est toujours le cas ? Il suffit alors d'appliquer les fonctions d'agrégation pour chaque élément de l'itérateur.

1 votes

@tobias_k Belle prise ! Je ne peux pas parler pour le PO, mais supposer que cela semble un peu exagéré parce qu'alors vous travaillez vraiment avec des fonctions binaires ( + et le binaire max ), et non avec des fonctions agrégées. La question fait référence aux fonctions d'agrégation en général, en les décrivant comme "acceptant un itérateur comme seul paramètre", en utilisant uniquement la fonction sum et max à titre d'exemple. Dans ce contexte, je dirais qu'une réponse doit fonctionner pour les agrégats qui ne peuvent être réduits à une série apatride d'applications d'une fonction binaire (par exemple, un agrégat qui renvoie la médiane de la séquence).

0 votes

@user4815162342 Je pensais que ce serait un moyen simple et agréable qui fonctionne avec O(1) mémoire et sans threads, mais vous avez raison ; average serait un autre exemple. (Aussi, c'est assez lent.)

47voto

user4815162342 Points 27348

Voyons comment appliquer deux fonctions d'agrégation au même itérateur, que nous ne pouvons épuiser qu'une seule fois. La première tentative (qui code en dur sum et max pour des raisons de brièveté, mais qui est trivialement généralisable à un nombre arbitraire de fonctions agrégées) pourrait ressembler à ceci :

def max_and_sum_buffer(it):
    content = list(it)
    p = sum(content)
    m = max(content)
    return p, m

Cette mise en œuvre présente l'inconvénient de stocker tous les éléments générés en mémoire en une seule fois, alors que les deux fonctions sont parfaitement capables de traiter les flux. La question anticipe cette dérobade et demande explicitement que le résultat soit produit sans mettre en mémoire tampon la sortie de l'itérateur. Est-il possible de le faire ?

Exécution en série : itertools.tee

Il est certain que semble possible. Après tout, les itérateurs Python sont externe Ainsi, chaque itérateur est déjà capable de se suspendre. Comment pourrait-il être difficile de fournir un adaptateur qui divise un itérateur en deux nouveaux itérateurs qui fournissent le même contenu ? En fait, c'est exactement la description de itertools.tee qui semble parfaitement adapté à l'itération parallèle :

def max_and_sum_tee(it):
    it1, it2 = itertools.tee(it)
    p = sum(it1)  # XXX
    m = max(it2)
    return p, m

La méthode ci-dessus donne le bon résultat, mais ne fonctionne pas comme nous le voudrions. Le problème est que nous n'itérons pas en parallèle. Les fonctions d'agrégation comme sum et max ne sont jamais suspendues - chacune insiste pour consommer tout le contenu de l'itérateur avant de produire le résultat. Ainsi, sum va épuiser it1 avant max n'a pas eu l'occasion de courir du tout. Les éléments épuisants de it1 tout en laissant it2 à lui seul, ces éléments seront accumulés dans une FIFO interne partagée entre les deux itérateurs. C'est inévitable ici - puisque max(it2) doivent voir les mêmes éléments, tee n'a pas d'autre choix que de les accumuler. (Pour plus de détails intéressants sur tee , se référer à ce poste. )

En d'autres termes, il n'y a aucune différence entre cette implémentation et la première, si ce n'est que la première rend au moins la mise en mémoire tampon explicite. Pour éliminer la mise en mémoire tampon, sum et max doivent fonctionner en parallèle, et non l'un après l'autre.

Fils de discussion : concurrent.futures

Voyons ce qu'il se passe si nous exécutons les fonctions d'agrégation dans des threads séparés, en utilisant toujours tee pour dupliquer l'itérateur original :

def max_and_sum_threads_simple(it):
    it1, it2 = itertools.tee(it)

    with concurrent.futures.ThreadPoolExecutor(2) as executor:
        sum_future = executor.submit(lambda: sum(it1))
        max_future = executor.submit(lambda: max(it2))

    return sum_future.result(), max_future.result()

Maintenant sum et max fonctionnent en fait en parallèle (jusqu'à le GIL ), les fils étant gérés par l'excellente équipe de la concurrent.futures module. Il présente toutefois un défaut fatal : pour les modules tee de ne pas mettre les données en mémoire tampon, sum et max doivent traiter leurs articles exactement à la même vitesse. Si l'un est même un peu plus rapide que l'autre, ils s'éloigneront l'un de l'autre, et tee mettra en mémoire tampon tous les éléments intermédiaires. Comme il n'y a aucun moyen de prédire la vitesse d'exécution de chaque élément, la quantité de mise en mémoire tampon est à la fois imprévisible et présente le pire des cas, celui de tout mettre en mémoire tampon.

Pour s'assurer qu'aucune mise en mémoire tampon ne se produit, tee doit être remplacé par un générateur personnalisé qui ne met rien en mémoire tampon et bloque jusqu'à ce que tous les consommateurs aient observé la valeur précédente avant de passer à la suivante. Comme précédemment, chaque consommateur s'exécute dans son propre thread, mais maintenant le thread appelant est occupé à exécuter un producteur, une boucle qui itère réellement sur l'itérateur source et signale qu'une nouvelle valeur est disponible. Voici une implémentation :

def max_and_sum_threads(it):
    STOP = object()
    next_val = None
    consumed = threading.Barrier(2 + 1)  # 2 consumers + 1 producer
    val_id = 0
    got_val = threading.Condition()

    def send(val):
        nonlocal next_val, val_id
        consumed.wait()
        with got_val:
            next_val = val
            val_id += 1
            got_val.notify_all()

    def produce():
        for elem in it:
            send(elem)
        send(STOP)

    def consume():
        last_val_id = -1
        while True:
            consumed.wait()
            with got_val:
                got_val.wait_for(lambda: val_id != last_val_id)
            if next_val is STOP:
                return
            yield next_val
            last_val_id = val_id

    with concurrent.futures.ThreadPoolExecutor(2) as executor:
        sum_future = executor.submit(lambda: sum(consume()))
        max_future = executor.submit(lambda: max(consume()))
        produce()

    return sum_future.result(), max_future.result()

C'est une quantité considérable de code pour quelque chose d'aussi simple conceptuellement, mais c'est nécessaire pour un fonctionnement correct.

produce() Boucle sur l'itérateur extérieur et envoie les éléments aux consommateurs, une valeur à la fois. Il utilise un barrière une primitive de synchronisation pratique ajoutée en Python 3.2, pour attendre que tous les consommateurs aient fini d'utiliser l'ancienne valeur avant de l'écraser avec la nouvelle dans le fichier next_val . Une fois que la nouvelle valeur est effectivement prête, un condition est diffusé. consume() est un générateur qui transmet les valeurs produites au fur et à mesure qu'elles arrivent, jusqu'à ce qu'il détecte STOP . Le code peut être généralisé pour exécuter un nombre quelconque de fonctions agrégées en parallèle en créant des consommateurs dans une boucle, et en ajustant leur nombre lors de la création de la barrière.

L'inconvénient de cette implémentation est qu'elle nécessite la création de threads (qui peut être allégée en rendant le pool de threads global) et beaucoup de synchronisation très prudente à chaque passage d'itération. Cette synchronisation détruit les performances - cette version est presque 2000 fois plus lente que l'implémentation monofilaire tee et 475 fois plus lent que la version threadée simple mais non déterministe.

Pourtant, tant que des threads sont utilisés, il est impossible d'éviter la synchronisation sous une forme ou une autre. Pour éliminer complètement la synchronisation, nous devons abandonner les threads et passer au multi-tâches coopératif. La question est de savoir s'il est possible de suspendre l'exécution de fonctions synchrones ordinaires telles que sum et max afin de passer de l'un à l'autre ?

Fibres : greenlet

Il s'avère que le greenlet Ce module d'extension tiers permet exactement cela. Les Greenlets sont une implémentation de fibres des micro-threads légers qui passent de l'un à l'autre de manière explicite. Cela ressemble en quelque sorte aux générateurs Python, qui utilisent yield à suspendre, sauf que les greenlets offrent un mécanisme de suspension beaucoup plus souple, permettant de choisir qui suspendre à .

Cela rend assez facile le portage de la version threadée de max_and_sum à des verdures :

def max_and_sum_greenlet(it):
    STOP = object()
    consumers = None

    def send(val):
        for g in consumers:
            g.switch(val)

    def produce():
        for elem in it:
            send(elem)
        send(STOP)

    def consume():
        g_produce = greenlet.getcurrent().parent
        while True:
            val = g_produce.switch()
            if val is STOP:
                return
            yield val

    sum_result = []
    max_result = []
    gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume())))
    gsum.switch()
    gmax = greenlet.greenlet(lambda: max_result.append(max(consume())))
    gmax.switch()
    consumers = (gsum, gmax)
    produce()

    return sum_result[0], max_result[0]

La logique est la même, mais avec moins de code. Comme avant, produce produit des valeurs récupérées à partir de l'itérateur source, mais sa fonction send ne s'embarrasse pas de synchronisation, car elle n'en a pas besoin lorsque tout est monofilaire. Au lieu de cela, il bascule explicitement vers chaque consommateur à tour de rôle pour qu'il fasse son travail, et le consommateur bascule consciencieusement en retour. Après être passé par tous les consommateurs, le producteur est prêt pour le passage de l'itération suivante.

Les résultats sont récupérés à l'aide d'une liste intermédiaire à un seul élément, car greenlet ne donne pas accès à la valeur de retour de la fonction cible (pas plus que la fonction threading.Thread C'est pourquoi nous avons opté pour concurrent.futures ci-dessus).

Il y a cependant des inconvénients à utiliser les greenlets. Tout d'abord, elles ne sont pas fournies avec la bibliothèque standard, vous devez installer l'extension greenlet. Ensuite, greenlet est intrinsèquement non portable car le code de commutation de pile est non supporté par le système d'exploitation et le compilateur et peut être considéré comme une sorte de hack (bien qu'un extrêmement malin un). Un Python qui cible WebAssembly ou JVM ou GraalVM serait très peu susceptible de soutenir le greenlet. Ce n'est pas un problème urgent, mais c'est certainement quelque chose à garder à l'esprit pour le long terme.

Coroutines : asyncio

Depuis Python 3.5, Python fournit des coroutines natives. Contrairement aux greenlets, et comme les générateurs, les coroutines sont distinctes des fonctions régulières et doivent être définies à l'aide de la commande async def . Les coroutines ne peuvent pas être facilement exécutées à partir d'un code synchrone, elles doivent être traitées par un planificateur qui les mène à terme. L'ordonnanceur est également connu sous le nom de boucle d'événement car son autre rôle est de recevoir les événements IO et de les transmettre aux callbacks et coroutines appropriés. Dans la bibliothèque standard, c'est le rôle de la fonction asyncio module.

Avant de mettre en œuvre un système de gestion de l'information basé sur l'asynchronisme max_and_sum nous devons d'abord résoudre un problème. Contrairement à greenlet, asyncio n'est capable de suspendre l'exécution que des coroutines, et non des fonctions arbitraires. Nous devons donc remplacer sum et max avec des coroutines qui font essentiellement la même chose. Il suffit de les implémenter de la manière évidente, en remplaçant seulement for avec async for permettant à l itérateur asynchrone pour suspendre la coroutine en attendant l'arrivée de la prochaine valeur :

async def asum(it):
    s = 0
    async for elem in it:
        s += elem
    return s

async def amax(it):
    NONE_YET = object()
    largest = NONE_YET
    async for elem in it:
        if largest is NONE_YET or elem > largest:
            largest = elem
    if largest is NONE_YET:
        raise ValueError("amax() arg is an empty sequence")
    return largest

# or, using https://github.com/vxgmichel/aiostream
#
#from aiostream.stream import accumulate
#def asum(it):
#    return accumulate(it, initializer=0)
#def amax(it):
#    return accumulate(it, max)

On peut raisonnablement se demander si le fait de fournir une nouvelle paire de fonctions agrégées constitue une tricherie ; après tout, les solutions précédentes ont pris soin d'utiliser les fonctions existantes. sum et max des encastrements. La réponse dépendra de l'interprétation exacte de la question, mais je dirais que les nouvelles fonctions sont autorisées car elles ne sont en aucun cas spécifiques à la tâche à accomplir. Elles font exactement la même chose que les built-ins, mais en consommant des itérateurs asynchrones. Je soupçonne que la seule raison pour laquelle ces fonctions n'existent pas déjà quelque part dans la bibliothèque standard est que les coroutines et les itérateurs asynchrones sont une fonctionnalité relativement nouvelle.

Cela étant dit, nous pouvons passer à l'écriture max_and_sum comme une coroutine :

async def max_and_sum_asyncio(it):
    loop = asyncio.get_event_loop()
    STOP = object()

    next_val = loop.create_future()
    consumed = loop.create_future()
    used_cnt = 2  # number of consumers

    async def produce():
        for elem in it:
            next_val.set_result(elem)
            await consumed
        next_val.set_result(STOP)

    async def consume():
        nonlocal next_val, consumed, used_cnt
        while True:
            val = await next_val
            if val is STOP:
                return
            yield val
            used_cnt -= 1
            if not used_cnt:
                consumed.set_result(None)
                consumed = loop.create_future()
                next_val = loop.create_future()
                used_cnt = 2
            else:
                await consumed

    s, m, _ = await asyncio.gather(asum(consume()), amax(consume()),
                                   produce())
    return s, m

Bien que cette version soit basée sur le passage d'une coroutine à l'autre à l'intérieur d'un seul thread, tout comme celle utilisant greenlet, elle semble différente. asyncio ne fournit pas de passage explicite des coroutines, il base le passage d'une tâche à l'autre sur l'attribut await suspension/reprise primitive. La cible de await peut être une autre coroutine, mais aussi un "futur" abstrait, un espace réservé à une valeur qui sera remplie plus tard par une autre coroutine. Une fois que la valeur attendue est disponible, la boucle d'événements reprend automatiquement l'exécution de la coroutine, avec la balise await évaluant la valeur fournie. Ainsi, au lieu de produce En passant aux consommateurs, elle se suspend en attendant un futur qui arrivera une fois que tous les consommateurs auront observé la valeur produite.

consume() est un générateur asynchrone qui est comme un générateur ordinaire, sauf qu'il crée un itérateur asynchrone, que nos coroutines agrégées sont déjà prêtes à accepter grâce à l'utilisation de la fonction async for . L'équivalent d'un itérateur asynchrone de __next__ s'appelle __anext__ et est une coroutine, ce qui permet à la coroutine qui épuise l'itérateur asynchrone de se suspendre en attendant l'arrivée de la nouvelle valeur. Lorsqu'un générateur asynchrone en cours d'exécution se suspend sur une valeur de await qui est observé par async for comme une suspension de l'implicite __anext__ l'invocation. consume() fait exactement cela quand il attend les valeurs fournies par produce et, au fur et à mesure qu'elles deviennent disponibles, les transmet aux coroutines agrégées comme asum et amax . L'attente est réalisée à l'aide du next_val futur, qui transporte l'élément suivant de it . En attendant ce futur à l'intérieur consume() suspend le générateur asynchrone, et avec lui la coroutine agrégée.

L'avantage de cette approche par rapport à la commutation explicite des greenlets est qu'elle permet de combiner beaucoup plus facilement des coroutines qui ne se connaissent pas dans la même boucle d'événements. Par exemple, on peut avoir deux instances de max_and_sum s'exécutant en parallèle (dans le même thread), ou d'exécuter une fonction d'agrégation plus complexe qui invoque d'autres codes asynchrones pour effectuer des calculs.

La fonction pratique suivante montre comment exécuter l'opération ci-dessus à partir d'un code non asynchrone :

def max_and_sum_asyncio_sync(it):
    # trivially instantiate the coroutine and execute it in the
    # default event loop
    coro = max_and_sum_asyncio(it)
    return asyncio.get_event_loop().run_until_complete(coro)

Performance

La mesure et la comparaison des performances de ces approches de l'exécution parallèle peuvent être trompeuses car sum et max ne font presque aucun traitement, ce qui surcharge les frais généraux de la parallélisation. Traitez-les comme vous le feriez pour n'importe quel microbenchmark, avec un gros grain de sel. Cela dit, regardons quand même les chiffres !

Les mesures ont été produites à l'aide de Python 3.6 Les fonctions n'ont été exécutées qu'une seule fois et ont été données range(10000) leur temps est mesuré en soustrayant time.time() avant et après l'exécution. Voici les résultats :

  • max_and_sum_buffer et max_and_sum_tee : 0.66 ms - presque exactement le même temps pour les deux, avec la tee version étant un peu plus rapide.

  • max_and_sum_threads_simple : 2,7 ms. Ce temps ne signifie pas grand-chose à cause de la mise en mémoire tampon non déterministe, donc il s'agit peut-être de mesurer le temps de démarrage de deux threads et la synchronisation effectuée en interne par Python.

  • max_and_sum_threads : 1.29 secondes L'option la plus lente est de loin l'option la plus lente, ~2000 fois plus lente que l'option la plus rapide. Ce résultat horrible est probablement dû à une combinaison des multiples synchronisations effectuées à chaque étape de l'itération et de leur interaction avec le GIL.

  • max_and_sum_greenlet : 25.5 ms, lent par rapport à la version initiale, mais beaucoup plus rapide que la version threadée. Avec une fonction agrégée suffisamment complexe, on peut imaginer utiliser cette version en production.

  • max_and_sum_asyncio : 351 ms, soit presque 14 fois plus lent que la version greenlet. C'est un résultat décevant car les coroutines asyncio sont plus légères que les greenlets, et le passage de l'une à l'autre devrait être beaucoup plus rapide. plus rapide que de passer d'une fibre à l'autre. Il est probable que le surcoût lié à l'exécution du planificateur de coroutine et de la boucle d'événement (qui dans ce cas est surdimensionné étant donné que le code n'effectue aucune entrée/sortie) détruise les performances de ce micro-benchmark.

  • max_and_sum_asyncio en utilisant uvloop : 125 ms. C'est plus du double de la vitesse de l'asyncio ordinaire, mais toujours presque 5 fois plus lent que greenlet.

L'exécution des exemples sous PyPy n'apporte pas de gain de vitesse significatif, en fait la plupart des exemples s'exécutent légèrement plus lentement, même après les avoir exécutés plusieurs fois pour assurer le réchauffement du JIT. La fonction asyncio nécessite un réécriture de ne pas utiliser de générateurs asynchrones (puisque PyPy, à ce jour, implémente Python 3.5), et s'exécute en un peu moins de 100 ms. C'est comparable aux performances de CPython+uvloop, c'est-à-dire meilleur, mais pas dramatique comparé à greenlet.

5voto

tobias_k Points 13121

S'il est vrai que pour vos fonctions agrégées f(a,b,c,...) == f(a, f(b, f(c, ...))) Ensuite, vous pouvez simplement parcourir vos fonctions et les alimenter un élément à la fois, en les combinant à chaque fois avec le résultat de l'application précédente, comme suit reduce ferait, par exemple, comme ceci :

def aggregate(iterator, *functions):
    first = next(iterator)
    result = [first] * len(functions)
    for item in iterator:
        for i, f in enumerate(functions):
            result[i] = f((result[i], item))
    return result

Cette méthode est considérablement plus lente (environ 10 à 20 fois) que la simple matérialisation de l'itérateur dans une liste et l'application de la fonction d'agrégation sur l'ensemble de la liste, ou que l'utilisation de l'option itertools.tee (qui fait fondamentalement la même chose, en interne), mais il a l'avantage de ne pas utiliser de mémoire supplémentaire.

Notez cependant que si cela fonctionne bien pour des fonctions telles que sum , min ou max mais elle ne fonctionne pas pour d'autres fonctions d'agrégation, par exemple pour trouver l'élément moyen ou médian d'un itérateur. mean(a, b, c) != mean(a, mean(b, c)) . (Pour mean vous pouvez bien sûr vous contenter d'obtenir le sum et le diviser par le nombre d'éléments, mais calculer par exemple la médiane en prenant un seul élément à la fois sera plus difficile).

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X