运行环境是:linux-manjaro、hadoop-2.7.1、jdk8
在本地完成小规模测试,就可以把作业部署到集群上了。
说明下map和reduce。
mapreduce任务过程分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键值作为输入和输出,其类型由程序员选择。程序员还需要写map函数和reduce函数
我们使用的数据如下:
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
mapreduce的逻辑数据流:

下面是代码实现阶段:
MaxTemperatureMapper.java

// cc MaxTemperatureMapper Mapper for maximum temperature example
// vv MaxTemperatureMapper
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {

  private static final int MISSING = 9999;//找不到的数据设为9999
  
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    
    String line = value.toString();//输入的Text值转换成java的string类型
    String year = line.substring(15, 19);
    //取15、16、17、18列,即是取上面加粗的年份
    int airTemperature;
    if (line.charAt(87) == '+') { 
    // 有+号为正数不取+,没有+为负数取到87列
      airTemperature = Integer.parseInt(line.substring(88, 92));
    } else {
      airTemperature = Integer.parseInt(line.substring(87, 92));
    }
    String quality = line.substring(92, 93);
    if (airTemperature != MISSING &&  quality.matches("[01459]") ) {
    //String.matches() 这个方法主要是返回是否匹配指定的字符串,如果匹配则为true,否则为false; [01459] 匹配 “0” 或 “1” 或 “4” 或 “5”或 “9”
      context.write(new Text(year), new IntWritable(airTemperature));
    }
  }
}
// ^^ MaxTemperatureMapper

mapper类是一个泛指类型,他有三个形参类型,分别指定map函数的输入键、输入值、输出键和输出值的类型。对于这个例子来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。hadoop本身提供了一套可优化网络序列化传输的基本类型。LongWritable类型(java的long类型)、Text类型(java的string类型)和IntWritable类型(java的Integer类型)。Context实例用于输出内容的写入。

MaxTemperatureReducer.java

// cc MaxTemperatureReducer Reducer for maximum temperature example
// vv MaxTemperatureReducer
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
  extends Reducer<Text, IntWritable, Text, IntWritable> {
  
  @Override
  public void reduce(Text key, Iterable<IntWritable> values,
      Context context)
      throws IOException, InterruptedException {
    
    int maxValue = Integer.MIN_VALUE;
    for (IntWritable value : values) {
      maxValue = Math.max(maxValue, value.get());
    }
    context.write(key, new IntWritable(maxValue));
  }
}
// ^^ MaxTemperatureReducer

reduce函数也有四个形式参数,输入类型必须匹配map函数的输出类型。通过循环找出最高气温。

MaxTemperature.java 这部分代码负责运行mapreduce作业

// cc MaxTemperature Application to find the maximum temperature in the weather dataset
// vv MaxTemperature
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {

  public static void main(String[] args) throws Exception {
  //判断input和output路径是否存在
    if (args.length != 2) {
      System.err.println("Usage: MaxTemperature <input path> <output path>");
      System.exit(-1);
    }
    //Jod对象指定作业执行规范。
    Job job = new Job();
    job.setJarByClass(MaxTemperature.class);
    job.setJobName("Max temperature");
	//这两个路径参数是hadoop jar ./MaxTemperature.jar ./input ./output  它的参数,即hdfs文件系统的路径。args[0]:./input;args[1]:./output
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    job.setMapperClass(MaxTemperatureMapper.class);
    job.setReducerClass(MaxTemperatureReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
// ^^ MaxTemperature

结果如下:
在这里插入图片描述
参考:hadoop权威指南

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