Running Apache Hive 3, new features and tips and tricks
Jul 25, 2019
- Categories
- Big Data
- Business Intelligence
- DataWorks Summit 2019
- Tags
- JDBC
- LLAP
- Druid
- Hadoop
- Hive
- Kafka
- Release and features [more][less]
Never miss our publications about Open Source, big data and distributed systems, low frequency of one email every two months.
Apache Hive 3 brings a bunch of new and nice features to the data warehouse. Unfortunately, like many major FOSS releases, it comes with a few bugs and not much documentation. It is available since July 2018 as part of HDP3 (Hortonworks Data Platform version 3).
I will first review the new features available with Hive 3 and then give some tips and tricks learnt from running it in production for a few months. The first part of the article is based on the talk What’s new in Apache Hive given by Jason Dere at the DataWorks Summit 2019 Barcelona.
Hive 3 new features
A lot of features and improvements have been implemented in Hive 3, greatly enlarging the use cases that it can cover. Let’s recap those that are available in HDP3 (most of them).
Materialized views (MV)
Materialized views (MV) are finally available in HDP. More than traditional views, they bring:
- Storage for intermediate results: as their name suggests, materialized views results are stored, allowing mutualizing computing costs;
- Incremental rebuilding: updating an materialized view only computes the data that was inserted in its sources tables since the last update;
- Query rewriting: when appropriate, the cost-based optimizer uses existing materialized views to plan queries instead of the sources tables, without the user being aware of it!
For in-depth information about materialized views, check out the article of my colleague Paul-Adrien Cordonnier Accelerating query processing with materialized views in Apache Hive.
SQL features
Constraints and default values
It is now possible to use UNIQUE
and NOT NULL
SQL constraints, in addition to PRIMARY|FOREIGN KEY
(added in Hive 2.1). Moreover, you can specify a DEFAULT
value for each column and use this default value in INSERT
and UPDATE
statements.
CREATE TABLE users (
id INT NOT NULL,
firstname STRING,
lastname STRING,
join_date DATE DEFAULT CURRENT_DATE(),
PRIMARY KEY (id) DISABLE NOVALIDATE
);
INSERT INTO TABLE users VALUES (1, 'John', 'Doe', DEFAULT);
Tables information
Hive 3 allows easy exploration of the whole warehouse with information_schema
and sys
databases:
SELECT table_schema, table_name
FROM information_schema.columns
WHERE column_name LIKE '%ssn%';
JDBC, Druid and Kafka connectors
2 new connectors are available for external tables:
- The JDBC connector (JdbcStorageHandler), which is read-only for now. It allows you to easily read/import data from any of the supported databases: MySQL, PostgreSQL, Oracle, MSSQL and Apache Derby.
- The Kafka connector, which can be used to query real-time data from Apache Kafka, but also achieve exactly-once offloading of data to Hive and real-time data transformation! See Integrating Hive and Kafka for more details.
Related to the latter, exactly-once ingestion from Kafka to Druid can be done through Hive.
ACID v2
Hive ACID (Atomicity, Consistency, Isolation, Durability) brought a lot to Hive capabilities for bucketed Apache ORC managed tables:
- Streaming ingestion of data;
- Seamless
INSERT
/UPDATE
/DELETE
operations on existing tables.
The second version of ACID carries several improvements:
- Performance just as good as non-ACID;
- Transactional tables (tables supporting ACID) don’t have to be bucketed anymore;
- Non-ORC formats support for
INSERT
/SELECT
; - Compatibility with cloud storage providers, e.g. Amazon S3;
- New HiveStreaming API (v2).
Hortonworks decided to turn ACID on by default in HDP3. For example, managed ORC tables have to be transactional.
Unfortunately, most of the problem I faced with Hive 3 come from Hive ACID and the HiveStreaming API, as we will see in the second part of the article.
Hive Warehouse Connector for Apache Spark
Starting from HDP 3.0, all the interactions between Hive and Apache Spark have to go through the Hive Warehouse Connector. This connector takes advantage of Hive LLAP to allow streaming/ACID interaction between Hive and Spark. The Spark application will need to access a Hive Server Interactive (with LLAP activated) to read Hive managed tables, but it won’t need it to write to Hive managed tables or read/write Hive external tables. Don’t be surprised if the traditional way of accessing Hive tables from Spark doesn’t work anymore!
LLAP workload management
This great new feature improves your control over Hive LLAP (Live Long And Process) in multi-tenant environments with resource plans:
- Split your LLAP resources in pools, e.g.
bi
(for Business Analytics) andetl
(for Extract Transform and Load); - Automatically map applications to pools, e.g. Tableau to the
bi
pool; - Create triggers to dynamically move applications from one pool to another, e.g. move long running queries to the
etl
pool; - Activate/deactivate your resource plans based on your users’ needs (one active at a time).
CREATE RESOURCE PLAN my_plan;
CREATE POOL my_plan.bi
WITH ALLOC_FRACTION=70,QUERY_PARALLELISM=4;
CREATE POOL my_plan.etl
WITH ALLOC_FRACTION=30,QUERY_PARALLELISM=10;
CREATE TRIGGER my_plan.slow_query
WHEN execution_time_ms > 60000
DO MOVE TO etl;
ALTER PLAN my_plan SET DEFAULT POOL=bi;
ALTER PLAN my_plan ENABLE ACTIVATE;
Improved performance
Based on a recent TPC-DS benchmark by the MR3 team, Hive LLAP 3.1.0 is the fastest SQL-on-Hadoop system available in HDP 3.0.1. The benchmark compares all the SQL systems embedded with HDP3 as well as Hive on MR3 (a new execution engine for Hadoop and Kubernetes), by running a set of 99 SQL queries.
System | Total TCP-DS running time (seconds) |
---|---|
HDP 3.0.1 LLAP | 5516.89 |
Hive 3.1.0/MR3 | 6575.052 |
HDP 3.0.1 Tez | 12227.317 |
Presto 0.208e | 12948.224 |
Spark 2.3.1 | 26247.471 |
I invite you to read the very complete Performance Evaluation of SQL-on-Hadoop Systems using the TPC-DS Benchmark (edit: defunct url) and Jason Dere slides about performance for more details on what makes Hive 3 faster.
Hive 3 tips and tricks
I have been running and maintaining an instance of Hive 3 for more than 5 months now. During this time I have encountered some stability issues and implemented solutions that are sometimes tricky. More details about the Hive setup used are available at the end of the article.
If you plan on running Hive 3, this will probably help you to identify problems and to fix them, at least before they are fixed in a next official release.
Memory leaks in the HiveStreaming API
The new HiveStreaming API mentioned earlier has 2 known memory leaks (HIVE-20979). This impacts any client application using the API, that will eventually crash because of JVM (Java Virtual Machine) heap size issue.
In my case, the client application was Apache NiFi and its PutHive3Streaming processor. This is the official processor supplied with NiFi 1.7.0 to write to Hive 3.0+. As it leverages the HiveStreaming API (v2), my NiFi cluster was always crashing after running a few hours.
I spend some time troubleshooting NiFi and ended up rewriting the processor (NiFi Hive3Streaming Fixed). This reimplementation embeds the fixes provided by the Hive community (HIVE-20979). If you want to know more about this particular NiFi problem, check out the dedicated article Troubleshooting a NiFi-HiveStreaming pipeline (coming soon).
If you look closely at the HIVE-20979 JIRA issue, you will see that the fix is supposed to be in Hive 3.1.1 version. Nonetheless, it is not… But this is not that bad because even if it was fixed, you would have to wait for Hortonworks to shift it into Hive 3.1.1, and to update your whole cluster (in case you are running HDP).
So the good thing about this HiveStreaming issue is that you can deal with it. As I did to rewrite the processor, check out the fixes provided by the Hive community (HIVE-20979), and embed the impacted Java classes in your application. Feel free to reuse parts of my code published on GitHub if you need!
Hive ACID compactions issues
As said earlier, Hive tables are transactional by default in HDP 3 and managed ORC tables cannot be non-transactional. That means that they support ACID operations:
- Atomicity: an operation either succeeds or fails;
- Consistency: once an operation has been performed, every subsequent operation will see its results;
- Isolation: an incomplete operation does not affect other operations;
- Durability: once an operation is complete, it is preserved even against machine/system failure.
While this gives more functionalities to tables, this also brings new challenges as the internal mechanism of transactional tables is more complex. I have experienced several issues with the compaction system.
What are Hive compactions?
Before detailing the issues, you have to understand what are compactions and why they are needed in Hive ACID.
To allow ACID, every transaction (INSERT
, UPDATE
, DELETE
) on a bucket creates new files stored inside delta
directories. Each table partition contains one base directory and several delta directories:
hdfs dfs -ls -R /warehouse/tablespace/managed/hive/d.db/t
drwxr-xr-x - hive hive 0 2016-06-09 17:03 /warehouse/tablespace/managed/hive/d.db/t/base_0000022
-rw-r--r-- 1 hive hive 602 2016-06-09 17:03 /warehouse/tablespace/managed/hive/d.db/t/base_0000022/bucket_00000
drwxr-xr-x - hive hive 0 2016-06-09 17:06 /warehouse/tablespace/managed/hive/d.db/t/delta_0000023_0000023_0000
-rw-r--r-- 1 hive hive 611 2016-06-09 17:06 /warehouse/tablespace/managed/hive/d.db/t/delta_0000023_0000023_0000/bucket_00000
drwxr-xr-x - hive hive 0 2016-06-09 17:07 /warehouse/tablespace/managed/hive/d.db/t/delta_0000024_0000024_0000
-rw-r--r-- 1 hive hive 610 2016-06-09 17:07 /warehouse/tablespace/managed/hive/d.db/t/delta_0000024_0000024_0000/bucket_00000
As you can expect, there are as many delta directories in a partition as transactions made (actually transactions are grouped and committed in batches). This can rapidly become a problem in case of streaming ingestion as Hive will have to read more and more files on every request.
To save Hive performances, there are 2 types of compactions:
- Minor compactions to merge the delta files of one bucket into one delta file;
- Major compactions to merge all the delta files of one bucket to the existing base file of this bucket.
By default, Hive is supposed to automatically trigger compactions based on a set of properties (see the official documentation). Yet, in my experience, this does not really work as expected…
Compactions not triggering
After some time working with transactional tables as sinks of a NiFi dataflow, I noticed that the compactions were not working as they should. The behaviour I observed is the following:
- A new partition is created, thus has no base nor delta file;
- Hive checks for compactions on this partition during its checking loop. You can see what partitions are checked in the
hivemetastore.log
file:cat /var/log/hive/hivemetastore.log | grep -P 'Checking to see if we should compact'
- As there are no base file, the threshold to build the base file is reached on the first check (because delta files represent an infinite percentage of the partition size);
- The major compaction is triggered and the base file created;
- After that, no more checking is done on the partition…
Apparently, I am not the only one facing this issue [Stack Overflow question (edit: defunct url), though I am not sure that everyone will face is, and I didn’t find its root cause.
After trying to adjust the hive.compactor.*
parameters without success, I ended up writing an automated workflow with Apache Oozie to “manually” trigger major compactions. The workflow does the following at a database level every 10 minutes:
- Get a list of the delta and base directories with a bash script:
# To get the delta list hdfs dfs -ls -R "/warehouse/tablespace/managed/hive/$target_db.db" \ | grep 'drwx' | grep -P -o '[^ ]*(delta_\d{7}|base)_\d{7}_*\d{0,4}$' \
- Parse the list and generate a compaction query for each partition that needs it. I do this with a Python script.
- Submit the generated queries to Hive, what I do through Beeline that allows submitting a parameterised query file:
-- Major compaction query example ALTER TABLE target_db.tbl1 PARTITION (col1 = 'value1') COMPACT 'major';
This is how you can ensure the performance of Hive if you encounter the same troubles with compactions. If you do so pay attention to:
- The YARN queue where you publish your compactions queries. To avoid penalising your users and ensure that the compactions will not be delayed, I would suggest dedicating a queue.
- The
yarn.scheduler.capacity[.queue].maximum-am-resource-percent
property, which is 20% by default. Each compaction launches 1 application master and 1 map task, so you might want to increase themaximum-am-resource-percent
, especially if you dedicate a queue. In this case, this property also allows you to control how many compactions will run in parallel. - The memory used by the compactor. If you notice that some compaction jobs take too long, you can overwrite the
compactor.mapreduce.map.memory.mb
at table level, or when you query the compaction:ALTER TABLE target_db.tbl1 PARTITION (col1 = 'value1') COMPACT 'major' WITH OVERWRITE TBLPROPERTIES ('compactor.mapreduce.map.memory.mb' = '3072');
- The max number of delta files compacted at once with the
hive.compactor.max.num.delta
property. It helps make the time taken by compactions more constant and to avoid “OutOfMemory” exceptions:hive.compactor.max.num.delta = 100
- The table property
NO_AUTO_COMPACTION
, that isfalse
by default. If you don’t want Hive to interfere with your custom compaction scheduling, you can set it totrue
on specific tables:CREATE TABLE target_db.tbl1 ( ... ) TBLPROPERTIES ('NO_AUTO_COMPACTION' = 'true');
Cleaner impersonation failure
If you use Apache Ranger to manage Hive authorisations, the property hive.server2.enable.doAs
is set to false
so that any operation done through the Hive Server is executed as hive
. The hive
user is therefore usually the owner of all the files of managed tables.
However, when using HiveStreaming to insert data into tables, you don’t talk to the Hive Server, but directly to the Hive Metastore! That’s why you have to be careful of the user credentials used to call the API. If hive
is not the owner of the table partitions, you might encounter this error in the hivemetastore.log
file:
2019-03-03T00:00:00,799 ERROR [pool-8-thread-188]: server.TThreadPoolServer (TThreadPoolServer.java:run(297)) - Error occurred during processing of message.
java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: Peer indicated failure: GSS initiate failed
...
Caused by: org.apache.thrift.transport.TTransportException: Peer indicated failure: GSS initiate failed
If you turn on the debug mode, you will get the origin of the error:
2019-03-14T12:55:46,299 ERROR [Thread-19]: transport.TSaslTransport (TSaslTransport.java:open(315)) - SASL negotiation failure
javax.security.sasl.SaslException: GSS initiate failed
...
at java.security.AccessController.doPrivileged(Native Method) [?:1.8.0_112]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_112]
...
at org.apache.hadoop.hive.ql.txn.compactor.Cleaner.removeFiles(Cleaner.java:352) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.hadoop.hive.ql.txn.compactor.Cleaner.access$000(Cleaner.java:64) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.hadoop.hive.ql.txn.compactor.Cleaner$1.run(Cleaner.java:304) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at java.security.AccessController.doPrivileged(Native Method) [?:1.8.0_112]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_112]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) [hadoop-common-3.1.1.3.0.1.0-187.jar:?]
at org.apache.hadoop.hive.ql.txn.compactor.Cleaner.clean(Cleaner.java:301) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.hadoop.hive.ql.txn.compactor.Cleaner.run(Cleaner.java:169) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
This error means that the Cleaner, responsible for removing obsolete delta files, couldn’t impersonate the user that owns the partition.
The easiest way to solve this issue is to make the user hive
owner of the partitions. In a kerberized environment, you could use the hive
keytab when querying the HiveStreaming API.
Querying data during compactions
While ACID is guaranteed during INSERT
, UPDATE
and DELETE
operations, what I have seen is that launching a table-wide operation when compactions are running can lead to errors, due to delta files being deleted. To avoid this concurrency troubles, I separate the inserting from the querying with the following architecture:
- A table is used for ACID operations, e.g. real-time ingestion from NiFi through HiveStreaming;
- A materialized view is built on top of the table and is used for querying.
To avoid the same issue as with querying, compactions have to be paused during the rebuilding of the materialized view, which hopefully should be quick thanks to incremental rebuilding.
Conclusion
In this article, we have reviewed all the new features brought by Hive 3. There are a lot of them and they are very appealing. Yet, you have seen that it’s not that easy to maintain a Hive 3 instance. This should give you some pros and cons to decide whether or not you should update to Hive 3.
Bonus tips
Here are some tips that might make you win some time.
Warehouse location
In HDP 3, the location of the Hive warehouse has changed. You can find managed tables in HDFS at the path:
/warehouse/tablespace/managed/hive/
Errors due to materialized views
During maintenance of the warehouse, you might encounter this troubling mysterious issue:
DROP TABLE my_table;
...
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Couldn't acquire the DB log notification lock because we reached the maximum # of retries: 5 retries. If this happens too often, then is recommended to increase the maximum number of retries on the hive.notification.sequence.lock.max.retries configuration :: Error executing SQL query "select "NEXT_EVENT_ID" from "NOTIFICATION_SEQUENCE" for update".) (state=08S01,code=1)
Even if this error message gives you a recommendation, don’t follow it too quickly! First, go check the logs in hivemetastore.log
and you will certainly find this:
2019-04-12T11:17:36,250 INFO [pool-8-thread-174]: metastore.ObjectStore$RetryingExecutor (ObjectStore.java:run(9928)) - Attempting to acquire the DB log notification lock: 0 out of 5 retries
javax.jdo.JDODataStoreException: Error executing SQL query "select "NEXT_EVENT_ID" from "NOTIFICATION_SEQUENCE" for update".
...
Caused by: java.sql.BatchUpdateException: Batch entry 0 DELETE FROM "TBLS" WHERE "TBL_ID"=1852 was aborted: ERROR: update or delete on table "TBLS" violates foreign key constraint "MV_TABLES_USED_FK2" on table "MV_TABLES_USED"
Detail: Key (TBL_ID)=(1852) is still referenced from table "MV_TABLES_USED". Call getNextException to see other errors in the batch.
...
Caused by: org.postgresql.util.PSQLException: ERROR: update or delete on table "TBLS" violates foreign key constraint "MV_TABLES_USED_FK2" on table "MV_TABLES_USED"
Detail: Key (TBL_ID)=(1852) is still referenced from table "MV_TABLES_USED".
This basically means that a materialized view references the table you are trying to drop and that you don’t have the rights on that view. Set the table/MV permissions accordingly and you won’t see this weird message anymore.
Hive setup
The Hive setup of the cluster I run is the following:
- 2 Hive Metastores - 12 GB of RAM each
- 1 Hive Server 2 - 6 GB of RAM
- 1 Hive Server Interactive (LLAP) - 6 GB of RAM
- 3 nodes running LLAP daemons - 32 GB of RAM per daemon with 18 GB for caching
- The cluster is kerberized with Active Directory
Sources
- Apache Hive Wiki, https://cwiki.apache.org
- What’s new in Apache Hive presentation by Jason Dere, https://www.slideshare.net/Hadoop_Summit/whats-new-in-apache-hive-137569339
- Announcing HDP 3.0 – Faster, Smarter, Hybrid Data post on Hortonworks.com, http://web.archive.org/web/20190709182508/https://fr.hortonworks.com/blog/announcing-hdp-3-0-faster-smarter-hybrid-data/
- Cloudera documentation, https://docs.hortonworks.com/