Stack Labs Blog moves to Dev.to | Le Blog Stack Labs déménage sur Dev.to 🚀
Les 10 clés pour optimiser vos jobs Spark
Temps de lecture estimé : 5 minutes
Beaucoup de développeurs passent le plus clair de leur temps à essayer d’optimiser leurs jobs Spark. En effet, on pense parfois qu’augmenter les ressources mémoires/CPU, le nombre de partitions ou mettre en cache les données permet d’améliorer les performances. Si vous faites cela et que ça ne marche pas toujours ou si ça marche mais que vous ne savez pas pourquoi, voici dix clés incontournables pour optimiser vos jobs Spark et comprendre ce qui se passe sous le capot.
Choisir le bon format de données
Le format le plus adapté aujourd’hui pour les gros volumes de données est le Parquet
(pour le format colonne) et Avro
(pour le format ligne). En savoir plus
Un fichier parquet peut être découpé en plusieurs sous-fichiers et être traité en parallèle par votre job Spark.
Sur un cluster Yarn avec des blocs HDFS sur 128Mb, le mieux est de découper le fichier Parquet
en plusieurs fichiers de 128Mo.
L’allocation dynamique des ressources
Si les ressources du cluster sont limitées, l’allocation dynamique (dynamic allocation
) peut être utilisée ; on laisse alors Spark définir lui-même les ressources à allouer dynamiquement à chaque job. En savoir plus
Nettoyer les données en amont
Il est important d’avoir des fichiers Parquet
les plus propre possible avant de les ingérer dans le cluster Spark. Le cas échéant, supprimer les colonnes inutilisées via drop
et les lignes dupliquées via dropDuplicates
dès le début du job.
Bien filter dès le départ
Les opérations telles que groupByKey
, reduceByKey
, cogroup
et join
nécessitent le shuffle et sont les plus coûteuses en consommation mémoire, I/O disque, sérialisation des données et I/O réseau car elles nécessitent de transférer les données entre les différents exécuteurs et machines.
S’il n’est pas possible de les éviter, essayer de les placer au début du job.
Se libérer du shuffle quand c’est possible
Si les données en entrée du Job sont conséquentes, trop solliciter le shuffle pourrait causer un out of memory
. Le partitionnement avec coalesce
peut s’avérer très pratique. Contrairement à l’opération repartition
qui recrée des partitions et déclenche un shuffle, coalesce
utilise les partitions existantes, ce qui limite le transfert de données entre les exécuteurs.
Définir le bon nombre de partitions
Le nombre de partitions utilisé peut se révéler crucial. Le choix doit se porter sur plusieurs critères, d’abord sur les ressources : Spark recommande d’utiliser 2 à 3 tâches par CPU. Le nombre de partitions à définir est de nombre de CPU x 3
.
Ceci diminue le risque pour certains exécuteurs de se retrouver en attente de la terminaison d’autres exécuteurs et ainsi de prendre les tâches suivantes. Toutefois ce risque peut être évité si les exécuteurs traitent des blocs de données de même taille et disposent des mêmes ressources disque, mémoire et réseau.
Enfin la position du repartitionement doit être bien réfléchie, il faut toujours placer l’opération de repartitionnement après l’opération la moins coûteuse en shuffle, par exemple entre un filter
et un flatmap
.
Une partition bien hashée
Lorsque repartition
ou coalesce
est utilisé, chaque donnée se verra attribuer une clé générée aléatoirement (via HashPartitioner
) puis incrémentée pour chaque valeur successive de la partition. Ces clés seront utilisées plus tard pour simplifier un éventuel repartitionnement et obtenir une répartition plus équitable. Dans certains cas, cette génération par défaut de clé ne suffit pas, comme lorsqu’on souhaite co-localiser toutes les données possédant la même clé. L’opération partitionBy
nous permet de spécifier notre propre hash. Il est également possible de regrouper par range de clé via RangePartitioner
.
On peut aussi créer notre propre partitionneur en héritant de Partitioner
.
Le partitionBy
peut également être utilisé pour partitionner dynamiquement les données par colonne. Ce mécanisme est à utiliser lorsqu’on souhaite effectuer des opérations sur des catégories de données de manière isolée et ainsi éviter de solliciter le shuffle plus loin.
Faites bouger la ruche
Dans certains cas, il peut être très pratique de partionner dynamiquement ses données. Le Hive partitionning
permet de partitionner ses données par colonnes et organiser les données de colonne par sous-répertoires. Ce mécanisme facilite le filtre pour Spark et diminue la lecture de fichiers non utiles pour le job. En savoir plus.
Faciliter les jointures
Par défaut, lorsque les données en entrée n’excèdent pas 10Mb (spark.sql.autoBroadcastJoinThreshold=10485760
), Spark dupliquera toutes les données sur les exécuteurs pour faciliter les opérations de join
ou d’agg
. Il est possible de le faire manuellement en utilisant la fonction de Broadcast
.
Une histoire de cache
Il peut arriver que les mêmes opérations soient réexécutées par Spark, impactant ainsi les performances. Il est possible de mettre en cache ce calcul avec la fonction cache
. Le cache peut être conservé en mémoire et/ou sur le disque. À utiliser avec précaution avec les gros volumes de données. En savoir plus
Par défaut, dans le cas d’une allocation dynamique, si un exécuteur a des données en cache, il restera actif indéfiniment. Ce qui aura pour effet de ne pas libérer l’exécuteur jusqu’à la fin du job Spark. Il est possible de limiter cette attente en configurant spark.dynamicAllocation.cachedExecutorIdleTimeout=60s
.
Bonus : les pièges à éviter
Pour terminer, voici quelques recommendations et pièges à éviter :
- préfèrer le langage Scala
- utiliser l’API dataset, au lieu de RDD ou dataframe
- si le job est écrit en Python :
- éviter d’utiliser les
UDF (User-defined function)
qui nécessitent une sérialisation/désérialisation entre l’interpréteur Python et la JVM pour chaque opération - éviter d’utiliser
pandas
qui n’est pas optimisé pour Spark. À la placeWindow functions
etApache Arrow
peuvent s’avérer très pratique
- éviter d’utiliser les
- les actions Spark telles que
collect
forEach
sont à utiliser avec précaution car elles nécessitent d’exécuter toutes les opérations de transformation. En cas de volumétrie importante de données renvoyée par les exécuteurs, unout of memory
pourrait se produire sur ledriver
. - par défaut, si Spark reçoit un seul fichier en entrée, il créera 200 partitions (
spark.sql.shuffle.partitions=200
) pour effectuer les opérations dejoin
etagg
, sinon il créera une partition pour chaque fichier en entrée.
Références :