Join two ordinary RDDs with/without Spark SQL

Join two ordinary RDDs with/without Spark SQL

Asked on January 7, 2019 in Apache-spark.
Add Comment


  • 4 Answer(s)

    The values are Iterable which is joined RDD , So the output will not be identical to ordinary table joining.

    The another possibility:

    val mappedItems = items.map(item => (item.companyId, item))
    val mappedComp = companies.map(comp => (comp.companyId, comp))
    mappedItems.join(mappedComp).take(10).foreach(println)
    

    Output will be:

    (c1,(Item(1,first,2,c1),Company(c1,company-1,city-1)))
    (c1,(Item(2,second,2,c1),Company(c1,company-1,city-1)))
    (c2,(Item(3,third,2,c2),Company(c2,company-2,city-2)))
    
    Answered on January 7, 2019.
    Add Comment

    By Using Scala, Lets consider two RDDs:

    emp: (empid, ename, dept)

    dept: (dname, dept)

    This is the another method:

    //val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))
    val emp = sc.parallelize(Seq(("jordan",10), ("ricky",20), ("matt",30), ("mince",35), ("rhonda",30)))
    val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))
    //val shifted_fields_emp = emp.map(t => (t._3, t._1, t._2))
    val shifted_fields_emp = emp.map(t => (t._2, t._1))
    val shifted_fields_dept = dept.map(t => (t._2,t._1))
    shifted_fields_emp.join(shifted_fields_dept)
    // Create emp RDD
    val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))
    // Create dept RDD
    val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))
    // Establishing that the third field is to be considered as the Key for the emp RDD
    val manipulated_emp = emp.keyBy(t => t._3)
    // Establishing that the second field need to be considered as the Key for dept RDD
    val manipulated_dept = dept.keyBy(t => t._2)
    // Inner Join
    val join_data = manipulated_emp.join(manipulated_dept)
    // Left Outer Join
    val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept)
    // Right Outer Join
    val right_outer_join_data = manipulated_emp.rightOuterJoin(manipulated_dept)
    // Full Outer Join
    val full_outer_join_data = manipulated_emp.fullOuterJoin(manipulated_dept)
    // Formatting the Joined Data for better understandable (using map)
    val cleaned_joined_data = join_data.map(t => (t._2._1._1, t._2._1._2, t._1, t._2._2._1))
    

    The output will be:

    // Print the output cleaned_joined_data on the console
    scala> cleaned_joined_data.collect()
    res13: Array[(Int, String, Int, String)] = Array((3,matt,30,hive), (5,rhonda,30,hive), (2,ricky,20,spark), (1,jordan,10,hadoop))
    
    Answered on January 7, 2019.
    Add Comment

    Alternatively the below code can be used for solving the issue, this will work.

    scala> case class Item(id:String, name:String, unit:Int, companyId:String)
     
    scala> case class Company(companyId:String, name:String, city:String)
     
    scala> val i1 = Item("1", "first", 2, "c1")
     
    scala> val i2 = i1.copy(id="2", name="second")
     
    scala> val i3 = i1.copy(id="3", name="third", companyId="c2")
     
    scala> val items = sc.parallelize(List(i1,i2,i3))
    items: org.apache.spark.rdd.RDD[Item] = ParallelCollectionRDD[14] at parallelize at <console>:20
     
    scala> val c1 = Company("c1", "company-1", "city-1")
     
    scala> val c2 = Company("c2", "company-2", "city-2")
     
    scala> val companies = sc.parallelize(List(c1,c2))
     
    scala> val groupedItems = items.groupBy( x => x.companyId)
    groupedItems: org.apache.spark.rdd.RDD[(String, Iterable[Item])] = ShuffledRDD[16] at groupBy at <console>:22
     
    scala> val groupedComp = companies.groupBy(x => x.companyId)
    groupedComp: org.apache.spark.rdd.RDD[(String, Iterable[Company])] = ShuffledRDD[18] at groupBy at <console>:20
     
    scala> groupedItems.join(groupedComp).take(10).foreach(println)
     
    14/12/12 00:52:32 INFO DAGScheduler: Job 5 finished: take at <console>:35, took 0.021870 s
    (c1,(CompactBuffer(Item(1,first,2,c1), Item(2,second,2,c1)),CompactBuffer(Company(c1,company-1,city-1))))
    (c2,(CompactBuffer(Item(3,third,2,c2)),CompactBuffer(Company(c2,company-2,city-2))))
    
    Answered on January 7, 2019.
    Add Comment
    val mappedItems = items.map(item => (item.companyId, item))
    val mappedComp = companies.map(comp => (comp.companyId, comp))
    mappedItems.join(mappedComp).take(10).foreach(println)

    The output would be:

    (c1,(Item(1,first,2,c1),Company(c1,company-1,city-1)))
    (c1,(Item(2,second,2,c1),Company(c1,company-1,city-1)))
    (c2,(Item(3,third,2,c2),Company(c2,company-2,city-2)))
    Answered on March 5, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.