Aggregating multiple columns with custom function in spark

Aggregating multiple columns with custom function in spark

Asked on January 4, 2019 in Apache-spark.
Add Comment


  • 3 Answer(s)

    The simplest way is first collect two lists by DataFrame , After that UDF is used to zip the two lists together. like as follows:

    import org.apache.spark.sql.functions.{collect_list, udf}
    import sqlContext.implicits._
     
    val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))
     
    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")
     
    val df2 = df.groupBy("name").agg(
    collect_list(col("food")) as "food",
    collect_list(col("price")) as "price"
    ).withColumn("food", zipper(col("food"), col("price"))).drop("price")
     
    df2.show(false)
    # +----+---------------------------------------------+
    # |name|food                                         |
    # +----+---------------------------------------------+
    # |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
    # |bill|[[apple,0.99], [taco,2.59]]                  |
    # +----+---------------------------------------------+
    
    Answered on January 4, 2019.
    Add Comment

    Here before collecting as a list make sure to use the struct function to group the columns together:

    import org.apache.spark.sql.functions.{collect_list, struct}
    import sqlContext.implicits._
     
    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")
     
    df.groupBy($"name")
      .agg(collect_list(struct($"food", $"price")).as("foods"))
      .show(false)
    

    The output will be:

    +----+---------------------------------------------+
    |name|foods                                        |
    +----+---------------------------------------------+
    |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
    |bill|[[apple,0.99], [taco,2.59]]                  |
    +----+---------------------------------------------+
    
    Answered on January 4, 2019.
    Add Comment

    In this convert the data frame to a RDD of Map After call a groupByKey on it. List of key-value pairs where value is a list of tuples will be as output.

    df.show
    +----+------+----+
    | _1 | _2   | _3 |
    +----+------+----+
    |john|tomato|1.99|
    |john|carrot|0.45|
    |bill| apple|0.99|
    |john|banana|1.29|
    |bill| taco |2.59|
    +----+------+----+
    val tuples = df.map(row => row(0) -> (row(1), row(2)))
    tuples: org.apache.spark.rdd.RDD[(Any, (Any, Any))] = MapPartitionsRDD[102] at map at <console>:43
     
    tuples.groupByKey().map{ case(x, y) => (x, y.toList) }.collect
    res76: Array[(Any, List[(Any, Any)])] = Array((bill,List((apple,0.99), (taco,2.59))), (john,List((tomato,1.99), (carrot,0.45), (banana,1.29))))
    
    Answered on January 4, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.