Role of Hadoop Combiner in MapReduce API

Hadoop Combiners In my previous blog, I discussed about Hadoop Counter. In this post, I would like to focus on Hadoop Combiner, a highly useful function offered by Hadoop. Similar to my previous post, I would be demonstrating the functionality of Hadoop Combiner using an example and would be utilizing the same dataset (Customer Complaints), which was used in my previous post, I am sure this would help readers to correlate easily. Let’s get started. 

Hadoop Combiner

When a MapReduce Job is run on a large dataset, Hadoop Mapper generates large chunks of intermediate data that is passed on to Hadoop Reducer for further processing, which leads to massive network congestion. So how do go about reducing this network congestion? Is there any function in Hadoop to address this issue? The MapReduce framework offers a function known as ‘Combiner’ that can play a crucial role in reducing network congestion. As a matter of fact ‘Combiner’ is also termed as ‘Mini-reducer’. It is important to note that the primary job of a Hadoop Combiner is to process the output data from Hadoop Mapper, before passing it to a Hadoop Reducer. Technically speaking, Combiner and Reducer use the same code.

Hadoop Combiner Example

In order to understand the concept of Hadoop Combiner effectively, let’s consider the following use case, which identifies the number of complaints reported in each state. Below is the code snippet:

package com.evoke.bigdata.mr.complaint;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ComplaintCombiner extends Configured implements Tool {
 @Override
 public int run(String[] args) throws Exception {
 String input, output;
 if(args.length == 2) {
 input = args[0];
 output = args[1];
 } else {
 input = "your-input-dir";
 output = "your-output-dir";
 }
 JobConf conf = new JobConf(getConf(), ComplaintCombiner.class);
 conf.setJobName(this.getClass().getName());

 FileInputFormat.setInputPaths(conf, new Path(input));
 FileOutputFormat.setOutputPath(conf, new Path(output));

 conf.setMapperClass(StateMapper.class);
 conf.setReducerClass(SumCombiner.class);

 //conf.setCombinerClass(SumCombiner.class);

 conf.setMapOutputKeyClass(Text.class);
 conf.setMapOutputValueClass(IntWritable.class);

 conf.setOutputKeyClass(Text.class);
 conf.setOutputValueClass(IntWritable.class);

 JobClient.runJob(conf);
 return 0;
 }
 public static void main(String[] args) throws Exception {
 int exitCode = ToolRunner.run(new ComplaintCombiner(), args);
 System.exit(exitCode);
 }
}

Below is the mapper code that can be used to retrieve complaints for each state:

package com.evoke.bigdata.mr.complaint;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class StateMapper extends MapReduceBase implements
 Mapper<LongWritable, Text, Text, IntWritable> {
 @Override
 public void map(LongWritable key, Text value,
 OutputCollector<Text, IntWritable> output, Reporter reporter)
 throws IOException {
 String s = value.toString();
 String[] fields = value.toString().split(",");
 if (fields.length > 1) {
 output.collect(new Text(fields[5]), new IntWritable(1));
 }
 }
}

And lastly, here’s the Reducer/Combiner code that provides the total number of complaints received in each state:

package com.evoke.bigdata.mr.complaint;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class SumCombiner extends MapReduceBase implements
 Reducer<Text, IntWritable, Text, IntWritable> {
 @Override
 public void reduce(Text key, Iterator<IntWritable> values,
 OutputCollector<Text, IntWritable> output, Reporter reporter)
 throws IOException {
 int stateCount = 0;
 while (values.hasNext()) {
 IntWritable value = values.next();
 stateCount += value.get();
 }
 output.collect(key, new IntWritable(stateCount));
 }
}

Subsequently, here’s the output in HDFS, when the above Job code is executed. As can be seen the output showcases total number of complaints against each state.

Hadoop Distributed File System Output Screen

Correspondingly, here’s is the view of intermediate objects created in the Job Tracker UI, when Hadoop Reducer is used without a Combiner (refer highlighted area).

Reducer without Combiner Output Screen

In like manner, if the same job is executed using Hadoop Combiner by replacing setReducerClass method with a setCombinerClass method in the job code, it will drastically reduce the number of bytes transmitted over the network. Furthermore, we can clearly observe in the Job Tracker UI that very limited data is transferred to the Reduce phase (refer below illustration). It is also interesting to note that the operation has been performed by Hadoop Combiner without transmitting large bytes of data to the Reduce phase (refer the highlighted area).

Combiner without Reducer - Output Screen

Conclusion

All things considered, Hadoop Combiner should be used only when the function is both commutative and associative. We can incorporate both Hadoop Combiner and Hadoop Reducer in a Job code and use them to perform diverse operations. In the above scenario, we performed only a single operation ‘summation’. Hence, I preferred to use Hadoop Combiner over Hadoop Reducer to perform the Job, which significantly reduced network congestion and optimized network bandwidth to a great extent. Additionally, it also helped in improving the data processing speed. In my next blog, I would be talking about Hadoop Partitioners, so stay tuned.

Tej Jawahar Kummari

View posts by Tej Jawahar Kummari
Tej Jawahar was a Senior Analyst at Evoke Technologies. He was part of the Data Science & Big Data Analytics - COE, He is highly proficient in Java technologies. He is keen observer of trends in Big data and its associated technologies. He has hands on experience in Hadoop, Hbase, Hive, Pig, Spark and related tools.

6 Comments

  1. How it combines the data on same key,list as we just applying combiner in individual mappers?
    Ex: say our word count example:
    One off my mapper emits out put as like below
    Hi 1
    Hello 1
    How 1
    Hi 1

    If we use the same reducer logic for combiner, here in mapper output there is no list of values then how combiner combiner the data and optimise network???

  2. Have a question….
    Combiners are made using same class as reducer and mostly same code.
    But question when exactly it is called before sort and shuffle or before reduce when??
    if before sort and shuffle i.e., just after mapper then how it will get input as [key, list]???? as this is given by sort and shuffle.
    Now if it is called after sort and shuffle i.e., just before reducer then output to combiner is [key, value] like reducer then how reducer will get input as [key, list]????
    Prompt answer is highly appreciated

    1. Mapper or Combiner/Reducer is an individual task instance (JVM process) originating from task tracker, operating on the data node. If the data node is down, then the task instance fails, which in turn notifies task tracker, thereby informing about the failure to Job Tracker.

      Now, the Job Tracker may resubmit that job on another data node (as it is clustered environment, data is replicated thrice by default). Job tracker verifies with Name Node to find out which data node has similar data. Job tracker then sends required resources to data node to perform tasks with the task tracker on that data node.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

%d bloggers like this: