Voyons comment appliquer deux fonctions d'agrégation au même itérateur, que nous ne pouvons épuiser qu'une seule fois. La première tentative (qui code en dur sum
et max
pour des raisons de brièveté, mais qui est trivialement généralisable à un nombre arbitraire de fonctions agrégées) pourrait ressembler à ceci :
def max_and_sum_buffer(it):
content = list(it)
p = sum(content)
m = max(content)
return p, m
Cette mise en œuvre présente l'inconvénient de stocker tous les éléments générés en mémoire en une seule fois, alors que les deux fonctions sont parfaitement capables de traiter les flux. La question anticipe cette dérobade et demande explicitement que le résultat soit produit sans mettre en mémoire tampon la sortie de l'itérateur. Est-il possible de le faire ?
Exécution en série : itertools.tee
Il est certain que semble possible. Après tout, les itérateurs Python sont externe Ainsi, chaque itérateur est déjà capable de se suspendre. Comment pourrait-il être difficile de fournir un adaptateur qui divise un itérateur en deux nouveaux itérateurs qui fournissent le même contenu ? En fait, c'est exactement la description de itertools.tee
qui semble parfaitement adapté à l'itération parallèle :
def max_and_sum_tee(it):
it1, it2 = itertools.tee(it)
p = sum(it1) # XXX
m = max(it2)
return p, m
La méthode ci-dessus donne le bon résultat, mais ne fonctionne pas comme nous le voudrions. Le problème est que nous n'itérons pas en parallèle. Les fonctions d'agrégation comme sum
et max
ne sont jamais suspendues - chacune insiste pour consommer tout le contenu de l'itérateur avant de produire le résultat. Ainsi, sum
va épuiser it1
avant max
n'a pas eu l'occasion de courir du tout. Les éléments épuisants de it1
tout en laissant it2
à lui seul, ces éléments seront accumulés dans une FIFO interne partagée entre les deux itérateurs. C'est inévitable ici - puisque max(it2)
doivent voir les mêmes éléments, tee
n'a pas d'autre choix que de les accumuler. (Pour plus de détails intéressants sur tee
, se référer à ce poste. )
En d'autres termes, il n'y a aucune différence entre cette implémentation et la première, si ce n'est que la première rend au moins la mise en mémoire tampon explicite. Pour éliminer la mise en mémoire tampon, sum
et max
doivent fonctionner en parallèle, et non l'un après l'autre.
Fils de discussion : concurrent.futures
Voyons ce qu'il se passe si nous exécutons les fonctions d'agrégation dans des threads séparés, en utilisant toujours tee
pour dupliquer l'itérateur original :
def max_and_sum_threads_simple(it):
it1, it2 = itertools.tee(it)
with concurrent.futures.ThreadPoolExecutor(2) as executor:
sum_future = executor.submit(lambda: sum(it1))
max_future = executor.submit(lambda: max(it2))
return sum_future.result(), max_future.result()
Maintenant sum
et max
fonctionnent en fait en parallèle (jusqu'à le GIL ), les fils étant gérés par l'excellente équipe de la concurrent.futures
module. Il présente toutefois un défaut fatal : pour les modules tee
de ne pas mettre les données en mémoire tampon, sum
et max
doivent traiter leurs articles exactement à la même vitesse. Si l'un est même un peu plus rapide que l'autre, ils s'éloigneront l'un de l'autre, et tee
mettra en mémoire tampon tous les éléments intermédiaires. Comme il n'y a aucun moyen de prédire la vitesse d'exécution de chaque élément, la quantité de mise en mémoire tampon est à la fois imprévisible et présente le pire des cas, celui de tout mettre en mémoire tampon.
Pour s'assurer qu'aucune mise en mémoire tampon ne se produit, tee
doit être remplacé par un générateur personnalisé qui ne met rien en mémoire tampon et bloque jusqu'à ce que tous les consommateurs aient observé la valeur précédente avant de passer à la suivante. Comme précédemment, chaque consommateur s'exécute dans son propre thread, mais maintenant le thread appelant est occupé à exécuter un producteur, une boucle qui itère réellement sur l'itérateur source et signale qu'une nouvelle valeur est disponible. Voici une implémentation :
def max_and_sum_threads(it):
STOP = object()
next_val = None
consumed = threading.Barrier(2 + 1) # 2 consumers + 1 producer
val_id = 0
got_val = threading.Condition()
def send(val):
nonlocal next_val, val_id
consumed.wait()
with got_val:
next_val = val
val_id += 1
got_val.notify_all()
def produce():
for elem in it:
send(elem)
send(STOP)
def consume():
last_val_id = -1
while True:
consumed.wait()
with got_val:
got_val.wait_for(lambda: val_id != last_val_id)
if next_val is STOP:
return
yield next_val
last_val_id = val_id
with concurrent.futures.ThreadPoolExecutor(2) as executor:
sum_future = executor.submit(lambda: sum(consume()))
max_future = executor.submit(lambda: max(consume()))
produce()
return sum_future.result(), max_future.result()
C'est une quantité considérable de code pour quelque chose d'aussi simple conceptuellement, mais c'est nécessaire pour un fonctionnement correct.
produce()
Boucle sur l'itérateur extérieur et envoie les éléments aux consommateurs, une valeur à la fois. Il utilise un barrière une primitive de synchronisation pratique ajoutée en Python 3.2, pour attendre que tous les consommateurs aient fini d'utiliser l'ancienne valeur avant de l'écraser avec la nouvelle dans le fichier next_val
. Une fois que la nouvelle valeur est effectivement prête, un condition est diffusé. consume()
est un générateur qui transmet les valeurs produites au fur et à mesure qu'elles arrivent, jusqu'à ce qu'il détecte STOP
. Le code peut être généralisé pour exécuter un nombre quelconque de fonctions agrégées en parallèle en créant des consommateurs dans une boucle, et en ajustant leur nombre lors de la création de la barrière.
L'inconvénient de cette implémentation est qu'elle nécessite la création de threads (qui peut être allégée en rendant le pool de threads global) et beaucoup de synchronisation très prudente à chaque passage d'itération. Cette synchronisation détruit les performances - cette version est presque 2000 fois plus lente que l'implémentation monofilaire tee
et 475 fois plus lent que la version threadée simple mais non déterministe.
Pourtant, tant que des threads sont utilisés, il est impossible d'éviter la synchronisation sous une forme ou une autre. Pour éliminer complètement la synchronisation, nous devons abandonner les threads et passer au multi-tâches coopératif. La question est de savoir s'il est possible de suspendre l'exécution de fonctions synchrones ordinaires telles que sum
et max
afin de passer de l'un à l'autre ?
Fibres : greenlet
Il s'avère que le greenlet
Ce module d'extension tiers permet exactement cela. Les Greenlets sont une implémentation de fibres des micro-threads légers qui passent de l'un à l'autre de manière explicite. Cela ressemble en quelque sorte aux générateurs Python, qui utilisent yield
à suspendre, sauf que les greenlets offrent un mécanisme de suspension beaucoup plus souple, permettant de choisir qui suspendre à .
Cela rend assez facile le portage de la version threadée de max_and_sum
à des verdures :
def max_and_sum_greenlet(it):
STOP = object()
consumers = None
def send(val):
for g in consumers:
g.switch(val)
def produce():
for elem in it:
send(elem)
send(STOP)
def consume():
g_produce = greenlet.getcurrent().parent
while True:
val = g_produce.switch()
if val is STOP:
return
yield val
sum_result = []
max_result = []
gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume())))
gsum.switch()
gmax = greenlet.greenlet(lambda: max_result.append(max(consume())))
gmax.switch()
consumers = (gsum, gmax)
produce()
return sum_result[0], max_result[0]
La logique est la même, mais avec moins de code. Comme avant, produce
produit des valeurs récupérées à partir de l'itérateur source, mais sa fonction send
ne s'embarrasse pas de synchronisation, car elle n'en a pas besoin lorsque tout est monofilaire. Au lieu de cela, il bascule explicitement vers chaque consommateur à tour de rôle pour qu'il fasse son travail, et le consommateur bascule consciencieusement en retour. Après être passé par tous les consommateurs, le producteur est prêt pour le passage de l'itération suivante.
Les résultats sont récupérés à l'aide d'une liste intermédiaire à un seul élément, car greenlet ne donne pas accès à la valeur de retour de la fonction cible (pas plus que la fonction threading.Thread
C'est pourquoi nous avons opté pour concurrent.futures
ci-dessus).
Il y a cependant des inconvénients à utiliser les greenlets. Tout d'abord, elles ne sont pas fournies avec la bibliothèque standard, vous devez installer l'extension greenlet. Ensuite, greenlet est intrinsèquement non portable car le code de commutation de pile est non supporté par le système d'exploitation et le compilateur et peut être considéré comme une sorte de hack (bien qu'un extrêmement malin un). Un Python qui cible WebAssembly ou JVM ou GraalVM serait très peu susceptible de soutenir le greenlet. Ce n'est pas un problème urgent, mais c'est certainement quelque chose à garder à l'esprit pour le long terme.
Coroutines : asyncio
Depuis Python 3.5, Python fournit des coroutines natives. Contrairement aux greenlets, et comme les générateurs, les coroutines sont distinctes des fonctions régulières et doivent être définies à l'aide de la commande async def
. Les coroutines ne peuvent pas être facilement exécutées à partir d'un code synchrone, elles doivent être traitées par un planificateur qui les mène à terme. L'ordonnanceur est également connu sous le nom de boucle d'événement car son autre rôle est de recevoir les événements IO et de les transmettre aux callbacks et coroutines appropriés. Dans la bibliothèque standard, c'est le rôle de la fonction asyncio
module.
Avant de mettre en œuvre un système de gestion de l'information basé sur l'asynchronisme max_and_sum
nous devons d'abord résoudre un problème. Contrairement à greenlet, asyncio n'est capable de suspendre l'exécution que des coroutines, et non des fonctions arbitraires. Nous devons donc remplacer sum
et max
avec des coroutines qui font essentiellement la même chose. Il suffit de les implémenter de la manière évidente, en remplaçant seulement for
avec async for
permettant à l itérateur asynchrone pour suspendre la coroutine en attendant l'arrivée de la prochaine valeur :
async def asum(it):
s = 0
async for elem in it:
s += elem
return s
async def amax(it):
NONE_YET = object()
largest = NONE_YET
async for elem in it:
if largest is NONE_YET or elem > largest:
largest = elem
if largest is NONE_YET:
raise ValueError("amax() arg is an empty sequence")
return largest
# or, using https://github.com/vxgmichel/aiostream
#
#from aiostream.stream import accumulate
#def asum(it):
# return accumulate(it, initializer=0)
#def amax(it):
# return accumulate(it, max)
On peut raisonnablement se demander si le fait de fournir une nouvelle paire de fonctions agrégées constitue une tricherie ; après tout, les solutions précédentes ont pris soin d'utiliser les fonctions existantes. sum
et max
des encastrements. La réponse dépendra de l'interprétation exacte de la question, mais je dirais que les nouvelles fonctions sont autorisées car elles ne sont en aucun cas spécifiques à la tâche à accomplir. Elles font exactement la même chose que les built-ins, mais en consommant des itérateurs asynchrones. Je soupçonne que la seule raison pour laquelle ces fonctions n'existent pas déjà quelque part dans la bibliothèque standard est que les coroutines et les itérateurs asynchrones sont une fonctionnalité relativement nouvelle.
Cela étant dit, nous pouvons passer à l'écriture max_and_sum
comme une coroutine :
async def max_and_sum_asyncio(it):
loop = asyncio.get_event_loop()
STOP = object()
next_val = loop.create_future()
consumed = loop.create_future()
used_cnt = 2 # number of consumers
async def produce():
for elem in it:
next_val.set_result(elem)
await consumed
next_val.set_result(STOP)
async def consume():
nonlocal next_val, consumed, used_cnt
while True:
val = await next_val
if val is STOP:
return
yield val
used_cnt -= 1
if not used_cnt:
consumed.set_result(None)
consumed = loop.create_future()
next_val = loop.create_future()
used_cnt = 2
else:
await consumed
s, m, _ = await asyncio.gather(asum(consume()), amax(consume()),
produce())
return s, m
Bien que cette version soit basée sur le passage d'une coroutine à l'autre à l'intérieur d'un seul thread, tout comme celle utilisant greenlet, elle semble différente. asyncio ne fournit pas de passage explicite des coroutines, il base le passage d'une tâche à l'autre sur l'attribut await
suspension/reprise primitive. La cible de await
peut être une autre coroutine, mais aussi un "futur" abstrait, un espace réservé à une valeur qui sera remplie plus tard par une autre coroutine. Une fois que la valeur attendue est disponible, la boucle d'événements reprend automatiquement l'exécution de la coroutine, avec la balise await
évaluant la valeur fournie. Ainsi, au lieu de produce
En passant aux consommateurs, elle se suspend en attendant un futur qui arrivera une fois que tous les consommateurs auront observé la valeur produite.
consume()
est un générateur asynchrone qui est comme un générateur ordinaire, sauf qu'il crée un itérateur asynchrone, que nos coroutines agrégées sont déjà prêtes à accepter grâce à l'utilisation de la fonction async for
. L'équivalent d'un itérateur asynchrone de __next__
s'appelle __anext__
et est une coroutine, ce qui permet à la coroutine qui épuise l'itérateur asynchrone de se suspendre en attendant l'arrivée de la nouvelle valeur. Lorsqu'un générateur asynchrone en cours d'exécution se suspend sur une valeur de await
qui est observé par async for
comme une suspension de l'implicite __anext__
l'invocation. consume()
fait exactement cela quand il attend les valeurs fournies par produce
et, au fur et à mesure qu'elles deviennent disponibles, les transmet aux coroutines agrégées comme asum
et amax
. L'attente est réalisée à l'aide du next_val
futur, qui transporte l'élément suivant de it
. En attendant ce futur à l'intérieur consume()
suspend le générateur asynchrone, et avec lui la coroutine agrégée.
L'avantage de cette approche par rapport à la commutation explicite des greenlets est qu'elle permet de combiner beaucoup plus facilement des coroutines qui ne se connaissent pas dans la même boucle d'événements. Par exemple, on peut avoir deux instances de max_and_sum
s'exécutant en parallèle (dans le même thread), ou d'exécuter une fonction d'agrégation plus complexe qui invoque d'autres codes asynchrones pour effectuer des calculs.
La fonction pratique suivante montre comment exécuter l'opération ci-dessus à partir d'un code non asynchrone :
def max_and_sum_asyncio_sync(it):
# trivially instantiate the coroutine and execute it in the
# default event loop
coro = max_and_sum_asyncio(it)
return asyncio.get_event_loop().run_until_complete(coro)
Performance
La mesure et la comparaison des performances de ces approches de l'exécution parallèle peuvent être trompeuses car sum
et max
ne font presque aucun traitement, ce qui surcharge les frais généraux de la parallélisation. Traitez-les comme vous le feriez pour n'importe quel microbenchmark, avec un gros grain de sel. Cela dit, regardons quand même les chiffres !
Les mesures ont été produites à l'aide de Python 3.6 Les fonctions n'ont été exécutées qu'une seule fois et ont été données range(10000)
leur temps est mesuré en soustrayant time.time()
avant et après l'exécution. Voici les résultats :
-
max_and_sum_buffer
et max_and_sum_tee
: 0.66 ms - presque exactement le même temps pour les deux, avec la tee
version étant un peu plus rapide.
-
max_and_sum_threads_simple
: 2,7 ms. Ce temps ne signifie pas grand-chose à cause de la mise en mémoire tampon non déterministe, donc il s'agit peut-être de mesurer le temps de démarrage de deux threads et la synchronisation effectuée en interne par Python.
-
max_and_sum_threads
: 1.29 secondes L'option la plus lente est de loin l'option la plus lente, ~2000 fois plus lente que l'option la plus rapide. Ce résultat horrible est probablement dû à une combinaison des multiples synchronisations effectuées à chaque étape de l'itération et de leur interaction avec le GIL.
-
max_and_sum_greenlet
: 25.5 ms, lent par rapport à la version initiale, mais beaucoup plus rapide que la version threadée. Avec une fonction agrégée suffisamment complexe, on peut imaginer utiliser cette version en production.
-
max_and_sum_asyncio
: 351 ms, soit presque 14 fois plus lent que la version greenlet. C'est un résultat décevant car les coroutines asyncio sont plus légères que les greenlets, et le passage de l'une à l'autre devrait être beaucoup plus rapide. plus rapide que de passer d'une fibre à l'autre. Il est probable que le surcoût lié à l'exécution du planificateur de coroutine et de la boucle d'événement (qui dans ce cas est surdimensionné étant donné que le code n'effectue aucune entrée/sortie) détruise les performances de ce micro-benchmark.
-
max_and_sum_asyncio
en utilisant uvloop
: 125 ms. C'est plus du double de la vitesse de l'asyncio ordinaire, mais toujours presque 5 fois plus lent que greenlet.
L'exécution des exemples sous PyPy n'apporte pas de gain de vitesse significatif, en fait la plupart des exemples s'exécutent légèrement plus lentement, même après les avoir exécutés plusieurs fois pour assurer le réchauffement du JIT. La fonction asyncio nécessite un réécriture de ne pas utiliser de générateurs asynchrones (puisque PyPy, à ce jour, implémente Python 3.5), et s'exécute en un peu moins de 100 ms. C'est comparable aux performances de CPython+uvloop, c'est-à-dire meilleur, mais pas dramatique comparé à greenlet.
0 votes
sum(a,b,c)
est la même chose quesum(a,sum(b,c))
de même pourmax
. Peut-on supposer que c'est toujours le cas ? Il suffit alors d'appliquer les fonctions d'agrégation pour chaque élément de l'itérateur.1 votes
@tobias_k Belle prise ! Je ne peux pas parler pour le PO, mais supposer que cela semble un peu exagéré parce qu'alors vous travaillez vraiment avec des fonctions binaires (
+
et le binairemax
), et non avec des fonctions agrégées. La question fait référence aux fonctions d'agrégation en général, en les décrivant comme "acceptant un itérateur comme seul paramètre", en utilisant uniquement la fonctionsum
etmax
à titre d'exemple. Dans ce contexte, je dirais qu'une réponse doit fonctionner pour les agrégats qui ne peuvent être réduits à une série apatride d'applications d'une fonction binaire (par exemple, un agrégat qui renvoie la médiane de la séquence).0 votes
@user4815162342 Je pensais que ce serait un moyen simple et agréable qui fonctionne avec O(1) mémoire et sans threads, mais vous avez raison ;
average
serait un autre exemple. (Aussi, c'est assez lent.)0 votes
@tobias_k Aussi, c'est assez lent. J'ai essayé par curiosité, et pour
sum
etmax
etrange(10000)
il pointe à 4,9 ms sur ma machine, bien plus rapide que les solutions de ma réponse (sauf les initiales qui mettent tout en mémoire tampon).0 votes
@user4815162342 Je l'ai juste comparé à la mise en mémoire tampon de l'itérateur entier avec
list
outee
. Quoi qu'il en soit, je l'ai posté comme une réponse, peut-être que c'est utile dans certains cas. Au moins, c'est plus simple que les approches basées sur les fils.0 votes
@tobias_k Au moins, c'est plus simple que les approches basées sur les fils. Pour mémoire, les approches de ma réponse ne sont pas basées sur le thread, du moins pas toutes. Elles sont néanmoins complexes, mais c'est probablement inévitable sans perte de généralité.