How do I get a SQL row_number equivalent for a Spark RDD ?

How do I get a SQL row_number equivalent for a Spark RDD ?

Asked on January 8, 2019 in Apache-spark.
Add Comment


  • 2 Answer(s)

    In the version Spark 1.4 The row_number() over (partition by … order by …) functionality was included.

    Here PySpark/DataFrames can be used.

    DataFrame test is created:

    from pyspark.sql import Row, functions as F
     
    testDF = sc.parallelize(
        (Row(k="key1", v=(1,2,3)),
        Row(k="key1", v=(1,4,7)),
        Row(k="key1", v=(2,2,3)),
        Row(k="key2", v=(5,5,5)),
        Row(k="key2", v=(5,5,9)),
        Row(k="key2", v=(7,5,5))
        )
    ).toDF()
    

    In this partitioned row number is added:

    from pyspark.sql.window import Window
     
    (testDF
      .select("k", "v",
            F.rowNumber()
            .over(Window
                .partitionBy("k")
                .orderBy("k")
              )
            .alias("rowNum")
            )
      .show()
    )
    +----+-------+------+
    | k  | v     |rowNum|
    +----+-------+------+
    |key1|[1,2,3]| 1    |
    |key1|[1,4,7]| 2    |
    |key1|[2,2,3]| 3    |
    |key2|[5,5,5]| 1    |
    |key2|[5,5,9]| 2    |
    |key2|[7,5,5]| 3    |
    +----+-------+------+
    
    Answered on January 8, 2019.
    Add Comment

    Here the solution is given in Python and this will be able to translate smoothly and continuously to Scala.

    This is done by:

    First simplifying the data:

    temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
    

    Here “real” key-value pair is temp2. It wiil be as follows:

    [
    ((3, 4), (5, 5, 5)),
    ((3, 4), (5, 5, 9)),
    ((3, 4), (7, 5, 5)),
    ((1, 2), (1, 2, 3)),
    ((1, 2), (1, 4, 7)),
    ((1, 2), (2, 2, 3))
    ]
    

    For reproducing the effect of the PARTITION BY, group-by function is used:

    temp3 = temp2.groupByKey()
    

    RDD with 2 rows is temp3 :

    [((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),
    ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
    

    Here rank function is applied for each value of the RDD. We could use the simple sorted function (the enumerate will create your row_number column) in python:

    temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
    

    Make sure that the particular order is implemented, The lambda function is created :

    lambda tuple : (tuple[0],-tuple[1],tuple[2])
    

    At the end (without the key argument function, it looks like that):

    [
    ((1, 2), ((1, 2, 3), 0)), 
    ((1, 2), ((1, 4, 7), 1)), 
    ((1, 2), ((2, 2, 3), 2)), 
    ((3, 4), ((5, 5, 5), 0)), 
    ((3, 4), ((5, 5, 9), 1)), 
    ((3, 4), ((7, 5, 5), 2))
    ]
    

    And this works,

    Answered on January 8, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.