Hadoop MapReduce实现单词计数(Word Count)

1.Map与Reduce过程

1.1 Map过程

首先,Hadoop会把输入数据划分成等长的输入分片(input split) 或分片发送到MapReduce。Hadoop为每个分片创建一个map任务,由它来运行用户自定义的map函数以分析每个分片中的记录。在我们的单词计数例子中,输入是多个文件,一般一个文件对应一个分片,如果文件太大则会划分为多个分片。map函数的输入以<key, value>形式做为输入,value为文件的每一行,key为该行在文件中的偏移量(一般我们会忽视)。这里map函数起到的作用为将每一行进行分词为多个word,并在context中写入<word, 1>以代表该单词出现一次。

map过程的示意图如下:

-1

mapper代码编写如下:

  1. public static class TokenizerMapper
  2.          extends Mapper<Object, Text, Text, IntWritable> {
  3.      private final static IntWritable one = new IntWritable(1);
  4.      private Text word = new Text();
  5.      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  6.          //每次处理一行,一个mapper里的value为一行,key为该行在文件中的偏移量
  7.          StringTokenizer iter = new StringTokenizer(value.toString());
  8.          while (iter.hasMoreTokens()) {
  9.              word.set(iter.nextToken());
  10.              // 向context中写入<word, 1>
  11.              context.write(word, one);
  12.              System.out.println(word);
  13.          }
  14.      }
  15. }

如果我们能够并行处理分片(不一定是完全并行),且分片是小块的数据,那么处理过程将会有一个好的负载平衡。但是如果分片太小,那么管理分片与map任务创建将会耗费太多时间。对于大多数作业,理想分片大小为一个HDFS块的大小,默认是64MB。

map任务的执行节点和输入数据的存储节点相同时,Hadoop的性能能达到最佳,这就是计算机系统中所谓的data locality optimization(数据局部性优化)。而最佳分片大小与块大小相同的原因就在于,它能够保证一个分片存储在单个节点上,再大就不能了。

1.2 Reduce过程

接下来我们看reducer的编写。reduce任务的多少并不是由输入大小来决定,而是需要人工单独指定的(默认为1个)。和上面map不同的是,reduce任务不再具有本地读取的优势————一个reduce任务的输入往往来自于所有mapper的输出,因此map和reduce之间的数据流被称为 shuffle(洗牌) 。Hadoop会先按照key-value对进行排序,然后将排序好的map的输出通过网络传输到reduce任务运行的节点,并在那里进行合并,然后传递到用户定义的reduce函数中。

reduce 函数示意图如下:

-2

reducer代码编写如下:

  1.      public static class IntSumReducer
  2.              extends Reducer<Text, IntWritable, Text, IntWritable>{
  3.          private IntWritable result = new IntWritable();
  4.          public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
  5.              int sum = 0;
  6.              for (IntWritable val : values) {
  7.                  sum += val.get();
  8.              }
  9.              result.set(sum);
  10.              context.write(key, result);
  11.          }
  12.      }

2.完整代码

2.1 项目架构

关于VSCode+Java+Maven+Hadoop开发环境搭建,可以参见我的博客《VSCode+Maven+Hadoop开发环境搭建》,此处不再赘述。这里展示我们的项目架构如下:

Word-Count-Hadoop
├─ input
│  ├─ file1
│  ├─ file2
│  └─ file3
├─ output
├─ pom.XML
├─ src
│  └─ main
│     └─ java
│        └─ WordCount.java
└─ target

