Primary keys with Apache Spark

Primary keys with Apache Spark

Asked on January 11, 2019 in Apache-spark.
Add Comment

  • 3 Answer(s)

    For the scala

    In this we need the unique numbers in which zipWithUniqueId is used and DataFrame is recreated. Let us imports and add dummy data:

    import sqlContext.implicits._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructType, StructField, LongType}
    val df = sc.parallelize(Seq(
        ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

    For the more usage, just extract schema :

    val schema = df.schema

    After the id field is added:

    val rows ={
        case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

    Then dataframe is created:

    val dfWithPK = sqlContext.createDataFrame(
      rows, StructType(StructField("id", LongType, false) +: schema.fields))

    Similarly used in python:

    from pyspark.sql import Row
    from pyspark.sql.types import StructField, StructType, LongType
    row = Row("foo", "bar")
    row_with_index = Row(*["id"] + df.columns)
    df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
    def make_row(columns):
        def _make_row(row, uid):
            row_dict = row.asDict()
            return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
        return _make_row
    f = make_row(df.columns)
    df_with_pk = (df.rdd
        .map(lambda x: f(*x))
        .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

    When the consecutive number is considered, And then zipWithUniqueId is replaced with zipWithIndex.

    Here this is Directly with DataFrame API:

    In this monotonicallyIncreasingId function can be used and there is no need of consecutive numbers:

    import org.apache.spark.sql.functions.monotonicallyIncreasingId
    df.withColumn("id", monotonicallyIncreasingId).show()
    // +---+----+-----------+
    // |foo| bar| id        |
    // +---+----+-----------+
    // | a |-1.0|17179869184|
    // | b |-2.0|42949672960|
    // | c |-3.0|60129542144|
    // +---+----+-----------+

    Make sure that we can also use rowNumber window function:

    from pyspark.sql.window import Window
    from pyspark.sql.functions import rowNumber
    w = Window().orderBy()
    df.withColumn("id", rowNumber().over(w)).show()
    Answered on January 11, 2019.
    Add Comment

    Here alternatively the below code can be used.

    from pyspark.sql.functions import monotonically_increasing_id
    df.withColumn("id", monotonically_increasing_id()).show()

    Make sure that the 2nd argument of df.withColumn is monotonically_increasing_id() not monotonically_increasing_id .

    Answered on January 11, 2019.
    Add Comment

    Here the answer is straightforward for the case where zipWithIndex() is the desired behavior

    In this condition pyspark is used.

    # read the initial dataframe without index
    dfNoIndex =
    # Need to zip together with a unique integer
    # First create a new schema with uuid field appended
    newSchema = StructType([StructField("uuid", IntegerType(), False)]
                      + dfNoIndex.schema.fields)
    # zip with the index, map it to a dictionary which includes new field
    df = dfNoIndex.rdd.zipWithIndex()\
                      .map(lambda (row, id): {k:v
                               for k, v
                               in row.asDict().items() + [("uuid", id)]})\
    Answered on January 11, 2019.
    Add Comment

  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.