top of page
  • Twitter Social Icon
  • LinkedIn Social Icon
  • Facebook Social Icon

MapReduce Job Chaining with Bigram Count and Sorting Data elements

  • Writer: anydataflow
    anydataflow
  • May 9, 2020
  • 4 min read

Updated: Jul 20, 2021

Bigram Count Program with Sorting data using Comparator code will be shown in this blog with details explanation.


As of now we have seen lot's of example of wordcount MapReduce which is mostly used to explain how MapReduce works in hadoop and how it use the hadoop distributed file system. Here we are going to see next level of WordCount program in term of "BIGRAM"(combination of 2 word's) count.


What is BIGRAM: Bigram is sequences of two consecutive words.

Here we use MapReduce program in JAVA programming


Approach:


  1. We need to write total 5 program where 2 Map Program, 2 Reduce Pragram and 1 Driver program will be there.

  2. 1st program will be a Map, which will use String Tokenizer to break string in words, and there we will use loop to identify bigram. After that we will write (bigram,1) as output.

  3. 2nd program will be a Reducer, which will iterate Map's output and will sum similar bigram and then will save it in "temp" location on hdfs as (bigram,sum).

  4. 3rd program will be again a Map, which will read the output of 2nd program and parse it as (key,value) pair and swap it and output will be (value,word), so that next program can sort as per value and can result sorted WordCount results.

  5. 4th program will be again a Reducer which will use 3rd program as input(value,key) means (count,bigram) and will internally call the comparator to sort as per value and result will be generated as (bigram,count) in descending order of count, since in comparator we are writing descending order code and calling in driver(5th program)

  6. 5th program will be a Driver program which is responsible to start the program and chain them. Also if there is any sorting technique implement then to call them as well. See the code:


The jar file WordCount-1.0-SNAPSHOT.jar consist of;

  • 2 Mappers

  • 2 Reducers

  • 1 Driver

  • 1 Comparator Class


WordCountMapper.java

package com.anydataflow.bigram;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    IntWritable count = new IntWritable(1);
    Text word = new Text();


    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        String prev = null;
        StringTokenizer itr = new StringTokenizer(line.toLowerCase()," \t.!?:()[],'&-;|0123456789");

        while(itr.hasMoreTokens()){

            String str = itr.nextToken();

            if(((!str.equals("")) && (str != null) && (str.matches("^[a-zA-Z]*$")))){

                if (prev != null) {
                    word.set(prev + " " + str);
                    context.write(word, count);
                }
                prev = str;
            }

            // Emit only if we have an actual bigram.

        }
    }
}

WordCountReducer.java

package com.anydataflow.bigram;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {


    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count=0;
        for (IntWritable value : values) {
            count += value.get();

        }
        context.write(key,new IntWritable(count));
    }
}




WordCountMapper2.java

package com.anydataflow.bigram;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper2 extends Mapper< Text, Text, IntWritable, Text> {

    IntWritable frequency = new IntWritable();

    @Override
    public void map(Text key, Text value, Context context)
            throws IOException, InterruptedException {

        int newVal = Integer.parseInt(value.toString());
        frequency.set(newVal);
        context.write(frequency, key);
    }
}


WordCountReducer2.java

package com.anydataflow.bigram;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer2 extends Reducer<IntWritable, Text, IntWritable, Text> {

    Text word = new Text();

    @Override
    public void reduce(IntWritable key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        for (Text value : values) {
            word = value;
            context.write(key, word);
        }
    }
}

WordCountDriver.java

package com.anydataflow.bigram;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapred.jobcontrol.JobControl;

public class WordCountDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception
    {
        JobControl jobControl = new JobControl("jobChain");
        Configuration conf1 = getConf();

        Job job1 = Job.getInstance(conf1);
        job1.setJarByClass(WordCountDriver.class);
        job1.setJobName("bigram count");

        FileInputFormat.setInputPaths(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path(args[1] + "/temp"));

        job1.setMapperClass(WordCountMapper.class);
        job1.setReducerClass(WordCountReducer.class);
        job1.setCombinerClass(WordCountReducer.class);

        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);

        ControlledJob controlledJob1 = new ControlledJob(conf1);
        controlledJob1.setJob(job1);

        jobControl.addJob(controlledJob1);
        Configuration conf2 = getConf();

        Job job2 = Job.getInstance(conf2);
        job2.setJarByClass(WordCountDriver.class);
        job2.setJobName("bigram descending");

        FileInputFormat.setInputPaths(job2, new Path(args[1] + "/temp"));
        FileOutputFormat.setOutputPath(job2, new Path(args[1] + "/output"));

        job2.setMapperClass(WordCountMapper2.class);
        job2.setReducerClass(WordCountReducer2.class);
        job2.setCombinerClass(WordCountReducer2.class);

        job2.setOutputKeyClass(IntWritable.class);
        job2.setOutputValueClass(Text.class);
        job2.setInputFormatClass(KeyValueTextInputFormat.class);

        job2.setSortComparatorClass(IntComparator.class);
        ControlledJob controlledJob2 = new ControlledJob(conf2);
        controlledJob2.setJob(job2);

        // make job2 dependent on job1
        controlledJob2.addDependingJob(controlledJob1);
        // add the job to the job control
        jobControl.addJob(controlledJob2);
        Thread jobControlThread = new Thread(jobControl);
        jobControlThread.start();

        while (!jobControl.allFinished()) {
            System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
            System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
            System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
            System.out.println("Jobs in success state: " + jobControl.getSuccessfulJobList().size());
            System.out.println("Jobs in failed state: " + jobControl.getFailedJobList().size());
            try {
                Thread.sleep(5000);
            } catch (Exception e) {

            }

        }
        System.exit(0);
        return (job1.waitForCompletion(true) ? 0 : 1);
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new WordCountDriver(), args);
        System.exit(exitCode);
    }

}

IntComparator.java

package com.anydataflow.bigram;

import java.nio.ByteBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparator;

public class IntComparator extends WritableComparator {

    public IntComparator() {
        super(IntWritable.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2,
                       int s2, int l2) {
        Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
        Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();
        return v1.compareTo(v2) * (-1);
    }
}

Run the jar on hadoop:

hadoop jar WordCount-1.0-SNAPSHOT.jar com.anydataflow.example.WordCountDriver /input-data.txt /output

Output:


hadoop fs -cat /output/output/part-r-00000 |head -n10
461     of the
423     to be
378     in the
252     of her
246     to the
242     mr darcy
241     i am
235     it was
216     of his
204     she was

Hope this post is use full, Next we are posting to analyse these result and find multiple matrix out of it.

Comparator is a strong hadoop mapreduce interview question as well.


Thanks for visiting.







 
 
 

Comments


SIGN UP AND STAY UPDATED!

Thanks for submitting!

  • Grey Twitter Icon
  • Grey LinkedIn Icon
  • Grey Facebook Icon

© 2023 by Talking Business.  Proudly created with anydataflow.com

bottom of page