注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Code@Pig Home

喜欢背着一袋Code傻笑的Pig .. 忧美.欢笑.记忆.忘却 .之. 角落

 
 
 

日志

 
 

[hadoop] hello, hadoop!  

2010-12-31 19:17:18|  分类: lang_java |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
MapReduce,说了很久的东东了,自己也来体验一把。
例子来自于《Hadoop: The Definitive Guide (2nd)》


安装 hadoop
下载 hadoop-0.21.0.tgz,解压。然后修改 .profile:
export JAVA_HOME=/usr/local/jdk1.6.0
export HADOOP_INSTALL=/home/kasicass/sandbox/hadoop/hadoop-0.21.0
export PATH=$PATH:$HADOOP_INSTALL/bin


测试目标
'数据源'一行,是'year temperature'这样的数据。我们要找出某 year 对应的 temperature 的最大值。
----------------------------- 数据源 -----------------------------
$ cat input/sample.txt 
2010 10
2009 88
2009 30
2010 87
2011 8
2010 37
2011 100
---------------------------- 处理结果 ----------------------------
$ cat output/part-00000 
2009    88
2010    87
2011    100
---------------------------------------------------------------------


测试代码
对于 map(),输入数据是 key/value = (offset, "line"),比如:(0, "2010 10")、(8, "2009 88")、(16, "2009 30")。其中 offset 是行首相对于文件头的偏移。然后输出的数据是 key/value = (key, array of values),比如这里的 (year, [t1, t2, ...])。
对于 reduce(),输入数据是 map() 的输出,key/value = (key, one of value in array)。对于每次调用 reduce(),value 只是 array 中的一个元素。reduce() 的输出也是个 key/value。
代码中,Mapper/Reducer 的四个模板参数,分别对应 input key、input value、output key、output value。
-------------------- MaxTemperatureMapper.java ----------------
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureMapper extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {

    public void map(LongWritable key, Text value,
        OutputCollector<Text, IntWritable> output, Reporter reporter)
        throws IOException {

        String line  = value.toString();
        String tmp[] = line.split(" ");

        String year  = tmp[0];
        int temperature = Integer.parseInt(tmp[1]);

        output.collect(new Text(year), new IntWritable(temperature));
    }
}
-------------------- MaxTemperatureReducer.java ----------------
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureReducer extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,
        OutputCollector<Text, IntWritable> output, Reporter reporter)
        throws IOException {

        int maxValue = Integer.MIN_VALUE;
        while (values.hasNext()) {
            maxValue = Math.max(maxValue, values.next().get());
        }

        output.collect(key, new IntWritable(maxValue));
    }
}
-------------------- MaxTemperature.java ----------------
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

public class MaxTemperature {
    public static void main(String[] args) throws IOException {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperature <input path> <output path>");
            System.exit(-1);
        }

        JobConf conf = new JobConf(MaxTemperature.class);
        conf.setJobName("Max Temperature");

        FileInputFormat.addInputPath(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setMapperClass(MaxTemperatureMapper.class);
        conf.setReducerClass(MaxTemperatureReducer.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        JobClient.runJob(conf);
    }
}
---------------------------------------------------------------------
% export HADOOP_CLASSPATH=build/classes
% hadoop MaxTemperature input/sample.txt output
... blab. blab ...
then output/* are generated.
---------------------------------------------------------------------


总结
我这个测试只是在 single node 上跑,而 hadoop 可以帮你管理一堆的 node(machine),然后并发跑你的业务。
MapReduce 很适合处理海量数据、且数据间没啥关联性的业务,比如:把几个TB的文件全部转换成pdf格式。
  评论这张
 
阅读(1053)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017