Monday, October 24, 2011

Enable Sorted Bucketing in Hive


From the hive documents mostly we get to an impression as for grouping records, we go in for partitions and for sampling purposes, ie for evenly distributed records across multiple files we go in for buckets. But can we group records based on some columns/fields in buckets as well (individual files in buckets).
Concepts get clearer when we explain it through examples. So I’m taking the same route here.  Once with a hadoop assignment we did design a hadoop hybrid solution where the final output was on a hive partitioned table. This final output has to be consumed by an Oracle DWH for some legacy applications. The hand shake between hadoop and oracle team was they wanted ‘n’ files for each sub partition/folder and the files should have data grouped based on a few columns in the table (country and continent). If the files are grouped then the oracle load would be much efficient. How can we get the solution materialized?

1.       After hive operations do a map reduce on the final folders that would do the Group by
You do this by setting the number of reducers to ’n’ for n output files while running against each sub folder.It is really not a good solution because you have to run the map reduce for all sub partitions/folders which is definitely a performance glitch.
2.       Bucketing in hive
Using bucketing in hive for sub paritions. It is not plain bucketing but sorted bucketing. Normally we enable bucketing in hive during table creation as
CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
  Id INT,name String
)
PARTITIONED BY (dt STRING,hour STRING)
CLUSTERED BY(country,continent) INTO n BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
LOCATION '/home/test_dir';

When we go into sorted bucketing/grouped bucketing our DDL would look like

CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(         
Id INT,name String
)
PARTITIONED BY (dt STRING,hour STRING)
CLUSTERED BY(country,continent) SORTED BY(country,continent) INTO n BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
LOCATION '/home/test_dir';     

Now to enforce bucketing while loading data into the table, we need to enable a hive parameter as follows
set hive.enforce.bucketing = true;
                     
With this DDL our requirement would be satisfied. The n individual files within each sub partitions and the records would be grouped into n files based on country, continent. ie the a particular combination of country, continent would be present in only one file. Now if the question arises, which combination in which file? It is decided by the hash partitioning function. If you want control over that you need to write your custom hash partitioner and plug in the same into your hive session.

NOTE: When we use partitions data is stored under individual directories/sub directories in hdfs. But when we use buckets the records are stored as files with naming convention ranging from 0 to n-1.

NOTE: In partitioned tables when we issue a query only the required partitions are scanned, no need to specify any hints in your hive query. But for bucketed tables it is not the case, you need to hint your hive query if you want to scan some particular buckets else the whole set of files would be scanned. We hint the buckets using TABLESAMPLE clause in our hive query. For example in our example if we want to choose only the data from BUCKET 2
SELECT * FROM test_table TABLESAMPLE(2 OUT OF n BUCKETS)WHERE dt=’2011-10-11’ AND hr=’13’;

Include values during execution time in hive QL/ Dynamically substitute values in hive


When you play around with data warehousing it is very common to come across scenarios where you’d like to submit values at run time. In production environments when we have to enable a hive job we usually write our series of hive operations in HQL on a file and trigger it using the hive –f option from a shell script or some workflow management systems like oozie. Let’s have this discussion limited to triggering the hive job from shell as it is the basic one.
      Say I’m having a hive job called hive_job.hql, normally from a shell I’d trigger the hive job as
hive -f hive_job.hql

If I need to set some hive config parameters, say I need to enable compression in hive then I’d include the following arguments along as
hive -f hive_job.hql  -hiveconf hive.exec.compress.output=true -hiveconf mapred.output.compress=true -hiveconf mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec

Now the final one, say my hive QL is doing some operations on date range and this date is varying/dynamic. This date is to be accepted from CLI each time. We can achieve the same with the following steps
1.       Pass the variables as config parameters
2.       Refer these config parameters in your hive query

                Let me make it more specific with a small example. I need to perform some operation on records in a table called ‘test_table’ which has a column/field named ‘creation_date’ .(ie I need to filter records based on creation_date). The date range for the operation is supplied at run time. It is achieved as
1.       Pass the variables as config parameters
                Here we need to pass two parameters, the start date and end date to get all the records within a specific date range
hive -f hive_job.hql  -hiveconf start_date=2011-01-01 –hiveconf end _date=2011-12-31
2.       Refer these config parameters in your hive query
                In our hive QL the start date and end date are to be decorated with place holders to be replaced by actual values during execution time.
