Deux Hive UDAF pour convertir une aggregation vers une map
By WORMS David
6 mars 2012
- Catégories
- Data Engineering
- Tags
- Java
- HBase
- Hive
- Format de fichier [plus][moins]
Ne ratez pas nos articles sur l'open source, le big data et les systèmes distribués, fréquence faible d’un email tous les deux mois.
Je publie deux nouvelles fonctions UDAF pour Hive pour aider avec les map dans Apache Hive. Le code source est disponible sur GitHub dans deux classes Java : “UDAFToMap” et “UDAFToOrderedMap” ou vous pouvez télécharger le fichier jar. La première fonction convertit une agrégation en une map et utilise en interne un HashMap
Java. La deuxième fonction étend la première. Elle convertit une agrégation en une sorted map et utilise en interne un TreeMap
Java.
API
Pour rappel, un UDF représente une fonction définie par l’utilisateur et un UDAF correspond à la fonction d’agrégation définie par l’utilisateur. Tandis qu’une UDF est une fonction scalaire sur une ou plusieurs colonnes d’une même ligne (par exemple, la fonction CONCAT
en SQL), une UDAF fonctionne sur une agrégation d’une ou de plusieurs colonnes (par exemple, la fonction MAX
dans SQL ) sur plusieurs lignes.
MAP to_map(primitive, primitive_or_complex)
MAP to_ordered_map(primitive, primitive_or_complex)
Les deux fonctions partagent la même API. Ils acceptent deux arguments, le premier est la clé et le second est la valeur. Les clés peuvent être n’importe quel type primitif. Les valeurs peuvent être un type primitif ou complexe. Dans Hive, les types primitifs sont les nombres entier, booleen, float, string, date et binaire, tandis que les types complexes sont structure, map et array.
Motivation
En travaillant sur des séries temporelles dans Hive, j’ai créé ces fonctions pour répondre aux besoins suivants :
- Étant donné que les fichiers UDF sont beaucoup plus simples à implémenter qu’une UDAF, il est pratique d’écrire et d’utiliser un fichier UDF prenant le résultat d’un fichier générique UDAF en entrée, par exemple :
sélectionnez MY_UDF (MY_UDAF (colonne_1, colonne_2)) dans le groupe my_table par la colonne_3
. - La base de donnée HBase que nous modélisons utilise des clés pour stocker les identifiants client et les colonnes pour stocker les dates. Importer des données de Hive vers HBase peut être effectué de deux manières : avec une instruction
select
ou avec une stratégie compliquée de chargement en bloc plutôt bas niveau. L’approche parselect
utilise une map et nous n’avons trouvé aucun moyen de convertir le résultat d’une agrégation en une map. Nous avons dû insérer les données ligne par ligne qui, à notre avis, étaient moins efficaces. En utilisant une stratégie de chargement en masse, nous avons également pû regrouper notre ensemble de données dans une sorted map afin de refléter la mise en page finale du format de fichier de stockage HBase. - Nous expérimentons un format de fichier HDFS personnalisé intégré à Hive, qui doit en interne regrouper les données de mesure d’un client pour optimiser son stockage. Nous avons d’abord écrit une implémentation qui prend en entrée un ensemble de résultats ordonné. Cela fonctionne mais l’implémentation est plus compliquée. De plus, en raison de la nature de HDFS et de MapReduce, nous n’avions aucune garantie que toutes les données d’un seul client seraient stockées dans un même fichier, ce qui nous empêcherait une optimisation complète. À nouveau, il était indispensable de structurer les données sous forme de map.
Utilisation
Considérant un ensemble de données sources composé de 4 colonnes comme suit (ID client, horodatage, valeur du compteur, état du compteur) :
195100,1199145600,607527807,B
185100,1199145600,775031942,A
195100,1199156400,607532682,B
185100,1199156400,775032584,A
195100,1199167200,607535485,B
185100,1199167200,775033200,A
195100,1199178000,582924326,C
185100,1199178000,775034241,A
195100,1199188800,582927007,C
185100,1199188800,775035698,A
195100,1199199600,582929212,C
185100,1199199600,775036891,A
195100,1199210400,582932070,C
185100,1199210400,775038268,A
195100,1199221200,582935353,B
185100,1199221200,775039703,A
La source CSV est importée dans Hive avec les déclarations suivantes :
DROP TABLE source;
CREATE TABLE source (
customer INT,
emission INT,
value INT,
state STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
LOAD DATA LOCAL INPATH './sample/data.csv' OVERWRITE INTO TABLE source;
Nous pouvons maintenant déclarer nos deux UDAF :
ADD JAR ./target/adaltas-hive-udf-0.0.1-SNAPSHOT.jar;
CREATE TEMPORARY FUNCTION to_map as 'com.adaltas.UDAFToMap';
CREATE TEMPORARY FUNCTION to_ordered_map as 'com.adaltas.UDAFToOrderedMap';
Et nous pouvons enfin les utiliser avec les requêtes suivantes :
# HashMap implementation
SELECT
`customer`, to_map(from_unixtime(emission), array(value,state))
FROM `source`
GROUP BY `customer`;
# Ordered HashMap implementation
SELECT
`customer`, to_ordered_map(from_unixtime(emission), array(value,state))
FROM `source`
GROUP BY `customer`;
La sortie de la dernière instruction select
ressemble à ceci :
185100 {
"2008-01-01 01:00:00":[fusion_builder_container hundred_percent="yes" overflow="visible"][fusion_builder_row][fusion_builder_column type="1_1" background_position="left top" background_color="" border_size="" border_color="" border_style="solid" spacing="yes" background_image="" background_repeat="no-repeat" padding="" margin_top="0px" margin_bottom="0px" class="" id="" animation_type="" animation_speed="0.3" animation_direction="left" hide_on_mobile="no" center_content="no" min_height="none"]["775031942","A"],
"2008-01-01 04:00:00":["775032584","A"],
"2008-01-01 07:00:00":["775033200","A"],
"2008-01-01 10:00:00":["775034241","A"],
"2008-01-01 13:00:00":["775035698","A"],
"2008-01-01 16:00:00":["775036891","A"],
"2008-01-01 19:00:00":["775038268","A"],
"2008-01-01 22:00:00":["775039703","A"] }
195100 {
"2008-01-01 01:00:00":["607527807","B"],
"2008-01-01 04:00:00":["607532682","B"],
"2008-01-01 07:00:00":["607535485","B"],
"2008-01-01 10:00:00":["582924326","C"],
"2008-01-01 13:00:00":["582927007","C"],
"2008-01-01 16:00:00":["582929212","C"],
"2008-01-01 19:00:00":["582932070","C"],
"2008-01-01 22:00:00":["582935353","B"]}
Si vous avez cloné notre repository hive-udf sur GitHub, vous pouvez exécuter l’exemple ci-dessus avec la commande mvn install && hive -f sample/to_map.hive
.
En écrivant cet article, j’ai également publié le ticket JIRA HIVE-2843 proposant l’intégration du code source dans Hive.