博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用MapReduce实现二度人脉搜索算法
阅读量:7047 次
发布时间:2019-06-28

本文共 4627 字,大约阅读时间需要 15 分钟。

一,背景介绍

      在新浪微博、人人网等社交网站上,为了使用户在网络上认识更多的朋友,社交网站往往提供类似“你可能感兴趣的人”、“间接关注推荐”等好友推荐的功能,其中就包含了二度人脉算法。

二,算法实现

原始数据集测试:

a    bb    ca    cb    dc    ee    ce    f

数据集说明:为关注关系,即a关注b,b关注c和d,所以a的二度人脉应该是d和c,而c已经被a关注,所以应该舍去,自己不能二度人脉是自己,如c关注e,而e又关注c

代码实现,代码用了两个Job实现的

难点:两个job如何先后执行

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.HashSet;import java.util.Random;import java.util.Set;public class De2Friends {    public static class De2Mapper1 extends Mapper
{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line =value.toString(); String[] strArr = line.split("\t"); if(strArr.length==2) { //关注的人 context.write(new Text(strArr[0]), new Text("1" + strArr[1])); //被关注的人 context.write(new Text(strArr[1]), new Text("0" + strArr[0])); } } } public static class De2Reducer1 extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { Set
follows= new HashSet
(); Set
fans=new HashSet
(); for(Text val :values ){ String friend =val.toString(); if(friend.startsWith("1")){ context.write(key,new Text(friend));//输出用户已经关注的人,一度人脉 follows.add(friend.substring(1)); } if(friend.startsWith("0")){ fans.add(friend.substring(1)); } } for(String fan : fans) for(String follow:follows) { if (!fan.equals(follow)) { context.write(new Text(fan),new Text("2"+follow)); } } } } public static class De2Mapper2 extends Mapper
{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line =value.toString(); String[] strArr=line.split("\t"); if(strArr.length==2) { context.write(new Text(strArr[0]), new Text(strArr[1]));//输出用户的一度好友和二度好友 } } } public static class De2Reducer2 extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { Set
firstFriend = new HashSet
(); Set
secondFriend =new HashSet
(); for(Text val:values){ String friend =val.toString(); if(friend.contains("1")){ firstFriend.add(friend.substring(1)); } if(friend.contains("2")){ secondFriend.add(friend.substring(1)); } } for(String second:secondFriend) { if(!(firstFriend.contains(second))) context.write(key,new Text(second)); //输出好友的二度人脉 } } } public static void main(String[] args) throws Exception{ System.setProperty("hadoop.home.dir","E:\\softs\\majorSoft\\hadoop-2.7.5"); Configuration conf =new Configuration(); conf.set("mapreduce.app-submission.cross-platform", "true"); Path fileInput = new Path("hdfs://mycluster/testFile/qq.txt"); Path tempDir = new Path("hdfs://mycluster/output/deg2friend-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); Path fileOutput = new Path("hdfs://mycluster/output/qq"); Job job = Job.getInstance(conf,"de2Firend"); job.setJar("E:\\bigData\\hadoopDemo\\out\\artifacts\\wordCount_jar\\hadoopDemo.jar"); job.setJarByClass(De2Friends.class); job.setMapperClass(De2Mapper1.class); job.setReducerClass(De2Reducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(1); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,fileInput); FileOutputFormat.setOutputPath(job,tempDir); job.waitForCompletion(true);//必须有,感觉是等job执行完才让job2执行的效果,即阻塞吧 Job job2 = Job.getInstance(conf,"de2Firend"); job2.setJar("E:\\bigData\\hadoopDemo\\out\\artifacts\\wordCount_jar\\hadoopDemo.jar"); job2.setJarByClass(De2Friends.class); job2.setMapperClass(De2Mapper2.class); job2.setReducerClass(De2Reducer2.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2,tempDir); FileOutputFormat.setOutputPath(job2,fileOutput); System.exit(job2.waitForCompletion(true)?0:1); }}

结果如下:

a    db    eb    fc    f

 

转载于:https://www.cnblogs.com/ksWorld/p/8670353.html

你可能感兴趣的文章
大数据多维分析平台的实践
查看>>
Python常用函数
查看>>
二分法习题HDU2199
查看>>
strcpy,sprintf,memcpy的区别
查看>>
web框架
查看>>
线程互斥锁
查看>>
spring colud 博客
查看>>
Redis安装
查看>>
JavaScript 自学过程
查看>>
GDAL源码剖析(三)之Swig编译和帮助文档生成
查看>>
Android学习笔记:NDK入门一些总结
查看>>
Project Euler Problem 3: Largest prime factor
查看>>
颜色区分
查看>>
微信认证结果拆分为资质审核和名称审核
查看>>
Sass和Compass入门
查看>>
重装系统后删除Cygwin文件夹
查看>>
享元模式
查看>>
M4修改外部晶振8M和25M晶振的方法
查看>>
六、python小功能记录——递归删除bin和obj内文件
查看>>
EF架构~数据分批批量提交
查看>>