Quelles nouveautés pour Apache Spark 2.3 ?

Quelles nouveautés pour Apache Spark 2.3 ?

Vous appréciez notre travail......nous recrutons !

Ne ratez pas nos articles sur l'open source, le big data et les systèmes distribués, fréquence faible d’un email tous les deux mois.

Plongeons nous dans les nouveautés proposées par la nouvelle distribution 2.3 d’Apache Spark.

Cette article est composé de recherches et d’informations issues des présentations suivantes du DataWorks Summit 2018 :

  • Apache Spark 2.3 boosts advanced analytics & deep learning par Yanbo Liang, Staff Software Engineer @ Hortonworks
  • ORC Improvement in Apache Spark 2.3 par Dongjoon Hyun, Principal Software Engineer @ Hortonworks Data Science Team

Sommaire

Spark 2.0 a introduit énormément de changements majeurs qui ont améliorés de plus de dix fois les performances.

Spark 2.3 est la dernière release en date de la branche 2.X, apportant les fonctionnalités suivantes :

  • Support des UDFs Pandas / Vectorizés dans PySpark
  • Support de représentation et lecture d’images dans les APIs DataFrame et Dataset
  • Optimisation de modèles Machine Learning en parallèle
  • Nouveau mode de streaming “Continuous processing” (expérimental)
  • Ajout des jointures stream-stream
  • Nouvelle API Datasource V2 (beta)
  • Support ORC natif
  • Spark dans Kubernetes (expérimental)

UDFs d’analyse avancés

La communauté Spark a énormément travaillé sur les UDFs Python.

Depuis Spark 0.7, les UDFs Python appliquaient les transformations de données ligne par ligne, ce qui entrainait beaucoup de sérialisation et de l’overhead d’invocation. Bien sûr les jobs PySpark en souffraient en terme de performance et impactait les méthodes de développement de job des Data Scientists (utiliser Scala et apprendre un nouveau langage ou se faire accompagner par un développeur, ou encore accepter de mauvaises performances).

Spark 2.3 permet maintenant d’encapsuler et manipuler un DataFrame avec la librairie Pandas dans un UDF PySpark afin de profiter de vraies performances. Ces UDFs sont appelés Vectorized UDFs (UDFs Vectorisés, ou UDFs Pandas). Ils sont le résultat de deux efforts complémentaires :

Les DataFrames sont maintenant sérialisés avec Apache Arrow qui permet une bien meilleure conversion entre les formats Spark & Pandas (entre 10 et plusieurs centaines de fois plus rapide).

Les UDFs Pandas résultants sont réparties en deux catégories :

  • scalaires : applique une transformation à toutes les valeurs de la colonne, ex : v => v + 1.
  • map groupée : applique une logique “split - apply - combine” sur un DataFrame précédemment groupé, ex : v => v - mean(v) (pour chaque valeur de chaque groupe, soustraire la moyenne des valeurs du groupe).

Pour activer les UDFs Pandas, il faut indiquer la propriété Spark suivante :

spark.sql.execution.arrow.enabled = true

Plus d’informations ici.

Apprentissage

Le Deep Learning peut être défini comme un “ensemble de techniques de Machine Learning capable d’apprendre des représentations utiles de fonctionnalités à partir d’images, de texte et de son”

MLlib est une API simple et concise permettant de créer des pipelines de Machine Learning exploitant la capacité de scaling et de traitement massif de données de Spark.

Support de représentation et lecture d’images dans les APIs DataFrame et Dataset

MLlib est une librairie de Machine Learning, et en version 2.3 elle peut également servir à traiter des graph TensorFlow ou des modèles Keras comme Transformer ou même comme fonction SparkSQL, afin d’être utilisée pour du Deep Learning.

Databricks a ouvert en Open Source un paquet Spark (spark-deep-learning) permettant justement de le faire et a récemment publié la version 0.3.0.

Cependant, pour pouvoir traiter des images en Deep Learning il faut être capable de les lire pour les interpréter, ce qui est exactement le sujet de SPARK-21866. En version 2.3 Spark est capable de lire une image à partir d’un chemin et de la parser en DataFrame avec le schéma suivant :

root 
├── image: struct (nullable = true)
│   ├── origin: string (nullable = true)
│   ├── height: integer (nullable = false)
│   ├── width: integer (nullable = false)
│   ├── nChannels: integer (nullable = false)
│   ├── mode: string (nullable = false)
│   └── data: binary (nullable = false)

