MapReduce初探

这学期学了陈世敏老师的课,编程小作业都是用Java写的,俺作为一个曾经只会Java语言(现在都忘了),被迫拾起,发现还是蛮不错的,这里记录一下作业和mapreduce。

时隔一年再看到这篇记录,感概万千呐~

简介

mapreduce是典型的大数据运算系统,hdfs是大数据存储系统。

整体思路是程序员写串行程序,由系统完成并行分布式地执行:

  • 程序员保证串行程序的正确性
  • 系统负责并行分布执行的正确性和效率

这样存在的问题是,虽然直接进行并行分布式编程,可以完成各种各样丰富的功能,但对程序员要求非常高。如果用编程模型的话,则会对功能进行限制,所以需要选择具有代表性的编程模型,并且可以拓展。

知乎上看到一个段子,很有意思

Map-Shuffle-Reduce编程

Map

输入是一个key-value记录:<ik, iv>

输出是0~多个key-value记录:<mk, mv>

ik和mk可能完全不同,其中i为input,m为intermedia

这里Map函数会被每一条记录所调用

Combiner

Combiner可以近似的理解为partial reducer,主要是在hdfs的每个节点(对应hdfs)上先对局部数据进行一次reduce,降低reducer压力,所以其代码往往和reduce非常接近,往往一致

combine的输入格式与reduce的输入格式一致

combine的输出格式与map的输出格式一致

需要注意的是combine当集群负载量很大时,combine不会执行

截屏2020-04-08下午10.29.08

上图是一个简化的Mapper类,一般我们在setup里做一些起始操作,cleanup做一些收尾操作,map是主要操作。

Partition

在Shuffle过程中按照Key值将中间结果分成R份,其中每份都有一个Reduce去负责,可以通过job.setPartitionerClass()方法进行设置,默认的使用hashPartitioner类,实现getPartition函数。

可以认为Partition分区为Shuffle的一部分。

Shuffle(系统实现)

Shuffle可以近似的认为是group by mk

map的结果是key-value对,Shuffle会将相同的key的value组成Iterable,传给reduce

Reduce

输入是一个mk和与之对应的所有mv

输出是0~多个key-value记录:<ok, ov>

截屏2020-04-08下午10.29.21

上图是一个简化的Reducer类,结果与Mapper类似

整体流程

20150613171133300

WordCount代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Modified by Shimin Chen to demonstrate functionality for Homework 2
// April-May 2015

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

// This is the Mapper class
// reference: http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/mapreduce/Mapper.html
//
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumCombiner
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

// This is the Reducer class
// reference http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/mapreduce/Reducer.html
//
// We want to control the output format to look at the following:
//
// count of word = count
//
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,Text> {

private Text result_key= new Text();
private Text result_value= new Text();
private byte[] prefix;
private byte[] suffix;

protected void setup(Context context) {
try {
prefix= Text.encode("count of ").array();
suffix= Text.encode(" =").array();
} catch (Exception e) {
prefix = suffix = new byte[0];
}
}

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}

// generate result key
result_key.set(prefix);
result_key.append(key.getBytes(), 0, key.getLength());
result_key.append(suffix, 0, suffix.length);

// generate result value
result_value.set(Integer.toString(sum));

context.write(result_key, result_value);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}

Job job = Job.getInstance(conf, "word count");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumCombiner.class);
job.setReducerClass(IntSumReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// add the input paths as given by command line
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}

// add the output path as given by the command line
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

其中,Text,IntWritable等类型是MapReduce对Java中String,int的封装,起到一个串行化buffer的效果。

这里combine和reduce不同,可以在reduce对输出的格式按要求进行修改。

基于WordCount的一个作业

截屏2020-04-08下午10.41.42
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
* @author hezhengjie
* @version 1.0
* */
public class Hw2Part1 {

/**
* Judge if the order key is number
* @param str This is the first key of the whole rows
* */
public static boolean isNumericzidai(String str) {
Pattern pattern = Pattern.compile("-?[0-9]+(\\.[0-9]+)?");
Matcher isNum = pattern.matcher(str);
return isNum.matches();
}

/**
* map method
* This method will return "1 time"
* */
public static class TokenizerMapper
extends Mapper<Object, Text, Text, Text>{ //Inputkey, Inputvalue, Outpurkey, Outputvalue

private Text aimKey = new Text();
private Text aimValue = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(), "\n");

while (itr.hasMoreTokens()) {
// StringTokenizer eachData = new StringTokenizer(itr.nextToken());

String[] dataList = itr.nextToken().trim().split("\\s+");
if(dataList.length!=3 || !isNumericzidai(dataList[2])) {
continue;
}

aimKey.set(dataList[0] + " " + dataList[1]);
aimValue.set("1" + " " + dataList[2]); //It's the time
context.write(aimKey, aimValue);
}//while
}//map
}//TokenizerMapper

/**
* combine method
* all the params are Text
* */
public static class TimeAvgCombiner
extends Reducer<Text,Text,Text,Text> {
private Text result = new Text();

public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
int count = 0;
double timeSum = 0.0;
for (Text val : values) {
StringTokenizer each = new StringTokenizer(val.toString());
int nowCount = Integer.parseInt(each.nextToken());
double nowAvg = Double.parseDouble(each.nextToken());

timeSum += nowAvg;
count += 1;
}
double avg = timeSum / count;

result.set(Integer.toString(count) + " " + Double.toString(avg));
context.write(key, result);
}
}

/**
* reduce method
* output is <source> <destination> <count> <average time>
* */
public static class TimeAvgReducer
extends Reducer<Text,Text,Text,Text> {
private Text result_key= new Text();
private Text result_value= new Text();

public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
int count = 0;
double timeSum = 0.0;
for (Text val : values) {
StringTokenizer each = new StringTokenizer(val.toString());
int nowCount = Integer.parseInt(each.nextToken());
double nowAvg = Double.parseDouble(each.nextToken());

timeSum += nowAvg * nowCount;
count += nowCount;
}
double avg = timeSum / count;
String avg_str = String.format("%.3f", avg);

result_key.set(key);
result_value.set(count + " " + avg_str);
context.write(result_key, result_value);
}
}

/**
* main method
* input: <source> <destination> <time>
* output: <source> <destination> <count> <average time>
* */
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: hadoop jar ./Hw2Part1.jar <input file> <output directory>");
System.exit(2);
}

Job job = Job.getInstance(conf, "source-destination count");

job.setJarByClass(Hw2Part1.class);

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(TimeAvgCombiner.class);
job.setReducerClass(TimeAvgReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);


// add the input paths as given by command line
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}

// add the output path as given by the command line
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}