6 votes

L'envoi d'un grand dictionnaire via un appel API casse le serveur de développement

Je suis en train d'exécuter une application django avec une base de données postgreSQL et j'essaie d'envoyer un très grand dictionnaire (constitué de données de séries temporelles) à la base de données.

Mon objectif est d'écrire mes données dans la base de données le plus rapidement possible. J'utilise la bibliothèque requests pour envoyer les données via un appel d'API (construit avec django REST) :

Ma vue API est simple :

@api_view(["POST"])
def CreateDummy(request):

    for elem, ts in request.data['time_series'] :
        TimeSeries.objects.create(data_json=ts)

    msg = {"detail": "Created successfully"}
    return Response(msg, status=status.HTTP_201_CREATED)

request.data['time_series'] est un énorme dictionnaire structuré de la sorte :

{Bâtiment1: {1:123, 2: 345, 4:567 .... 31536000: 2345}, .... Bâtiment30: {..... }}

Cela signifie que j'ai 30 clés avec 30 valeurs, alors que les valeurs sont chacune un dictionnaire avec 31536000 éléments.

Ma requête API ressemble à ceci (où les données sont mon dictionnaire décrit ci-dessus) :

 payload = {
            "time_series": data,
           } 

 requests.request(
        "post", url=endpoint, json=payload
    )

Le code enregistre les données de séries temporelles dans un champ jsonb dans le backend. Maintenant, cela fonctionne si je ne boucle que sur les 4 premiers éléments du dictionnaire. Je peux obtenir ces données en environ 1 minute. Mais lorsque je boucle sur l'ensemble du dictionnaire, mon serveur de développement s'arrête. Je suppose que c'est parce que la mémoire est insuffisante. Je reçois une requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')). Est-ce que le dictionnaire entier est sauvegardé en mémoire avant de commencer l'itération ? Je doute car j'ai lu qu'en python3, boucler avec .items() renvoie un itérateur et c'est la manière recommandée de le faire.

Y a-t-il une meilleure façon de gérer des dictionnaires massifs en django/python ? Devrais-je boucler sur la moitié puis sur l'autre moitié ? Ou y a-t-il une façon plus rapide ? Peut-être en utilisant pandas ? Ou peut-être en envoyant les données différemment ? Je suppose que je recherche la manière la plus performante de le faire.

Je suis prêt à fournir plus de code si nécessaire.

Toute aide, suggestions ou guides sont grandement appréciés ! Merci d'avance

EDIT2: Je ne pense pas que ce soit l'utilisation de ma RAM ou la taille du dictionnaire. Il me reste encore 5 GiB de RAM lorsque le serveur s'arrête. Et la taille du dictionnaire est de 1176 octets Le dictionnaire est beaucoup plus grand, voir les commentaires

EDIT3: Je ne peux même pas imprimer le gros dictionnaire. Le serveur s'arrête également à ce moment-là

EDIT4: Lorsque je divise les données et les envoie pas toutes en une seule fois, le serveur peut les gérer. Mais lorsque j'essaie de les interroger à nouveau, le serveur s'arrête à nouveau. Il s'arrête sur mon serveur de production (configuration AWS RDS nginx) et il s'arrête sur mon serveur de développement local. Je suis presque certain que c'est parce que django ne peut pas gérer des requêtes aussi volumineuses avec ma configuration actuelle. Mais comment pourrais-je résoudre cela ?

EDIT5: Donc ce que je recherche est une solution en deux parties. Une pour la création des données et une pour l'interrogation des données. La création des données que j'ai décrite ci-dessus. Mais même si j'obtiens toutes ces données dans la base de données, j'aurai néanmoins des problèmes pour les récupérer.

J'ai essayé en créant les données pas toutes en même temps mais chaque série temporelle individuellement. Donc supposons que j'ai ces énormes données dans ma base de données et que j'essaie de les interroger à nouveau. Tous les objets de séries temporelles appartiennent à un réseau, donc j'ai essayé ceci comme tel :

class TimeSeriesByTypeAndCreationMethod(ListAPIView):
    """Interroger des séries temporelles dans un réseau spécifique."""

    serializer_class = TimeSeriesSerializer

    def get_queryset(self):
        """Interroger les séries temporelles

        Interroger par le nom du réseau, le type de données, la méthode de création et
        la source.
        """

        network = self.kwargs["name_network"]

        if TimeSeries.objects.filter(
            network_element__network__name=network,
        ).exists():
            time_series = TimeSeries.objects.filter(
                network_element__network__name=network,
            )
            return time_series
        else:
            raise NotFound()

