Derive multiple columns from a single column in a Spark DataFrame

Derive multiple columns from a single column in a Spark DataFrame

Asked on November 21, 2018 in Apache-spark.
Add Comment


  • 3 Answer(s)

    The UDF can only return a single column at the time. In this there are two different ways to can overcome this limitation:

    Return a column of complex type. The solution is a StructType but consider ArrayType or MapType as well.

    import org.apache.spark.sql.functions.udf
     
    val df = Seq(
        (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
    ).toDF("x", "y", "z")
     
    case class Foobar(foo: Double, bar: Double)
     
    val foobarUdf = udf((x: Long, y: Double, z: String) =>
        Foobar(x * y, z.head.toInt * y))
     
    val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
    df1.show
    // +---+----+---+------------+
    // | x|   y| z| foobar|
    // +---+----+---+------------+
    // | 1| 3.0| a| [3.0,291.0]|
    // | 2|-1.0| b|[-2.0,-98.0]|
    // | 3| 0.0| c| [0.0,0.0]|
    // +---+----+---+------------+
     
    df1.printSchema
    // root
    // |-- x: long (nullable = false)
    // |-- y: double (nullable = false)
    // |-- z: string (nullable = true)
    // |-- foobar: struct (nullable = true)
    // | |-- foo: double (nullable = false)
    // | |-- bar: double (nullable = false)
    

    Here it is easily done and there will be no need for that

     

    Switch to RDD, reshape and rebuild DF:

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
     
    def foobarFunc(x: Long, y: Double, z: String): Seq[Any] =
        Seq(x * y, z.head.toInt * y)
     
    val schema = StructType(df.schema.fields ++
        Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
     
    val rows = df.rdd.map(r => Row.fromSeq(
        r.toSeq ++
        foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
     
    val df2 = sqlContext.createDataFrame(rows, schema)
     
    df2.show
    // +---+----+---+----+-----+
    // | x|   y| z| foo| bar|
    // +---+----+---+----+-----+
    // | 1| 3.0| a| 3.0|291.0|
    // | 2|-1.0| b|-2.0|-98.0|
    // | 3| 0.0| c| 0.0| 0.0|
    // +---+----+---+----+-----+
    

     

    Answered on November 21, 2018.
    Add Comment

    Consider that after the function there will be a sequence of elements,

    For instance:

    val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
    df.show
    +------------------+---+
    | infoComb         |age|
    +------------------+---+
    |Mike,1986,Toronto| 30|
    | Andre,1980,Ottawa| 36|
    | jill,1989,London| 27|
    +------------------+---+
    

    Now with this infoComb is that we can start split the string and get more columns with:

    df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
    +-----+----------+-------+---+
    | name|yearOfBorn| city|age|
    +-----+----------+-------+---+
    |Mike | 1986     |Toronto| 30|
    |Andre| 1980     | Ottawa| 36|
    | jill| 1989     | London| 27|
    +-----+----------+-------+---+
    

    This could be helpful.

     

     

    Answered on November 21, 2018.
    Add Comment

    When resulting columns, there will be of the same length as the original one, We could create brand new columns with withColumn function and by applying an udf. After that  just drop the original column.

    For instance:

    val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
    .withColumn("newCol2", myFun2(myDf("originalColumn"))
    .drop(myDf("originalColumn"))
    

    In this myFun is an udf defined :

    def myFun= udf(
      (originalColumnContent : String) => {
        // do something with your original column content and return a new one
      }
    )
    
    Answered on November 21, 2018.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.