Reshaping/Pivoting data in Spark RDD and/or Spark DataFrames

Reshaping/Pivoting data in Spark RDD and/or Spark DataFrames

Asked on January 8, 2019 in Apache-spark.
Add Comment


  • 3 Answer(s)

    Here  pivot function is used on GroupedData for the version spark 1.6 and provide aggregate expression.

    pivoted = (df
    .groupBy("ID", "Age")
    .pivot(
    "Country",
    ['US', 'UK', 'CA']) # Optional list of levels
    .sum("Score")) # alternatively you can use .agg(expr))
    pivoted.show()
    ## +---+---+---+---+---+
    ## | ID|Age| US| UK| CA|
    ## +---+---+---+---+---+
    ## |X01| 41| 3 | 1 | 2 |
    ## |X02| 72| 4 | 6 | 7 |
    ## +---+---+---+---+---+
    

    Here we can both boost performance and serve as an internal filter when levels are provided.

    Answered on January 8, 2019.
    Add Comment

    Here this doesn’t hardwire the column names by approaching native spark. It’s based on aggregateByKey, and For collecting the columns dictionary can be used to that appear for each key. And all the column names are placed together to create the final dataframe. [Prior version used jsonRDD after emitting a dictionary for each record, but this is more efficient.] This will be easy modification by restricting to a specific list of columns, or excluding ones like XX.

    In this variation is used, which counts the number of times that each of a variable number of events occurs for each ID, generating one column per event type. The code is basically the same except it uses a collections.Counter instead of a dict in the seqFn to count the occurrences.

    from pyspark.sql.types import *
     
    rdd = sc.parallelize([('X01',41,'US',3),
                   ('X01',41,'UK',1),
                   ('X01',41,'CA',2),
                   ('X02',72,'US',4),
                   ('X02',72,'UK',6),
                   ('X02',72,'CA',7),
                   ('X02',72,'XX',8)])
     
    schema = StructType([StructField('ID', StringType(), True),
                  StructField('Age', IntegerType(), True),
                  StructField('Country', StringType(), True),
                  StructField('Score', IntegerType(), True)])
     
    df = sqlCtx.createDataFrame(rdd, schema)
     
    def seqPivot(u, v):
        if not u:
            u = {}
        u[v.Country] = v.Score
        return u
     
    def cmbPivot(u1, u2):
        u1.update(u2)
        return u1
     
    pivot = (
        df
        .rdd
        .keyBy(lambda row: row.ID)
        .aggregateByKey(None, seqPivot, cmbPivot)
    )
    columns = (
        pivot
        .values()
        .map(lambda u: set(u.keys()))
        .reduce(lambda s,t: s.union(t))
    )
    result = sqlCtx.createDataFrame(
        pivot
        .map(lambda (k, u): [k] + [u.get(c) for c in columns]),
        schema=StructType(
            [StructField('ID', StringType())] + 
            [StructField(c, IntegerType()) for c in columns]
        )
    )
    result.show()
    

    Which gives:

    ID   CA  UK  US  XX 
    X02   7  6   4    8 
    X01   2  1   3    null
    
    Answered on January 8, 2019.
    Add Comment

    In this correction is made to RDD:

    rdd = sc.parallelize([('X01',41,'US',3),
                       ('X01',41,'UK',1),
                       ('X01',41,'CA',2),
                       ('X02',72,'US',4),
                       ('X02',72,'UK',6),
                       ('X02',72,'CA',7),
                       ('X02',72,'XX',8)])
    

    After the correction is completed, this can be done:

    df.select($"ID", $"Age").groupBy($"ID").agg($"ID", first($"Age") as "Age")
    .join(
        df.select($"ID" as "usID", $"Country" as "C1",$"Score" as "US"),
        $"ID" === $"usID" and $"C1" === "US"
    )
    .join(
        df.select($"ID" as "ukID", $"Country" as "C2",$"Score" as "UK"),
        $"ID" === $"ukID" and $"C2" === "UK"
    )
    .join(
        df.select($"ID" as "caID", $"Country" as "C3",$"Score" as "CA"), 
        $"ID" === $"caID" and $"C3" === "CA"
    )
    .select($"ID",$"Age",$"US",$"UK",$"CA")
    
    Answered on January 8, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.