How do I add a new column to a Spark DataFrame (using PySpark) ?

How do I add a new column to a Spark DataFrame (using PySpark) ?

Asked on November 16, 2018 in Apache-spark.
Add Comment


  • 3 Answer(s)

    Here the arbitrary column cannot be added to a DataFrame in Spark. By using literals, new columns can be created.

    from pyspark.sql.functions import lit
     
    df = sqlContext.createDataFrame(
        [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
     
    df_with_x4 = df.withColumn("x4", lit(0))
    df_with_x4.show()
     
    ## +---+---+-----+---+
    ## | x1  | x2 | x3| x4|
    ## +---+---+-----+---+
    ## | 1|  a|  23.0|    0|
    ## | 3|  B| -23.0|   0|
    ## +---+---+-----+---+
    

    In this by transforming an existing column:

    from pyspark.sql.functions import exp
    df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
    df_with_x5.show()
    ## +---+---+-----+---+--------------------+
    ## |x1| x2|  x3|  x4|            x5|
    ## +---+---+-----+---+--------------------+
    ## | 1| a   | 23.0  | 0    | 9.744803446248903E9|
    ## | 3| B  |-23.0  | 0   |1.026187963170189...|
    ## +---+---+-----+---+--------------------+
    

    This is included using join:

    from pyspark.sql.functions import exp
    lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
    df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))
    ## +---+---+-----+---+--------------------+----+
    ## | x1| x2|x3|x4|        x5|                x6|
    ## +---+---+-----+---+--------------------+----+
    ## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
    ## | 3| B|-23.0| 0|1.026187963170189...|null|
    ## +---+---+-----+---+--------------------+----+
    

    or generated with function / udf:

    from pyspark.sql.functions import rand
    df_with_x7 = df_with_x6.withColumn("x7", rand())
    df_with_x7.show()
    ## +---+---+-----+---+--------------------+----+-------------------+
    ## | x1| x2|x 3|  x4|    x5|            x6|                                x7|
    ## +---+---+-----+---+--------------------+----+-------------------+
    ## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
    ## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
    ## +---+---+-----+---+--------------------+----+-------------------+
    

        Here the Performance-wise, built-in functions (pyspark.sql.functions), which map to Catalyst expression, are usually preferred over Python user defined functions.

    If there is need of adding content of an arbitrary RDD as a column:

    • By adding row numbers to existing data frame
    • calling zipWithIndex on RDD and convert it to data frame
    • And join both using index as a join key

     

    Answered on November 16, 2018.
    Add Comment

    By using a UDF, column can be added :

    df = sqlContext.createDataFrame(
        [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
    from pyspark.sql.functions import udf
    from pyspark.sql.types import *
     
    def valueToCategory(value):
        if value == 1: return 'cat1'
        elif value == 2: return 'cat2'
        ...
        else: return 'n/a'
    # NOTE: it seems that calls to udf() must be after SparkContext() is called
    udfValueToCategory = udf(valueToCategory, StringType())
    df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
    df_with_cat.show()
    ## +---+---+-----+---------+
    ## | x1| x2| x3| category|
    ## +---+---+-----+---------+
    ## | 1| a| 23.0|     cat1|
    ## | 3| B|-23.0|     n/a|
    ## +---+---+-----+---------+
    
    Answered on November 16, 2018.
    Add Comment

    Here this solution can be used for Spark 2.0:

    # assumes schema has 'age' column
    df.select('*', (df.age + 10).alias('agePlusTen'))
    
    Answered on November 16, 2018.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.