Two Hive UDAF to convert an aggregation to a map

Two Hive UDAF to convert an aggregation to a map

David WORMS

By David WORMS

Mar 6, 2012

Categories
Data Engineering
Tags
Java
HBase
Hive
File Format
[more]
Do you like our work......we hire!

Never miss our publications about Open Source, big data and distributed systems, low frequency of one email every two months.

I am publishing two new Hive UDAF to help with maps in Apache Hive. The source code is available on GitHub in two Java classes: “UDAFToMap” and “UDAFToOrderedMap” or you can download the jar file. The first function converts an aggregation into a map and is internally using a Java HashMap. The second function extends the first one. It converts an aggregation into an ordered map and is internally using a Java TreeMap.

API

As a reminder, an UDF stands for a User Defined Function and an UDAF stands for User Defined Aggregate Function. When an UDF is a custom scalar function on one or more column of a single row (for example the CONCAT function in SQL), an UDAF works on an aggregation of one or multiple columns (for example the MAX function in SQL).

MAP to_map(primitive, primitive_or_complex)
MAP to_ordered_map(primitive, primitive_or_complex)

Both functions share the same API. They accept two arguments, first is the key and second is the value. Keys may be any primitive type. Values may be a primitive or a complex type. In Hive, primitive types are integer, boolean, floating point number, string, date and binary while complex types are structure, map and array.

Motivation

While working on time series in Hive, I created those functions out of the following needs:

  • Since UDF are much trivial to implement than UDAF, it is convenient to write and use a UDF which takes the result of a generic UDAF as its input, for example:
    select MY_UDF( MY_UDAF(column_1, column_2) ) from my_table group by column_3
  • The HBase structure we are modeling use keys to store customer identifiers and columns to store dates. Importing data from Hive to HBase could be done in two ways: with a select statement or with a complicated low-level bulk loading strategy. Using a select statement, since it is making usage of a map and since we couldn’t find any way to convert the result of an aggregation into a map, we had to insert on row per metering data which we suspect to be less efficient than inserting one row of multiple metering data for a single customer. Using a bulk loading strategy, we also faced the need to group our dataset into an ordered map to reflect to final layout of the HBase storage file format.
  • We are experimenting a custom HDFS file format integrated to Hive which internally needs to group the metering data of a customer to optimize their storage. We first wrote an implementation which takes an ordered result set as its input. This works but is a much more complicated implementation. Also, because of the nature of HDFS and MapReduce, we had no guarantee that all the data of a single customer would be store in a same file which prevents us from a complete optimization. Once again, structuring the data as a map was a requirement.

Usage

Considering a source data set as follow made of 4 columns (customer id, timestamp, meter value, meter state):

195100,1199145600,607527807,B
185100,1199145600,775031942,A
195100,1199156400,607532682,B
185100,1199156400,775032584,A
195100,1199167200,607535485,B
185100,1199167200,775033200,A
195100,1199178000,582924326,C
185100,1199178000,775034241,A
195100,1199188800,582927007,C
185100,1199188800,775035698,A
195100,1199199600,582929212,C
185100,1199199600,775036891,A
195100,1199210400,582932070,C
185100,1199210400,775038268,A
195100,1199221200,582935353,B
185100,1199221200,775039703,A

The CSV source is imported into Hive with the next statements:

DROP TABLE source;
CREATE TABLE source (
    customer INT,
    emission INT,
    value INT,
    state STRING
)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
LOAD DATA LOCAL INPATH './sample/data.csv' OVERWRITE INTO TABLE source;

Now we can declare our two UDAF:

ADD JAR ./target/adaltas-hive-udf-0.0.1-SNAPSHOT.jar;
CREATE TEMPORARY FUNCTION to_map as 'com.adaltas.UDAFToMap';
CREATE TEMPORARY FUNCTION to_ordered_map as 'com.adaltas.UDAFToOrderedMap';

And we can finally used them with the following queries:

 # HashMap implementation
SELECT
    `customer`, to_map(from_unixtime(emission), array(value,state))
FROM `source`
GROUP BY `customer`;
 # Ordered HashMap implementation
SELECT
    `customer`, to_ordered_map(from_unixtime(emission), array(value,state))
FROM `source`
GROUP BY `customer`;

The output of the last select statement looks like:

185100  {
    "2008-01-01 01:00:00":[fusion_builder_container hundred_percent="yes" overflow="visible"][fusion_builder_row][fusion_builder_column type="1_1" background_position="left top" background_color="" border_size="" border_color="" border_style="solid" spacing="yes" background_image="" background_repeat="no-repeat" padding="" margin_top="0px" margin_bottom="0px" class="" id="" animation_type="" animation_speed="0.3" animation_direction="left" hide_on_mobile="no" center_content="no" min_height="none"]["775031942","A"],
    "2008-01-01 04:00:00":["775032584","A"],
    "2008-01-01 07:00:00":["775033200","A"],
    "2008-01-01 10:00:00":["775034241","A"],
    "2008-01-01 13:00:00":["775035698","A"],
    "2008-01-01 16:00:00":["775036891","A"],
    "2008-01-01 19:00:00":["775038268","A"],
    "2008-01-01 22:00:00":["775039703","A"] }
195100  {
    "2008-01-01 01:00:00":["607527807","B"],
    "2008-01-01 04:00:00":["607532682","B"],
    "2008-01-01 07:00:00":["607535485","B"],
    "2008-01-01 10:00:00":["582924326","C"],
    "2008-01-01 13:00:00":["582927007","C"],
    "2008-01-01 16:00:00":["582929212","C"],
    "2008-01-01 19:00:00":["582932070","C"],
    "2008-01-01 22:00:00":["582935353","B"]}

If you have clone our GitHub repository, you can run the above sample with the command mvn install && hive -f sample/to_map.hive.

While writing this article, I also published a JIRA enhancement issue (HIVE-2843) proposing the integration of the source code into Hive. I’ll prepare the patch for its inclusion into Hive if the community express its interest.

Share this article

Canada - Morocco - France

We are a team of Open Source enthusiasts doing consulting in Big Data, Cloud, DevOps, Data Engineering, Data Science…

We provide our customers with accurate insights on how to leverage technologies to convert their use cases to projects in production, how to reduce their costs and increase the time to market.

If you enjoy reading our publications and have an interest in what we do, contact us and we will be thrilled to cooperate with you.

Support Ukrain