案例基于hadoop 2.73,伪分布式集群
一,创建一个MapReduce应用
MapReduce应用结构如图:
1、引入maven依赖
4.0.0 com.hadoop beginner 1.0-SNAPSHOT jar beginner http://maven.apache.org UTF-8 org.apache.hadoop hadoop-core 1.2.1 org.apache.hadoop hadoop-common 2.7.3 org.apache.hadoop hadoop-client 2.7.3 au.com.bytecode opencsv 2.4 org.apache.maven.plugins maven-shade-plugin 1.2.1 package shade com.hadoop.FlightsByCarrier
2、MapReduce Driver代码
是用户与hadoop集群交互的客户端,在此配置MapReduce Job。
package com.hadoop;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class FlightsByCarrier { public static void main(String[] args) throws Exception { Job job = new Job(); job.setJarByClass(FlightsByCarrier.class); job.setJobName("FlightsByCarrier"); TextInputFormat.addInputPath(job, new Path(args[0])); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(FlightsByCarrierMapper.class); job.setReducerClass(FlightsByCarrierReducer.class); TextOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); }}
3、MapReduce Mapper代码
package com.hadoop;import au.com.bytecode.opencsv.CSVParser;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlightsByCarrierMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (key.get() > 0) { String[] lines = new CSVParser().parseLine(value.toString()); context.write(new Text(lines[8]), new IntWritable(1)); } }}
4、MapReduce Reducer代码
package com.hadoop;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlightsByCarrierReducer extends Reducer{ @Override protected void reduce(Text token, Iterable counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum+= count.get(); } context.write(token, new IntWritable(sum)); }}
5、利用idea maven打jar包
jar包名称为:beginner-1.0-SNAPSHOT.jar
6、上传到linux虚拟机
代码是在window系统中的idea编写完成,需要上传到Linux虚拟机。
7、运行MapReduce Driver,处理航班数据
hadoop jar beginner-1.0-SNAPSHOT.jar /user/root/2008.csv /user/root/output/flightsCount
运行情况如下:
18/01/09 02:29:52 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:803218/01/09 02:29:52 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.18/01/09 02:29:53 INFO input.FileInputFormat: Total input paths to process : 118/01/09 02:29:54 INFO mapreduce.JobSubmitter: number of splits:618/01/09 02:29:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1515491426576_000218/01/09 02:29:54 INFO impl.YarnClientImpl: Submitted application application_1515491426576_000218/01/09 02:29:55 INFO mapreduce.Job: The url to track the job: http://slave1:8088/proxy/application_1515491426576_0002/18/01/09 02:29:55 INFO mapreduce.Job: Running job: job_1515491426576_000218/01/09 02:30:01 INFO mapreduce.Job: Job job_1515491426576_0002 running in uber mode : false18/01/09 02:30:01 INFO mapreduce.Job: map 0% reduce 0%18/01/09 02:30:17 INFO mapreduce.Job: map 39% reduce 0%18/01/09 02:30:19 INFO mapreduce.Job: map 52% reduce 0%18/01/09 02:30:21 INFO mapreduce.Job: map 86% reduce 0%18/01/09 02:30:22 INFO mapreduce.Job: map 100% reduce 0%18/01/09 02:30:31 INFO mapreduce.Job: map 100% reduce 100%18/01/09 02:30:32 INFO mapreduce.Job: Job job_1515491426576_0002 completed successfully18/01/09 02:30:32 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=63087558 FILE: Number of bytes written=127016400 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=689434454 HDFS: Number of bytes written=197 HDFS: Number of read operations=21 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=6 Launched reduce tasks=1 Data-local map tasks=6 Total time spent by all maps in occupied slots (ms)=110470 Total time spent by all reduces in occupied slots (ms)=7315 Total time spent by all map tasks (ms)=110470 Total time spent by all reduce tasks (ms)=7315 Total vcore-milliseconds taken by all map tasks=110470 Total vcore-milliseconds taken by all reduce tasks=7315 Total megabyte-milliseconds taken by all map tasks=113121280 Total megabyte-milliseconds taken by all reduce tasks=7490560 Map-Reduce Framework Map input records=7009729 Map output records=7009728 Map output bytes=49068096 Map output materialized bytes=63087588 Input split bytes=630 Combine input records=0 Combine output records=0 Reduce input groups=20 Reduce shuffle bytes=63087588 Reduce input records=7009728 Reduce output records=20 Spilled Records=14019456 Shuffled Maps =6 Failed Shuffles=0 Merged Map outputs=6 GC time elapsed (ms)=6818 CPU time spent (ms)=38010 Physical memory (bytes) snapshot=1807056896 Virtual memory (bytes) snapshot=13627478016 Total committed heap usage (bytes)=1370488832 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=689433824 File Output Format Counters Bytes Written=197
8、查看航班数据
hadoop fs -cat /user/root/output/flightsCount/part-r-00000
结果如下:
9E 262208AA 604885AQ 7800AS 151102B6 196091CO 298455DL 451931EV 280575F9 95762FL 261684HA 61826MQ 490693NW 347652OH 197607OO 567159UA 449515US 453589WN 1201754XE 374510YV 254930
参考资料:
1、《Hadoop For Dummies》