Publish Spark SQL DataFrame and RDD with Spark Thrift Server

Publish Spark SQL DataFrame and RDD with Spark Thrift Server

Oskar RYNKIEWICZ

By Oskar RYNKIEWICZ

Mar 25, 2019

Categories
Data Engineering
Tags
Thrift
JDBC
Hadoop
Hive
Spark
SQL
[more]
Do you like our work......we hire!

Never miss our publications about Open Source, big data and distributed systems, low frequency of one email every two months.

The distributed and in-memory nature of the Spark engine makes it an excellent candidate to expose data to clients which expect low latencies. Dashboards, notebooks, BI studios, KPIs-based reports tools commonly speak the JDBC/ODBC protocols and are such examples.

Spark Thrift Server may be used in various fashions. It can run independently as Spark standalone application or be embedded in the existing Spark driver. It can mount into RAM the data stored inside the Hive Data Warehouse or expose a used-defined DataFrame/RDD of a Spark job.

Spark Thrift Server

Spark Thrift Server is a JDBC/ODBC server which is built on top of Hive’s HiveServer2. With Spark Thrift Server, data is exposed to any JDBC client such as Hive’s shell called beeline, or any application supporting the JDBC/ODBC protocol. Datasets stored inside Hive’s Data Warehouse are exposed to applications which will leverage Spark engine through the SQL language. The SQL queries handled by Spark Thrift Server are executed with Spark’s SQL module. This gateway takes advantage of Spark’s distributed in-memory computing capability without writing any Python or Scala code.

The main difference between Spark’s Thrift Server and Hive’s HiveServer2 is that the former enforces usage of Spark by executing SQL queries using Spark’s SQL engine while the later may leverage several engines such as MapReduce, Tez or even Spark. Being based on HiveServer2, Spark Thrift Server uses Hive’s Data Warehouse to store persistent data and Hive’s Metastore to describing data (table names, column names in each table, schema, storage formats, partitions and so forth).

Spark is downloaded from its official website. The pre-built Spark releases (2.4.x, 2.3.x, 2.2.x) are available with Hive 1.2.1 support enabled and Spark Thrift Server ready to be run. When building Spark from source code, make sure to add -Phive and -Phive-thriftserver profiles to build options. Here’s how to create a small project and install the latest version of Spark:

mkdir spark_thrift_project 
cd $_ 
curl http://mirrors.standaloneinstaller.com/apache/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz -o spark-2.4.0-bin-hadoop2.7.tgz 
tar xzf spark-2.4.0-bin-hadoop2.7.tgz 
ln -sf spark-2.4.0-bin-hadoop2.7 spark

In the context of this article, Spark will be executed without Hadoop or YARN/Mesos/Kubernetes, but with its own native cluster manager. To run a standalone local cluster, 3 components will be started:

  • Spark Master with the Driver program process: central coordinator distributing tasks to execute among workers
  • Spark Worker node with executor processes: carrying out the task work assigned to them by the master and report back
  • Spark Thrift Server: JDBC/ODBC server allowing Spark SQL act as a distributed query engine
./spark/sbin/start-master.sh \
  --host 0.0.0.0 
./spark/sbin/start-slave.sh \
  spark://localhost:7077
./spark/sbin/start-thriftserver.sh \
  --total-executor-cores 2 \
  --master spark://localhost:7077

The Spark master is now accepting connection on port 7077 with the address spark://localhost:7077. You can also access its WebUI on port 8080 (http://localhost:8080/). Note that, by default, Spark’s Master will not listen on localhost on port 7077, only its WebUI will. Passing the --hostwith the value 0.0.0.0 will allow any connection on both ports. Browsing the WebUI, you shall see the Spark Slave service being registered as a worker. Spark Thrift Server is also started and marked as a running application.

By default, metastore_db directory is created in “./”, but that can be changed to an absolute path by setting the javax.jdo.option.ConnectionURL property. Also, upon persisting the data later on, spark-warehouse directory will be created in “./” which can be changed by setting the spark.sql.warehouse.dir property.

Logs available in “./spark/logs” provide additional information about each of the 3 components.

For the purpose of this article, the dataset shared by the New York City Taxi & Limousine Commission is used.

curl https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2017-02.csv -o input.csv
head -1 input.csv > header.csv
tail -n +3 input.csv > data.csv

The head command isolates the header name present in the first line into the header.csv file. The tail command extracts the data and saves them into the data.csv file.

Exposing Hive tables in RAM

Being built on Hive, Spark Thrift Server makes it easy to manipulate and expose Hive tables through JDBC interface without having to define a DataFrame. The SQL queries sent to Spark Thrift Server are interpreted with Spark SQL and processed with the Spark in-memory engine. Internally, Spark’s Thrift Server connects to Hive’s Metastore to retrieve metadata information and fetch the data from Hive’s Data Warehouse.

