Sometimes when you are running a MapReduce job your Map task and (or) reduce task may require some extra data in terms of a file, a jar or a zipped file in order to do their processing. In such scenarios you can use Distributed cache in Hadoop MapReduce.
What is distributed cache
Distributed cache in Hadoop provides a mechanism to copy files, jars or archives to the nodes where map and reduce tasks are running. Initially the specified file is cached to the HDFS, once the task is about to run Node manager copies the cached file to the local disk of the node where task is running.
How to use Distributed cache in Hadoop
Earlier DistributedCache class methods were used to add files to distributed cache but that whole class is deprecated now.
You need to use methods in Job class to add the files to distributed cache, methods that can be used are as follows.
- public void addCacheArchive(URI uri)- This method is used to add an archive to be localized. Archived file will be unarchived on the node where task is running.
- public void addCacheFile(URI uri)- This method is used to add a file to be localized.
- public void setCacheArchives(URI[] archives)- Set the given set of archives that need to be localized.
- public void setCacheFiles(URI[] files)- Set the given set of files that need to be localized.
- public void addFileToClassPath(Path file)- This method adds a file path to the current set of classpath entries. It adds the file to cache as well.
- public void addArchiveToClassPath(Path archive)- This method adds an archive path to the current set of classpath entries. It adds the archive to cache as well.
If you are using GenericOptionsParser and ToolRunner in MapReduce code then you can pass the files to be added to the distributed cache through command line too.
Another advantage you get by using GenericOptionsParser to add a file is that you can add file from local file system too. With methods in job class, file has to be in shared file system which is one of the difference between these two options of adding files to a distributed cache in Hadoop.
If you are using Java API (Job class methods) to add file to distributed cache in Hadoop then you have to ensure that the file is copied to HDFS. Then you can use the relevant method (based on file, jar or archive).
For example - If you are adding a text file to distributed cache then you can use the following method call
job.addCacheFile(new URI("/test/input/abc.txt#abc"));
Here #abc creates a symbolic link to the file and using this name (abc in this case) you can access the cached file in the task nodes.
If you are adding a jar to the class path then you can use the following method call
job.addFileToClassPath(new Path("/test/MyTestLib.jar"));
If you are adding a .zip archive file to distributed cache then you can use the following method call.
job.addCacheArchive(new URI("/test/input/abc.zip"));
Distributed cache with MapReduce example
Suppose you have data in the following format -
Item1 345 DEL Item1 205 KOL Item3 654 BLR Item3 453 KOL Item2 350 MUM Item1 122 DELWhat is needed is to find Total sales per city but in the end file, you want the full name of the City in the following form, not the three letter city code .
Item1 Delhi 467In this scenario you can add a properties file to distributed cache which has the following form.
DEL=Delhi KOL=Kolkata
In the reducer you can get this properties file from the distributed cache and replace the city code with full name by referring the properties file. Getting the file from distributed cache and loading it into properties instance will be done in the setup() method of the Reducer.
MapReduce code
import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class TotalSalesWithDC extends Configured implements Tool{ // Mapper public static class TotalSalesMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text item = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Splitting the line on tab String[] stringArr = value.toString().split("\t"); item.set(stringArr[0] + " " + stringArr[2]); Integer sales = Integer.parseInt(stringArr[1]); context.write(item, new IntWritable(sales)); } } // Reducer public static class TotalSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private Properties cityProp = new Properties(); private Text cityKey = new Text(); private IntWritable result = new IntWritable(); @Override protected void setup(Context context) throws IOException, InterruptedException { // That's where file stored in distributed cache is used InputStream iStream = new FileInputStream("./city"); //Loading properties cityProp.load(iStream); } public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; String[] stringArr = key.toString().split(" "); // Getting the city name from prop file String city = cityProp.getProperty(stringArr[1].trim()); cityKey.set(stringArr[0] + "\t"+ city); for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(cityKey, result); } } public static void main(String[] args) throws Exception { int exitFlag = ToolRunner.run(new TotalSalesWithDC(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "TotalSales"); job.setJarByClass(getClass()); job.setMapperClass(TotalSalesMapper.class); job.setReducerClass(TotalSalesReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // Adding file to distributed cache job.addCacheFile(new URI("/test/input/City.properties#city")); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
That's all for this topic Distributed Cache in Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!
>>>Return to Hadoop Framework Tutorial Page
Related Topics
You may also like-