MapReduce 分区

  • MapReduce 分区程序

    分区器的工作原理类似于处理输入数据集的条件。分区阶段发生在Map阶段之后,Reduce阶段之前。
    分区器的数量等于减速器(Reducer)的数量。这意味着分区器将根据缩减器的数量对数据进行拆分。因此,从单个分区程序传递的数据由单个Reducer处理。
  • 分区器

    分区程序对中间Map输出的键值对进行分区。它使用用户定义的条件对数据进行分区,该条件类似于哈希函数。分区总数与作业的Reducer任务数相同。让我们举一个例子来了解分区器是如何工作的。
  • MapReduce分区程序实现

    为了方便起见,让我们假设我们有一个名为Employee的小表,其中包含以下数据。我们将使用此样本数据作为输入数据集来演示分区程序的工作方式。
    Id Name Age Gender Salary
    1201 gopal 45 Male 50,000
    1202 manisha 40 Female 50,000
    1203 khalil 34 Male 30,000
    1204 prasanth 30 Male 30,000
    1205 kiran 20 Male 40,000
    1206 laxmi 25 Female 35,000
    1207 bhavya 20 Female 15,000
    1208 reshma 19 Female 15,000
    1209 kranthi 22 Male 22,000
    1210 Satish 24 Male 25,000
    1211 Krishna 25 Male 25,000
    1212 Arshad 28 Male 20,000
    1213 lavanya 18 Female 8,000
    我们必须编写一个应用程序来处理输入数据集,以找到不同年龄段(例如,低于20岁,21至30岁,高于30岁)中按性别划分的薪水最高的员工。
    输入数据
    以上数据另存为input.txt在“/home/hadoop/hadoopPartitioner”目录中,并作为输入提供。
    
    1201  gopal 45 Male  50000
    1202  manisha  40 Female   51000
    1203  khaleel  34 Male  30000
    1204  prasanth 30 Male  31000
    1205  kiran 20 Male  40000
    1206  laxmi 25 Female   35000
    1207  bhavya   20 Female   15000
    1208  reshma   19 Female   14000
    1209  kranthi  22 Male  22000
    1210  Satish   24 Male  25000
    1211  Krishna  25 Male  26000
    1212  Arshad   28 Male  20000
    1213  lavanya  18 Female   8000
    
    根据给定的输入,以下是该程序的算法说明。
    Map任务
    当我们将文本数据保存在文本文件中时,map任务接受键值对作为输入。此地图任务的输入如下-
    • 输入- 键将是一种模式,例如“任何特殊键+文件名+行号”(示例:key = @input1),值将是该行中的数据(示例:value = 1201\tgopal\t45\t男性\t50000)。
    • 方法- 此映射任务的操作如下-
      • 从字符串中的参数列表中读取作为输入值的值(记录数据)。
      • 使用split函数,将性别分开并存储在字符串变量中。
      • 
        String[] str = value.toString().split("\t", -3);
        String gender=str[3];
        
      • 将性别信息和记录数据值作为输出键值对从映射任务发送到分区任务。
      • 
        context.write(new Text(gender), new Text(value));
        
      • 对文本文件中的所有记录重复上述所有步骤。
    • 输出-您将获得性别数据和记录数据值作为键值对。
    分区任务
    分区器任务接受来自映射(Map)任务的键-值对作为其输入。分区意味着将数据划分为多个段。根据给定的分区条件条件,可以将输入的键值对数据根据年龄条件分为三部分。
    • 输入 - 键值对集合中的整个数据。键 = 记录中的性别字段值。值= 该性别的整个记录​​数据值。
    • 方法-分区逻辑的过程如下运行。
      • 从输入键值对中读取年龄字段值。
      • 
        String[] str = value.toString().split("\t");
        int age = Integer.parseInt(str[2]);
        
        在以下条件下检查年龄值。
        • 年龄小于或等于20
        • 年龄大于20岁且小于或等于30岁。
        • 年龄大于30岁。
        
        if(age<=20)
        {
           return 0;
        }
        else if(age>20 && age<=30)
        {
           return 1 % numReduceTasks;
        }
        else
        {
           return 2 % numReduceTasks;
        }
        
      • 输出-键值对的整个数据被分为三个键值对集合。Reducer在每个集合上单独工作。
    Reducer任务
    分区程序任务的数量等于简化(Reducer)程序任务的数量。在这里,我们有三个分区程序任务,因此有三个要执行的Reducer任务。
    • 输入-Reducer将使用不同的键值对集合执行三次。=记录中的性别字段值。=该性别的整个记录​​数据。
    • 方法-以下逻辑将应用于每个集合。
      • 读取每个记录的薪金字段值。
      • 
        String [] str = val.toString().split("\t", -3);
        Note: str[4] have the salary field value.
        
      • 用max变量检查薪水。如果str[4]是最高工资,则将str [4]分配给max,否则跳过该步骤。
      • 
        if(Integer.parseInt(str[4])>max)
        {
           max=Integer.parseInt(str[4]);
        }
        
      • 对每个key集合重复步骤1和2(Male 和 Female是key集合)。执行完这三个步骤后,您将从“Female”钥匙集合中找到一个最高薪水,从“Male”钥匙集合中找到一个最高薪水。
      • 
        context.write(new Text(key), new IntWritable(max));
        
    • 输出-最后,您将在三个不同年龄组的集合中获得一组键值对数据。它分别包含每个年龄组中男性收入的最高薪水和女性收入的最高薪水。
    执行Map,Partitioner和Reduce任务后,键-值对数据的三个集合存储在三个不同的文件中作为输出。
    所有这三个任务都被视为MapReduce作业。这些作业的以下要求和规格应在“配置”中指定-
    • 工作名称
    • 键和值的输入和输出格式
    • Map,Reduce和Partitioner任务的各个类
    
    Configuration conf = getConf();
    
    //Create Job
    Job job = new Job(conf, "topsal");
    job.setJarByClass(PartitionerExample.class);
    
    // File Input and Output paths
    FileInputFormat.setInputPaths(job, new Path(arg[0]));
    FileOutputFormat.setOutputPath(job,new Path(arg[1]));
    
    //Set Mapper class and Output format for key-value pair.
    job.setMapperClass(MapClass.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    
    //set partitioner statement
    job.setPartitionerClass(CaderPartitioner.class);
    
    //Set Reducer class and Input/Output format for key-value pair.
    job.setReducerClass(ReduceClass.class);
    
    //Number of Reducer tasks.
    job.setNumReduceTasks(3);
    
    //Input and Output format for data
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    
    范例程序
    以下程序显示了如何在MapReduce程序中为给定条件实现分区程序。
    
    package partitionerexample;
    
    import java.io.*;
    
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.fs.*;
    
    import org.apache.hadoop.mapreduce.lib.input.*;
    import org.apache.hadoop.mapreduce.lib.output.*;
    
    import org.apache.hadoop.util.*;
    
    public class PartitionerExample extends Configured implements Tool
    {
       //Map class
       
       public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
       {
          public void map(LongWritable key, Text value, Context context)
          {
             try{
                String[] str = value.toString().split("\t", -3);
                String gender=str[3];
                context.write(new Text(gender), new Text(value));
             }
             catch(Exception e)
             {
                System.out.println(e.getMessage());
             }
          }
       }
       
       //Reducer class
       
       public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
       {
          public int max = -1;
          public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
          {
             max = -1;
             
             for (Text val : values)
             {
                String [] str = val.toString().split("\t", -3);
                if(Integer.parseInt(str[4])>max)
                max=Integer.parseInt(str[4]);
             }
             
             context.write(new Text(key), new IntWritable(max));
          }
       }
       
       //Partitioner class
       
       public static class CaderPartitioner extends
       Partitioner < Text, Text >
       {
          @Override
          public int getPartition(Text key, Text value, int numReduceTasks)
          {
             String[] str = value.toString().split("\t");
             int age = Integer.parseInt(str[2]);
             
             if(numReduceTasks == 0)
             {
                return 0;
             }
             
             if(age<=20)
             {
                return 0;
             }
             else if(age>20 && age<=30)
             {
                return 1 % numReduceTasks;
             }
             else
             {
                return 2 % numReduceTasks;
             }
          }
       }
       
       @Override
       public int run(String[] arg) throws Exception
       {
          Configuration conf = getConf();
          
          Job job = new Job(conf, "topsal");
          job.setJarByClass(PartitionerExample.class);
          
          FileInputFormat.setInputPaths(job, new Path(arg[0]));
          FileOutputFormat.setOutputPath(job,new Path(arg[1]));
          
          job.setMapperClass(MapClass.class);
          
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(Text.class);
          
          //set partitioner statement
          
          job.setPartitionerClass(CaderPartitioner.class);
          job.setReducerClass(ReduceClass.class);
          job.setNumReduceTasks(3);
          job.setInputFormatClass(TextInputFormat.class);
          
          job.setOutputFormatClass(TextOutputFormat.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(Text.class);
          
          System.exit(job.waitForCompletion(true)? 0 : 1);
          return 0;
       }
       
       public static void main(String ar[]) throws Exception
       {
          int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
          System.exit(0);
       }
    }
    
    将以上代码另存为“ /home/hadoop/hadoopPartitioner”中的PartitionerExample.java。该程序的编译和执行如下。
    编译与执行
    让我们假设我们位于Hadoop用户的主目录中(例如/home/hadoop)。
    请按照下面给出的步骤来编译和执行上述程序。
    步骤1-下载用于编译和执行MapReduce程序的Hadoop-core-1.2.1.jar。您可以从mvnrepository.com下载jar 。
    让我们假设下载的文件夹是“/home/hadoop/hadoopPartitioner”
    步骤2-以下命令用于编译程序PartitionerExample.java并为该程序创建一个jar。
    
    $ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
    $ jar -cvf PartitionerExample.jar -C .
    
    步骤3 - 使用以下命令在HDFS中创建输入目录。
    
    $HADOOP_HOME/bin/hadoop fs -mkdir input_dir
    
    步骤4-使用以下命令将名为input.txt的输入文件复制到HDFS的输入目录中。
    
    $HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
    
    步骤5-使用以下命令来验证输入目录中的文件。
    
    $HADOOP_HOME/bin/hadoop fs -ls input_dir/
    
    步骤6-使用以下命令通过从输入目录获取输入文件来运行最高薪水应用程序。
    
    $HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
    
    等待一段时间,直到文件被执行。执行后,输出包含许多输入拆分,Mapper任务和Reducer任务。
    
    15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
    15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
    
    File System Counters
    
       FILE: Number of bytes read=467
       FILE: Number of bytes written=426777
       FILE: Number of read operations=0
       FILE: Number of large read operations=0
       FILE: Number of write operations=0
       
       HDFS: Number of bytes read=480
       HDFS: Number of bytes written=72
       HDFS: Number of read operations=12
       HDFS: Number of large read operations=0
       HDFS: Number of write operations=6
       
    Job Counters
    
       Launched map tasks=1
       Launched reduce tasks=3
       
       Data-local map tasks=1
       
       Total time spent by all maps in occupied slots (ms)=8212
       Total time spent by all reduces in occupied slots (ms)=59858
       Total time spent by all map tasks (ms)=8212
       Total time spent by all reduce tasks (ms)=59858
       
       Total vcore-seconds taken by all map tasks=8212
       Total vcore-seconds taken by all reduce tasks=59858
       
       Total megabyte-seconds taken by all map tasks=8409088
       Total megabyte-seconds taken by all reduce tasks=61294592
       
    Map-Reduce Framework
    
       Map input records=13
       Map output records=13
       Map output bytes=423
       Map output materialized bytes=467
       
       Input split bytes=119
       
       Combine input records=0
       Combine output records=0
       
       Reduce input groups=6
       Reduce shuffle bytes=467
       Reduce input records=13
       Reduce output records=6
       
       Spilled Records=26
       Shuffled Maps =3
       Failed Shuffles=0
       Merged Map outputs=3
       GC time elapsed (ms)=224
       CPU time spent (ms)=3690
       
       Physical memory (bytes) snapshot=553816064
       Virtual memory (bytes) snapshot=3441266688
       
       Total committed heap usage (bytes)=334102528
       
    Shuffle Errors
    
       BAD_ID=0
       CONNECTION=0
       IO_ERROR=0
       
       WRONG_LENGTH=0
       WRONG_MAP=0
       WRONG_REDUCE=0
       
    File Input Format Counters
    
       Bytes Read=361
       
    File Output Format Counters
    
       Bytes Written=72
    
    步骤7-使用以下命令来验证输出文件夹中的结果文件。
    
    $HADOOP_HOME/bin/hadoop fs -ls output_dir/
    
    您将在三个文件中找到输出,因为您在程序中使用了三个分区程序和三个Reducer。
    步骤8-使用以下命令查看Part-00000文件中的输出。该文件由HDFS生成。
    
    $HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
    
    part-00000 输出
    
    Female   15000
    Male     40000
    
    使用以下命令查看Part-00001文件中的输出。
    
    $HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
    
    在part-00001中输出
    
    Female   35000
    Male    31000
    
    使用以下命令查看Part-00002文件中的输出。
    
    $HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
    
    在part-00002中输出
    
    Female  51000
    Male   50000