2 votes

Amazon EMR lors de la soumission d'un travail pour Apache-Flink obtient une erreur avec Hadoop récupérable

Added Depedency Pom Details :

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.7.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.7.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-hadoop</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop</artifactId>
            <version>1.7.1</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-s3</artifactId>
            <version>1.11.529</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-connectors</artifactId>
            <version>1.1.5</version>
            <type>pom</type>
        </dependency>
    </dependencies>

java.lang.UnsupportedOperationException : Les écrivains récupérables sur Hadoop ne sont pris en charge que pour HDFS et pour Hadoop version 2.7 ou plus récente au niveau de org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57) à l'adresse org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) à l'adresse org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) à l'adresse org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112) à l'adresse org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242) à l'adresse org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) à l'adresse org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) à l'adresse org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) à l'adresse org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) à l'adresse org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) à l'adresse org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) à l'adresse org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748)

7voto

aaronlangford31 Points 94

Flink utilise ce qu'on appelle un ServiceLoader pour charger les composants nécessaires à interface avec les systèmes de fichiers enfichables . Si vous voulez voir où Flink fait cela dans le code, allez à org.apache.flink.core.fs.FileSystem . Prenez note de la initialize qui fait appel à la fonction RAW_FACTORIES variable. RAW_FACTORIES est créé par la fonction loadFileSystems qui, comme vous pouvez le constater, utilise la méthode Java ServiceLoader .

Les composants du système de fichiers doivent être configurés avant le démarrage de votre application sur Flink. Cela implique que votre application Flink n'a pas besoin de regrouper ces composants, ils doivent être fournis pour votre application.

EMR ne fournit pas les composants du système de fichiers S3 dont Flink a besoin pour utiliser S3 comme un puits de fichiers en continu. Cette exception est levée non pas parce que la version n'est pas assez élevée, mais parce que Flink a chargé le système de fichiers HadoopFileSystem en l'absence d'un système de fichiers correspondant à l'attribut s3 ( voir le code ici ).

Vous pouvez voir si vos systèmes de fichiers se chargent en activant le niveau de journalisation DEBUG pour mon application Flink, ce que EMR vous permet de faire dans les configurations :

{
    "Classification": "flink-log4j",
    "Properties": {
      "log4j.rootLogger": "DEBUG,file"
    }
  },{
    "Classification": "flink-log4j-yarn-session",
    "Properties": {
      "log4j.rootLogger": "DEBUG,stdout"
    }
  }

Les journaux pertinents sont disponibles dans le gestionnaire de ressources YARN, en regardant les journaux d'un nœud individuel. Recherche de la chaîne "Added file system" devrait vous aider à localiser tous les systèmes de fichiers chargés avec succès.

Il était également pratique, dans cette enquête, de se connecter en SSH au nœud maître et d'utiliser la fonction flink-scala REPL où je peux voir quel système de fichiers Flink a décidé de charger en fonction de l'URI d'un fichier.

La solution consiste à déposer le JAR de l'implémentation du système de fichiers S3 dans le dossier suivant /usr/lib/flink/lib/ avant de lancer votre application Flink. Cela peut être fait avec une action d'amorçage qui saisit le fichier flink-s3-fs-hadoop ou flink-s3-fs-presto (selon l'implémentation que vous utilisez). Mon action bootstrap script ressemble à quelque chose comme ça :

sudo mkdir -p /usr/lib/flink/lib
cd /usr/lib/flink/lib

sudo curl -O https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-s3-fs-hadoop/1.8.1/flink-s3-fs-hadoop-1.8.1.jar

0voto

Till Rohrmann Points 8466

Afin d'utiliser la méthode de Flink StreamingFileSink avec exactement une seule garantie, vous devez utiliser Hadoop >= 2.7 . Les versions ci-dessous 2.7 ne sont pas prises en charge. Par conséquent, veuillez vous assurer que vous exécutez une version Hadoop à jour sur EMR.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X