WordCount.java代码如下:

  1. import java.io.IOException;
  2. import java.util.StringTokenizer;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. public class WordCount{
  14.      public static class TokenizerMapper
  15.              extends Mapper<Object, Text, Text, IntWritable> {
  16.          private final static IntWritable one = new IntWritable(1);
  17.          private Text word = new Text();
  18.  
  19.          public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  20.          //每次处理一行,一个mapper里的value为一行,key为该行在文件中的偏移量
  21.              StringTokenizer iter = new StringTokenizer(value.toString());
  22.              while (iter.hasMoreTokens()) {
  23.                  word.set(iter.nextToken());
  24.                  // 向context中写入<word, 1>
  25.                  context.write(word, one);
  26.              }
  27.          }
  28.      }
  29.  
  30.      public static class IntSumReducer
  31.              extends Reducer<Text, IntWritable, Text, IntWritable>{
  32.          private IntWritable result = new IntWritable();
  33.          public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
  34.              int sum = 0;
  35.              for (IntWritable val : values) {
  36.                  sum += val.get();
  37.              }
  38.              result.set(sum);
  39.              context.write(key, result);
  40.          }
  41.      }
  42.  
  43.      public static void main(String[] args) throws Exception{
  44.          Configuration conf = new Configuration();
  45.          Job job = Job.getInstance(conf, “word_count”);
  46.  
  47.          job.setJarByClass(WordCount.class);
  48.  
  49.          job.setMapperClass(TokenizerMapper.class);
  50.          //此处的Combine操作意为即第每个mapper工作完了先局部reduce一下,最后再全局reduce
  51.          job.setCombinerClass(IntSumReducer.class);
  52.          job.setReducerClass(IntSumReducer.class);
  53.  
  54.          job.setOutputKeyClass(Text.class);
  55.          job.setOutputValueClass(IntWritable.class);
  56.  
  57.          //第0个参数是输入目录,第1个参数是输出目录
  58.          //先判断output path是否存在,如果存在则删除
  59.          Path path = new Path(args[1]);//
  60.          FileSystem fileSystem = path.getFileSystem(conf);
  61.          if (fileSystem.exists(path)) {
  62.              fileSystem.delete(path, true);
  63.          }
  64.  
  65.          //设置输入目录和输出目录
  66.          FileInputFormat.addInputPath(job, new Path(args[0]));
  67.          FileOutputFormat.setOutputPath(job, new Path(args[1]));
  68.          System.exit(job.waitForCompletion(true)?0:1);
  69.      }
  70. }

pom.xml中记得配置Hadoop的依赖环境:

  1.      
  2.      <!– 集中定义版本号 –>
  3.      <properties>
  4.      <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
  5.      <maven.compiler.source>17</maven.compiler.source>
  6.      <maven.compiler.target>17</maven.compiler.target>
  7.      <hadoop.version>3.3.1</hadoop.version>
  8.      </properties>
  9.      <dependencies>
  10. <dependency>
  11.          <groupId>junit</groupId>
  12.          <artifactId>junit</artifactId>
  13.          <version>4.11</version>
  14.          <scope>test</scope>
  15.      </dependency>
  16.      <!– 导入hadoop依赖环境 –>
  17.      <dependency>
  18.          <groupId>org.apache.hadoop</groupId>
  19.          <artifactId>hadoopcommon</artifactId>
  20.          <version>${hadoop.version}</version>
  21.      </dependency>
  22.      <dependency>
  23.          <groupId>org.apache.hadoop</groupId>
  24.          <artifactId>hadoophdfs</artifactId>
  25.          <version>${hadoop.version}</version>
  26.      </dependency>
  27.      <dependency>
  28.          <groupId>org.apache.hadoop</groupId>
  29.          <artifactId>hadoopmapreduceclientcore</artifactId>
  30.          <version>${hadoop.version}</version>
  31.      </dependency>
  32.      <dependency>
  33.          <groupId>org.apache.hadoop</groupId>
  34.          <artifactId>hadoopclient</artifactId>
  35.          <version>${hadoop.version}</version>
  36.      </dependency>
  37.      <dependency>
  38.          <groupId>org.apache.hadoop</groupId>
  39.          <artifactId>hadoopyarnapi</artifactId>
  40.          <version>${hadoop.version}</version>
  41.      </dependency>
  42.      </dependencies>
  43.      
  44. </project>

此外,因为我们的程序自带输入参数,我们还需要在VSCode的launch.json中配置输入参数intput(代表输入目录)和output(代表输出目录):

  1. “args”: [
  2.      “input”,
  3.      “output”
  4. ],

编译运行完毕后,可以查看output文件夹下的part-r-00000文件:

David    1
Goodbye    1
Hello    3
Tom    1
World    2

可见我们的程序正确地完成了单词计数的功能。

以上就是Hadoop MapReduce实现单词计数(Word Count)的详细内容,更多关于Hadoop MapReduce的资料请关注我们其它相关文章!

标签

发表评论