Publier Spark SQL Dataframe et RDD avec Spark Thrift Server
25 mars 2019
Ne ratez pas nos articles sur l'open source, le big data et les systèmes distribués, fréquence faible d’un email tous les deux mois.
La nature distribuée et en-mémoire du moteur de traitement Spark en fait un excellant candidat pour exposer des données à des clients qui souhaitent des latences faibles. Les dashboards, les notebooks, les studios de BI, les outils de rapport basés sur les indicateurs de performance clés (KPIs) parlent souvent les protocoles JDBC/ODBC et sont de bons exemples.
Le Spark Thrift Server peut être utilisé de différentes manières : il peut s’exécuter en tant qu’application Spark de façon indépendante ou il peut être embarqué dans une application Spark déjà existante. Il peut charger dans la mémoire vive les données stockées dans un Hive Data Warehouse ou exposer un DataFrame/RDD défini par utilisateur d’une tâche Spark.
Spark Thrift Server
Le Spark Thrift Server est un serveur JDBC/ODBC basé sur le HiveServer2. Avec Spark Thrift Server, les données sont exposées à tout client JDBC tel que le shell Hive appelé beeline, ou à toute application supportant le protocole JDBC/ODBC. Les données stockées dans Hive Data Warehouse sont exposées aux applications qui feront usage du moteur Spark via le langage SQL. Les requêtes SQL gérées par le Spark Thrift Server sont exécutées avec le module Spark SQL. Cette passerelle tire profit des capacités de calcul distribués en mémoire de Spark sans avoir besoin d’écrire de code Python ou Scala.
La principale différence entre Spark Thrift Server et HiveServer2 est que le premier impose l’utilisation de Spark en exécutant des requêtes SQL avec le moteur Spark SQL, alors que le second peut exploiter plusieurs moteurs tels que MapReduce, Tez ou Spark. Basé sur HiveServer2, le Spark Thrift Server utilise Hive Data Warehouse pour stocker des données persistantes et Hive Metastore pour décrire les données (noms de tableaux, noms de colonne dans chaque tableaux, un schéma, un format de données, le partitionnement et ainsi de suite).
Spark est téléchargable depuis son site officiel. Les versions stables de Spark déjà compilées (2.4.x, 2.3.x, 2.2.x) supportent Hive 1.2.1 par default et le Spark Thrift Server prêt à l’usage. Sinon, lors de la compilation de Spark veillez à ajouter les profils -Phive
et -Phive-thriftserver
. Voici comment créer un petit projet et installer la dernière version de Spark :
mkdir spark_thrift_project
cd $_
curl http://mirrors.standaloneinstaller.com/apache/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz -o spark-2.4.0-bin-hadoop2.7.tgz
tar xzf spark-2.4.0-bin-hadoop2.7.tgz
ln -sf spark-2.4.0-bin-hadoop2.7 spark
Dans le contexte de cet article, Spark sera exécuté sans Hadoop YARN ou Mesos, mais avec son propre gestionnaire de cluster (Spark Standalone). Pour lancer un cluster en mode local (standalone), 3 composants seront démarrés :
- Spark Master avec le processus Driver : le pilote et coordinateur central qui repart les tâches à exécuter entre les Workers
- Spark Worker avec des processus Executor : la réalisation du travail qui leur est assigné par le Master
- Spark Thrift Server : un serveur JDBC/ODBC permettant à Spark SQL d’agir comme un moteur de requête distribué
./spark/sbin/start-master.sh \
--host 0.0.0.0
./spark/sbin/start-slave.sh \
spark://localhost:7077
./spark/sbin/start-thriftserver.sh \
--total-executor-cores 2 \
--master spark://localhost:7077
Désormais, le Master Spark accepte la connexion sur le port 7077
à l’adresse spark://localhost:7077
. Son interface WebUI (http://localhost:8080/
) est également accessible sur le port 8080. Notez que, par défaut, le Master Spark n’écoute pas localhost sur le port 7077
, seule son interface WebUI. En passant —host avec la valeur 0.0.0.0
, toute connexion sur les deux ports sont autorisées. En navigant la WebUI, vous verrez que le service Spark Slave est enregistré en tant que Worker. Le Spark Thrift Server est également démarré et marqué comme une application en cours d’exécution.
Par défaut, le dossier metastore_db est créé dans “./”, mais cela peut être remplacé par le chemin absolu en définissant la propriété javax.jdo.option.ConnectionURL
. Si les données sont conservées ultérieurement, le dossier spark-warehouse sera créé dans “./”, ce qui peut être modifié avec la propriété spark.sql.warehouse.dir
.
Les logs disponibles sous “./spark/logs” fournissent des informations additionnelles sur chacun des 3 composant.
Dans cet article, le jeu de données partagé par New York City Taxi & Limousine Commission est utilisé.
curl https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2017-02.csv -o input.csv
head -1 input.csv > header.csv
tail -n +3 input.csv > data.csv
La commande head
sépare les noms des colonnes présents sur la première ligne en fichier header.csv. La commande tail
extrait les données et les enregistre dans le fichier data.csv.
Exposer les tableaux Hive dans RAM
Étant basé sur Hive, le Spark Thrift Server facilite la manipulation et l’exposition des tableaux Hive via l’interface JDBC sans avoir à définir un DataFrame. Les requêtes SQL envoyées à Spark Thrift Server sont interprétées avec Spark SQL et traitées en-mémoire avec le moteur Spark. En interne, Spark Thrift Server se connecte à Hive Metastore pour extraire les informations de métadonnées (Metadata) et extraire les données de Hive Data Warehouse.
Une fois démarré, le Spark Thrift Server dispose son interface WebUI sur le port 4040 (http://localhost:4040
). Dans l’affichage il n’y a encore ni Job en cours, ni Job terminé.
L’outil beeline
de Hive est un shell interactif pour écrire et envoyer des requêtes SQL. Tout d’abord, une connexion doit être établie avec Spark Thrift Server :
./spark/bin/beeline \
-u jdbc:hive2://localhost:10000 \
-n usr \
-p pass
Une fois que beeline est connectée, il est possible d’exécuter des requêtes SQL. Le code suivant définit un tableau Hive dans Data Warehouse de Hive et charge les données à partir du fichier CSV créé ci-dessus.
CREATE TABLE taxi_rides (
VendorID STRING, tpep_pickup_datetime STRING,
tpep_dropoff_datetime STRING, passenger_count STRING,
trip_distance STRING, RatecodeID STRING,
store_and_fwd_flag STRING, PULocationID STRING,
DOLocationID STRING, payment_type STRING,
fare_amount STRING, extra STRING, mta_tax STRING,
tip_amount STRING, tolls_amount STRING,
improvement_surcharge STRING, total_amount STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ",";
SHOW TABLES;
LOAD DATA LOCAL INPATH 'data.csv' OVERWRITE INTO TABLE taxi_rides;
À ce stade, des données sont stockées et gérées par le Spark Thrift Server. Aucun Spark Job n’est marqué comme en cours d’exécution ou terminé, car aucune des actions précédentes n’a déclenché l’utilisation de Spark.
Spark SQL facilite l’expression d’opérations simples sur RDD. Par exemple, compter le nombre de lignes en SQL est aussi simple que :
SELECT count(*) FROM taxi_rides;
En retournant à l’interface Web, un Spark Job est maintenant terminé avec la description SELECT count(*) FROM taxi_rides run at AccessController.java:0
.
La requête count montre comment lire les données stockées dans Hive Data Warehouse et les traiter avec le moteur Spark. Il ne charge pas les données en mémoire pour réduire la latence des requêtes subséquentes. Il est toutefois possible de mettre en cache un DataFrame qui permettra une utilisation transparente du traitement avec SparkSQL sans avoir à accéder au disque pour les requêtes suivantes.
Il est possible de voir l’utilisation du stockage en-mémoire par un exécuteur sur sa page de Executors dans webUI (http://localhost:4040/executors/
). La colonne “Storage Memory” indique qu’aucune mémoire n’est associée aux requêtes précédentes. De plus, la page de stockage RDD (http://localhost:4040/storage/
) ne contient aucune information à ce moment. La requête SQL pour charger des données taxi_rides en mémoire et de les exposer en tant que tableau temporaire taxi_rides_cache est la suivante :
CACHE TABLE taxi_rides_cache AS SELECT * FROM taxi_rides;
Notez que, au redémarrage de le Spark Thrift Server tous les tableaux seront effacés.
Sue le page Executors de WebUI (http://localhost:4040/executors/
), l’utilisation du stockage en-mémoire dans la colonne “Storage Memory” affiche maintenant plusieurs Mo. La page de stockage RDD (http://localhost:4040/storage/
) montre actuellement des informations à propos des données stockées.
Comparons la performance de traitement sur des données non-cachées et cachées de deux tableaux taxi_rides et taxi_rides_cache. La requête ci-dessous exécute une requête GROUP BY
sur payment_type :
SELECT payment_type, count(*)
FROM taxi_rides
GROUP BY payment_type;
Exécuter cette requête plusieurs fois prend en moyenne 4 secondes.
La même requête, mais en accédant le tableau taxi_rides_cache est :
SELECT payment_type, count(*)
FROM taxi_rides_cache
GROUP BY payment_type;
Exécuter cette requête prend entre 1 et 2 secondes. Le temps d’exécution est bas grâce au stockage des données en-mémoire du cluster évitant des opérations I/O sur le disque. Une performance qui satisfait donc les cas d’utilisation tels que l’analyse interactive.
Ces moyennes deviendront plus pertinentes avec des jeux de données plus volumineux, ainsi qu’avec des requêtes plus complexes.
La declaration UNCACHE TABLE
libère le tableau taxi_rides de mémoire :
UNCACHE TABLE taxi_rides_cache;
En regardant “Storage Memory”, il montre maintenant “0 MB / 384.1 MB” puisqu’il ne reste plus de données persistantes dans la mémoire de Workers.
Exposer les DataFrames dans la mémoire vive
Il est également possible d’exposer un DataFrame créé par une application Spark personnalisée. Le Spark Thrift Server peut être embarqué dans une application Spark, exposant les DataFrames via JDBC/ODBC.
En codant le job en Python, la méthode startWithContext()
d’objet scala org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
démarre le serveur automatiquement depuis le code d’autre application. Pour l’importer en Python, la passerelle Java py4j est utilisée. La boucle « while » infinie à la fin du code garde le Spark Thrift Server en marche. Sinon, le Job serait terminé et la passerelle JDBC ne serait plus disponible.
Créons un script ./thriftserver-in-context.py
:
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
import time
spark = SparkSession.builder \
.appName("Embedding Spark Thrift Server") \
.config("spark.sql.hive.thriftServer.singleSession", "True") \
.config("hive.server2.thrift.port", "10001") \
.config("javax.jdo.option.ConnectionURL", \
"jdbc:derby:;databaseName=metastore_db2;create=true") \
.enableHiveSupport() \
.getOrCreate()
df = spark.read.option("header","true").csv("input.csv").cache()
df.createOrReplaceTempView("taxi_rides")
sc = spark.sparkContext
java_import(sc._gateway.jvm, "")
#Start Spark Thrift Server using the jvm and passing the SparkSession
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver \
.HiveThriftServer2.startWithContext(spark._jwrapped)
while True:
time.sleep(5)
Notez que Spark Thrift Server écoute sur le port 10001
afin d’éviter des conflits potentiels avec un autre serveur s’exécutant sur le port 10000
, ce qui est le cas par défaut.
Déployer le code sur le cluster local avec spark-submit
.
./spark/bin/spark-submit \
--master spark://localhost:7077 \
--total-executor-cores 2 \
./thriftserver-in-context.py
Les logs du serveur Spark Thrift sont maintenant imprimés dans stdout de Spark Job.
Ni le dossier metastore_db ni spark-warehouse n’a été créé dans le dossier de travail actuel actuel. Une fois embarqué, le Spark Thrift Server ne crée plus les tableaux temporaires Spark pour Hive Metastore et Hive Data Warehouse. Pour confirmer que le tableau en mémoire est exposé aux clients JDBC, lancez beeline
et exécutez SHOW TABLES
.
./spark/bin/beeline \
-u jdbc:hive2://localhost:10001 \
-n usr -p pass \
-q 'SHOW TABLES;'
Lancer la même requête comme avant :
SELECT payment_type, count(*)
FROM taxi_rides
GROUP BY payment_type;
L’exécution de la requête prend entre 2 et 3 secondes. Le Job terminé est affiché sur WebUI (http://localhost:4040/
).
Après la fermeture de l’application le tableau ne sera plus disponible en raison de la mémoire volatile. Si le stockage sous-jacent de l’application Spark est persistant, il est possible que le DataFrame soit persisté dans le tableau Hive dans Spark Data Warehouse en appelant également saveAsTable()
.
df.write.saveAsTable("taxi_rides")
Dans un environnement distribué, l’exécution du Spark Thrift Server dans le contexte d’application Spark présente des inconvénients. Dans un tel environnement, plusieurs applications coexistent sur le même host et, à moins d’affecter une adresse IP dédiée au Job Spark, comme dans Kubernetes, il existe un risque de collision de ports. De plus, des orchestrateurs tels que Kubernetes, Hadoop YARN et Mesos provisionneront dynamiquement le Spark Driver dans l’un de ses nœuds Worker/Slave gérés. Nous ne connaissons donc pas à l’avance l’adresse du Spark Thrift Server. Il doit être publié dans un emplacement, tel que Zookeeper ou ETCD, et récupéré par le client. Kubernetes résout élégamment ce problème grâce à une définition de service et à l’utilisation des routes d’Ingress. Enfin, la plupart des environnements distribués sont sécurisés par un firewall et avec une isolation réseau. Les applications Spark exécutées à partir du cluster ne sont pas accessibles de l’extérieur et il sera impossible pour un client externe de se connecter au Spark Thrift Server sans la présence d’un proxy qui déroute les demandes.
Accès aux données dans un cluster Hadoop
L’intégration de Spark avec Hadoop Hive pose des nouveaux défis pour l’exposition des données. Auparavant, Hive Metastore utilisait une base de données Derby locale créée automatiquement par Spark Thrift Server. Dans l’environnement Hadoop, il existe déjà Hive Warehouse avec une base de données contenant Metastore. Cela implique que Spark Thrift Server doit être configuré pour utiliser la base de données existante afin d’accéder aux données des tables Hive du cluster.
Sur un cluster Hadoop « bare », une connexion à Hive Metastore sur une base de données distante peut être configurée manuellement et un Warehouse commun pour Spark et Hive peut être spécifié. C’est possible puisque Spark fournit une compatibilité de base avec Hive. Cette approche permettrait des fonctionnalités de base telles que l’exposition du tableau Hive dans Spark ou l’accès à la table Hive existante à partir de Spark. Alors, Spark Thrift Server pourrait exposer les tables Hive dans Hive Warehouse et les DataFrames de la mémoire Spark aux clients JDBC. Cependant, à partir de Spark 2.x, cette solution ne permet pas les autres fonctionnalités de Hive en raison d’une compatibilité limitée. Un exemple de telle fonctionnalité moderne est le tableau ACID dans Apache Hive.
En abordant le manque de compatibilité de Spark avec Hive, Hortonworks a développé un Hive Warehouse Connector pour la plateforme Hortonworks Data Platform (HDP). Hive Warehouse Connector est une nouvelle génération pour lire et écrire des données entre Apache Spark et Apache Hive. À partir de HDP 3.0, l’utilisation de Hive Warehouse Connector est le seul moyen d’intégrer Hive et Spark. Dans ce contexte, Spark et Hive ont leurs propres catalogues indépendants et toutes les opérations sont reliées à “HiveWarehouseConnector”. Un avantage de cette approche est la possibilité de tirer parti des fonctionnalités plus récentes de Hive.
Conslusion
Sans la nécessité de coder une application Spark, les tableaux Hive peuvent être exposées en mémoire (RAM), à l’aide de requêtes SQL via des clients JDBC et en exploitant le moteur Spark. Pour un meilleur contrôle, les DataFrames personnalisés manipulables sont accessibles via JDBC/ODBC avec des requêtes SQL lorsque le Spark Thrift Server est embarqué dans l’application Spark. Les deux opérations sont assez faciles à effectuer, mais l’intégration de Spark Thrift Server avec Hive dans un cluster peut être complexe.