How to define partitioning of DataFrame?



  • 3 Answer(s)

    Spark >= 2.3.0

    SPARK-22614 visible the range partitioning.

    val partitionedByRange = df.repartitionByRange(42, $"k")
    partitionedByRange.explain
    // == Parsed Logical Plan ==
    // 'RepartitionByExpression ['k ASC NULLS FIRST], 42
    // +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
    //
    // == Analyzed Logical Plan ==
    // k: string, v: int
    // RepartitionByExpression [k#5 ASC NULLS FIRST], 42
    // +- Project [_1#2 AS k#5, _2#3 AS v#6]
    // +- LocalRelation [_1#2, _2#3]
    //
    // == Optimized Logical Plan ==
    // RepartitionByExpression [k#5 ASC NULLS FIRST], 42
    // +- LocalRelation [k#5, v#6]
    //
    // == Physical Plan ==
    // Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
    // +- LocalTableScan [k#5, v#6]
    

    SPARK-22389 visibles external format partitioning in the Data Source API v2.

     

    Spark >= 1.6.0

       In the Spark >= 1.6 it is applicable to use partitioning by column for query and caching. See: SPARK-11410 and SPARK-4849 using repartition method:

    val df = Seq(
    ("A", 1), ("B", 2), ("A", 3), ("C", 1)
    ).toDF("k", "v")
    val partitioned = df.repartition($"k")
    partitioned.explain
    // scala> df.repartition($"k").explain(true)
    // == Parsed Logical Plan ==
    // 'RepartitionByExpression ['k], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    //
    // == Analyzed Logical Plan ==
    // k: string, v: int
    // RepartitionByExpression [k#7], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    //
    // == Optimized Logical Plan ==
    // RepartitionByExpression [k#7], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    //
    // == Physical Plan ==
    // TungstenExchange hashpartitioning(k#7,200), None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    // +- Scan PhysicalRDD[_1#5,_2#6]
    

    Dissimilar RDDs Spark Dataset (including Dataset[Row] a.k.a DataFrame) cannot use custom partitioner as for now. This can be typically address that by creating an artificial partitioning column but it will not show  the same flexibility.

    Spark < 1.6.0:

    This can be done by pre-partition input data before you create a DataFrame

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.HashPartitioner
     
    val schema = StructType(Seq(
        StructField("x", StringType, false),
        StructField("y", LongType, false),
        StructField("z", DoubleType, false)
    ))
     
    val rdd = sc.parallelize(Seq(
    Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
    Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
    ))
     
    val partitioner = new HashPartitioner(5)
     
    val partitioned = rdd.map(r => (r.getString(0), r))
        .partitionBy(partitioner)
        .values
     
    val df = sqlContext.createDataFrame(partitioned, schema)
    

    Here the  DataFrame creation from an RDD requires only a easy map phase existing partition layout should be preserved*:

    assert(df.rdd.partitions == partitioned.partitions)
    

    The repartition existing DataFrame is done by the same way:

    sqlContext.createDataFrame(
        df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
        df.schema
    )
    

    Here this looks like it is not impossible. The question remains if it make sense at all. I will argue that most of the time it doesn’t:

    Repartitioning is an valuable process. In this most of the data has to be serialized, shuffled and deserialized. From the other hand number of operations which can benefit from a pre-partitioned data is relatively small and is further limited if internal API is not designed to leverage this property.

    • When joining in some scenarios it would require an internal support,
    • window functions calls with matching partitioner. Same as above, limited to a single window definition. It is already partitioned internally though, so pre-partitioning may be redundant,
    • simple aggregations with GROUP BY – it is possible to reduce memory footprint of the temporary buffers**, but overall cost is much higher. More or less equivalent to groupByKey.mapValues(_.reduce) (current behavior) vs reduceByKey (pre-partitioning). Unlikely to be useful in practice.
    • data compression with SqlContext.cacheTable. Since it looks like it is using run length encoding, applying OrderedRDDFunctions.repartitionAndSortWithinPartitions could improve compression ratio.

    Performance is highly dependent on a distribution of the keys. If it is skewed it will result in a suboptimal resource utilization. In the worst case scenario it will be impossible to finish the job at all.

     

    Answered on November 15, 2018.
    Add Comment

         Here in the Spark < 1.6  when we create a HiveContext, not the plain old SqlContext we can use the HiveQL DISTRIBUTE BY colX… (ensures each of N reducers gets non-overlapping ranges of x) & CLUSTER BY colX... (shortcut for Distribute By and Sort By)

    for instance;

    df.registerTempTable("partitionMe")
    hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")
    

        here we have no idea that how this fits in with Spark DF api. These keywords are not supported in the normal SqlContext

    (Note: Here there is no need of hive meta store to use the HiveContext)

    Answered on November 15, 2018.
    Add Comment

    Here the DataFrame is used and returned by:

    yourDF.orderBy(account)
    

        In there is no accruate way to use partitionBy on a DataFrame, only on a PairRDD, but when you sort a DataFrame, This will be used in it’s LogicalPlan and that will help when there is need to make calculations on each Account.

         I have faced  the same exact issue, with a dataframe that I want to partition by account. By assuming that when you say “want to have the data partitioned so that all of the transactions for an account are in the same Spark partition”, Here you need it for scale and performance, but your code doesn’t depend on it (like using mapPartitions() etc), right?

    Answered on November 15, 2018.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.