In this post we’ll see what is combiner in Hadoop and how combiner helps in speeding up the shuffle and sort phase in Hadoop MapReduce.
What is combiner in Hadoop
Generally in a MapReduce job, data is collated in the Map phase and later aggregated in reduce phase. By specifying a combiner function in MapReduce you can aggregate data at the Map phase also.
You can specify a combiner in your MapReduce driver using the following statement -
job.setCombinerClass(COMBINER_CLASS.class);
Note that specifying combiner in your MapReduce job is optional.
How combiner helps in improving MapReduce performance
Once the Map tasks start producing output that data has to be stored in memory, partitioned as per the number of reducers, sorted on keys and then spilled to the disk.
Once the Map task is done the data partitions have to be sent to the reducers (on different nodes) working on specific partitions. As you can see this whole shuffle and sort process involves consuming memory, I/O and data transfer across network.
If you specify a combiner function in MapReduce, when the map output stored in memory is written to disk, combiner function is run on the data so that there is less data to be written to the disk (reducing I/O) which also results in less data being transferred to reducer nodes (reducing bandwidth).
For example– Suppose you have sales data of several items and you are trying to find the maximum sales number per item. For Item1 if following (key,value) pair are the output of Map-1 and Map-2.
Map-1(Item1, 300) (Item1, 250) (Item1, 340) (Item1, 450)Map-2
(Item1, 120) (Item1, 540) (Item1, 290) (Item1, 300)Then the reduce function which gets data for this key (Item1) will receive all these (key, value) pairs as input after the shuffle phase.
[Item1,(300,250,340,450,120,540,290,300)]
Resulting in final output - (Item1, 540)
If you are using a combiner in MapReduce job and the reducer class itself is used as the combiner class then combiner will be called for each map output.
Map-1 Combiner output
(Item1, 450)
Map-2 Combiner output
(Item1, 540)
Input to Reducer - [Item1, (450, 540)]
Resulting in final output - (Item1, 540)
So you can see by using a combiner map output is reduced which means
less data is written to disk and less data is transferred to reducer
nodes.
How to write a Combiner function
For writing Combiner class you need to extend Reducer and implement the reduce method just like you do for writing the reducer. In fact in many cases reducer itself can be used as the Combiner.
The output key value types of combiner must be same as the output key value type of the mapper.
Though it is not always possible to use the reducer as the combiner class, classic example of this constraint is calculation of average.
For example- If there are two maps with (key, value) pair as following
Map-1 (1,4,7) and Map-2 (8,9)
Then reduce function will calculate average as – (1+4+7+8+9)/5 = 29/5 = 5.8
where as with combiner where average will also be calculated per map output
Map-1 – (1+4+7)/3 = 12/3 = 4
Map-2 – (8+9)/2 = 17/2 = 8.5
So the average calculated at reduce side will be – (4+8.5)/2 = 12.5/2 = 6.25
Combiner with MapReduce example
Here is a example where combiner is specified while calculating maximum sales figure per item.
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MaxSales extends Configured implements Tool{ // Mapper public static class MaxSalesMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text item = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Splitting the line on tab String[] stringArr = value.toString().split("\t"); item.set(stringArr[0]); Integer sales = Integer.parseInt(stringArr[1]); context.write(item, new IntWritable(sales)); } } // Reducer public static class MaxSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxSalesValue = Integer.MIN_VALUE; for(IntWritable val : values) { maxSalesValue = Math.max(maxSalesValue, val.get()); } result.set(maxSalesValue); context.write(key, result); } } public static void main(String[] args) throws Exception { int exitFlag = ToolRunner.run(new MaxSales(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "MaxSales"); job.setJarByClass(getClass()); job.setMapperClass(MaxSalesMapper.class); // Specifying combiner class job.setCombinerClass(MaxSalesReducer.class); job.setReducerClass(MaxSalesReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
In the displayed counters for the MapReduce job you can see the reduction in number of records passed to reducer.
Map input records=21 Map output records=21 Map output bytes=225 Map output materialized bytes=57 Input split bytes=103 Combine input records=21 Combine output records=4 Reduce input groups=4 Reduce shuffle bytes=57 Reduce input records=4 Reduce output records=4
For comparison here are the counters when the same MapReduce job is run without a Combiner class.
Map input records=21 Map output records=21 Map output bytes=225 Map output materialized bytes=273 Input split bytes=103 Combine input records=0 Combine output records=0 Reduce input groups=4 Reduce shuffle bytes=273 Reduce input records=21 Reduce output records=4 Spilled Records=42
That's all for this topic Using Combiner in Hadoop MapReduce to Improve Performance. If you have any doubt or any suggestions to make please drop a comment. Thanks!
>>>Return to Hadoop Framework Tutorial Page
Related Topics
You may also like-