大表关联小表时可以使用hadoop的DistributedCache把小标缓存到内存中,由hadoop分发这些内存到每台需要map操作的服务器上进行数据的清洗,关联。
例如有这样一份数据用户登陆信息login:
1,0,20121213
2,0,20121213 3,1,20121213 4,1,20121213 1,0,20121114第一列是用户id,二列是性别,第三列是登陆时间 。
需要将表中的用户id,替换成用户的名字,性别替换成汉字,然后统计他的登陆次数。
其中users表为:
1,张三,hubei
3,王五,tianjin4,赵六,guangzhou2,李四,beijingsex表为:
0,男
1,女map函数中进行维表的关联,输出为姓名,性别为key,登陆1次为value。
public class Mapclass extends Mapper{ private Map userMap = new HashMap (); private Map sexMap = new HashMap (); private Text oKey = new Text(); private Text oValue = new Text(); private String[] kv; @Override protected void setup(Context context) { BufferedReader in = null; // 从当前作业中获取要缓存的文件 try { Path[] paths = DistributedCache.getLocalCacheFiles(context .getConfiguration()); String uidNameAddr = null; String sidSex = null; for (Path path : paths) { if (path.toString().contains("users")) { in = new BufferedReader(new FileReader(path.toString())); while (null != (uidNameAddr = in.readLine())) { userMap.put(uidNameAddr.split(",", -1)[0], uidNameAddr.split(",", -1)[1]); } } else if (path.toString().contains("sex")) { in = new BufferedReader(new FileReader(path.toString())); while (null != (sidSex = in.readLine())) { sexMap.put(sidSex.split(",", -1)[0], sidSex.split(",", -1)[1]); } } } } catch (IOException e) { e.printStackTrace(); } finally { try { if (in != null) { in.close(); } } catch (IOException e) { e.printStackTrace(); } } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { kv = value.toString().split(","); // map join: 在map阶段过滤掉不需要的数据 if (userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])) { oKey.set(userMap.get(kv[0]) + "," + sexMap.get(kv[1])); oValue.set("1"); context.write(oKey, oValue); } }}
reduce函数:
public class Reduce extends Reducer{ private Text oValue = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sumCount = 0; for (Text val : values) { sumCount += Integer.parseInt(val.toString()); } oValue.set(String.valueOf(sumCount)); context.write(key, oValue); }}
main函数为:
public class MultiTableJoin extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = new Job(getConf(), "MultiTableJoin"); job.setJobName("MultiTableJoin"); job.setJarByClass(MultiTableJoin.class); job.setMapperClass(Mapclass.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); // 我们把第1、2个参数的地址作为要缓存的文件路径 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration()); DistributedCache.addCacheFile(new Path(otherArgs[1]).toUri(), job.getConfiguration()); FileInputFormat.addInputPath(job, new Path(otherArgs[2])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[3])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] arg0) throws Exception { String[] args = new String[4]; args[0] = "hdfs://172.16.0.87:9000/user/jeff/decli/sex"; args[1] = "hdfs://172.16.0.87:9000/user/jeff/decli/users"; args[2] = "hdfs://172.16.0.87:9000/user/jeff/decli/login"; args[3] = "hdfs://172.16.0.87:9000/user/jeff/decli/out"; int res = ToolRunner.run(new Configuration(), new MultiTableJoin(), args); System.exit(res); }}
计算的输出为:
张三,男 2
李四,男 1王五,女 1赵六,女 1