SELECT * FROM test_table t1 WHERE t1.created_date=’ ${hiveconf: start_date }’ AND t2.created_date=’ ${hiveconf: end _date}’


Let us look into one more example, decide on the number of reducers during run time with the previous set of requirements along with compression. I have 2 components here 

·         Shell script that triggers .hql
Hive_trigger.sh
# The script accepts 3 parameters in order- number of reducers, start date and end date.
NUM_REDUCERS=$1
BEGIN_DATE=$2
CLOSE_DATE=$3
hive -f hive_job.hql  -hiveconf mapred.reduce.tasks= NUM_REDUCERS  -hiveconf start_date= BEGIN_DATE –hiveconf end _date= CLOSE_DATE

·         HQL that accepts parameters at run time
hive_job.hql
SELECT * FROM test_table t1 WHERE t1.created_date=’ ${hiveconf: start_date }’ AND t2.created_date=’ ${hiveconf: end _date}’

The components are ready you can now trigger you shell script. For example as
./hive_trigger.sh 50 2011-01-01 2011-12-31

How to efficiently store data in hive/ Store and retrieve compressed data in hive


Hive is a data warehousing tool built on top of hadoop. The data corresponding to hive tables are stored as delimited files in hdfs. Since it is used for data warehousing, the data for production system hive tables would definitely be at least in terms of hundreds of gigs. Now naturally the question arises, how efficiently we can store this data, definitely it has to be compressed. Now a few more questions arise, how can store compressed data in hive? How can we process and retrieve compressed data in hive using hive QL.
                Now let’s look into these, it is fairly simple if you know hive. Before you use hive you need to enable a few parameters for dealing with compressed tables. It is the same compression enablers when you play around with map reduce along with a few of hive parameters.

·         hive.exec.compress.output=true
·         mapred.output.compress=true
·         mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec

Here I have used LZO as my compression in hdfs, hence using the LzopCodec. Beyond setting this you don’t need to do anything else, use the hive QLs normally as you do with uncompressed data. I have tried out the same successfully with Dynamic Partitions, Buckets etc, It works like any normal hive operations.
               The input data for me from conventional sources were normal text, this raw data was loaded into a staging table. From the staging table with some hive QL the cleansed data was loaded into actual hive tables . The staging table gets flushed every time the data is loaded into target hive table.

Monday, September 19, 2011

Joins with plain Map Reduce or MultipleInputs



Being a map reduce developer I’d never recommend to write joins of data sets using custom map reduce code. You have very intelligent and powerful tools handy in hadoop like hive and pig that can easily join huge data sets with the choice of join like inner, outer etc. But if such a scenario arises where you need to do join using map reduce you should be able to accomplish that with your knowledge on basic map reduce programming.
                Let us look into a mocked up example for the same.

Problem Statement: A retailer has a customer data base and he need to do some promotions based on the same. He chooses bulk sms as the choice of his promotion which is done by a third part for him. And once the sms is pushed the sms provider returns the delivery status back to the retailer. Now let us look into more details which makes things a little complicated
We have 3 input files as follows
1.       UserDetails.txt
                                 i.            Every record is of the format ‘mobile number , consumer name’
2.       DeliveryDetails.txt
                                 i.            Every record is of the format ‘mobile number, delivery status code’
3.       DeliveryStatusCodes.txt
                                 i.            Every record is of the format ‘delivery status code, status message’
The retailer has a consumer data base(UserDetails.txt)  from which only the mobile number are provided to a bulk sms provider. He can’t reveal the customer name due to security reasons. Once the messages are pushed the sms provider sends back a report of the mobile numbers with status code (DeliveryDetails.txt) and also a look up file that relates every status code to the corresponding Status message (DeliveryStatusCodes.txt).
 The requirement is that for meaningful information we need the consumer name along with its corresponding status message. And we need to obtain the same from these 3 files.

Sample Inputs
File 1 – UserDetails.txt
123 456, Jim
456 123, Tom
789 123, Harry
789 456, Richa

File 2 – DeliveryDetails.txt
123 456, 001
456 123, 002
789 123, 003
789 456, 004

File 3 – DeliveryStatusCodes.txt
001, Delivered
002, Pending
003, Failed
004, Resend

Expected Output
Jim, Delivered
Tom, Pending
Harry, Failed
Richa, Resend