Le DataFrame résultant contient une Structure avec les métadonnées de l’image et les données binaires, le tout basé sur les conventions OpenCV, sur lesquels on peut appliquer des modèles entrainés.

Optimisation de modèles Machine Learning en parallèle

Optimiser un modèle de Machine Learning consiste avant la 2.3 à entrainer séquentiellement le même modèle plusieurs fois avec différents paramètres d’entrée en utilisant les algorithmes CrossValidator ou TrainValidationSplit pour déterminer les paramètres donnant les meilleures performances. Chaque exécution profite du parallélisme de Spark mais les jobs en eux mêmes sont séquentiels. SPARK-19357 introduit le paramètre parallelism aux algorithmes qui permet de définir le nombre de jobs à exécuter en parallèle à un instant donné et donc de mieux exploiter les ressources du cluster Spark.

Plus d’informations sur ce sujet ici et ici.

Streaming

Spark 2.3 introduit plusieurs nouveautés côté Streaming qui améliorent de beaucoup ses capacités et permet de le rapprocher un peu plus de ce que propose Apache Flink.

Continuous processing (expérimental)

Jusqu’à maintenant Spark Streaming consistait en une série de micro-batch à faible latence (~100ms) avec des garanties de résistance à la panne et de réception “exactly-once”. Le nouveau mode de déclenchement expérimental introduit en version 2.3, Continuous Processing (ou Traitement Continu), permet d’avoir des temps de latences encore plus faibles (~1ms) avec des garanties de résistance à la panne et de réception “at-least-once”.

Étant donné que le Continuous Processing est un mode de déclenchement du moteur actuel et non un nouveau moteur à part entière, ses checkpoints sont compatibles avec le mode micro-batch.

Un stream Continuous Processing supporte toute fonction SparkSQL (hors agrégation) mais seulement les opérations “map” (select, where, map, filter, …).

Quelques points à noter :

  • Une requête Continuous Processing lancera en parallèle autant de tâches Spark qu’il y a de nombre de partitions à lire, il faut donc fournir suffisamment de coeurs sur le cluster (e.g 10 partitions Kafka = 10 coeurs).
  • Arrêter un stream en Continuous Processing produira peut être des warnings de fin de tâche à ignorer.
  • Il n’y a pas de ré-essai automatique des tâches échouées, ce qui résulte en une requête échouée qui doit être relancée manuellement depuis un checkpoint.

Jointures Stream-stream

Spark 2.0 a introduit la notion de jointure Stream-static, permettant de joindre un stream avec un DataFrame/DataSet statique (table de référence par ex.). Spark 2.3 permet maintenant de joindre deux streams entre eux.

Pour s’assurer que la jointure se fait bien à n’importe quel moment, les états passés de chaque stream sont bufferisés. Ainsi n’importe quelle ligne peut être matchée avec les lignes futures de l’autre stream. Un watermark de délai doit être défini sur les deux entrées afin que le moteur sache combien de temps un input peut être délayé et une contrainte de temps doit être indiquée lors de la requête de jointure (une condition de fenêtrage temporel ou un lien de fenêtre du stream). Ainsi la mémoire buffer de Spark ne risque pas d’être saturée.

La contrainte de temps et le délai sont optionnels pour les requêtes INNER JOIN mais obligatoires pour les LEFT et RIGHT OUTER JOIN. Les requêtes de type FULL OUTER JOIN ne sont pas supportées.

Le point faible des jointures externes est que les lignes non associées (résultat NULL) seront traitées à expiration du délai watermark et, si le mode micro-batch est utilisé, les données du batch courant dont le délai a expiré seront uniquement traitées au batch suivant (e.g. quand de nouvelles données arrivent). Ainsi une donnée avec un délai expiré peut être en attente plus longtemps que la durée du délai.

Plus d’informations sur le Continuous Processing et les jointures Stream-stream dans le guide de programmation Spark Structured Streaming.

API Datasource V2 (beta)

L’API Datasource de Spark lui permet de s’intégrer avec de nombreuses sources de données (Hive, Avro, CSV, Parquet, …) avec des interactions plus fines qu’un simple processus de lecture et chargement en mémoire.

