MapReduce Job Chaining with Bigram Count and Sorting Data elements
- anydataflow
- May 9, 2020
- 4 min read
Updated: Jul 20, 2021
Bigram Count Program with Sorting data using Comparator code will be shown in this blog with details explanation.
As of now we have seen lot's of example of wordcount MapReduce which is mostly used to explain how MapReduce works in hadoop and how it use the hadoop distributed file system. Here we are going to see next level of WordCount program in term of "BIGRAM"(combination of 2 word's) count.
What is BIGRAM: Bigram is sequences of two consecutive words.
Here we use MapReduce program in JAVA programming
Approach:
We need to write total 5 program where 2 Map Program, 2 Reduce Pragram and 1 Driver program will be there.
1st program will be a Map, which will use String Tokenizer to break string in words, and there we will use loop to identify bigram. After that we will write (bigram,1) as output.
2nd program will be a Reducer, which will iterate Map's output and will sum similar bigram and then will save it in "temp" location on hdfs as (bigram,sum).
3rd program will be again a Map, which will read the output of 2nd program and parse it as (key,value) pair and swap it and output will be (value,word), so that next program can sort as per value and can result sorted WordCount results.
4th program will be again a Reducer which will use 3rd program as input(value,key) means (count,bigram) and will internally call the comparator to sort as per value and result will be generated as (bigram,count) in descending order of count, since in comparator we are writing descending order code and calling in driver(5th program)
5th program will be a Driver program which is responsible to start the program and chain them. Also if there is any sorting technique implement then to call them as well. See the code:
The jar file WordCount-1.0-SNAPSHOT.jar consist of;
2 Mappers
2 Reducers
1 Driver
1 Comparator Class
WordCountMapper.java
package com.anydataflow.bigram;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
IntWritable count = new IntWritable(1);
Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String prev = null;
StringTokenizer itr = new StringTokenizer(line.toLowerCase()," \t.!?:()[],'&-;|0123456789");
while(itr.hasMoreTokens()){
String str = itr.nextToken();
if(((!str.equals("")) && (str != null) && (str.matches("^[a-zA-Z]*$")))){
if (prev != null) {
word.set(prev + " " + str);
context.write(word, count);
}
prev = str;
}
// Emit only if we have an actual bigram.
}
}
}
WordCountReducer.java
package com.anydataflow.bigram;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key,new IntWritable(count));
}
}
WordCountMapper2.java
package com.anydataflow.bigram;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper2 extends Mapper< Text, Text, IntWritable, Text> {
IntWritable frequency = new IntWritable();
@Override
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
int newVal = Integer.parseInt(value.toString());
frequency.set(newVal);
context.write(frequency, key);
}
}
WordCountReducer2.java
package com.anydataflow.bigram;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer2 extends Reducer<IntWritable, Text, IntWritable, Text> {
Text word = new Text();
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
word = value;
context.write(key, word);
}
}
}
WordCountDriver.java
package com.anydataflow.bigram;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
public class WordCountDriver extends Configured implements Tool {
public int run(String[] args) throws Exception
{
JobControl jobControl = new JobControl("jobChain");
Configuration conf1 = getConf();
Job job1 = Job.getInstance(conf1);
job1.setJarByClass(WordCountDriver.class);
job1.setJobName("bigram count");
FileInputFormat.setInputPaths(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1] + "/temp"));
job1.setMapperClass(WordCountMapper.class);
job1.setReducerClass(WordCountReducer.class);
job1.setCombinerClass(WordCountReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
ControlledJob controlledJob1 = new ControlledJob(conf1);
controlledJob1.setJob(job1);
jobControl.addJob(controlledJob1);
Configuration conf2 = getConf();
Job job2 = Job.getInstance(conf2);
job2.setJarByClass(WordCountDriver.class);
job2.setJobName("bigram descending");
FileInputFormat.setInputPaths(job2, new Path(args[1] + "/temp"));
FileOutputFormat.setOutputPath(job2, new Path(args[1] + "/output"));
job2.setMapperClass(WordCountMapper2.class);
job2.setReducerClass(WordCountReducer2.class);
job2.setCombinerClass(WordCountReducer2.class);
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
job2.setSortComparatorClass(IntComparator.class);
ControlledJob controlledJob2 = new ControlledJob(conf2);
controlledJob2.setJob(job2);
// make job2 dependent on job1
controlledJob2.addDependingJob(controlledJob1);
// add the job to the job control
jobControl.addJob(controlledJob2);
Thread jobControlThread = new Thread(jobControl);
jobControlThread.start();
while (!jobControl.allFinished()) {
System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
System.out.println("Jobs in success state: " + jobControl.getSuccessfulJobList().size());
System.out.println("Jobs in failed state: " + jobControl.getFailedJobList().size());
try {
Thread.sleep(5000);
} catch (Exception e) {
}
}
System.exit(0);
return (job1.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WordCountDriver(), args);
System.exit(exitCode);
}
}
IntComparator.java
package com.anydataflow.bigram;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparator;
public class IntComparator extends WritableComparator {
public IntComparator() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2,
int s2, int l2) {
Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();
return v1.compareTo(v2) * (-1);
}
}
Run the jar on hadoop:
hadoop jar WordCount-1.0-SNAPSHOT.jar com.anydataflow.example.WordCountDriver /input-data.txt /output
Output:
hadoop fs -cat /output/output/part-r-00000 |head -n10
461 of the
423 to be
378 in the
252 of her
246 to the
242 mr darcy
241 i am
235 it was
216 of his
204 she was
Hope this post is use full, Next we are posting to analyse these result and find multiple matrix out of it.
Comparator is a strong hadoop mapreduce interview question as well.
Thanks for visiting.
Comments