In this post we’ll see how you can read and write a sequence file using Java API in Hadoop. We’ll also see how to read and write sequence file using MapReduce.
Java program to write a sequence file
Using the createWriter() method of the SeqeunceFile you can get a writer that can then be used to write a SequenceFile. In this Java program a file from local file system is written as a SequenceFile into HDFS.
import java.io.File; import java.io.IOException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; public class FileWriter { public static void main(String[] args) { Configuration conf = new Configuration(); int i =0; try { FileSystem fs = FileSystem.get(conf); // path to input file – in local file system File file = new File("netjs/Hadoop/Data/log.txt"); // Path for output file Path outFile = new Path(args[0]); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { // creating writer writer = SequenceFile.createWriter(conf, Writer.file(outFile), Writer.keyClass(key.getClass()), Writer.valueClass(value.getClass()), Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec())); for (String line : FileUtils.readLines(file)) { key.set(i++); value.set(line); writer.append(key, value); } }finally { if(writer != null) { writer.close(); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.
export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/binThen you can run the Java program using the following command.
hadoop org.netjs.FileWriter /user/out/data.seq
Java program to read a sequence file
To read a SequenceFile using Java API in Hadoop create an instance of SequenceFile.Reader. Using that reader instance you can iterate the (key, value) pairs in the SequenceFile using the next() method.
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Reader; import org.apache.hadoop.io.Text; public class FileReader { public static void main(String[] args) { Configuration conf = new Configuration(); try { Path inFile = new Path(args[0]); SequenceFile.Reader reader = null; try { IntWritable key = new IntWritable(); Text value = new Text(); reader = new SequenceFile.Reader(conf, Reader.file(inFile), Reader.bufferSize(4096)); while(reader.next(key, value)) { System.out.println("Key " + key + "Value " + value); } }finally { if(reader != null) { reader.close(); } } } catch (IOException e) { // TODO Auto-generated catch bloc e.printStackTrace(); } } }Then you can read the previously written SequenceFile using the following command.
hadoop org.netjs.FileReader /user/out/data.seq
MapReduce job to write a SequenceFile
If you have a very big file and you want to take advantage of parallel processing then you can also use MapReduce to write a sequence file. Only change that is required is to set Output format class as SequenceFileOutputFormat. Also set number of reducers as 0 since you need Mapper only job.
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FileWriter extends Configured implements Tool{ // Map function public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static void main(String[] args) throws Exception{ int flag = ToolRunner.run(new FileWriter(), args); System.exit(flag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "seqfilewrite"); job.setJarByClass(FileWriter.class); job.setMapperClass(SFMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // If you want to compress FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); int returnFlag = job.waitForCompletion(true) ? 0 : 1; return returnFlag; } }
MapReduce job to read a SequenceFile
In this case set Input format class as SequenceFileInputFormat.
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FileReader extends Configured implements Tool{ // Map function public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static void main(String[] args) throws Exception{ int flag = ToolRunner.run(new FileReader(), args); System.exit(flag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "seqfileread"); job.setJarByClass(FileReader.class); job.setMapperClass(SFMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); int returnFlag = job.waitForCompletion(true) ? 0 : 1; return returnFlag; } }
That's all for this topic How to Read And Write SequenceFile in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!
>>>Return to Hadoop Framework Tutorial Page
Related Topics
You may also like-
No comments:
Post a Comment