GVKun编程网logo

一起学Hadoop——使用IDEA编写第一个MapReduce程序(Java和Python)(idea开发hadoop)

79

在本文中,您将会了解到关于一起学Hadoop——使用IDEA编写第一个MapReduce程序(Java和Python)的新资讯,同时我们还将为您解释idea开发hadoop的相关在本文中,我们将带你探

在本文中,您将会了解到关于一起学Hadoop——使用IDEA编写第一个MapReduce程序(Java和Python)的新资讯,同时我们还将为您解释idea开发hadoop的相关在本文中,我们将带你探索一起学Hadoop——使用IDEA编写第一个MapReduce程序(Java和Python)的奥秘,分析idea开发hadoop的特点,并给出一些关于bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar、Eclipse下Hadoop的MapReduce开发之MapReduce编写、Eclipse中使用Hadoop插件编写Map/Reduce程序、Hadoop 6、第一个 mapreduce 程序 WordCount的实用技巧。

本文目录一览:

一起学Hadoop——使用IDEA编写第一个MapReduce程序(Java和Python)(idea开发hadoop)

一起学Hadoop——使用IDEA编写第一个MapReduce程序(Java和Python)(idea开发hadoop)

上一篇我们学习了MapReduce的原理,今天我们使用代码来加深对MapReduce原理的理解。

wordcount是Hadoop入门的经典例子,我们也不能免俗,也使用这个例子作为学习Hadoop的第一个程序。本文将介绍使用java和python编写第一个MapReduce程序。

本文使用Idea2018开发工具开发第一个Hadoop程序。使用的编程语言是Java。

打开idea,新建一个工程,如下图所示:

在弹出新建工程的界面选择Java,接着选择SDK,一般默认即可,点击“Next”按钮,如下图:

 

在弹出的选择创建项目的模板页面,不做任何操作,直接点击“Next”按钮。

输入项目名称,点击Finish,就完成了创建新项目的工作,我们的项目名称为:WordCount。如下图所示:

 

添加依赖jar包,和Eclipse一样,要给项目添加相关依赖包,否则会出错。

点击Idea的File菜单,然后点击“Project Structure”菜单,如下图所示:

 

依次点击Modules和Dependencies,然后选择“+”的符号,如下图所示:

 

 

选择hadoop的包,我用得是hadoop2.6.1。把下面的依赖包都加入到工程中,否则会出现某个类找不到的错误。

(1)”/usr/local/hadoop/share/hadoop/common”目录下的hadoop-common-2.6.1.jar和haoop-nfs-2.6.1.jar;

(2)/usr/local/hadoop/share/hadoop/common/lib”目录下的所有JAR包;

(3)“/usr/local/hadoop/share/hadoop/hdfs”目录下的haoop-hdfs-2.6.1.jar和haoop-hdfs-nfs-2.7.1.jar;

(4)“/usr/local/hadoop/share/hadoop/hdfs/lib”目录下的所有JAR包。

 

工程已经创建好,我们开始编写Map类、Reduce类和运行MapReduce的入口类:

 JAVA编写MarReduce代码

Map类如下:

 1 import org.apache.hadoop.io.IntWritable;
 2 
 3 import org.apache.hadoop.io.LongWritable;
 4 
 5 import org.apache.hadoop.io.Text;
 6 
 7 import org.apache.hadoop.mapreduce.Mapper;
 8 
 9 import java.io.IOException;
10 
11 
12 public class WordcountMap extends Mapper<LongWritable,Text,Text,IntWritable> {
13         public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
14 
15         String line = value.toString();//读取一行数据
16 
17         String str[] = line.split("");//因为英文字母是以“ ”为间隔的,因此使用“ ”分隔符将一行数据切成多个单词并存在数组中
18 
19         for(String s :str){//循环迭代字符串,将一个单词变成<key,value>形式,及<"hello",1>
20             context.write(new Text(s),new IntWritable(1));
21         }
22     }
23 }

 

