How to query JSON data column using Spark DataFrames ?

How to query JSON data column using Spark DataFrames ?

Asked on January 11, 2019 in Apache-spark.
Add Comment


  • 3 Answer(s)

    For the version Spark >= 2.4

    Here the schema_of_json function is used to determined the schema:

    import org.apache.spark.sql.functions.schema_of_json
     
    val schema = df.select(schema_of_json($"jsonData")).as[String].first
    df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]()))
    

    For the version Spark >= 2.1

    In this from_json function can be used:

    import org.apache.spark.sql.functions.from_json
    import org.apache.spark.sql.types._
     
    val schema = StructType(Seq(
      StructField("k", StringType, true), StructField("v", DoubleType, true)
    ))
     
    df.withColumn("jsonData", from_json($"jsonData", schema))
    

    For the version Spark >= 1.6

    For this get_json_object can be used, which takes a column and a path:

    import org.apache.spark.sql.functions.get_json_object
     
    val exprs = Seq("k", "v").map(
      c => get_json_object($"jsonData", s"$$.$c").alias(c))
     
    df.select($"*" +: exprs: _*)
    

    For the version Spark <= 1.5:

    Try identical to the given below:

    val df = sc.parallelize(Seq(
      ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
      ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
    )).toDF("key", "jsonData", "blobData")
    

    Lets consider that blob field cannot be represented in JSON. Or else splitting and joining can be omitted:

    import org.apache.spark.sql.Row
    val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
    val jsons = sqlContext.read.json(df.drop("blobData").map{
      case Row(key: String, json: String) =>
        s"""{"key": "$key", "jsonData": $json}"""
    })
     
    val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
    parsed.printSchema
     
    // root
    // |-- jsonData: struct (nullable = true)
    // | |-- k: string (nullable = true)
    // | |-- v: double (nullable = true)
    // |-- key: long (nullable = true)
    // |-- blobData: string (nullable = true)
    

    Alternatively an UDF is used to parse JSON and output a struct or map column.

    For instance try this:

    import net.liftweb.json.parse
     
    case class KV(k: String, v: Int)
     
    val parseJson = udf((s: String) => {
      implicit val formats = net.liftweb.json.DefaultFormats
      parse(s).extract[KV]
    })
     
    val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
    parsed.show
     
    // +---+--------------------+------------------+----------+
    // |key| jsonData           | blobData         |parsedJSON|
    // +---+--------------------+------------------+----------+
    // | 1 |{"k": "foo", "v":...|some_other_field_1| [foo,1]  |
    // | 2 |{"k": "bar", "v":...|some_other_field_2| [bar,3]  |
    // +---+--------------------+------------------+----------+
     
    parsed.printSchema
     
    // root
    // |-- key: string (nullable = true)
    // |-- jsonData: string (nullable = true)
    // |-- blobData: string (nullable = true)
    // |-- parsedJSON: struct (nullable = true)
    // | |-- k: string (nullable = true)
    // | |-- v: integer (nullable = false)
    
    Answered on January 11, 2019.
    Add Comment

    Another method is from_json function is used.

    val df = sqlContext.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
      .load()
     
    //You can define whatever struct type that your json states
    val schema = StructType(Seq(
      StructField("key", StringType, true),
      StructField("value", DoubleType, true)
    ))
     
    df.withColumn("jsonData", from_json(col("jsonData"), schema))
    
    Answered on January 11, 2019.
    Add Comment

    In this JSON String is underlined.

    "{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}";
    

    Here is the JSON script for the filter and to cassandra the required data is loaded.

    sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
              .write.format("org.apache.spark.sql.cassandra")
              .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
              .mode(SaveMode.Append)
              .save()
    
    Answered on January 11, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.