Mais la requête fait crasher le serveur comme la création des données précédente. Je pense aussi que c'est une charge de données trop importante. J'ai pensé que je pourrais utiliser du sql brut pour éviter de faire planter le serveur... Ou existe-t-il aussi une meilleure façon ?

EDIT6: Modèles pertinents :

class TimeSeries(models.Model):

    CHOIX_TYPE_DE_DONNEES = [
        ....beaucoup de choix...
    ]

    CHOIX_METHODE_DE_CREATION = [
        ....beaucoup de choix...
    ]

    description = models.CharField(
        max_length=120,
        null=True,
        blank=True,
    )

    network_element = models.ForeignKey(
        Building,
        on_delete=models.CASCADE,
        null=True,
        blank=True,
    )
    type_data = models.CharField(
        null=True,
        blank=True,
        max_length=30,
        choices=CHOIX_TYPE_DE_DONNEES,
    )

    creation_method = models.CharField(
        null=True,
        blank=True,
        max_length=30,
        choices=CHOIX_METHODE_DE_CREATION,
    )

    source = models.CharField(
        null=True,
        blank=True,
        max_length=300
    )

    data_json = JSONField(
        help_text="Données des séries temporelles au format JSON. JSON valide attendu."
    )

    creation_date = models.DateTimeField(auto_now=True, null=True, blank=True)

    def __str__(self):
        return f"{self.creation_method}:{self.type_data}"

class Building(models.Model):

    CHOIX_UTILISATION = [
        ...
    ]

    nom = models.CharField(
        max_length=120,
        null=True,
        blank=True,
    )
    rue = models.CharField(
        max_length=120,
        null=True,
        blank=True,
    )
    numéro_de_maison = models.CharField(
        max_length=20,
        null=True,
        blank=True,
    )
    code_postal = models.CharField(
        max_length=5,
        null=True,
        blank=True,
    )
    ville = models.CharField(
        max_length=120,
        null=True,
        blank=True,
    )
    utilisation = models.CharField(
        max_length=120,
        choices=CHOIX_UTILISATION,
        null=True,
        blank=True,
    )
    .....beaucoup plus de champs....

2voto

sonus21 Points 920

Vous pouvez résoudre vos problèmes en utilisant deux techniques.

Création de données

Utilisez bulk_create pour insérer un grand nombre d'enregistrements, si une erreur SQL se produit en raison de la taille de la requête, etc., alors fournissez le batch_size dans bulk_create.

records = []
for elem, ts in request.data['time_series'] :
    records.append(
         TimeSeries(data_json=ts)
    )

# définir une taille de lot à 1000

TimeSeries.objects.bulk_create(records, batch_size=1000)

Il y a quelques inconvénients avec bulk_create comme il ne générera pas de signaux et d'autres, voir plus dans Doc

Récupération de données

Configurez le framework rest pour utiliser la pagination configuration par défaut

REST_FRAMEWORK = {
    'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.LimitOffsetPagination',
    'PAGE_SIZE': 100
}

Pour une configuration personnalisée, utilisez

class TimeSeriesResultsSetPagination(PageNumberPagination):
    page_size = 50
    page_size_query_param = 'page_size'
    max_page_size = 10000

class BillingRecordsView(generics.ListAPIView):
   serializer_class = TimeSeriesSerializer
   pagination_class = TimeSeriesResultsSetPagination

   def get_queryset(self):
    """Query time-series

    Query by name of network, type of data, creation method and
    source.
    """

    network = self.kwargs["name_network"]

    if TimeSeries.objects.filter(
        network_element__network__name=network,
    ).exists():
        time_series = TimeSeries.objects.filter(
            network_element__network__name=network,
        )
        return time_series
    else:
        raise NotFound()

Voir d'autres techniques de pagination à https://www.django-rest-framework.org/api-guide/pagination/

1voto

Ahmed Shehab Points 796

@micromegas lorsque votre solution est correcte théoriquement, cependant appeler create() plusieurs fois dans une boucle, je crois que cela provoque l'exception ConnectionError.

essayez de refactoriser quelque chose comme:

big_data_holder = []
for elem, ts in request.data['time_series'] :
    big_data_holder.append(
         TimeSeries(data_json=ts)
    )

# examiner la structure 
print(big_data_holder) 

TimeSeries.objects.bulk_create(big_data_holder)

veuillez vérifier les inconvénients de cette méthode Django Docs bulk_create

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X