Reudce类:

 1 import org.apache.hadoop.io.IntWritable;
 2 import org.apache.hadoop.mapreduce.Reducer;
 3 import org.apache.hadoop.io.Text;
 4 import java.io.IOException;
 5 
 6 public class WordcountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
 7         public void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
 8         int count = 0;
 9         for(IntWritable value: values) {
10             count++;
11         }
12         context.write(key,new IntWritable(count));
13         }
14 }

 

 入口类 :

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.Path;
 3 import org.apache.hadoop.mapreduce.Job;
 4 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 5 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 6 import org.apache.hadoop.util.GenericOptionsParser;
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.Text;
 9 
10 public class WordCount {
11 
12     public static void main(String[] args)throws Exception{
13         Configuration conf = new Configuration();
14         //获取运行时输入的参数,一般是通过shell脚本文件传进来。
15         String [] otherArgs = new         GenericOptionsParser(conf,args).getRemainingArgs();
16         if(otherArgs.length < 2){
17             System.err.println("必须输入读取文件路径和输出路径");
18             System.exit(2);
19         }
20         Job job = new Job();
21         job.setJarByClass(WordCount.class);
22         job.setJobName("wordcount app");
23     
24         //设置读取文件的路径,都是从HDFS中读取。读取文件路径从脚本文件中传进来
25         FileInputFormat.addInputPath(job,new Path(args[0]));
26         //设置mapreduce程序的输出路径,MapReduce的结果都是输入到文件中
27         FileOutputFormat.setOutputPath(job,new Path(args[1]));
28 
29          //设置实现了map函数的类
30         job.setMapperClass(WordcountMap.class);
31         //设置实现了reduce函数的类
32         job.setReducerClass(WordcountReduce.class);
33 
34          //设置reduce函数的key值
35         job.setOutputKeyClass(Text.class);
36         //设置reduce函数的value值
37         job.setOutputValueClass(IntWritable.class);
38         System.exit(job.waitForCompletion(true) ? 0 :1);
39     }
40 }

 

代码写好之后,开始jar包,按照下图打包。点击“File”,然后点击“Project Structure”,弹出如下的界面,

依次点击"Artifacts" -> "+" -> "JAR" -> "From modules with dependencies",然后弹出一个选择入口类的界面,选择刚刚写好的WordCount类,如下图:

 

按照上面设置好之后,就开始打jar包,如下图:

点击上图的“Build”之后就会生成一个jar包。jar的位置看下图,依次点击File->Project Structure->Artifacts就会看到如下的界面:

将打好包的wordcount.jar文件上传到装有hadoop集群的机器中,然后创建shell文件,shell文件内容如下,/usr/local/src/hadoop-2.6.1是hadoop集群中hadoop的安装位置,

1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar wordcount.jar \ #执行jar文件的命令以及jar文件名,
2 
3 hdfs://hadoop-master:8020/data/english.txt \ #输入路径
4 
5 hdfs://hadoop-master:8020/wordcount_output #输出路径

 

执行shell文件之后,会看到如下的信息,

 

上图中数字1表示输入分片split的数量,数字2表示map和reduce的进度,数字3表示mapreduce执行成功,数字4表示启动多少个map任务,数字5表示启动多少个reduce任务。

自行成功后在hadoop集群中的hdfs文件系统中会看到一个wordcount_output的文件夹。使用“hadoop fs -ls /”命令查看:

 

在wordcount_output文件夹中有两个文件,分别是_SUCCESS和part-r-00000,part-r-00000记录着mapreduce的执行结果,使用hadoop fs -cat /wordcount_output/part-r-00000查看part-r-00000的内容:

 

可以每个英文单词出现的次数。

至此,借助idea 2018工具开发第一个使用java语言编写的mapreduce程序已经成功执行。下面介绍使用python语言编写的第一个mapreduce程序,相对于java,python编写mapreduce会简单很多,因为hadoop提供streaming,streaming是使用Unix标准流作为Hadoop和应用程序之间的接口,所以可以使用任何语言通过标准输入输出来写MapReduce程序。

Python编写MapReduce程序

看代码:

实现了map函数的python程序,命名为map.py:

 1 #!/usr/local/bin/python
 2 
 3 import sys #导入sys包
 4 
 5 for line in sys.stdin: #从标准输入中读取数据
 6     ss = line.strip().split('' '')#读取每一行数据,strip()函数过滤掉空格换行的字符,split('' '')分隔出每个额单词并存放在数组ss中
 7 
 8     for s in ss: #读取数组ss中的每个单词
 9         if s.strip() != "":
