Concatenate columns in Apache Spark DataFrame

Concatenate columns in Apache Spark DataFrame

Asked on November 19, 2018 in Apache-spark.
Add Comment


  • 3 Answer(s)

    Here CONCAT is used in sql:

    In the python:

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
    

    In the Scala:

    import sqlContext.implicits._
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
    

    From Spark 1.5.0, concat function is used with DataFrame API:

    In the python:

    from pyspark.sql.functions import concat, col, lit
     
    df.select(concat(col("k"), lit(" "), col("v")))
    

    In the Scala :

    import org.apache.spark.sql.functions.{concat, lit}
     
    df.select(concat($"k", lit(" "), $"v"))
    

    Here the concat_ws function which takes a string separator as the first argument.

    Answered on November 19, 2018.
    Add Comment

    Here the custom naming can be done by:

    import pyspark
    from pyspark.sql import functions as sf
    sc = pyspark.SparkContext()
    sqlc = pyspark.SQLContext(sc)
    df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
    df.show()
    

    Which gives,

    +--------+--------+
    |colname1|colname2|
    +--------+--------+
    |   row11|  row12|
    |   row21|  row22|
    +--------+--------+
    

    create new column by concatenating:

    df = df.withColumn('joined_column',
    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
    df.show()
    +--------+--------+-------------+
    |colname1|colname2|joined_column|
    +--------+--------+-------------+
    |    row11|   row12|   row11_row12|
    |    row21|   row22|   row21_row22|
    +--------+--------+-------------+
    
    Answered on November 19, 2018.
    Add Comment

    If there is a need of using with DF, Simply use a udf to add a new column based on existing columns.

    val sqlContext = new SQLContext(sc)
    case class MyDf(col1: String, col2: String)
    //here is our dataframe
    val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
    ))
    //Define a udf to concatenate two passed in string values
    val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )
    //use withColumn method to add a new column called newColName
    df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
    
    Answered on November 19, 2018.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.