Cependant la V1 a quelques faiblesses : l’API a quelques dépendances dans les APIs haut niveau (DataFrame et SQLContext), il n’y a aucune garantie transactionnelle à l’écriture, et elle est compliquée à étendre s’il faut des optimisations. Ces différents points ont motivé l’équipe de Databricks à designer une API V2 avec les objectifs suivants en tête :

  • Écrite en Java.
  • Aucune dépendance sur les APIs haut-niveau (DataFrame, RDD, …).
  • Facile à implémenter, permet d’ajouter de nouvelles optimisations tout en gardant la rétro-compatibilité.
  • Permet de remonter des informations physiques (taille, partitionnement, …).
  • Support des sources / puits streaming.
  • API d’écriture puissante, transactionnelle et flexible.
  • Aucun changement pour l’utilisateur final.

Pour la sortie de Spark 2.3 les fonctionnalités suivantes sont disponibles et testables (l’API est toujours en développement et plus de fonctionnalités seront disponibles en 2.4) :

  • Support de scan colonnaire et par ligne.
  • “Column pruning and filter push-down”.
  • Permet de remonter des statistiques basiques et du partitionnement de données.
  • API d’écriture transactionnelle.
  • Support des sources / puit streaming pour les modes micro-batch et continu.

Une analyse plus profonde de l’API Datasource V2 est disponible ici.

Support ORC natif

Avant que soit lancé le projet Apache ORC, Spark (depuis sa version 1.4) utilisait du legacy code basé sur Hive 1.2.1 pour lire et écrire les fichiers ORC. Lorsque le projet est arrivé le module a été amélioré mais gardait toujours une dépendance avec Hive et des erreurs différenciant l’ORC produit par Spark et les spécifications (caractères unicodes / points dans les noms de colonnes, évolutions de schéma, fichiers ORC vides lorsque la partition du DataFrame est vide, …).

Spark 2.3 corrige ces points en ajoutant un nouveau reader ORC vectorisé dit native (ou natif) dans le module sql/core qui est entièrement indépendant de Hive. Le nouveau OrcFileFormat (voir SPARK-20682) améliore les performances par un facteur 2 à 5.

Pour pouvoir l’utiliser, il faut soit indiquer le paramètre spark.sql.orc.impl=native et spécifier le format source / cible comme orc, ou bien indiquer le format org.apache.spark.sql.execution.datasources.orc (voir SPARK-20728). En attendant Spark 2.4, l’implémentation ORC par défaut reste Hive pour maintenir la compatibilité avec les anciennes données.

Spark est également capable de convertir des sources de données en utilisant le reader vectorisé, il suffit d’indiquer le paramètre spark.sql.hive.convertMetastoreOrc=true (défaut = false, nécessite le reader natif).

Enfin le module supporte l’évolution de schéma. Plutôt que de supprimer et recréer une table comme jusqu’à maintenant, on peut ajouter une nouvelle colonne à la fin, cacher une colonne ou en changer le type et la position.

Également, le module ne supporte pas de transaction ACID (pas d’activité ou de besoin exprimé à ce sujet au sein de la communauté Spark) ou le bucketting (Spark a divergé de l’implémentation de Hive et il y a des difficultés pour merger les deux, peu d’évolutions sur le sujet).

Spark dans Kubernetes (expérimental)

Il y a un nouveau scheduler expérimental permettant de soumettre des jobs Spark à un cluster managé par Kubernetes au lieu de YARN, Mesos ou Spark Standalone. Le driver et les exécuteurs sont déployés dans des Pods avec une image Docker spécifiée via le paramètre de configuration spark.kubernetes.container.image=/path/to/img (un Dockerfile exemple est fourni avec la release de Spark 2.3). Les dépendances applicatives doivent être ajoutées à une image Docker personnalisée ou être référencées vers l’extérieur.

Plus d’informations dans SPARK-18278 et la documentation.

Lectures complémentaires

Partagez cet article

Canada - Maroc - France

Nous sommes une équipe passionnée par l'Open Source, le Big Data et les technologies associées telles que le Cloud, le Data Engineering, la Data Science le DevOps…

Nous fournissons à nos clients un savoir faire reconnu sur la manière d'utiliser les technologies pour convertir leurs cas d'usage en projets exploités en production, sur la façon de réduire les coûts et d'accélérer les livraisons de nouvelles fonctionnalités.

Si vous appréciez la qualité de nos publications, nous vous invitons à nous contacter en vue de coopérer ensemble.

Support Ukrain