10             print "%s\t%s" % (s, 1)#构造以单词为key,1为value的键值对,并写入到标准输出中。

 

 实现了reduce函数的python程序,命名为reduce.py:

 

 1 import sys
 2 cur_word = None
 3 sum = 0
 4 for line in sys.stdin:
 5         ss = line.strip().split(''\t'')#从标准输入中读取数据。
 6         if len(ss) != 2:
 7                 continue
 8         word,cnt = ss
 9         if cur_word == None:
10                 cur_word = word
11         #因为从map流转到reduce的数据时按照key排好序的,cur_word记录的是上一个单词,word记    #录的是当前读取的单词,如果两个单词一致,则将sum+1,否则将word和sum值组成一个键值对,##写入到标准输出,同时sum赋值为0,并且将word赋值给cur_word变量。
12         if cur_word != word:
13                 print ''\t''.join([cur_word,str(sum)])
14                 cur_word = word
15                 sum = 0
16         sum += int(cnt)
17 print ''\t''.join([cur_word,str(sum)])

 

map和reduce程序已经编写完毕,下面编写shell脚本文件:

 1 HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
 2 STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar "
 3 
 4 INPUT_FILE_PATH_1="/data/english.txt"#输入路径
 5 OUTPUT_PATH="/wordcount_output"#输出路径
 6 $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH#每次执行时都删除输出路径,否则会出错
 7 
 8 $HADOOP_CMD jar $STREAM_JAR_PATH \
 9                 -input $INPUT_FILE_PATH_1 \#指定输入路径
10                 -output $OUTPUT_PATH \#指定输出路径
11                 -mapper "python map.py" \#指定要执行的map程序
12                 -reducer "python reduce.py" \#指定要执行reduce程序
13                 -file ./map.py \#指定map程序所在的位置
14                 -file ./reduce.py#指定reduce程序所在的位置

到此Java和Python编写第一个MapReduce程序已经完成。

bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar

bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar

[root@master hadoop-3.1.1]# bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar
An example program must be given as the first argument.
Valid program names are:
aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.
dbcount: An example job that count the pageview counts from a database.
distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.
grep: A map/reduce program that counts the matches of a regex in the input.
join: A job that effects a join over sorted, equally partitioned datasets
multifilewc: A job that counts words from several files.
pentomino: A map/reduce tile laying program to find solutions to pentomino problems.
pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.
randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.
randomwriter: A map/reduce program that writes 10GB of random data per node.
secondarysort: An example defining a secondary sort to the reduce.
sort: A map/reduce program that sorts the data written by the random writer.
sudoku: A sudoku solver.
teragen: Generate data for the terasort
terasort: Run the terasort
teravalidate: Checking results of terasort
wordcount: A map/reduce program that counts the words in the input files.
wordmean: A map/reduce program that counts the average length of the words in the input files.
wordmedian: A map/reduce program that counts the median length of the words in the input files.
wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.

Eclipse下Hadoop的MapReduce开发之MapReduce编写

Eclipse下Hadoop的MapReduce开发之MapReduce编写

hadoop安装部署及Eclipse安装集成,这里不赘述了。

    先说下业务需求吧,有个系统日志文件,记录系统的运行信息,其中包含DEBUG、INFO、WARN、ERROR四个级别的日志,现在想要看到所有级别各有多少条记录。

    创建一个map/reduce项目,项目名为mapreducetest。在src下建立一个名为mapreducetest的包,然后建一个类名叫MapReduceTest,下面是代码。

