This post shows how to convert existing data to Parquet file format using MapReduce in Hadoop. In the example given here Text file is converted to Parquet file.
You will need to put following jars in class path in order to read and write Parquet files in Hadoop.
- parquet-hadoop-bundle-1.10.0.jar
- parquet-avro-1.10.0.jar
- jackson-mapper-asl-1.9.13.jar
- jackson-core-asl-1.9.13.jar
- avro-1.8.2.jar
Using Avro to define schema
Rather than creating Parquet schema directly Avro framework is used to create schema as it is more convenient. Then you can use Avro API classes to write and read files respectively. The mapping between Avro and Parquet schema and mapping between Avro record to Parquet record will be taken care of by these classes itself.
MapReduce code to convert file to Parquet format file
In the code Avro schema is defined inline. Program uses Avro genric API to create generic record. Also it’s a Mapper only job as just conversion is required, records are not aggregated.
import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.parquet.avro.AvroParquetOutputFormat; import org.apache.parquet.example.data.Group; public class ParquetConvert extends Configured implements Tool{ /// Schema private static final Schema MAPPING_SCHEMA = new Schema.Parser().parse( "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TextFile\",\n" + " \"doc\": \"Text File\",\n" + " \"fields\":\n" + " [\n" + " {\"name\": \"line\", \"type\": \"string\"}\n"+ " ]\n"+ "}\n"); // Map function public static class ParquetConvertMapper extends Mapper<LongWritable, Text, Void, GenericRecord> { private GenericRecord record = new GenericData.Record(MAPPING_SCHEMA); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { record.put("line", value.toString()); context.write(null, record); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "ParquetConvert"); job.setJarByClass(getClass()); job.setMapperClass(ParquetConvertMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Void.class); job.setOutputValueClass(Group.class); job.setOutputFormatClass(AvroParquetOutputFormat.class); // setting schema AvroParquetOutputFormat.setSchema(job, MAPPING_SCHEMA); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception{ int exitFlag = ToolRunner.run(new ParquetConvert(), args); System.exit(exitFlag); } }On runnning the MapReduce code using the following command
hadoop jar /PATH_TO_JAR org.netjs.ParquetConvert /test/input /test/outputYou can see that the Parquet file is written at the output location.
hdfs dfs -ls /test/output Found 4 items -rw-r--r-- 1 netjs supergroup 0 2018-07-06 09:54 /test/output/_SUCCESS -rw-r--r-- 1 netjs supergroup 276 2018-07-06 09:54 /test/output/_common_metadata -rw-r--r-- 1 netjs supergroup 429 2018-07-06 09:54 /test/output/_metadata -rw-r--r-- 1 netjs supergroup 646 2018-07-06 09:54 /test/output/part-m-00000.parquet
Reading Parquet file using MapReduce
The following MapReduce program takes Parquet file as input and output a text file. In the Parquet file the records are in following format, so you need to write appropriate logic to extract the relevant part.
{"line": "Hello wordcount MapReduce Hadoop program."} {"line": "This is my first MapReduce program."} {"line": "This file will be converted to Parquet using MR."}
import java.io.IOException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.example.ExampleInputFormat; public class ParquetRead extends Configured implements Tool{ // Map function public static class ParquetMapper extends Mapper<NullWritable, Group, NullWritable, Text> { public void map(NullWritable key, Group value, Context context) throws IOException, InterruptedException { NullWritable outKey = NullWritable.get(); String line = value.toString(); String[] fields = line.split(": "); context.write(outKey, new Text(fields[1])); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "ParquetRead"); job.setJarByClass(getClass()); job.setMapperClass(ParquetMapper.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(ExampleInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception{ int exitFlag = ToolRunner.run(new ParquetRead(), args); System.exit(exitFlag); } }If you want to read back the data you got using the writing to Parquet MapReduce program you can use the following command.
hadoop jar /PATH_TO_JAR org.netjs.ParquetRead /test/output/part-m-00000.parquet /test/out
That's all for this topic Converting Text File to Parquet File Using Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!
>>>Return to Hadoop Framework Tutorial Page
Related Topics
You may also like-
No comments:
Post a Comment