How to Define Custom partitioner for Spark RDDs of equally sized partition where each partition has equal number of elements ?

How to Define Custom partitioner for Spark RDDs of equally sized partition where each partition has equal number of elements ?

Asked on January 11, 2019 in Apache-spark.
Add Comment


  • 2 Answer(s)

    Let us consider having an RDD keyed by sequential Ints, and total is knownl. Custom Partitioner will be as follows:

    class ExactPartitioner[V](
        partitions: Int,
        elements: Int)
      extends Partitioner {
     
      def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[Int]
        // `k` is assumed to go continuously from 0 to elements-1.
        return k * partitions / elements
      }
    }
    
    Answered on January 11, 2019.
    Add Comment

    By using pimp my library pattern full implementation is provided with an instance:

    import RDDConversions._
     
    trait RDDWrapper[T] {
      def rdd: RDD[T]
    }
     
    // TODO View bounds are deprecated, should use context bounds
    // Might need to change ClassManifest for ClassTag in spark 1.0.0
    case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
      rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
      // Here we use a single Long to try to ensure the sort is balanced,
      // but for really large dataset, we may want to consider
      // using a tuple of many Longs or even a GUID
      def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
        rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
        .grouped(numPartitions).map(t => (t._1._1, t._2))
    }
     
    case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
      def grouped(size: Int): RDD[T] = {
        // TODO Version where withIndex is cached
        val withIndex = rdd.mapPartitions(_.zipWithIndex)
     
        val startValues =
          withIndex.mapPartitionsWithIndex((i, iter) =>
            Iterator((i, iter.toIterable.last))).toArray().toList
          .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)
     
        withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
          case (value, index) => (startValues(i) + index.toLong, value)
        })
        .partitionBy(new Partitioner {
          def numPartitions: Int = size
          def getPartition(key: Any): Int =
            (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
        })
        .map(_._2)
      }
    }
    

    The alternative file:

    // TODO modify above to be implicit class, rather than have implicit conversions
    object RDDConversions {
      implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] =
        new RichRDD[T](rdd)
      implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
        rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd)
      implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
    }
    

    By considering it’s already sorted:

    import RDDConversions._
     
    yourRdd.grouped(2)
    
    Answered on January 11, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.