Codementor Events

Common MapReduce Patterns

Published Apr 30, 2019

This article by Chanchal Singh and Manish Kumar will delve into some of the common MapReduce patterns that will help you work with Hadoop. Chanchal Singh has more than five years of experience in product development and architect design, and Manish Kumar is a technical architect with more than ten years of experience in data management, working as a data architect and product architect.

The design patterns are the solution templates for solving specific problems. Developers can reuse templates for similar problems across domains so that they save time in solving problems. If you are a programmer, you would have used the abstract factory pattern, builder pattern, observer pattern, and so on before. These patterns are discovered by people who have been solving similar problems for many years. The MapReduce framework has existed for almost a decade now. Let's look into a few of the commonly used MapReduce design patterns across industries.

Summarization patterns

Summarization problems use the pattern widely across domains. It's all about grouping similar data together and then performing an operation such as calculating a minimum, maximum, count, average, median-standard deviation, building an index, or just simply counting based on key. For example, we might want to calculate the total amount of money our website has made by country.

As another example, let's say you want to get the average number of times the users log on to our website. One more example can be finding the minimum and maximum number of users by state. MapReduce works with key-value pair. Thus, operations on keys are commonly used operations. The mapper emits the key-value pairs and the values of these keys are aggregated on the reducer. The following are a few commonly used examples of the summarization pattern.

Word count example

Many people who start to learn MapReduce development would have written Word count as their first program. Thus, it is sometimes known as the Hello World program of MapReduce. The basic idea of this program is to show how the MapReduce framework works. The pattern of Word count can be applied to use cases such as counting population by state, counting the total number of crime by state, finding total spending per person, and so on. Let's briefly discuss the word count program with a Mapper, Reducer, and combiner example.

Mapper

The job of a Mapper is to split the record, get each word from record, and emit a value of one with a word. The output key and the output value are of type Text and IntWritable, as shown in the following code:

import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

public static final IntWritable ONE = new IntWritable(1);

@Override
protected void map(LongWritable offset, Text line, Context context)
throws IOException, InterruptedException {
        String[] result = line.toString().split("   ");

for (String word : result) {
            context.write(new Text(word), ONE);
        }
    }
}

Reducer

The MapReduce framework uses partitions to make sure that all the records with the same key always go to the same reducer. The reducer receives the list of values for the key and thus can easily perform aggregated operations such as count and sum, as follows:

import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {

int count = 0;
for (IntWritable current : values) {
            count += current.get();

        }
        context.write(key, new IntWritable(count));
    }

}

Combiner

The combiner would be the same as the Reducer in most of the cases and it can be added to the Driver class with the same class as that of the reducer. The advantage of the combiner is that it works as a mini reducer and runs on the same machine as the mapper, thus reducing the amount of data shuffling. The Driver class of the word count application is as follows:

import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.conf.Configured;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.Hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.Hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.Hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.Hadoop.util.Tool;
import org.apache.Hadoop.util.ToolRunner;

public class Driver extends Configured implements Tool {


public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), (Tool) new Driver(), args);
        System.exit(res);
    }

public int run(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "WordCount");

        job.setJarByClass(Driver.class);


if (args.length < 2) {
            System.out.println("Jar requires 2 paramaters :  \""
+ job.getJar()
                    + " input_path output_path");
return 1;
        }

        job.setMapperClass(WordcountMapper.class);

        job.setReducerClass(WordcountReducer.class);

        job.setCombinerClass(WordcountReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        Path filePath = new Path(args[0]);
        FileInputFormat.setInputPaths(job, filePath);

        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        job.waitForCompletion(true);
return 0;
    }
}

Minimum and maximum

The minimum and maximum calculation for a specific field is a commonly used use case in MapReduce. Once the mapper completes its operation, the reducer simply iterates through all the key values and finds out the minimum and maximum in the key grouping:

Writables: The idea behind writing custom writable was to save extra effort in splitting data at the reducer side and avoiding unnecessary problems that can occur from the delimiter. Most of the time, we choose the delimiter that is already present in the record and then it leads to the incorrect mapping of records with the field.

We will use the following import packages:

import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

The custom Writable class encapsulates the details inside the Writable object, which can be used at the reducer side to fetch values for the records:

public class PlayerDetail implements Writable {
private Text playerName;
private IntWritable score;
private Text opposition;
private LongWritable timestamps;
private IntWritable ballsTaken;
private IntWritable fours;
private IntWritable six;


public void readFields(DataInput dataInput) throws IOException {
playerName.readFields(dataInput);
score.readFields(dataInput);
opposition.readFields(dataInput);
timestamps.readFields(dataInput);
ballsTaken.readFields(dataInput);
fours.readFields(dataInput);
six.readFields(dataInput);

    }

public void write(DataOutput dataOutput) throws IOException {
playerName.write(dataOutput);
score.write(dataOutput);
opposition.write(dataOutput);
timestamps.write(dataOutput);
ballsTaken.write(dataOutput);
fours.write(dataOutput);
playerName.write(dataOutput);

    }

public Text getPlayerName() {
return playerName;
    }

public void setPlayerName(Text playerName) {
this.playerName = playerName;
    }

public IntWritable getScore() {
return score;
    }

public void setScore(IntWritable score) {
this.score = score;
    }

public Text getOpposition() {
return opposition;
    }

public void setOpposition(Text opposition) {
this.opposition = opposition;
    }

public LongWritable getTimestamps() {
return timestamps;
    }

public void setTimestamps(LongWritable timestamps) {
this.timestamps = timestamps;
    }

public IntWritable getBallsTaken() {
return ballsTaken;
    }

public void setBallsTaken(IntWritable ballsTaken) {
this.ballsTaken = ballsTaken;
    }

public IntWritable getFours() {
return fours;
    }

public void setFours(IntWritable fours) {
this.fours = fours;
    }

public IntWritable getSix() {
return six;
    }

public void setSix(IntWritable six) {
this.six = six;
    }

@Override
public String toString() {
return playerName +
"\t" + score +
"\t" + opposition +
"\t" + timestamps +
"\t" + ballsTaken +
"\t" + fours +
"\t" + six;
    }

}

We will import the following packages and implement the custom Writable class:

import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class PlayerReport implements Writable {

private Text playerName;
private IntWritable maxScore;
private Text maxScoreopposition;
private IntWritable minScore;
private Text minScoreopposition;


public void write(DataOutput dataOutput) throws IOException {
playerName.write(dataOutput);
maxScore.write(dataOutput);
maxScoreopposition.write(dataOutput);
minScore.write(dataOutput);
minScoreopposition.write(dataOutput);

    }

public void readFields(DataInput dataInput) throws IOException {
playerName.readFields(dataInput);
maxScore.readFields(dataInput);
maxScoreopposition.readFields(dataInput);
minScore.readFields(dataInput);
minScoreopposition.readFields(dataInput);

    }

public Text getPlayerName() {
return playerName;
    }

public void setPlayerName(Text playerName) {
this.playerName = playerName;
    }

public IntWritable getMaxScore() {
return maxScore;
    }

public void setMaxScore(IntWritable maxScore) {
this.maxScore = maxScore;
    }

public Text getMaxScoreopposition() {
return maxScoreopposition;
    }

public void setMaxScoreopposition(Text maxScoreopposition) {
this.maxScoreopposition = maxScoreopposition;
    }

public IntWritable getMinScore() {
return minScore;
    }

public void setMinScore(IntWritable minScore) {
this.minScore = minScore;
    }

public Text getMinScoreopposition() {
return minScoreopposition;
    }

public void setMinScoreopposition(Text minScoreopposition) {
this.minScoreopposition = minScoreopposition;
    }

@Override
public String toString() {
return playerName +
"\t" + maxScore +
"\t" + maxScoreopposition +
"\t" + minScore +
"\t" + minScoreopposition;
    }
}

Mapper class: The Mapper class in the MinMax algorithm maps the record with the custom writable object and emits the record for each player using the player name as key and PlayerDetail as value, as follows:

import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;

public class MinMaxMapper extends
Mapper<LongWritable, Text, Text, PlayerDetail> {

private PlayerDetail playerDetail = new PlayerDetail();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String[] player = value.toString().split(",");

playerDetail.setPlayerName(new Text(player[0]));
playerDetail.setScore(new IntWritable(Integer.parseInt(player[1])));
playerDetail.setOpposition(new Text(player[2]));
playerDetail.setTimestamps(new LongWritable(Long.parseLong(player[3])));
playerDetail.setBallsTaken(new IntWritable(Integer.parseInt(player[4])));
playerDetail.setFours(new IntWritable(Integer.parseInt(player[5])));
playerDetail.setSix(new IntWritable(Integer.parseInt(player[6])));

        context.write(playerDetail.getPlayerName(), playerDetail);

    }
}

