How to write the resulting RDD to a csv file in Spark python

How to write the resulting RDD to a csv file in Spark python

Asked on January 12, 2019 in Apache-spark.
Add Comment


  • 4 Answer(s)

    In the strings the lines of the RDD are mapped and rdd.saveAsTextFile() is used.

    def toCSVLine(data):
      return ','.join(str(d) for d in data)
     
    lines = labelsAndPredictions.map(toCSVLine)
    lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv')
    

     

    Answered on January 12, 2019.
    Add Comment

    Here the two column RDD is written to a single CSV file in PySpark 1.6.2

    For the RRD

    >>> rdd.take(5)
    [(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]
    

    The code will be:

    # First I convert the RDD to dataframe
    from pyspark import SparkContext
    df = sqlContext.createDataFrame(rdd, ['count', 'word'])
    

    For the DF:

    >>> df.show()
    +-----+-----------+
    |count| word      |
    +-----+-----------+
    |73342| cells     |
    |62861| cell      |
    |61714| studies   |
    |61377| aim       |
    |60168| clinical  |
    |59275| 2         |
    |59221| 1         |
    |58274| data      |
    |58087|development|
    |56579| cancer    |
    |50243| disease   |
    |49817| provided  |
    |49216| specific  |
    |48857| health    |
    |48536| study     |
    |47827| project   |
    |45573|description|
    |45455| applicant |
    |44739| program   |
    |44522| patients  |
    +-----+-----------+
    only showing top 20 rows
    

    To CSV it is written.

    # Write CSV (I have HDFS storage)
    df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')
    
    Answered on January 12, 2019.
    Add Comment

    Joining by the commas is not a good idea, the reason is it will not be properly quoted when fields contain commas,, For instance. ‘,’.join([‘a’, ‘b’, ‘1,2,3’, ‘c’]) gives you a,b,1,2,3,c when there is need a,b,”1,2,3″,c. Rather, Python’s csv module is used to convert each list in the RDD to a properly-formatted csv string:

    # python 3
    import csv, io
     
    def list_to_csv_str(x):
        """Given a list of strings, returns a properly-csv-formatted string."""
        output = io.StringIO("")
        csv.writer(output).writerow(x)
        return output.getvalue().strip() # remove extra newline
     
    # ... do stuff with your rdd ...
    rdd = rdd.map(list_to_csv_str)
    rdd.saveAsTextFile("output_directory")
    

    The file objects is only written by csv module, First an empty “file” with io.StringIO(“”) is created and says the csv.writer to write the csv-formatted string into it. After that output.getvalue() is used to get the string which is written to the “file”. In Python 2 this code will work by simply replacing io with the StringIO module.

    When the Spark DataFrames API is used, We can go into the  DataBricks save function, which has a csv format.

    Answered on January 12, 2019.
    Add Comment

    The RDD:

    >>> rdd.take(5)
    [(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]
    

    Now the code:

    # First I convert the RDD to dataframe
    from pyspark import SparkContext
    df = sqlContext.createDataFrame(rdd, ['count', 'word'])
    

    The DF:

    The DF:

    >>> df.show()
    +-----+-----------+
    |count|       word|
    +-----+-----------+
    |73342|      cells|
    |62861|       cell|
    |61714|    studies|
    |61377|        aim|
    |60168|   clinical|
    |59275|          2|
    |59221|          1|
    |58274|       data|
    |58087|development|
    |56579|     cancer|
    |50243|    disease|
    |49817|   provided|
    |49216|   specific|
    |48857|     health|
    |48536|      study|
    |47827|    project|
    |45573|description|
    |45455|  applicant|
    |44739|    program|
    |44522|   patients|
    +-----+-----------+
    only showing top 20 rows
    

    Now write to CSV

    # Write CSV (I have HDFS storage)
    df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')
    

    P.S: I am just a beginner learning from posts here in Stackoverflow. So I don’t know whether this is the best way. But it worked for me and I hope it will help someone!

     spark write csv schema

    I have a resulting RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions). This has output in this format:

    [(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]
    

     

    Answered on January 13, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.