5 votes

Envoi d'un événement lors de la création d'un nouveau fichier dans azure data lake gen 1

Je veux envoyer un événement ou une notification à un flux NIFI externe lorsqu'un nouveau fichier a été ajouté à Azure Data Lake Gen 1.

Quelqu'un a travaillé ou a des informations sur ce cas d'utilisation ?

5voto

Adam Marczak Points 1893

C'est une très bonne question et, malheureusement, il n'y a pas actuellement d'événement prêt à l'emploi que l'on puisse connecter, mais j'ai trouvé un moyen de le faire en utilisant la fonction journaux de diagnostic .

L'idée est donc simple

  1. Mettre en place journaux de diagnostic sur ADLSv1 pour transmettre les journaux à centre d'événements
  2. Déclencheur application logique pour les nouvelles entrées sur centre d'événements

enter image description here


Donc, pour configurer les journaux de diagnostic sur ADLSv1

  1. Allez dans les paramètres de diagnostic sur ADLSv1

enter image description here

  1. Ajouter des paramètres de diagnostic

enter image description here

  1. Créez un lien vers votre centre d'événements

enter image description here


Maintenant, configurez l'application logique

  1. Concepteur d'applications en logique ouverte

enter image description here

  1. Ajouter un déclencheur de hub d'événement (ceci suppose que le hub nommé stackdemohub existe)

enter image description here

  1. Ajouter l'action du tableau de filtres

enter image description here

3.1. Définir De à l'expression

triggerBody()?['ContentData']['records']

3.2. Définir la condition de droite comme "créer" et le champ de condition de gauche comme expression.

item()['operationName']
  1. Ajouter une boucle for-each et passer le corps de l'étape du tableau de filtre

enter image description here

  1. Sauvegarder et exécuter

5.1. Vous verrez que lorsque l'application logique est exécutée avec succès

enter image description here

vous trouverez de nouveaux fichiers dans la liste

enter image description here

Comme vous pouvez le voir, l'un des fichiers que j'ai téléchargé s'appelle MarketplaceCharges.json et se trouve dans le dossier demo.

Chaque événement ressemble à ceci

{
    "time": "2019-09-18T07:48:20.342Z",
    "resourceId": "/SUBSCRIPTIONS/2BCB9F3D-3F6B-4345-A49E-86D3141C7F73/RESOURCEGROUPS/STACKDEMO/PROVIDERS/MICROSOFT.DATALAKESTORE/ACCOUNTS/STACKDEMO",
    "category": "Requests",
    "operationName": "create",
    "resultType": "201",
    "callerIpAddress": "::ffff:111.222.333.444",
    "correlationId": "93faafd5-dfa2-4432-91f8-c7f360d80655",
    "identity": "adam@marczak.io",
    "properties": {
      "HttpMethod": "PUT",
      "Path": "/webhdfs/v1/demo/MarketplaceCharges.json",
      "RequestContentLength": 0,
      "ClientRequestId": "288c654f-0948-4468-8e92-b158cc265c54",
      "StartTime": "2019-09-18T07:48:20.264Z",
      "EndTime": "2019-09-18T07:48:20.334Z",
      "UserId": "8162E212-E32B-443C-8F13-1CDA7B264DDB"
    }
}

et vous obtenez une valeur avec le chemin du fichier /webhdfs/v1/demo/MarketplaceCharges.json

J'ai créé 3 fichiers sur ADLSv1 et j'ai obtenu 3 éléments dans la boucle comme prévu.

enter image description here

Vous pouvez maintenant faire ce que vous voulez de ces informations sur les événements et les envoyer où vous voulez.


Pour conclure, vous pouvez remplacer les applications logiques par des applications fonctionnelles en cas de volume important de demandes, car les applications logiques ne sont pas bon marché à grande échelle.

[FunctionName("EventHubTriggerCSharp")]
public static void Run([EventHubTrigger("stackdemo", Connection = "EventHubConnectionAppSetting")] EventData[] eventHubMessages, ILogger log)
{
    foreach (var message in eventHubMessages)
    {
        // parse items and do something
    }
}

Aussi un note importante pour les fichiers plus volumineux, assurez-vous d'ajouter un certain délai au processus, car le type de requête créé est celui où le fichier apparaît sur le stockage, même s'il est toujours en cours de copie. Dans ce cas, d'autres événements "append" sont en cours.

0voto

Raunak Jhawar Points 1253

Ce problème peut également être résolu en utilisant l'approche suivante :

  • Créez une nouvelle application de fonction et associez une fonction de déclenchement de blob (associée à l'événement - Microsoft.Storage.BlobCreated )
  • Le gestionnaire d'événement (c'est-à-dire le corps de la fonction de l'application) déclenchera votre code pour écrire un sujet sur EventHub
  • Créez un nouveau flux dans NiFi et utilisez le GetAzureEventHub pour recevoir les messages de l'espace de noms cible.

-1voto

DixitArora-MSFT Points 1533

Bien qu'Azure Data Lake Storage (ADLS) Gen2 soit basé sur Azure Blob Storage, il existe quelques différences entre les deux systèmes. problèmes et différences connus qui sont documentés.

En raison de ces différences, je pense que nous ne pouvons pas utiliser les liaisons existantes disponibles pour le stockage Blob ou Event Grid.

Mais vous pouvez toujours avoir une fonction, déclenchée par un timer, par exemple, et utiliser la fonction ADLS v2 REST API pour lire/mettre à jour les fichiers.

-1voto

Alber Tadrous Points 59

J'ai trouvé une solution à ce problème en utilisant Azure Data Lake Gen 2, nous pouvons déclencher différents types d'événements comme "Azure Function", et faire en sorte que cette fonction envoie une notification avec le chemin vers le fichier qui vient d'être ajouté au flux NIFI, qui commence avec l'un des processeurs listeners comme "ListenHttp ou HandleHttpRequest". Après cela, nous pouvons utiliser n'importe quel processeur pour récupérer ce fichier depuis votre stockage.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X