Solution : Using core MapReduce
1.       Use two different mapper classes for both processing the  initial inputs from UserDetails.txt and DeliveryDetails.txt, The Key value output from the mappers should be as follows
a)      UserDetails.txt
                                                         i.            Key(Text) – mobile number
                                                       ii.            Value(Text) – An identifier to indicate the source of input(using ‘CD’ for the customer details file) + Customer Name
b)      DeliveryDetails.txt
                                                         i.            Key(Text) – mobile number
                                                       ii.            Value(Text) – An identifier to indicate the source of input(using ‘DR’ for the delivery report file) + Status Code
So here since the two files needs to be parsed separately using two mappers. I’m using
UserFile Mapper.java to process UserDetails.txt and
DeliveryFileMapper.java to process DeliveryDetails.txt
In map reduce API, I’m using MulipleInputFormat to specify which input to go into which mapper. But the ouput key value pairs from the mapper go into the same reducer, for the Reducer to identify the source of the value we are prepending the values ‘CD’ or ‘DR’.

2.       On the reducer end use distributed cache to distribute the DeliveryStatusCodes.txt. Parse the file and load the contents into HashMap with Key being the status code and value being the status message

3.       On the reducer every key would be having two values one with prefix ‘CD’ and other ‘DR’. (For simplicity let us assume only 2 values, in real time it can be more). Identify the records and from CD get the customer name corresponding to the cell number (input key) and from DR get the status code. On obtaining the status code do a look up on the HashMap to get the status message. So finally the output Key values from the reducer would be as follows
a)      Key : Customer Name
b)      Value : Status Message

Let’s just look at the source code

Mapper Class1: UserFileMapper.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class UserFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
    //variables to process Consumer Details
    private String cellNumber,customerName,fileTag="CD~";
   
    /* map method that process ConsumerDetails.txt and frames the initial key value pairs
       Key(Text) – mobile number
       Value(Text) – An identifier to indicate the source of input(using ‘CD’ for the customer details file) + Customer Name
     */
    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
    {
       //taking one line/record at a time and parsing them into key value pairs
        String line = value.toString();
        String splitarray[] = line.split(",");
        cellNumber = splitarray[0].trim();
        customerName = splitarray[1].trim();
       
      //sending the key value pair out of mapper
        output.collect(new Text(cellNumber), new Text(fileTag+customerName));
     }
}

Mapper Class2:DeliverFileMapper.java

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class DeliveryFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
    //variables to process delivery report
    private String cellNumber,deliveryCode,fileTag="DR~";
   
   /* map method that process DeliveryReport.txt and frames the initial key value pairs
    Key(Text) – mobile number
    Value(Text) – An identifier to indicate the source of input(using ‘DR’ for the delivery report file) + Status Code*/

    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
    {
       //taking one line/record at a time and parsing them into key value pairs
        String line = value.toString();
        String splitarray[] = line.split(",");
        cellNumber = splitarray[0].trim();
        deliveryCode = splitarray[1].trim();
       
        //sending the key value pair out of mapper
        output.collect(new Text(cellNumber), new Text(fileTag+deliveryCode));
     }
}

Reducer Class:SmsReducer.java

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class SmsReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
      
       //Variables to aid the join process
       private String customerName,deliveryReport;
       /*Map to store Delivery Codes and Messages
       Key being the status code and vale being the status message*/
       private static Map<String,String> DeliveryCodesMap= new HashMap<String,String>();
      
       public void configure(JobConf job)
       {
              //To load the Delivery Codes and Messages into a hash map
              loadDeliveryStatusCodes();
             
       }


       public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
    {
        while (values.hasNext())
        {
             String currValue = values.next().toString();
             String valueSplitted[] = currValue.split("~");
             /*identifying the record source that corresponds to a cell number
             and parses the values accordingly*/
             if(valueSplitted[0].equals("CD"))
             {
               customerName=valueSplitted[1].trim();
             }
             else if(valueSplitted[0].equals("DR"))
             {
              //getting the delivery code and using the same to obtain the Message
               deliveryReport = DeliveryCodesMap.get(valueSplitted[1].trim());
             }
        }
        
        //pump final output to file
        if(customerName!=null && deliveryReport!=null)
        {
               output.collect(new Text(customerName), new Text(deliveryReport));
        }
        else if(customerName==null)
               output.collect(new Text("customerName"), new Text(deliveryReport));
        else if(deliveryReport==null)
               output.collect(new Text(customerName), new Text("deliveryReport"));
        
    }
      
      
       //To load the Delivery Codes and Messages into a hash map
    private void loadDeliveryStatusCodes()
    {
       String strRead;
       try {
              //read file from Distributed Cache
                     BufferedReader reader = new BufferedReader(new FileReader("DeliveryStatusCodes.txt"));
                     while ((strRead=reader.readLine() ) != null)
                     {
                           String splitarray[] = strRead.split(",");
                           //parse record and load into HahMap
                           DeliveryCodesMap.put(splitarray[0].trim(), splitarray[1].trim());
                          
                     }
              }
              catch (FileNotFoundException e) {
              e.printStackTrace();
              }catch( IOException e ) {
                       e.printStackTrace();
                }
             
       }
}

