Spark sql how to explode without losing null values

Spark sql how to explode without losing null values

Asked on January 11, 2019 in Apache-spark.
Add Comment


  • 2 Answer(s)

    For the version Spark 2.2+

    Here explode_outer function can be used:

    import org.apache.spark.sql.functions.explode_outer
    df.withColumn("likes", explode_outer($"likes")).show
    // +---+----+--------+
    // | id|name| likes|
    // +---+----+--------+
    // | 1|Luke|baseball|
    // | 1|Luke| soccer|
    // | 2|Lucy| null|
    // +---+----+--------+
    

    For the version Spark <= 2.1

    It is similar in Java equivalent

    Here import static is used to import individual functions.

    import org.apache.spark.sql.functions.{array, col, explode, lit, when}
    val df = Seq(
    (1, "Luke", Some(Array("baseball", "soccer"))),
    (2, "Lucy", None)
    ).toDF("id", "name", "likes")
    df.withColumn("likes", explode(
    when(col("likes").isNotNull, col("likes"))
    // If null explode an array<string> with a single null
    .otherwise(array(lit(null).cast("string")))))
    

    For replacing NULL with an array(NULL) of a desired type. For complex type (a.k.a structs) So here full schema is provided:

    val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")
    val st = StructType(Seq(
    StructField("_1", IntegerType, false), StructField("_2", StringType, true)
    ))
    dfStruct.withColumn("y", explode(
    when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast(st)))))
    

    or

    dfStruct.withColumn("y", explode(
    when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
    

    Make sure, with containsNull set to false the array Column has been created.

    This need to be changed:

    df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
    
    Answered on January 11, 2019.
    Add Comment

    when the array elements are a complex type it is hard to define it by hand, For instance with large structs.

    def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = {
        val arrayFields = df.schema.fields
            .map(field => field.name -> field.dataType)
            .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])}
            .toMap
     
        columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) =>
        dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
          .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))
    }
    
    Answered on January 11, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.