In this post we’ll see how to use Avro file with Hadoop MapReduce.
Avro MapReduce jar
You will need to download following jar and put it into project’s class path.
avro-mapred-1.8.2.jar
Avro MapReduce example
In this MapReduce program we have to get total sales per item and the output of MapReduce is an Avro file. Records are in the given tab separated format.
Item1 345 zone-1 Item1 234 zone-2 Item3 654 zone-2 Item2 231 zone-3
Avro schema (SALES_SCHEMA) used in the program is inlined with in the MapReduce code. While creating output record this Avro schema is referred as given below-
GenericRecord record = new GenericData.Record(SALES_SCHEMA);
- Refer ToolRunner and GenericOptionsParser in Hadoop to see an example of passing Avro schema as a command line argument.
import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AvroMR extends Configured implements Tool{ /// Schema private static final Schema SALES_SCHEMA = new Schema.Parser().parse( "{\n" + " \"type\": \"record\",\n" + " \"name\": \"SalesRecord\",\n" + " \"doc\": \"Sales Records\",\n" + " \"fields\":\n" + " [\n" + " {\"name\": \"item\", \"type\": \"string\"},\n"+ " {\"name\": \"totalsales\", \"type\": \"int\"}\n"+ " ]\n"+ "}\n"); //Mapper public static class ItemMapper extends Mapper<LongWritable, Text, AvroKey<Text>, AvroValue<GenericRecord>>{ private Text item = new Text(); private GenericRecord record = new GenericData.Record(SALES_SCHEMA); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //splitting record String[] salesArr = value.toString().split("\t"); item.set(salesArr[0]); record.put("item", salesArr[0]); record.put("totalsales", Integer.parseInt(salesArr[1])); context.write(new AvroKey<Text>(item), new AvroValue<GenericRecord>(record)); } } // Reducer public static class SalesReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable>{ public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException { int sum = 0; for (AvroValue<GenericRecord> value : values) { GenericRecord record = value.datum(); sum += (Integer)record.get("totalsales"); } GenericRecord record = new GenericData.Record(SALES_SCHEMA); record.put("item", key.datum()); record.put("totalsales", sum); context.write(new AvroKey<GenericRecord>(record), NullWritable.get()); } } public static void main(String[] args) throws Exception{ int exitFlag = ToolRunner.run(new AvroMR(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "AvroMR"); job.setJarByClass(getClass()); job.setMapperClass(ItemMapper.class); job.setReducerClass(SalesReducer.class); AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING)); AvroJob.setMapOutputValueSchema(job, SALES_SCHEMA); AvroJob.setOutputKeySchema(job, SALES_SCHEMA); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(AvroKeyOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
Running the MapReduce code using the following command.
hadoop jar /home/netjs/netjshadoop.jar org.netjs.AvroMR /test/input/sales.txt /test/out/sales
That creates an Avro file as output, to see the content of the output file you can use the following command.
hadoop jar /PATH_TO_JAR/avro-tools-1.8.2.jar tojson /test/out/sales/part-r-00000.avro
That's all for this topic Using Avro File With Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!
>>>Return to Hadoop Framework Tutorial Page
Related Topics
You may also like-