Once started, Spark Thrift Server exposes a WebUI (http://localhost:4040) available on port 4040. No running or completed jobs are displayed yet.

Hive’s beeline tool is an interactive shell to write and send SQL queries. First, a connection must be established with Spark Thrift Server:

./spark/bin/beeline \
  -u jdbc:hive2://localhost:10000 \
  -n usr \
  -p pass

Once beeline is connected, it is possible to execute SQL queries. The following code defines a Hive table in Hive’s Data Warehouse and loads data from the CSV file created above.

CREATE TABLE taxi_rides ( 
  VendorID STRING, tpep_pickup_datetime STRING, 
  tpep_dropoff_datetime STRING, passenger_count STRING, 
  trip_distance STRING, RatecodeID STRING, 
  store_and_fwd_flag STRING, PULocationID STRING, 
  DOLocationID STRING, payment_type STRING, 
  fare_amount STRING, extra STRING, mta_tax STRING, 
  tip_amount STRING, tolls_amount STRING, 
  improvement_surcharge STRING, total_amount STRING 
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ","; 
SHOW TABLES; 
LOAD DATA LOCAL INPATH 'data.csv' OVERWRITE INTO TABLE taxi_rides;

At this stage, the dataset is stored and managed by Spark Thrift Server. No Spark job is marked as running or completed because none of the previous actions triggered the usage of Spark.

SparkSQL eases the expression of simple RDD operations. For example, counting the number of rows in SQL is as easy as:

SELECT count(*) FROM taxi_rides;

Going back to the WebUI (http://localhost:4040/jobs/), a Spark job has now completed with the description SELECT count(*) FROM taxi_rides run at AccessController.java:0.

The count query illustrates how to read data stored inside the Hive Data Warehouse and process it with Spark’s engine. It does not load the data in memory to reduce the latency of subsequent queries. It is, however, possible to cache a DataFrame which will allow transparent use of SparkSQL processing without having to access the disk for subsequent queries.

The CACHE TABLE SQL instruction loads a table into memory. It is possible to see the storage memory used by an executor from the WebUI’s Executors page (http://localhost:4040/executors/) page. The “Storage Memory” column indicates no memory associated with the previous queries. Also, the RDD Storage page (http://localhost:4040/storage/) contains no information at the moment. The SQL query to mount the taxi_rides dataset in memory and to expose it as a taxi_rides_cache temporary table is:

CACHE TABLE taxi_rides_cache AS SELECT * FROM taxi_rides;

Note, the table will not be persisted after the restarting Spark Thrift Server.

In the WebUI’s Executors page (http://localhost:4040/executors/), used memory from the “Storage Memory” column now displays several MB. The RDD Storage page (http://localhost:4040/storage/) now shows information about stored data.

Let’s compare performance of processing uncached and cached data from two tables: taxi_rides and taxi_rides_cache. The query below execute a GROUP BY query on payment_type:

SELECT payment_type, count(*) 
FROM taxi_rides 
GROUP BY payment_type;

Running the query several times takes on average 4 seconds.

The same query using the taxi_rides_cache table is:

SELECT payment_type, count(*) 
FROM taxi_rides_cache 
GROUP BY payment_type;

Running the query takes between 1 and 2 seconds. Execution time is low due to storing data in a cluster-wide memory and avoided I/O operations on disk. High performance satisfies thus use cases such as interactive analysis.

Those measures will become more relevant with larger datasets as well as with more complex queries.

The UNCACHE TABLE statement releases taxi_rides table from memory:

UNCACHE TABLE taxi_rides_cache;

Observing “Storage Memory”, it shows now “0 MB / 384.1 MB” since no more data is persisted in the workers’ RAM anymore.

Exposing user-defined DataFrames in RAM

It is also possible to expose a DataFrame created by a custom Spark application. Spark Thrift Server can be embedded inside a Spark application, exposing DataFrames though JDBC/ODBC.

If writing your job in Python, the startWithContext() method of org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 Scala object start the server programmatically. To import it in Python, the py4j Java gateway is used. The infinite while loop at the end keeps Spark Thrift Server running. Otherwise, the job would be completed and the JDBC gateway would no longer be available.

Let’s create a “./thriftserver-in-context.py” script:

from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
import time
 
spark = SparkSession.builder \
  .appName("Embedding Spark Thrift Server") \
  .config("spark.sql.hive.thriftServer.singleSession", "True") \
  .config("hive.server2.thrift.port", "10001") \
  .config("javax.jdo.option.ConnectionURL", \
  "jdbc:derby:;databaseName=metastore_db2;create=true") \
  .enableHiveSupport() \
  .getOrCreate()
df = spark.read.option("header","true").csv("input.csv").cache()
df.createOrReplaceTempView("taxi_rides")
sc = spark.sparkContext
java_import(sc._gateway.jvm, "")
#Start Spark Thrift Server using the jvm and passing the SparkSession
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver \
  .HiveThriftServer2.startWithContext(spark._jwrapped)
while True:
  time.sleep(5)

Note, the Spark Thrift Server is listening on port 10001 to avoid potential conflicts with another server running on the default 10000 port.

Deploy the code to the local cluster with spark-submit.

./spark/bin/spark-submit \
  --master spark://localhost:7077 \
  --total-executor-cores 2 \
  ./thriftserver-in-context.py

The logs from Thrift Server are now printed to the Spark job stdout.

Neither the metastore_db nor spark-warehouse directories were created in the current working directory. When embedded, Spark Thrift Server no longer creates Hive’s Metastore and Data Warehouse for Spark temporary tables. To confirm the in-memory table is exposed to JDBC clients, launch beeline and execute SHOW TABLES.

./spark/bin/beeline \
  -u jdbc:hive2://localhost:10001 \
  -n usr -p pass \
  -e 'SHOW TABLES;'

Run same SQL query as before:

SELECT payment_type, count(*) 
FROM taxi_rides 
GROUP BY payment_type;

Running the query takes between 2 to 3 seconds. The completed job is displayed on WebUI (http://localhost:4040/).

After closing the application, the table won’t be available anymore due to the volatile storage. If the underlying storage of the Spark application is persistent, the DataFrame could be persisted to a Hive table in the Data Warehouse by calling also saveAsTable().

df.write.saveAsTable("taxi_rides")

In a distributed environment, launching Spark Thrift Server in the Spark Application context has drawbacks. In such environments, multiple applications coexist on the same host and, unless the Spark job is assigned a dedicated IP like in Kubernetes, there is a risk of port collision. Also, orchestrators such as Kubernetes, Hadoop YARN and Mesos, will dynamically provision the Spark driver inside one of its managed worker/slave nodes. Thus, we do not know in advance the address of Spark Thrift Server. It must be published in some location, such as Zookeeper or ETCD, and fetched by the client. This issue is elegantly solved by Kubernetes with a service definition and the usage of Ingress routes. Finally, most of the distributed environments are secured by firewall and with network isolation. Spark applications executed from within the cluster are not accessible from the outside and it will be impossible for an external client to connect to the Spark Thrift Server without the presence of a proxy routing the requests.

Data access on a Hadoop cluster

Integration of Spark and Hadoop Hive creates new challenges in exposing data. Previously, Hive Metastore was using local Derby database created automatically by Spark Thrift Server. In Hadoop environment, there is already Hive’s Warehouse with a remote database holding Metastore. It implies that Spark Thrift Server needs to be configured to use the existing database in order to access data in Hive tables on the cluster.

On a basic Hadoop cluster, a connection to Hive Metastore on a remote database could be configured manually and common Warehouse for Spark and Hive could be specified. It’s possible since Spark provides a basic compatibility with Hive. This approach would allow basic functionalities such as exposing Hive table in Spark or accessing existing Hive table from Spark. Thus, Spark Thrift Server could expose both Hive tables in Hive Warehouse and DataFrames in Spark memory to JDBC clients. However, as of Spark 2.x, this solution doesn’t allow modern Hive features because of limited compatibility. An examples of such modern feature is support for ACID tables in Apache Hive.

Addressing lacking Hive compatibility of Spark, Hortonworks developed a Hive Warehouse Connector for the Hortonworks Data Platform (HDP). Hive Warehouse Connector is a newer generation to read and write data between Apache Spark and Apache Hive. From HDP 3.0, the use of Hive Warehouse Connector is the only way to integrate Hive and Spark. In this setting Spark and Hive have their own independent catalogs and all operations are bridged with HiveWarehouseConnector. An advantage of this approach is the capability to leverage more Hive features.

Summary

Without the necessity to program a Spark application, Hive tables can easily be exposed in RAM, using SQL queries via JDBC clients and leveraging the Spark engine. For greater control, custom manipulated DataFrames are accessible through JDBC/ODBC with SQL queries when Spark Thrift Server is embedded inside the Spark application.

Share this article

Canada - Morocco - France

We are a team of Open Source enthusiasts doing consulting in Big Data, Cloud, DevOps, Data Engineering, Data Science…

We provide our customers with accurate insights on how to leverage technologies to convert their use cases to projects in production, how to reduce their costs and increase the time to market.

If you enjoy reading our publications and have an interest in what we do, contact us and we will be thrilled to cooperate with you.

Support Ukrain