Primary keys with Apache Spark

Primary keys with Apache Spark

Asked on January 11, 2019 in Apache-spark.
Add Comment


  • 3 Answer(s)

    For the scala

    In this we need the unique numbers in which zipWithUniqueId is used and DataFrame is recreated. Let us imports and add dummy data:

    import sqlContext.implicits._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructType, StructField, LongType}
     
    val df = sc.parallelize(Seq(
        ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
    

    For the more usage, just extract schema :

    val schema = df.schema
    

    After the id field is added:

    val rows = df.rdd.zipWithUniqueId.map{
        case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
    

    Then dataframe is created:

    val dfWithPK = sqlContext.createDataFrame(
      rows, StructType(StructField("id", LongType, false) +: schema.fields))
    

    Similarly used in python:

    from pyspark.sql import Row
    from pyspark.sql.types import StructField, StructType, LongType
     
    row = Row("foo", "bar")
    row_with_index = Row(*["id"] + df.columns)
     
    df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
     
    def make_row(columns):
        def _make_row(row, uid):
            row_dict = row.asDict()
            return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
        return _make_row
     
    f = make_row(df.columns)
     
    df_with_pk = (df.rdd
        .zipWithUniqueId()
        .map(lambda x: f(*x))
        .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
    

    When the consecutive number is considered, And then zipWithUniqueId is replaced with zipWithIndex.

    Here this is Directly with DataFrame API:

    In this monotonicallyIncreasingId function can be used and there is no need of consecutive numbers:

    import org.apache.spark.sql.functions.monotonicallyIncreasingId
     
    df.withColumn("id", monotonicallyIncreasingId).show()
    // +---+----+-----------+
    // |foo| bar| id        |
    // +---+----+-----------+
    // | a |-1.0|17179869184|
    // | b |-2.0|42949672960|
    // | c |-3.0|60129542144|
    // +---+----+-----------+
    

    Make sure that we can also use rowNumber window function:

    from pyspark.sql.window import Window
    from pyspark.sql.functions import rowNumber
     
    w = Window().orderBy()
    df.withColumn("id", rowNumber().over(w)).show()
    
    Answered on January 11, 2019.
    Add Comment

    Here alternatively the below code can be used.

    from pyspark.sql.functions import monotonically_increasing_id
    df.withColumn("id", monotonically_increasing_id()).show()
    

    Make sure that the 2nd argument of df.withColumn is monotonically_increasing_id() not monotonically_increasing_id .

    Answered on January 11, 2019.
    Add Comment

    Here the answer is straightforward for the case where zipWithIndex() is the desired behavior

    In this condition pyspark is used.

    # read the initial dataframe without index
    dfNoIndex = sqlContext.read.parquet(dataframePath)
    # Need to zip together with a unique integer
     
    # First create a new schema with uuid field appended
    newSchema = StructType([StructField("uuid", IntegerType(), False)]
                      + dfNoIndex.schema.fields)
    # zip with the index, map it to a dictionary which includes new field
    df = dfNoIndex.rdd.zipWithIndex()\
                      .map(lambda (row, id): {k:v
                               for k, v
                               in row.asDict().items() + [("uuid", id)]})\
                      .toDF(newSchema)
    
    Answered on January 11, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.