Spark: produce RDD[(X, X)] of all possible combinations from RDD[X]

Spark: produce RDD[(X, X)] of all possible combinations from RDD[X]

Asked on January 12, 2019 in Apache-spark.
Add Comment


  • 7 Answer(s)

    Here the Cartesian product and combinations are two seperate things, An RDD of size rdd.size() ^ 2 is created with the cartesian  and combinations will create an RDD of size rdd.size() choose 2

    val rdd = sc.parallelize(1 to 5)
    val combinations = rdd.cartesian(rdd).filter{ case (a,b) => a < b }`.
    combinations.collect()
    

    Make sure that this will only work if an ordering is defined on the elements of the list, Here we use <. This one only works for choosing two but can easily be extended by making sure the relationship a < b for all a and b in the sequence.

    Answered on January 12, 2019.
    Add Comment

    In this we have seen cartesian will give n^2 elements of the cartesian product of the RDD with itself. This algorithm computes the combinations (n,2) of an RDD without having to compute the n^2 elements first: (The String is used as type, generalizing to a type T takes some plumbing with classtags that would obscure the purpose here)

    Here more space efficient as it calculates only the C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2) elements rather of the n^2 of the cartesian product.

    import org.apache.spark.rdd._
     
    def combs(rdd:RDD[String]):RDD[(String,String)] = {
        val count = rdd.count
        if (rdd.count < 2) {
            sc.makeRDD[(String,String)](Seq.empty)
        } else if (rdd.count == 2) {
            val values = rdd.collect
            sc.makeRDD[(String,String)](Seq((values(0), values(1))))
        } else {
            val elem = rdd.take(1)
            val elemRdd = sc.makeRDD(elem)
            val subtracted = rdd.subtract(elemRdd)
            val comb = subtracted.map(e => (elem(0),e))
            comb.union(combs(subtracted))
        }
    }
    
    Answered on January 12, 2019.
    Add Comment

    By the cartesian transformation Spark RDD is gets supported.

    For instance:

    val rdd = sc.parallelize(1 to 5)
    val cartesian = rdd.cartesian(rdd)
    cartesian.collect
     
    Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5),
    (2,1), (2,2), (2,3), (2,4), (2,5),
    (3,1), (3,2), (3,3), (3,4), (3,5),
    (4,1), (4,2), (4,3), (4,4), (4,5),
    (5,1), (5,2), (5,3), (5,4), (5,5))
    
    Answered on January 12, 2019.
    Add Comment

    This is supported natively by a Spark RDD with the cartesian transformation.

    e.g.:

    val rdd = sc.parallelize(1 to 5)
    val cartesian = rdd.cartesian(rdd)
    cartesian.collect
    
    Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5), 
    (2,1), (2,2), (2,3), (2,4), (2,5), 
    (3,1), (3,2), (3,3), (3,4), (3,5), 
    (4,1), (4,2), (4,3), (4,4), (4,5), 
    (5,1), (5,2), (5,3), (5,4), (5,5))
    Answered on January 13, 2019.
    Add Comment
    1

    This creates all combinations (n, 2) and works for any RDD without requiring any ordering on the elements of RDD.

    val rddWithIndex = rdd.zipWithIndex
    rddWithIndex.cartesian(rddWithIndex).filter{case(a, b) => a._2 < b._2}.map{case(a, b) => (a._1, b._1)}

    a._2 and b._2 are the indices, while a._1 and b._1 are the elements of the original RDD.

    Answered on January 13, 2019.
    Add Comment

    As discussed, `cartesian` will give you n^2 elements of the cartesian product of the RDD with itself.
    This algorithm computes the combinations (n,2) of an RDD without having to compute the n^2 elements first: (used String as type, generalizing to a type T takes some plumbing with classtags that would obscure the purpose here)

    This is probably less time efficient that cartesian + filtering due to the iterative `count` and `take` actions that forces the computation of the RDD, but more space efficient as it calculates only the `C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2)` elements instead of the `n^2` of the cartesian product.

    import org.apache.spark.rdd._

    def combs(rdd:RDD[String]):RDD[(String,String)] = {
    val count = rdd.count
    if (rdd.count < 2) {
    sc.makeRDD[(String,String)](Seq.empty)
    } else if (rdd.count == 2) {
    val values = rdd.collect
    sc.makeRDD[(String,String)](Seq((values(0), values(1))))
    } else {
    val elem = rdd.take(1)
    val elemRdd = sc.makeRDD(elem)
    val subtracted = rdd.subtract(elemRdd)
    val comb = subtracted.map(e => (elem(0),e))
    comb.union(combs(subtracted))
    }
    }

    Answered on January 13, 2019.
    Add Comment

    This creates all combinations (n, 2) and works for any RDD without requiring any ordering on the elements of RDD.

    This creates all combinations (n, 2) and works for any RDD without requiring any ordering on the elements of RDD.

    val rddWithIndex = rdd.zipWithIndex
    rddWithIndex.cartesian(rddWithIndex).filter{case(a, b) => a._2 < b._2}.map{case(a, b) => (a._1, b._1)}

    a._2 and b._2 are the indices, while a._1 and b._1 are the elements of the original RDD.

    Example:

    Note that, no ordering is defined on the maps here.

    val m1 = Map('a' -> 1, 'b' -> 2)
    val m2 = Map('c' -> 3, 'a' -> 4)
    val m3 = Map('e' -> 5, 'c' -> 6, 'b' -> 7)
    val rdd = sc.makeRDD(Array(m1, m2, m3))
    val rddWithIndex = rdd.zipWithIndex
    rddWithIndex.cartesian(rddWithIndex).filter{case(a, b) => a._2 < b._2}.map{case(a, b) => (a._1, b._1)}.collect
    
    OUTPUT:
    
    Array((Map(a -> 1, b -> 2),Map(c -> 3, a -> 4)), (Map(a -> 1, b -> 2),Map(e -> 5, c -> 6, b -> 7)), (Map(c -> 3, a -> 4),Map(e -> 5, c -> 6, b -> 7)))
     
    Answered on January 13, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.