sparkjoin算⼦
java
1 /**
2 *join算⼦是根据两个rdd的key进⾏关联操作,类似scala中的拉链操作,返回的新元素为,⼀对⼀ 3 *@author Tele 4 * 5 */6 public class JoinDemo {
7 private static SparkConf conf = new SparkConf().setMaster(\"local\").setAppName(\"joindemo\"); 8 private static JavaSparkContext jsc = new JavaSparkContext(conf); 9 public static void main(String[] args) {10
11 //假设每个学⽣只有⼀门成绩
12 List> studentList = Arrays.asList(13 new Tuple2(1,\"tele\"),14 new Tuple2(2,\"yeye\"), 15 new Tuple2(3,\"wyc\")16 );1718 List> scoreList = Arrays.asList(19 new Tuple2(1,100),20 new Tuple2(1,1100),21 new Tuple2(2,90),22 new Tuple2(3,70)23 );24 2526 JavaPairRDD studentRDD = jsc.parallelizePairs(studentList);27 JavaPairRDD scoreRDD = jsc.parallelizePairs(scoreList);2829 //注意此处⽣成的新rdd对的参数类型,第⼀个泛型参数为key的类型,Tuple2的String与Integer分别对应原rdd的value类型30 JavaPairRDD> result = studentRDD.join(scoreRDD);3132 result.foreach(new VoidFunction>>() {33 private static final long serialVersionUID = 1L;3435 @Override
36 public void call(Tuple2> t) throws Exception {37 System.out.println(\"学号:\" + t._1);38 System.out.println(\"姓名:\" + t._2._1);39 System.out.println(\"成绩:\" + t._2._2);40 System.out.println(\"=================\");41 }42 });43
44 jsc.close();45 46 }47 }
scala
1 object JoinDemo {
2 def main(args: Array[String]): Unit = {
3 val conf = new SparkConf().setMaster(\"local\").setAppName(\"joindemo\"); 4 val sc = new SparkContext(conf); 5
6 val studentArr = Array((1,\"tele\"),(2,\"yeye\"),(3,\"wyc\")); 7 val scoreArr = Array((1,100),(2,80),(3,100)); 8
9 val studentRDD = sc.parallelize(studentArr,1);10 val scoreRDD = sc.parallelize(scoreArr,1);11
12 val result = studentRDD.join(scoreRDD);13
14 result.foreach(t=>{15 println(\"学号:\" + t._1);16 println(\"姓名:\" + t._2._1);17 println(\"成绩:\" + t._2._2);18 println(\"============\")19 })20 21 }22 }