package mapreducetest;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapReduceTest extends Configuration implements Tool{
    /**
     * 配置
     */
    private Configuration configuration;
    /**
     * 获取配置
     */
    @Override
    public Configuration getConf() {
        return this.configuration;
    }
    /**
     * 设置配置
     */
    @Override
    public void setConf(Configuration arg0) {
        this.configuration=arg0;
    }
    /**
     * 
     * @ClassName: Counter 
     * @Description: TODO(计数器) 
     * @author scc
     * @date 2015年5月27日 下午2:54:39 
     *
     */
    enum Counter{
        TIMER
    }
    /**
     * 
     * @ClassName: Map 
     * @Description: map实现,所有的map业务都在这里进行Mapper后的四个参数分别为,输入key类型,输入value类型,输出key类型,输出value类型
     * @author scc
     * @date 2015年5月27日 下午2:30:06 
     * @
     */
    public static class Map extends Mapper<LongWritable, Text, Text, Text>{
        /**
         * key:输入key
         * value:输入value
         * context:map上下文对象
         * 说明,hdfs生成的所有键值对都会调用此方法
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            try{
                //得到日志每一行数据
                String mapvalue=value.toString();
                //日志具有固定格式,通过空格切分可以获得固定打下的string数组
                String[] infos=mapvalue.split(" ");
                //时间在数组的第一列,日志级别在数据的第九列,
                String info=infos[10];
                //调整数据格式(第一个参数为key,第二个参数为value),这里key和value都设置为日志级别
                context.write(new Text(info), new Text(info));
            }catch(Exception e){
                //遇到错误是记录错误
                context.getCounter(Counter.TIMER).increment(1);
                return;
            }
        }
        
    }
    /**
     * 
     * @ClassName: Reduce 
     * @Description: reduce处理类 ,Reducer四个参数,前两个是输入key和value的类型,必须和map一样,后两个是输出的key和value的类型
     * @author scc
     * @date 2015年5月27日 下午3:33:06 
     *
     */
    public static class Reduce extends Reducer<Text, Text, Text, Text>{
        /**
         * 第一个参数输入的value,第二个参数是该key对应的所有的value集合,第三个是reducer的上下文
         * 说明:与map不同的这里是对map处理后的数据进行的调用,当map处理后的key有重复时,这里传进来的key会是去重后的key,比方说在map里放进10个键值对,
         * 其中有五个key是key1,有五个是key2,那么在reduce的时候只会调用两次reduce,分别是key1和key2
         */
        @Override
        protected void reduce(Text key, Iterable<Text> values,Context arg2) throws IOException, InterruptedException {
            //获取当前遍历的key
            String info=key.toString();
            //计数器
            int count=0;
            //当值和key相同时计数器加1
            for (Text text : values) {
                if(info.equals(text.toString()))
                    count=count+1;
            }
            //将级别和对应的数据写出去
            arg2.write(key, new Text(String.valueOf(count)));
        }
    }
    /**
     * run方法是一个入口
     */
    @Override
    public int run(String[] arg0) throws Exception {
        //建立一个job,并指定job的名称
        Job job=Job.getInstance(getConf(), "maptest");
        //指定job的类
        job.setJarByClass(MapReduceTest.class);
        //设置日志文件路径(hdfs路径)
        FileInputFormat.setInputPaths(job,  new Path(arg0[0]));
        //设置结果输出路径(hdfs路径)
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
        //设置map处理类的class
        job.setMapperClass(Map.class);
        //设置reduce的class
        job.setReducerClass(Reduce.class);
        //设置输出格式化的类的class
        job.setOutputFormatClass(TextOutputFormat.class);
        //设置输出key的类型
        job.setOutputKeyClass(Text.class);
        //设置输出value的类型
        job.setOutputValueClass(Text.class);        
        //设置等待job完成
        job.waitForCompletion(true);
        return job.isSuccessful()?0:1;
    }
    public static void main(String[] args) throws Exception {
            String[] args2=new String[2];
            args2[0]="hdfs://192.168.1.55:9000/test2-in/singlemaptest.log";
            args2[1]="hdfs://192.168.1.55:9000/test2-out";
            int res=ToolRunner.run(new Configuration(), new MapReduceTest(), args2);
            System.exit(res);
    }
}

下面是生成的结果:

INFO	3800
WARN	55

有兴趣的大佬大神可以关注下小弟的微信公共号,一起学习交流,扫描以下二维码关注即可。

Eclipse中使用Hadoop插件编写Map/Reduce程序

Eclipse中使用Hadoop插件编写Map/Reduce程序

纯转自 使用Eclipse搭建Hadoop编程环境

在eclipse中使用hadoop插件编程,在网上找了不少帖子,最终该贴解决了在eclipse中直接run on hadoop,而不用打jar,上传到hadoop主机,然后在hadoop jar xxx.jar。

在帖子中间补充了两点:

1.如果hadoop是搭建在虚拟机中,如何连接并运行程序

2.增加了log4j.properties配置

注:使用eclipse开发时,将workspace的默认字符集设置为utf-8

在前人的基础上,进行总结学习,发现bug,修改bug。

系统平台:Ubuntu14.04TLS(64位)

Hadoop环境:Hadoop2.7.1

Eclipse:Neon.2 Release(4.6.2)

Eclipse插件:hadoop-eclipse-plugin-2.7.1.jar

 

一.编译环境搭建

1.在eclipse上安装Hadoop插件

把下载好的hadoop-eclipse-plugin-2.7.1.jar文件拷贝到eclipse安装目录中的plugins文件夹内。如下图:

 

 

2.继续配置hadoop编译环境(方便配置,确保已经开启了 Hadoop

启动 Eclipse 后就可以在左侧的Project Explorer中看到DFS Locations

 

3.插件的配置

第一步:选择 Window 菜单下的 Preference。

此时会弹出一个窗体,点击选择 Hadoop Map/Reduce 选项,选择 Hadoop 的安装目录(例如:/home/hadoop/hadoop)。

第二步:切换 Map/Reduce 开发视图

选择Window 菜单下选择 Open Perspective -> Other,弹出一个窗体,从中选择Map/Reduce 选项即可进行切换。

第三步:建立与 Hadoop 集群的连接

点击Eclipse软件右下角的Map/Reduce Locations 面板,在面板中单击右键,选择New Hadoop Location

在弹出来的General选项面板中,General 的设置要与 Hadoop 的配置一致。一般两个 Host值是一样的。

如果是在本机搭建hadoop伪分布式,填写 localhost 即可,这里使用的是Hadoop伪分布,设置fs.defaultFShdfs://localhost:9000,则 DFS Master Port 改为 9000Map/Reduce(V2) Master Port 用默认的即可,Location Name 随意填写。

如果你是在虚拟机中搭建的hadoop伪分布模式,那么图中的Host应该填ip地址或者hostname,建议在这里填写hostname,然后去windows的hosts文件中添加一行

192.168.245.141 hadoop01(hostname可以自定义)

Advanced parameters 选项面板是对 Hadoop 参数进行配置,就是填写 Hadoop 的配置项(/home/hadoop/hadoop/etc/hadoop中的配置文件),如果配置了 hadoop.tmp.dir,就要进行相应的修改。但修改起来会比较繁琐,可以通过复制配置文件的方式解决。只要配置General 就行了,点击 finishMap/Reduce Location 就创建完成。

二.在 Eclipse 中操作 HDFS 中的文件

配置好后,点击左侧Project Explorer 中的 MapReduce Location 就能直接查看 HDFS 中的文件列表了(HDFS 中要有文件,如下图是 WordCount的输出结果),双击可以查看内容,右键点击可以上传、下载、删除 HDFS 中的文件

如果无法查看,可右键点击Location 尝试 Reconnect 或重启 EclipseHDFS 中的内容更新后Eclipse 不会同步刷新,需要右键点击 Project Explorer中的MapReduce Location,选择Refresh,(文件比较大的情况下eclipse可能死掉,重启eclipse就行)才能看到更新后的文件。

三.在 Eclipse 中创建 MapReduce 项目

点击 File 菜单,选择 New -> Project...:

选择Map/Reduce Project,点击Next

填写 Projectname WordCount 即可,点击Finish就创建好了项目。

然后在左侧的Project Explorer 可以看到刚才建立的项目。

接着右键点击刚创建的WordCount 项目,选择 New -> Class

需要填写两个地方:在Package 处填写 org.apache.hadoop.examples;在Name 处填写WordCount。(包名、类名可以随意起)

 

创建 Class 完成后,在 Projectsrc中可以看到 WordCount.java 这个文件。将如下 WordCount的代码拷贝到该文件中。

package com.jv.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	Logger logger = Logger.getLogger(WordCountMapper.class);
	
	Text k = new Text();
	IntWritable v = new IntWritable(1);
	/*
	 * 知识点
	 * 1.map方法的key是输入key,是每行的行首偏移量
	 * 2.map方法的value是输入value,是每行内容
	 * 3.Writable机制是Hadoop自身的序列化机制,
	 * 比如:LongWritable IntWritable NullWritable Text(String)等
	 * 4.context是MR的上下文件对象,此对象的作用:①可以输出结果 ②获取其他环境对象,比如文件的切片信息等
	 * 5.继承Mapper时,需要制定4个泛型类型
	 * ①泛型类型对应的是:Mapper输入key的类型
	 * ②泛型类型对应的是:Mapper输入value的类型
	 * 初学时,记住:① ②的类型是固定的
	 * ③泛型类型对应的是:Mapper的输出key类型
	 * ④泛型类型对应的是:Mapper的输出value类型
	 * 注:输出key和输出value是程序员自己决定.所以③ ④类型不固定,取决业务
	 */
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		logger.info("-----------" + key.get());
		// 1 获取一行
		String line = value.toString();
		// 2 切割
		String[] words = line.split(" ");
		// 3 输出
		for (String word : words) {
			k.set(word);
			context.write(k, v);
		}
	}
}
package com.jv.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	int sum;
	IntWritable v = new IntWritable();
	/*
	 * 知识点
	 * 1.reducer组件会接受Mapper组件的输出结果,所以reducer组件不能单独存在,但是Mapper组件可以
	 * 2.reducer会把Mapper的输出key,按相同的key做聚合(合并)
	 * 3.合并的形式:key it(迭代器) 通过reduce方法传给开发者.
	 * 其中 迭代器存储的是 key对应的value的集合
	 * 4.继承Reducer组件时的四个泛型
	 * ①泛型对应的Mapper输出key
	 * ②泛型对应的Mapper输出value
	 * ③泛型对应的Reducer的输出key
	 * ④泛型对应的Reducer的输出value
	 * 5.如果引入reducer组件后,最后的输出结果文件的结果是Reducer的输出key 和输出value
	 * 6.reduce方法会被调用多次,取决于有多少个不同的key
	 */
	@Override
	protected void reduce(Text key, Iterable<IntWritable> value, Context context)
			throws IOException, InterruptedException {
		// 1 累加求和
		sum = 0;
		for (IntWritable count : value) {
			sum += count.get();
		}
		// 2 输出
		v.set(sum);
		context.write(key, v);
	}
}
package com.jv.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// 1 获取配置信息以及封装任务
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);
		// 2 设置 jar 加载路径
		job.setJarByClass(WordcountDriver.class);
		// 3 设置 map 和 reduce 类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		// 4 设置 map 输出
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// 5 设置 Reduce 输出
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// 6 设置输入和输出路径
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.245.141:9000/park03"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.245.141:9000/park03/result"));
		// 7 提交
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

 

 

 