Reducer class: The Reducer is responsible for calculating the minimum and maximum scores of each individual by iterating through the list of records of players and emit the record using the PlayerReport writable object, as follows:

import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Reducer;
import java.io.IOException;

public class MinMaxReducer extends Reducer<Text, PlayerDetail, Text, PlayerReport> {

    PlayerReport playerReport = new PlayerReport();

@Override
protected void reduce(Text key, Iterable<PlayerDetail> values, Context context) throws IOException, InterruptedException {
playerReport.setPlayerName(key);
playerReport.setMaxScore(new IntWritable(0));
playerReport.setMinScore(new IntWritable(0));
for (PlayerDetail playerDetail : values) {
int score = playerDetail.getScore().get();
if (score > playerReport.getMaxScore().get()) {
playerReport.setMaxScore(new IntWritable(score));
playerReport.setMaxScoreopposition(playerDetail.getOpposition());
            }
if (score < playerReport.getMaxScore().get()) {
playerReport.setMinScore(new IntWritable(score));
playerReport.setMinScoreopposition(playerDetail.getOpposition());
            }
            context.write(key, playerReport);
        }
    }
}

Driver class: The Driver class provides the basic configuration to run MapReduce applications and defines the protocol that cannot be violated by the MapReduce framework. For example, the Driver class mentions the output key class as IntWritable and the value as text, but the reducer tries to emit the key as text and the value as IntWritable. Due to this, the job will fail and an error will be thrown, as follows:

import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.Hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.Hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.Hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.Hadoop.util.Tool;
import org.apache.Hadoop.util.ToolRunner;

public class MinMaxDriver {


public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), (Tool) new MinMaxDriver(), args);
        System.exit(res);
    }

public int run(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "MinMax");

        job.setJarByClass(MinMaxDriver.class);


if (args.length < 2) {
            System.out.println("Jar requires 2 paramaters :  \""
+ job.getJar()
                    + " input_path output_path");
return 1;
        }

        job.setMapperClass(MinMaxMapper.class);

        job.setReducerClass(MinMaxReducer.class);

        job.setCombinerClass(MinMaxReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PlayerReport.class);
        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        Path filePath = new Path(args[0]);
        FileInputFormat.setInputPaths(job, filePath);

        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        job.waitForCompletion(true);
return 0;
    }
}

Filtering patterns
The filtering pattern is simply filtering out records based on a particular condition. Data cleansing is one of the commonly used examples of a filtering pattern. The raw data may have records in which a few fields are not present or it's just junk that we cannot use in further analysis.

Filtering logic can be used to validate each record and remove any junk records. The other example could be web article filtering based on particular word/regex matches. These web articles can be further used in classification, tagging, or machine learning use cases. The other use case could be filtering out all the customers who do not buy anything that is more than 500 dollars in value and then process it further for any other analysis. Let's look at the following regex filtering example:

import org.apache.Hadoop.io.NullWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;

import java.io.IOException;

public class RegexFilteringMapper extends Mapper<Object, Text, NullWritable, Text> {

private String regexPattern = "/* REGEX PATTERN HERE */";

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

if (value.toString().matches(regexPattern)) {
            context.write(NullWritable.get(), value);
        }
    }
}

The other example could be random sampling of data, which is required in many use cases such as data for testing applications, training machine learning models, and so on. The other common use case is to find out top-k records based on a specific condition. In most organizations, it is important to find out the outliers/customers who are genuinely loyal to the merchant and offer them good rewards or to find out about customers who have not used the application for a long time and offer them a good discount to get them to re-engage.

There are many more design patterns available in MapReduce which you can explore. Hope you found this article interesting; you can refer to Mastering Hadoop 3 as a comprehensive guide to mastering the most advanced Hadoop 3 concepts. Mastering Hadoop 3 will help you learn how Hadoop works internally, study advanced concepts of different ecosystem tools, discover solutions to real-world use cases, and understand how to secure your cluster.

Discover and read more posts from PACKT
get started
post commentsBe the first to share your opinion
Show more replies