Quelles nouveautés pour Apache Spark 2.3 ?
23 mai 2018
Ne ratez pas nos articles sur l'open source, le big data et les systèmes distribués, fréquence faible d’un email tous les deux mois.
Plongeons nous dans les nouveautés proposées par la nouvelle distribution 2.3 d’Apache Spark.
Cette article est composé de recherches et d’informations issues des présentations suivantes du DataWorks Summit 2018 :
- Apache Spark 2.3 boosts advanced analytics & deep learning par Yanbo Liang, Staff Software Engineer @ Hortonworks
- ORC Improvement in Apache Spark 2.3 par Dongjoon Hyun, Principal Software Engineer @ Hortonworks Data Science Team
Sommaire
Spark 2.0 a introduit énormément de changements majeurs qui ont améliorés de plus de dix fois les performances.
Spark 2.3 est la dernière release en date de la branche 2.X, apportant les fonctionnalités suivantes :
- Support des UDFs Pandas / Vectorizés dans PySpark
- Support de représentation et lecture d’images dans les APIs DataFrame et Dataset
- Optimisation de modèles Machine Learning en parallèle
- Nouveau mode de streaming “Continuous processing” (expérimental)
- Ajout des jointures stream-stream
- Nouvelle API Datasource V2 (beta)
- Support ORC natif
- Spark dans Kubernetes (expérimental)
UDFs d’analyse avancés
La communauté Spark a énormément travaillé sur les UDFs Python.
Depuis Spark 0.7, les UDFs Python appliquaient les transformations de données ligne par ligne, ce qui entrainait beaucoup de sérialisation et de l’overhead d’invocation. Bien sûr les jobs PySpark en souffraient en terme de performance et impactait les méthodes de développement de job des Data Scientists (utiliser Scala et apprendre un nouveau langage ou se faire accompagner par un développeur, ou encore accepter de mauvaises performances).
Spark 2.3 permet maintenant d’encapsuler et manipuler un DataFrame avec la librairie Pandas dans un UDF PySpark afin de profiter de vraies performances. Ces UDFs sont appelés Vectorized UDFs (UDFs Vectorisés, ou UDFs Pandas). Ils sont le résultat de deux efforts complémentaires :
- Spark-21187 : Utiliser Apache Arrow pour la conversion de type.
- Spark-22216 : Améliorer l’interopérabilité PySpark/Pandas.
Les DataFrames sont maintenant sérialisés avec Apache Arrow qui permet une bien meilleure conversion entre les formats Spark & Pandas (entre 10 et plusieurs centaines de fois plus rapide).
Les UDFs Pandas résultants sont réparties en deux catégories :
- scalaires : applique une transformation à toutes les valeurs de la colonne, ex :
v => v + 1
. - map groupée : applique une logique “split - apply - combine” sur un DataFrame précédemment groupé, ex :
v => v - mean(v)
(pour chaque valeur de chaque groupe, soustraire la moyenne des valeurs du groupe).
Pour activer les UDFs Pandas, il faut indiquer la propriété Spark suivante :
spark.sql.execution.arrow.enabled = true
Plus d’informations ici.
Apprentissage
Le Deep Learning peut être défini comme un “ensemble de techniques de Machine Learning capable d’apprendre des représentations utiles de fonctionnalités à partir d’images, de texte et de son”
MLlib est une API simple et concise permettant de créer des pipelines de Machine Learning exploitant la capacité de scaling et de traitement massif de données de Spark.
Support de représentation et lecture d’images dans les APIs DataFrame et Dataset
MLlib est une librairie de Machine Learning, et en version 2.3 elle peut également servir à traiter des graph TensorFlow ou des modèles Keras comme Transformer ou même comme fonction SparkSQL, afin d’être utilisée pour du Deep Learning.
Databricks a ouvert en Open Source un paquet Spark (spark-deep-learning) permettant justement de le faire et a récemment publié la version 0.3.0.
Cependant, pour pouvoir traiter des images en Deep Learning il faut être capable de les lire pour les interpréter, ce qui est exactement le sujet de SPARK-21866. En version 2.3 Spark est capable de lire une image à partir d’un chemin et de la parser en DataFrame avec le schéma suivant :
root
├── image: struct (nullable = true)
│ ├── origin: string (nullable = true)
│ ├── height: integer (nullable = false)
│ ├── width: integer (nullable = false)
│ ├── nChannels: integer (nullable = false)
│ ├── mode: string (nullable = false)
│ └── data: binary (nullable = false)
Le DataFrame résultant contient une Structure avec les métadonnées de l’image et les données binaires, le tout basé sur les conventions OpenCV, sur lesquels on peut appliquer des modèles entrainés.
Optimisation de modèles Machine Learning en parallèle
Optimiser un modèle de Machine Learning consiste avant la 2.3 à entrainer séquentiellement le même modèle plusieurs fois avec différents paramètres d’entrée en utilisant les algorithmes CrossValidator
ou TrainValidationSplit
pour déterminer les paramètres donnant les meilleures performances. Chaque exécution profite du parallélisme de Spark mais les jobs en eux mêmes sont séquentiels. SPARK-19357 introduit le paramètre parallelism
aux algorithmes qui permet de définir le nombre de jobs à exécuter en parallèle à un instant donné et donc de mieux exploiter les ressources du cluster Spark.
Plus d’informations sur ce sujet ici et ici.
Streaming
Spark 2.3 introduit plusieurs nouveautés côté Streaming qui améliorent de beaucoup ses capacités et permet de le rapprocher un peu plus de ce que propose Apache Flink.
Continuous processing (expérimental)
Jusqu’à maintenant Spark Streaming consistait en une série de micro-batch à faible latence (~100ms) avec des garanties de résistance à la panne et de réception “exactly-once”. Le nouveau mode de déclenchement expérimental introduit en version 2.3, Continuous Processing (ou Traitement Continu), permet d’avoir des temps de latences encore plus faibles (~1ms) avec des garanties de résistance à la panne et de réception “at-least-once”.
Étant donné que le Continuous Processing est un mode de déclenchement du moteur actuel et non un nouveau moteur à part entière, ses checkpoints sont compatibles avec le mode micro-batch.
Un stream Continuous Processing supporte toute fonction SparkSQL (hors agrégation) mais seulement les opérations “map” (select, where, map, filter, …).
Quelques points à noter :
- Une requête Continuous Processing lancera en parallèle autant de tâches Spark qu’il y a de nombre de partitions à lire, il faut donc fournir suffisamment de coeurs sur le cluster (e.g 10 partitions Kafka = 10 coeurs).
- Arrêter un stream en Continuous Processing produira peut être des warnings de fin de tâche à ignorer.
- Il n’y a pas de ré-essai automatique des tâches échouées, ce qui résulte en une requête échouée qui doit être relancée manuellement depuis un checkpoint.
Jointures Stream-stream
Spark 2.0 a introduit la notion de jointure Stream-static, permettant de joindre un stream avec un DataFrame/DataSet statique (table de référence par ex.). Spark 2.3 permet maintenant de joindre deux streams entre eux.
Pour s’assurer que la jointure se fait bien à n’importe quel moment, les états passés de chaque stream sont bufferisés. Ainsi n’importe quelle ligne peut être matchée avec les lignes futures de l’autre stream. Un watermark de délai doit être défini sur les deux entrées afin que le moteur sache combien de temps un input peut être délayé et une contrainte de temps doit être indiquée lors de la requête de jointure (une condition de fenêtrage temporel ou un lien de fenêtre du stream). Ainsi la mémoire buffer de Spark ne risque pas d’être saturée.
La contrainte de temps et le délai sont optionnels pour les requêtes INNER JOIN
mais obligatoires pour les LEFT
et RIGHT OUTER JOIN
. Les requêtes de type FULL OUTER JOIN
ne sont pas supportées.
Le point faible des jointures externes est que les lignes non associées (résultat NULL
) seront traitées à expiration du délai watermark et, si le mode micro-batch est utilisé, les données du batch courant dont le délai a expiré seront uniquement traitées au batch suivant (e.g. quand de nouvelles données arrivent). Ainsi une donnée avec un délai expiré peut être en attente plus longtemps que la durée du délai.
Plus d’informations sur le Continuous Processing et les jointures Stream-stream dans le guide de programmation Spark Structured Streaming.
API Datasource V2 (beta)
L’API Datasource de Spark lui permet de s’intégrer avec de nombreuses sources de données (Hive, Avro, CSV, Parquet, …) avec des interactions plus fines qu’un simple processus de lecture et chargement en mémoire.
Cependant la V1 a quelques faiblesses : l’API a quelques dépendances dans les APIs haut niveau (DataFrame et SQLContext), il n’y a aucune garantie transactionnelle à l’écriture, et elle est compliquée à étendre s’il faut des optimisations. Ces différents points ont motivé l’équipe de Databricks à designer une API V2 avec les objectifs suivants en tête :
- Écrite en Java.
- Aucune dépendance sur les APIs haut-niveau (DataFrame, RDD, …).
- Facile à implémenter, permet d’ajouter de nouvelles optimisations tout en gardant la rétro-compatibilité.
- Permet de remonter des informations physiques (taille, partitionnement, …).
- Support des sources / puits streaming.
- API d’écriture puissante, transactionnelle et flexible.
- Aucun changement pour l’utilisateur final.
Pour la sortie de Spark 2.3 les fonctionnalités suivantes sont disponibles et testables (l’API est toujours en développement et plus de fonctionnalités seront disponibles en 2.4) :
- Support de scan colonnaire et par ligne.
- “Column pruning and filter push-down”.
- Permet de remonter des statistiques basiques et du partitionnement de données.
- API d’écriture transactionnelle.
- Support des sources / puit streaming pour les modes micro-batch et continu.
Une analyse plus profonde de l’API Datasource V2 est disponible ici.
Support ORC natif
Avant que soit lancé le projet Apache ORC, Spark (depuis sa version 1.4) utilisait du legacy code basé sur Hive 1.2.1 pour lire et écrire les fichiers ORC. Lorsque le projet est arrivé le module a été amélioré mais gardait toujours une dépendance avec Hive et des erreurs différenciant l’ORC produit par Spark et les spécifications (caractères unicodes / points dans les noms de colonnes, évolutions de schéma, fichiers ORC vides lorsque la partition du DataFrame est vide, …).
Spark 2.3 corrige ces points en ajoutant un nouveau reader ORC vectorisé dit native
(ou natif) dans le module sql/core qui est entièrement indépendant de Hive. Le nouveau OrcFileFormat (voir SPARK-20682) améliore les performances par un facteur 2 à 5.
Pour pouvoir l’utiliser, il faut soit indiquer le paramètre spark.sql.orc.impl=native
et spécifier le format source / cible comme orc
, ou bien indiquer le format org.apache.spark.sql.execution.datasources.orc
(voir SPARK-20728). En attendant Spark 2.4, l’implémentation ORC par défaut reste Hive pour maintenir la compatibilité avec les anciennes données.
Spark est également capable de convertir des sources de données en utilisant le reader vectorisé, il suffit d’indiquer le paramètre spark.sql.hive.convertMetastoreOrc=true
(défaut = false
, nécessite le reader natif).
Enfin le module supporte l’évolution de schéma. Plutôt que de supprimer et recréer une table comme jusqu’à maintenant, on peut ajouter une nouvelle colonne à la fin, cacher une colonne ou en changer le type et la position.
Également, le module ne supporte pas de transaction ACID (pas d’activité ou de besoin exprimé à ce sujet au sein de la communauté Spark) ou le bucketting (Spark a divergé de l’implémentation de Hive et il y a des difficultés pour merger les deux, peu d’évolutions sur le sujet).
Spark dans Kubernetes (expérimental)
Il y a un nouveau scheduler expérimental permettant de soumettre des jobs Spark à un cluster managé par Kubernetes au lieu de YARN, Mesos ou Spark Standalone. Le driver et les exécuteurs sont déployés dans des Pods avec une image Docker spécifiée via le paramètre de configuration spark.kubernetes.container.image=/path/to/img
(un Dockerfile exemple est fourni avec la release de Spark 2.3). Les dépendances applicatives doivent être ajoutées à une image Docker personnalisée ou être référencées vers l’extérieur.
Plus d’informations dans SPARK-18278 et la documentation.
Lectures complémentaires
- Apache Arrow
- Spark JIRA 21187
- Spark JIRA 22216
- Article Databrick sur les UDFs Pandas pour PySpark
- Package Spark pour le Deep Learning de Databrick
- Spark JIRA 18278
- Spark JIRA 19357
- Spark JIRA 20682
- Spark JIRA 20728
- Spark JIRA 21866
- Spark JIRA 23355
- Article de Bryan Cutler sur l’entrainement de modèles en parallèle
- Guide de programmation Spark Structured Streaming
- Slides de présentation Databrick sur les features de Spark 2.3
- IBM’s introduction to Datasource API V2
- Déploiement de Spark sur Kubernetes