四.通过eclipse运行mapreduce

运行 MapReduce 程序前,务必将 /home/hadoop/hadoop/etc/hadoop 中将有修改过的配置文件(如伪分布式需要core-site.xmlhdfs-site.xml),以及log4j.properties复制到WordCount 项目下的 src文件夹(~/workspace/WordCount/src)中。

原因:

在使用 Eclipse 运行 MapReduce 程序时,会读取 Hadoop-Eclipse-PluginAdvanced parameters作为 Hadoop 运行参数,如果未进行修改,则默认的参数其实就是单机(非分布式)参数,因此程序运行时是读取本地目录而不是HDFS 目录,就会提示Input 路径不存在。

Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/home/hadoop/workspace/WordCountProject/input

所以需要将配置文件拷贝到项目中的 src 目录,来覆盖这些参数。让程序能够正确运行。log4j用于记录程序的输出日记,需要log4j.properties 这个配置文件,如果没有拷贝该文件到项目中,运行程序后在Console 面板中会出现警告提示

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

这种情况不影响程序的正确运行的,但程序运行时无法看到任何提示消息,只能看到出错信息。

或者将如下内容复制到log4j.properties中

log4j.rootLogger=INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

拷贝完成后,右键点击 WordCount 选择 refresh 进行刷新(不会自动刷新,需要手动刷新),可以看到文件结构如下所示:

将Hadoop安装目录\bin\hadoop.dll复制到c:\windows\system32目录下

在windows环境变量中增加HADOOP_HOME=d:\hadoop-2.7.1

 

点击工具栏中的 Run 图标,或者右键点击 Project Explorer 中的 WordCount.java,选择 Run As -> Run on Hadoop,就可以运行 MapReduce 程序了。不过由于没有指定参数,运行时会提示 "Usage: wordcount ",需要通过Eclipse设定一下运行参数。

右键点击刚创建的 WordCount.java,选择 Run As -> Run Configurations,在此处可以设置运行时的相关参数(如果 Java Application 下面没有 WordCount,那么需要先双击 Java Application)。切换到 "Arguments"栏,在Program arguments 处填写参数 "/input  /output"

注意:在环境下测试,如果没有"/",会提示如下错误信息:

Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/input

 

也可以直接在代码中设置好输入参数。可将代码 main() 函数的 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 改为(没有测试):

// String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
String[] otherArgs=new String[]{"input","output"}; /* 直接设置输入参数 */  

 

设定参数后,运行程序,可以看到运行成功的提示,刷新DFS Location后也能看到输出的 output 文件夹。

 

 

 

五.在eclipse中导出可运行的jar包

1. 鼠标右键点击项目名称,并选择Export

