博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop join 之map side join
阅读量:6516 次
发布时间:2019-06-24

本文共 4606 字,大约阅读时间需要 15 分钟。

hot3.png

大表关联小表时可以使用hadoop的DistributedCache把小标缓存到内存中,由hadoop分发这些内存到每台需要map操作的服务器上进行数据的清洗,关联。

例如有这样一份数据用户登陆信息login:

1,0,20121213

2,0,20121213
3,1,20121213
4,1,20121213
1,0,20121114

第一列是用户id,二列是性别,第三列是登陆时间 。

需要将表中的用户id,替换成用户的名字,性别替换成汉字,然后统计他的登陆次数。

其中users表为:

1,张三,hubei

3,王五,tianjin
4,赵六,guangzhou
2,李四,beijing

sex表为:

0,男

1,女       

map函数中进行维表的关联,输出为姓名,性别为key,登陆1次为value。

public class Mapclass extends Mapper
 {    private Map
 userMap = new HashMap
();    private Map
 sexMap = new HashMap
();    private Text oKey = new Text();    private Text oValue = new Text();    private String[] kv;    @Override    protected void setup(Context context) {        BufferedReader in = null;        // 从当前作业中获取要缓存的文件        try {            Path[] paths = DistributedCache.getLocalCacheFiles(context                    .getConfiguration());            String uidNameAddr = null;            String sidSex = null;            for (Path path : paths) {                if (path.toString().contains("users")) {                    in = new BufferedReader(new FileReader(path.toString()));                    while (null != (uidNameAddr = in.readLine())) {                        userMap.put(uidNameAddr.split(",", -1)[0],                                uidNameAddr.split(",", -1)[1]);                    }                } else if (path.toString().contains("sex")) {                    in = new BufferedReader(new FileReader(path.toString()));                    while (null != (sidSex = in.readLine())) {                        sexMap.put(sidSex.split(",", -1)[0],                                sidSex.split(",", -1)[1]);                    }                }            }        } catch (IOException e) {            e.printStackTrace();        } finally {            try {                if (in != null) {                    in.close();                }            } catch (IOException e) {                e.printStackTrace();            }        }    }    @Override    protected void map(LongWritable key, Text value, Context context)            throws IOException, InterruptedException {        kv = value.toString().split(",");        // map join: 在map阶段过滤掉不需要的数据        if (userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])) {            oKey.set(userMap.get(kv[0]) + "," + sexMap.get(kv[1]));            oValue.set("1");            context.write(oKey, oValue);        }    }}

reduce函数:

public class Reduce extends Reducer
 {    private Text oValue = new Text();    @Override    protected void reduce(Text key, Iterable
 values, Context context)            throws IOException, InterruptedException {        int sumCount = 0;        for (Text val : values) {            sumCount += Integer.parseInt(val.toString());        }        oValue.set(String.valueOf(sumCount));        context.write(key, oValue);    }}

main函数为:

public class MultiTableJoin extends Configured implements Tool {    @Override    public int run(String[] args) throws Exception {        Job job = new Job(getConf(), "MultiTableJoin");        job.setJobName("MultiTableJoin");        job.setJarByClass(MultiTableJoin.class);        job.setMapperClass(Mapclass.class);        job.setReducerClass(Reduce.class);        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),                args).getRemainingArgs();        // 我们把第1、2个参数的地址作为要缓存的文件路径        DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),                job.getConfiguration());        DistributedCache.addCacheFile(new Path(otherArgs[1]).toUri(),                job.getConfiguration());        FileInputFormat.addInputPath(job, new Path(otherArgs[2]));        FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));        return job.waitForCompletion(true) ? 0 : 1;    }    public static void main(String[] arg0) throws Exception {        String[] args = new String[4];        args[0] = "hdfs://172.16.0.87:9000/user/jeff/decli/sex";        args[1] = "hdfs://172.16.0.87:9000/user/jeff/decli/users";        args[2] = "hdfs://172.16.0.87:9000/user/jeff/decli/login";        args[3] = "hdfs://172.16.0.87:9000/user/jeff/decli/out";        int res = ToolRunner.run(new Configuration(), new MultiTableJoin(),                args);        System.exit(res);    }}

计算的输出为:

张三,男    2

李四,男    1
王五,女    1
赵六,女    1

转载于:https://my.oschina.net/hanjiafu/blog/291870

你可能感兴趣的文章
FreeBinary 格式说明
查看>>
使用Spring Cloud和Docker构建微服务
查看>>
九州云实战人员为您揭秘成功部署OpenStack几大要点
查看>>
CloudCC:智能CRM究竟能否成为下一个行业风口?
查看>>
追求绿色数据中心
查看>>
Web开发初学指南
查看>>
探寻光存储没落的真正原因
查看>>
高通64位ARMv8系列服务器芯片商标命名:Centriq
查看>>
构建智能的新一代网络——专访Mellanox市场部副总裁 Gilad Shainer
查看>>
《数字视频和高清:算法和接口》一导读
查看>>
《中国人工智能学会通讯》——6.6 实体消歧技术研究
查看>>
如何在Windows查看端口占用情况及查杀进程
查看>>
洗茶,你误会了多少年?
查看>>
spring-aop
查看>>
android RecycleView Adapter简单封装
查看>>
Dart的数据库操作
查看>>
iOS打电话,发短信,发邮件,打开网址
查看>>
canvas学习笔记
查看>>
《Spring1之第十次站立会议》
查看>>
Unity Shader 噪声消融特效 - 剑灵死亡特效
查看>>