Overwrite specific partitions in spark dataframe write method

Overwrite specific partitions in spark dataframe write method

Asked on December 24, 2018 in Apache-spark.
Add Comment


  • 3 Answer(s)

    In this problem is quite common. With Spark up to 2.0 is to write directly into the partition directory and with this issue can be solved,

    for instance:

    df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
    

    While using Spark prior to 2.0, There will be need to stop Spark from emitting metadata files (because they will break automatic partition discovery)

    By using:

    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
    

    When using Spark prior to 1.6.2, there will be also need to delete the _SUCCESS file in /root/path/to/data/partition_col=value or its presence will break automatic partition discovery. (I strongly recommend using 1.6.2 or later.)

    Answered on December 24, 2018.
    Add Comment

    Here the below link is a new feature in spark 2.3.0:

    https://issues.apache.org/jira/browse/SPARK-20236

    For using it, we want to set the spark.sql.sources.partitionOverwriteMode setting to dynamic, the dataset needs to be partitioned, and the write mode overwrite.For instance:

    spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
    data.write.mode("overwrite").insertInto("partitioned_table")
    

    The best suggestion for doing a repartition based on the partition column before writing, so atlast it will not end up with 400 files per folder.

    Answered on December 24, 2018.
    Add Comment

    When using the Spark version 1.6…

    The HiveContext can solve this process greatly. The key is that we must first create the table in Hive first using a CREATE EXTERNAL TABLE statement with partitioning defined. For instance:

    # Hive SQL
    
    CREATE EXTERNAL TABLE test
    
    (name STRING)
    
    PARTITIONED BY
    
    (age INT)
    
    STORED AS PARQUET
    
    LOCATION 'hdfs:///tmp/tables/test'
    

    From here we have Dataframe with new records in it for a specific partition (or multiple partitions). A HiveContext SQL statement is used to perform an INSERT OVERWRITE using this Dataframe, which will overwrite the table for only the partitions contained in the Dataframe:

    # PySpark
    
    hiveContext = HiveContext(sc)
    
    update_dataframe.registerTempTable('update_dataframe')
    hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
    
    SELECT name, age
    
    FROM update_dataframe""")
    

    Make sure that the update_dataframe in this instance has a schema that matches that of the target test table.

    Here the simple mistake to make with this approach is to avoid the CREATE EXTERNAL TABLE step in Hive and simply make the table using the Dataframe API’s write methods. For Parquet-based tables in particular, the table will not be defined appropriately to support Hive’s INSERT OVERWRITE… PARTITION function.

    And this will be helpful.

     

    Answered on December 24, 2018.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.