2 votes

Spark java.io.IOException due à une raison inconnue

J'obtiens l'exception suivante lors de l'exécution d'une tâche Spark. Le travail reste bloqué à la même étape à chaque fois. L'étape est une requête SQL. Je ne vois aucune autre exception dans les journaux du pilote ou de l'exécuteur.

java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:748)

Cette exception est insérée entre ces erreurs :

ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from hostname.domain.com/ip is closed

La seule chose que j'ai pu trouver dans les journaux de l'exécuteur était :

INFO memory.TaskMemoryManager: Memory used in task 12302
INFO memory.TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@462e08e3: 32.0 MB
INFO memory.TaskMemoryManager: Acquired by org.apache.spark.unsafe.map.BytesToBytesMap@41bed570: 2.4 GB
INFO memory.TaskMemoryManager: 0 bytes of memory were used by task 12302 but are not associated with specific consumers
INFO memory.TaskMemoryManager: 2634274570 bytes of memory are used for execution and 1826540 bytes of memory are used for storage
INFO sort.UnsafeExternalSorter: Thread 197 spilling sort data of 512.0 MB to disk (0  time so far)

Mais je ne pense pas que ce soit un problème dû à la mémoire. Le travail s'achève avec succès dans un autre environnement avec la même quantité de données.

Voici mon spark-submit :

spark-submit --master yarn-cluster\
--conf spark.speculation=true \
--conf spark.default.parallelism=200 \
--conf spark.executor.memory=16G \
--conf spark.memory.storageFraction=$0.3 \
--conf spark.executor.cores=5 \
--conf spark.driver.memory=2G \
--conf spark.driver.cores=4 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.initialExecutors=10 \
--conf spark.yarn.executor.memoryOverhead=1638 \
--conf spark.driver.maxResultSize=1G \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--class com.test.TestClass Test.jar

J'ai lu quelques articles ici et là concernant une exception similaire, qui préconisent d'augmenter l'intervalle de battement de cœur et le délai d'attente du réseau. Mais je n'ai pas trouvé de réponse définitive.

Comment puis-je exécuter cette tâche avec succès ?

0voto

philantrovert Points 6487

Cela était dû à un problème avec les données.

La table de conduite pour toutes les jointures à gauche, avait une chaîne vide. '' comme données dans une des colonnes qui était utilisée pour joindre une autre table. De même, l'autre table contenait également beaucoup de chaînes vides pour cette colonne particulière.

Cela conduisait à une jonction croisée et, comme le nombre de lignes était trop élevé, le travail était suspendu indéfiniment.

L'ajout d'un filtre à la table de droite a permis de résoudre le problème :

SELECT 
  *
FROM
  LEFT_TABLE LT
LEFT JOIN
  ( SELECT
      *
    FROM
      RIGHT_TABLE
    WHERE LENGTH(TRIM(PROBLEMATIC_COLUMN)) <> 0 ) RT
ON
  LT.PROBLEMATIC_COLUMN = RT.PROBLEMATIC_COLUMN

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X