Driver Class: SmsDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.lib.MultipleInputs;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class SmsDriver extends Configured implements Tool
{
       public int run(String[] args) throws Exception {

              //get the configuration parameters and assigns a job name
              JobConf conf = new JobConf(getConf(), SmsDriver.class);
              conf.setJobName("SMS Reports");

              //setting key value types for mapper and reducer outputs
              conf.setOutputKeyClass(Text.class);
              conf.setOutputValueClass(Text.class);

              //specifying the custom reducer class
              conf.setReducerClass(SmsReducer.class);

              //Specifying the input directories(@ runtime) and Mappers independently for inputs from multiple sources
              MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, UserFileMapper.class);
              MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, DeliveryFileMapper.class);
             
              //Specifying the output directory @ runtime
              FileOutputFormat.setOutputPath(conf, new Path(args[2]));

              JobClient.runJob(conf);
              return 0;
       }

       public static void main(String[] args) throws Exception {
              int res = ToolRunner.run(new Configuration(), new SmsDriver(),
                           args);
              System.exit(res);
       }
}


Let us go in for a small code walk through, I’m not going in for each and every line of code as it is commented and is bit self-explanatory. The few points to keep in mind

1.       The only difference in the code is that we are using MultipleInputFormat instead of FileInputFormat. This is necessary as we use two mappers and we need the output of the two mappers to be processed by a single reducer
2.       When we normally execute our map reduce with the hadoop jar command the last two arguments on the command line represent the input and output dir in hdfs. But here instead of two we’d have three input locations and one output location.
3.       The second key thing to be noted here is that in place of input locations don’t provide the full path with file names. Provide the input directories instead. Load the two files in two separate directories and provide the corresponding paths to mappers.
4.       Since my driver is getting the arguments from command line, the order of arguments is also very critical. Make sure that the input directories always point to their corresponding mappers itself.

You can run the above example with the following command on CLI as
hadoop jar /home/bejoys/samples/ smsMarketing.jar com.bejoy.samples.smsmarketing.SmsDriver  -files /home/bejoys/samples/ DeliveryStatusCodes.txt /userdata/bejoys/samples/sms/consumerdata /userdata/bejoys/samples/sms/deliveryinformation /userdata/bejoys/samples/sms/output  

Note:
                     i.            Since the join the happening on reduce, it is termed as a reduce side join.
                   ii.            This is a very basic approach to implement joins in map reduce and is for those who have a basic knowledge on map reduce programming. You can implement it in more sophisticated manner in mapreduce frame work using DataJoin Mappers and Reducers with TaggedMap Output Types.

But if it is a join then I’d strongly recommend you to go in with Pig or Hive as both of these are highly optimized for implementing joins. Also you can eliminate the coding effort you need to put in. It is not exaggerating if I say I can implement the same in a single step using hive. Let us just check it out

Using Hive
1.       Load the data into 3 hive tables
2.       Perform join using hive QL

Creating Hive tables to store the files
CREATE TABLE customer_details (cellNumber String,consumerName String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH '/home/bejoys/samples/ConsumerDetails.txt' INTO TABLE customer_details;

CREATE TABLE delivery_report (cellNumber String,statusCode int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH '/home/bejoys/samples/DeliveryReport.txt' INTO TABLE delivery_report;

CREATE TABLE status_codes (statusCode int,statusMessage String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH '/home/bejoys/samples/DeliveryStatusCodes.txt' INTO TABLE status_codes;

Hive Query to execute Join operation on data sets

Select cd.consumerName,sc.statusMessage FROM customer_details cd
JOIN delivery_report dr ON (cd.cellNumber = dr.cellNumber) JOIN
status_codes sc ON(dr.statusCode = sc.statusCode);

You can optimize the hive Query again for performance boosting. Refer Optimizing Joins in hive