Timeseries storage in Hadoop and Hive
By David WORMS
Jan 10, 2012
- Categories
- Data Engineering
- Tags
- CRM
- timeseries
- Tuning
- Hadoop
- HDFS
- Hive
- File Format [more][less]
Never miss our publications about Open Source, big data and distributed systems, low frequency of one email every two months.
In the next few weeks, we will be exploring the storage and analytic of a large generated dataset. This dataset is composed of CRM tables associated to one timeserie table of about 7,000 billiard rows.
Before importing the dataset into Hive, we will be exploring different optimization options expected to impact speed and storage size.
Note, the following advantages and disadvantages of each optimization reflect our current understanding of Hadoop and Hive.
Hive partitioning
The distribution of the data files in HDFS under hierarchical directories.
Advantage:
- With partition pruning, some queries may only need to read a subset of the files
Disadvantage:
- Multiply the number of files managed by the namenode
- A query may work against the partition thus requiring to traverse more files. For example a query which aggregates the data of a single customer over a year would be less efficient under a table partitioned by day.
Block size
Each file in HDFS is split under different chunks of fixed size to be distributed across the Hadoop cluster. This block size is by default set to 64M but could be increased to 126M or more.
Advantage:
- Less work between M/R tasks
Disadvantage:
- Decrease the potential load repartition with smaller files (think Hive partitioning)
Hive index
Maintain an index against a Hive table
Advantage:
- Work on any columns with impacting the HDFS layout
Disadvantage:
- Still slower than a natural file storing layout (think sorted Clustering)
- Duplication of index for each Hive partition (might not be a problem)
Clustering
Dispatch data into a defined number of buckets (HDFS files) using a hash key on one or more columns and order the data.
Advantage:
- Sampling
Advantages not yet implemented:
- Bucket pruning when filtering on the bucket column
- Take advantage of the sorted schema
- Join optimization between two tables using the same column hash
Disadvantage:
- Multiply the number of files managed by the namenode.
Native file formats
Text versus binary and row versus column layout. Hive offers TEXTFILE
, SEQUENCEFILE
and RCFILE
.
Advantage:
RCFILE
may benefit better file size and increase speed on joinsSEQUENCEFILE
shall better handle map phase referencing multiple columns
Hive format
Best usage of Hive data type and experiment Hive data structure in the context of timeseries. For example, instead of storing one measure per row, we could store all the measure of a single customer in a Hive structure type of 144 elements for each day (one data every 10mn).
Advantage:
- Drastically reduce the storage size (while increasing speed)
Disadvantage:
- Increase queries complexity
Custom file formats
Leverage Hive SerDe to implement a custom layout. This is similar to the solution described above but pushed one step further.
Compression
Find the right equilibrium between IO and CPU usage by testing the various Codecs.
MR framework
Test various map and reduce tuning properties.
Preparation
Before we start we need:
- A consequent cluster (around 20 nodes)
- A simplified data schema
- A medium sized data sets (around 1To)
- A few representative queries
Cluster
- Number of servers: 19 (waiting for 2 more)
- Number of cpus: 320 logical thread
- Number of disk: 124 * 1Go SATA 7200rpm
- Number of rack: 2
- Hadoop version: Cloudera 0.20.2-cdh3u2
Data schema
The original Hive database schema is:
CREATE DATABASE timeseries;
USE timeseries;
-- Cities table
CREATE TABLE cities
(
id_city BIGINT,
fk_region INT,
zipcode INT,
name STRING
)
STORED AS SEQUENCEFILE;
-- Client table
CREATE TABLE client
(
id_client BIGINT,
fk_city INT,
lastname STRING
)
STORED AS SEQUENCEFILE;
-- Metering table
CREATE TABLE metering
(
fk_client BIGINT,
date_emission INT,
date_reception INT,
type STRING,
index INT,
diff INT
)
PARTITIONED BY(date_emission_day STRING)
CLUSTERED BY(fk_client) SORTED BY(date_emission) INTO 32 BUCKETS
STORED AS SEQUENCEFILE;