Stack Labs Blog moves to Dev.to | Le Blog Stack Labs déménage sur Dev.to 🚀

15 juin 2018 | BigData | Chabane Refes

Les 10 clés pour optimiser vos jobs Spark

Temps de lecture estimé : 5 minutes

Beaucoup de développeurs passent le plus clair de leur temps à essayer d’optimiser leurs jobs Spark. En effet, on pense parfois qu’augmenter les ressources mémoires/CPU, le nombre de partitions ou mettre en cache les données permet d’améliorer les performances. Si vous faites cela et que ça ne marche pas toujours ou si ça marche mais que vous ne savez pas pourquoi, voici dix clés incontournables pour optimiser vos jobs Spark et comprendre ce qui se passe sous le capot.

Choisir le bon format de données

Le format le plus adapté aujourd’hui pour les gros volumes de données est le Parquet (pour le format colonne) et Avro (pour le format ligne). En savoir plus
Un fichier parquet peut être découpé en plusieurs sous-fichiers et être traité en parallèle par votre job Spark. Sur un cluster Yarn avec des blocs HDFS sur 128Mb, le mieux est de découper le fichier Parquet en plusieurs fichiers de 128Mo.

L’allocation dynamique des ressources

Si les ressources du cluster sont limitées, l’allocation dynamique (dynamic allocation) peut être utilisée ; on laisse alors Spark définir lui-même les ressources à allouer dynamiquement à chaque job. En savoir plus

Nettoyer les données en amont

Il est important d’avoir des fichiers Parquet les plus propre possible avant de les ingérer dans le cluster Spark. Le cas échéant, supprimer les colonnes inutilisées via drop et les lignes dupliquées via dropDuplicates dès le début du job.

Bien filter dès le départ

Les opérations telles que groupByKey, reduceByKey, cogroup et join nécessitent le shuffle et sont les plus coûteuses en consommation mémoire, I/O disque, sérialisation des données et I/O réseau car elles nécessitent de transférer les données entre les différents exécuteurs et machines.

S’il n’est pas possible de les éviter, essayer de les placer au début du job.

Se libérer du shuffle quand c’est possible

Si les données en entrée du Job sont conséquentes, trop solliciter le shuffle pourrait causer un out of memory. Le partitionnement avec coalesce peut s’avérer très pratique. Contrairement à l’opération repartition qui recrée des partitions et déclenche un shuffle, coalesce utilise les partitions existantes, ce qui limite le transfert de données entre les exécuteurs.

Définir le bon nombre de partitions

Le nombre de partitions utilisé peut se révéler crucial. Le choix doit se porter sur plusieurs critères, d’abord sur les ressources : Spark recommande d’utiliser 2 à 3 tâches par CPU. Le nombre de partitions à définir est de nombre de CPU x 3.
Ceci diminue le risque pour certains exécuteurs de se retrouver en attente de la terminaison d’autres exécuteurs et ainsi de prendre les tâches suivantes. Toutefois ce risque peut être évité si les exécuteurs traitent des blocs de données de même taille et disposent des mêmes ressources disque, mémoire et réseau.
Enfin la position du repartitionement doit être bien réfléchie, il faut toujours placer l’opération de repartitionnement après l’opération la moins coûteuse en shuffle, par exemple entre un filter et un flatmap.

Une partition bien hashée

Lorsque repartition ou coalesce est utilisé, chaque donnée se verra attribuer une clé générée aléatoirement (via HashPartitioner) puis incrémentée pour chaque valeur successive de la partition. Ces clés seront utilisées plus tard pour simplifier un éventuel repartitionnement et obtenir une répartition plus équitable. Dans certains cas, cette génération par défaut de clé ne suffit pas, comme lorsqu’on souhaite co-localiser toutes les données possédant la même clé. L’opération partitionBy nous permet de spécifier notre propre hash. Il est également possible de regrouper par range de clé via RangePartitioner. On peut aussi créer notre propre partitionneur en héritant de Partitioner.
Le partitionBy peut également être utilisé pour partitionner dynamiquement les données par colonne. Ce mécanisme est à utiliser lorsqu’on souhaite effectuer des opérations sur des catégories de données de manière isolée et ainsi éviter de solliciter le shuffle plus loin.

Faites bouger la ruche

Dans certains cas, il peut être très pratique de partionner dynamiquement ses données. Le Hive partitionning permet de partitionner ses données par colonnes et organiser les données de colonne par sous-répertoires. Ce mécanisme facilite le filtre pour Spark et diminue la lecture de fichiers non utiles pour le job. En savoir plus.

Faciliter les jointures

Par défaut, lorsque les données en entrée n’excèdent pas 10Mb (spark.sql.autoBroadcastJoinThreshold=10485760), Spark dupliquera toutes les données sur les exécuteurs pour faciliter les opérations de join ou d’agg. Il est possible de le faire manuellement en utilisant la fonction de Broadcast.

Une histoire de cache

Il peut arriver que les mêmes opérations soient réexécutées par Spark, impactant ainsi les performances. Il est possible de mettre en cache ce calcul avec la fonction cache. Le cache peut être conservé en mémoire et/ou sur le disque. À utiliser avec précaution avec les gros volumes de données. En savoir plus
Par défaut, dans le cas d’une allocation dynamique, si un exécuteur a des données en cache, il restera actif indéfiniment. Ce qui aura pour effet de ne pas libérer l’exécuteur jusqu’à la fin du job Spark. Il est possible de limiter cette attente en configurant spark.dynamicAllocation.cachedExecutorIdleTimeout=60s.

Bonus : les pièges à éviter

Pour terminer, voici quelques recommendations et pièges à éviter :

  • préfèrer le langage Scala
  • utiliser l’API dataset, au lieu de RDD ou dataframe
  • si le job est écrit en Python :
    • éviter d’utiliser les UDF (User-defined function) qui nécessitent une sérialisation/désérialisation entre l’interpréteur Python et la JVM pour chaque opération
    • éviter d’utiliser pandas qui n’est pas optimisé pour Spark. À la place Window functions et Apache Arrow peuvent s’avérer très pratique
  • les actions Spark telles que collect forEach sont à utiliser avec précaution car elles nécessitent d’exécuter toutes les opérations de transformation. En cas de volumétrie importante de données renvoyée par les exécuteurs, un out of memory pourrait se produire sur le driver.
  • par défaut, si Spark reçoit un seul fichier en entrée, il créera 200 partitions (spark.sql.shuffle.partitions=200) pour effectuer les opérations de join et agg, sinon il créera une partition pour chaque fichier en entrée.

Références :