# Spark: produce RDD[(X, X)] of all possible combinations from RDD[X]

Spark: produce RDD[(X, X)] of all possible combinations from RDD[X]

Asked on January 12, 2019 in

Here the Cartesian product and combinations are two seperate things, An RDD of size rdd.size() ^ 2 is created with the cartesian  and combinations will create an RDD of size rdd.size() choose 2

```val rdd = sc.parallelize(1 to 5)
val combinations = rdd.cartesian(rdd).filter{ case (a,b) => a < b }`.
combinations.collect()
```

Make sure that this will only work if an ordering is defined on the elements of the list, Here we use <. This one only works for choosing two but can easily be extended by making sure the relationship a < b for all a and b in the sequence.

In this we have seen cartesian will give n^2 elements of the cartesian product of the RDD with itself. This algorithm computes the combinations (n,2) of an RDD without having to compute the n^2 elements first: (The String is used as type, generalizing to a type T takes some plumbing with classtags that would obscure the purpose here)

Here more space efficient as it calculates only the C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2) elements rather of the n^2 of the cartesian product.

```import org.apache.spark.rdd._

def combs(rdd:RDD[String]):RDD[(String,String)] = {
val count = rdd.count
if (rdd.count < 2) {
sc.makeRDD[(String,String)](Seq.empty)
} else if (rdd.count == 2) {
val values = rdd.collect
sc.makeRDD[(String,String)](Seq((values(0), values(1))))
} else {
val elem = rdd.take(1)
val elemRdd = sc.makeRDD(elem)
val subtracted = rdd.subtract(elemRdd)
val comb = subtracted.map(e => (elem(0),e))
comb.union(combs(subtracted))
}
}
```

By the cartesian transformation Spark RDD is gets supported.

For instance:

```val rdd = sc.parallelize(1 to 5)
val cartesian = rdd.cartesian(rdd)
cartesian.collect

Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5),
(2,1), (2,2), (2,3), (2,4), (2,5),
(3,1), (3,2), (3,3), (3,4), (3,5),
(4,1), (4,2), (4,3), (4,4), (4,5),
(5,1), (5,2), (5,3), (5,4), (5,5))
```

This is supported natively by a Spark RDD with the `cartesian` transformation.

e.g.:

``````val rdd = sc.parallelize(1 to 5)
val cartesian = rdd.cartesian(rdd)
cartesian.collect

Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5),
(2,1), (2,2), (2,3), (2,4), (2,5),
(3,1), (3,2), (3,3), (3,4), (3,5),
(4,1), (4,2), (4,3), (4,4), (4,5),
(5,1), (5,2), (5,3), (5,4), (5,5))``````

This creates all combinations (n, 2) and works for any RDD without requiring any ordering on the elements of RDD.

``````val rddWithIndex = rdd.zipWithIndex
rddWithIndex.cartesian(rddWithIndex).filter{case(a, b) => a._2 < b._2}.map{case(a, b) => (a._1, b._1)}``````

a._2 and b._2 are the indices, while a._1 and b._1 are the elements of the original RDD.

As discussed, `cartesian` will give you n^2 elements of the cartesian product of the RDD with itself.
This algorithm computes the combinations (n,2) of an RDD without having to compute the n^2 elements first: (used String as type, generalizing to a type T takes some plumbing with classtags that would obscure the purpose here)

This is probably less time efficient that cartesian + filtering due to the iterative `count` and `take` actions that forces the computation of the RDD, but more space efficient as it calculates only the `C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2)` elements instead of the `n^2` of the cartesian product.

import org.apache.spark.rdd._

def combs(rdd:RDD[String]):RDD[(String,String)] = {
val count = rdd.count
if (rdd.count < 2) {
sc.makeRDD[(String,String)](Seq.empty)
} else if (rdd.count == 2) {
val values = rdd.collect
sc.makeRDD[(String,String)](Seq((values(0), values(1))))
} else {
val elem = rdd.take(1)
val elemRdd = sc.makeRDD(elem)
val subtracted = rdd.subtract(elemRdd)
val comb = subtracted.map(e => (elem(0),e))
comb.union(combs(subtracted))
}
}

This creates all combinations (n, 2) and works for any RDD without requiring any ordering on the elements of RDD.

This creates all combinations (n, 2) and works for any RDD without requiring any ordering on the elements of RDD.

``````val rddWithIndex = rdd.zipWithIndex
rddWithIndex.cartesian(rddWithIndex).filter{case(a, b) => a._2 < b._2}.map{case(a, b) => (a._1, b._1)}``````

a._2 and b._2 are the indices, while a._1 and b._1 are the elements of the original RDD.

Example:

Note that, no ordering is defined on the maps here.

``````val m1 = Map('a' -> 1, 'b' -> 2)
val m2 = Map('c' -> 3, 'a' -> 4)
val m3 = Map('e' -> 5, 'c' -> 6, 'b' -> 7)
val rdd = sc.makeRDD(Array(m1, m2, m3))
val rddWithIndex = rdd.zipWithIndex
rddWithIndex.cartesian(rddWithIndex).filter{case(a, b) => a._2 < b._2}.map{case(a, b) => (a._1, b._1)}.collect

OUTPUT:
``````
``Array((Map(a -> 1, b -> 2),Map(c -> 3, a -> 4)), (Map(a -> 1, b -> 2),Map(e -> 5, c -> 6, b -> 7)), (Map(c -> 3, a -> 4),Map(e -> 5, c -> 6, b -> 7)))``
`` ``