2. 选择Java--> Runnable JAR file

3. 选择正确的项目配置信息,和输出路径以及重命名jar

4. 点击OK完成导出可运行jar(测试方法与运行Hadoop自带的Wordcount方法一样,不在赘述)

 

六、在hadoop的浏览器控制台查看日志

    有很多朋友喜欢用System.out.println输出日志,那么可以使用浏览器控制台查看日志

    前提:如果按照上面的步骤配置了log4j.properties,那这一节就忽略掉,因为日志不会输出到hadoop主机所在的日志文件中了。如果没有配置log4j.properties,则可以使用控制台查看日志输出

    进入自己网址:http://192.168.245.141:50070/logs/userlogs/

    下面列出的最后一条就是你最新运行mr程序的日志

    

    点开最后一条

    

      三个目录应该分别是mapper、reduce、driver的日志输出

Hadoop 6、第一个 mapreduce 程序 WordCount

Hadoop 6、第一个 mapreduce 程序 WordCount

1、程序代码

Map:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        String[] words = StringUtils.split(value.toString(), '' '');
        for(String word : words){
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

Reduce:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class wordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    protected void reduce(Text arg0, Iterable<IntWritable> arg1,Context arg2)
            throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable i : arg1){
            sum += i.get();
        }
        arg2.write(arg0, new IntWritable(sum));
    }
    
}

Main:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {

    public static void main(String[] args) {
        Configuration config = new Configuration();
        try {
            FileSystem fs = FileSystem.get(config);
            Job job = Job.getInstance(config);
            job.setJobName("wordCount");
            job.setJarByClass(RunJob.class);
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(wordCountReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            FileInputFormat.addInputPath(job, new Path("/usr/input/"));
            Path outPath = new Path("/usr/output/wc/");
            if(fs.exists(outPath)){
                fs.delete(outPath, true);
            }
            FileOutputFormat.setOutputPath(job, outPath);
            Boolean result = job.waitForCompletion(true);
            if(result){
                System.out.println("Job is complete!");
            }else{
                System.out.println("Job is fail!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

2、打包程序

将 Java 程序打成 Jar 包,并上传到 Hadoop 服务器上(任何一台在启动的 NameNode 节点即可)

 

3、数据源

数据源是如下:

hadoop java text hdfs
tom jack java text
job hadoop abc lusi
hdfs tom text

将该内容放到 txt 文件中,并放到 HDFS 的 /usr/input (是 HDFS 下不是 Linux 下),可以使用 Eclipse 插件上传:

 

4、执行 Jar 包

# hadoop jar jar路径  类的全限定名(Hadoop需要配置环境变量)
$ hadoop jar wc.jar com.raphael.wc.RunJob

执行完成以后会在 HDFS 的 /usr 下新创建一个 output 目录:

查看执行结果:

abc	1
hadoop	2
hdfs	2
jack	1
java	2
job	1
lusi	1
text	3
tom	2

完成了单词个数的统计。

今天关于一起学Hadoop——使用IDEA编写第一个MapReduce程序(Java和Python)idea开发hadoop的介绍到此结束,谢谢您的阅读,有关bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar、Eclipse下Hadoop的MapReduce开发之MapReduce编写、Eclipse中使用Hadoop插件编写Map/Reduce程序、Hadoop 6、第一个 mapreduce 程序 WordCount等更多相关知识的信息可以在本站进行查询。

本文标签: