8 votes

Mettre à jour un DataFrame dans différents processus python en temps réel

Disons que vous avez un processus Python qui collecte des données en temps réel à raison d'environ 500 lignes par seconde (ce qui peut être parallélisé pour réduire à environ 50 lignes par seconde) à partir d'un système de mise en file d'attente et qui les ajoute à un fichier de type DataFrame :

rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
    recv = rq.get(block=True)
    # some converting
    df.append(recv, ignore_index = True)

Maintenant la question est : Comment utiliser les processeurs en fonction de ces données ? Je suis donc tout à fait conscient des limites de la GIL et a regardé dans Gestionnaire de multitraitement espace de noms , ici aussi mais il semble qu'il y ait quelques inconvénients en ce qui concerne la latence sur la trame de données à maintien central . Avant de creuser, j'ai aussi essayé pool.map que je que reconnu pour appliquer pickle entre les processus, ce qui est bien trop lent et entraîne trop de frais généraux.

Après tout cela, je me demande finalement comment (si) une insertion de 500 lignes par seconde (ou même 50 lignes par seconde) peut être transférée à différents processus avec un peu de temps CPU restant pour appliquer des statistiques et des heuristiques sur les données dans les processus enfants ?

Peut-être serait-il préférable d'implémenter un socket tcp personnalisé ou un système de file d'attente entre les deux processus ? Ou existe-t-il des implémentations dans pandas ou d'autres librairies pour permettre un accès rapide à l'unique grand cadre de données du processus parent. ? J'adore les pandas !

4voto

AmirHmZ Points 436

Avant de commencer, je dois dire que vous ne nous avez pas dit grand-chose sur votre code mais que vous avez en tête de ne transférer que les 50/500 nouvelles lignes par seconde au processus enfant et d'essayer de créer ce grand DataFrame dans le processus de l'enfant.

Je travaille sur un projet exactement comme le vôtre. Python possède de nombreuses implémentations IPC telles que Pipe y Queue comme vous le savez. Shared Memory Cette solution peut s'avérer problématique dans de nombreux cas, AFAIK documentation officielle python a prévenu sur l'utilisation des souvenirs partagés.

D'après mon expérience, le meilleur moyen de transformer les données entre uniquement deux processus est Pipe Vous pouvez ainsi récupérer le DataFrame et l'envoyer à l'autre point de connexion. Je vous conseille vivement d'éviter TCP les prises ( AF_INET ) dans votre cas.

Pandas DataFrame ne peuvent pas être transformées en un autre processus sans être décapées et dépiquées. Je vous recommande donc également de transférer les données brutes sous forme de types intégrés tels que dict au lieu de DataFrame. Cela permet d'accélérer l'extraction et le désempilage et de réduire l'empreinte mémoire.

1voto

jorijnsmit Points 155

Parallélisme dans pandas est probablement mieux gérée par un tout autre moteur.

Jetez un coup d'œil à la Projet Koalas par Databricks o Le DataFrame de Dask .

0voto

arshit arora Points 33

Une solution simple consisterait à séparer le processus en deux étapes différentes. Utilisez Asyncio pour recevoir les données de manière non bloquante, et effectuez vos transformations dans ce cadre. La deuxième étape consomme une file d'attente Asyncio pour construire le DataFrame. Cela suppose que vous n'avez pas besoin que le DataFrame soit disponible pour un autre processus pendant que vous recevez des données de la file d'attente Redis.

Voici un exemple de construction d'un modèle producteur/consommateur avec Asyncio

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X