Pyspark: Pass multiple columns in UDF

Pyspark: Pass multiple columns in UDF

Asked on January 21, 2019 in Apache-spark.
Add Comment


  • 8 Answer(s)

    When there is need to pass all columns to UDF which is having the same data type, So here array can be used as input parameter,

    for instance:

    >>> from pyspark.sql.types import IntegerType
    >>> from pyspark.sql.functions import udf, array
    >>> sum_cols = udf(lambda arr: sum(arr), IntegerType())
    >>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']) \
    ... .    withColumn('Result', sum_cols(array('A', 'B'))).show()
    +---+---+---+------+
    | ID| A | B |Result|
    +---+---+---+------+
    |101| 1 | 16| 17   |
    +---+---+---+------+
     
    >>> spark.createDataFrame([(101, 1, 16, 8)], ['ID', 'A', 'B', 'C'])\
    ... .    withColumn('Result', sum_cols(array('A', 'B', 'C'))).show()
    +---+---+---+---+------+
    | ID| A | B | C |Result|
    +---+---+---+---+------+
    |101| 1 | 16| 8 | 25   |
    +---+---+---+---+------+
    
    Answered on January 21, 2019.
    Add Comment

    Here alternatively struct can be used rather than array.

    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import udf, struct
    sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
    a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
    a.show()
    a.withColumn('Result', sum_cols(struct('A', 'B'))).show()
    
    Answered on January 21, 2019.
    Add Comment

    The problem can be solved by without using array and struct.

    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import udf, struct
     
    def sum(x, y):
        return x + y
     
    sum_cols = udf(sum, IntegerType())
     
    a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
    a.show()
    a.withColumn('Result', sum_cols('A', 'B')).show()
    
    Answered on January 21, 2019.
    Add Comment

    def return_string(a, b, c):
    if a == ‘s’ and b == ‘S’ and c == ‘s’:
    return ‘S’
    if a == ‘s’ and b == ‘NS’ and c == ‘s’:
    return ‘S’
    if a == ‘s’ and b == ‘S’ and c == ‘NS’:
    return ‘NS’

    df = sc.parallelize([(‘s’, ‘S’, ‘NS’), (‘?’, ‘?’, ‘?’)]).toDF([‘a’, ‘b’, ‘c’])

    df.withColumn(‘result’, return_string(‘a’, ‘b’, ‘c’)).show()
    ## +—+—+—+——+
    ## | a| b| c|result|
    ## +—+—+—+——+
    ## | s| S| NS| NS|
    ## | ?| ?| ?| null|
    ## +—+—+—+——+
    All arguments should be listed (unless you pass data as struct).
    You should use and not & (you evaluate logical expressions not SQL expressions).
    Conditions should be expressions not lists (non-empty list are always truthy).
    Personally I’d skip all the ifs and use simple dict:

    Answered on January 27, 2019.
    Add Comment
    from pyspark.sql.functions import col, when, lit
    
    df.withColumn('d', when(
         ((col('A') == 'S') & (col('B') == 'S') & (col('C')=='S'))
       | ((col('A') == 'S') & (col('B') == 'NS') & (col('C')=='S'))
     , lit('S')
     ).otherwise(lit('NS'))
    ).show()
    Answered on February 3, 2019.
    Add Comment

    from pyspark.sql.types import IntegerType from pyspark.sql.functions import udf, struct sum_cols = udf(lambda x: x[0]+x[1], IntegerType()) a=spark.createDataFrame([(101, 1, 16)], [‘ID’, ‘A’, ‘B’]) a.show() a.withColumn(‘Result’, sum_cols(struct(‘A’, ‘B’))).show()

    Answered on February 5, 2019.
    Add Comment

    Use struct instead of array

    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import udf, struct
    sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
    a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
    a.show()
    a.withColumn('Result', sum_cols(struct('A', 'B'))).show()
    Answered on February 21, 2019.
    Add Comment

    1. Pass Single Column and return single vale in UDF

    a. define UDF
    b. Invoke the UDF

    def count_not_null(c):
     """Use conversion between boolean and integer
     - False -> 0
     - True -> 1
     """
     return count(when(col(c) =="NULL",1) .when(col(c) =="",1)).alias('null_ct')

    How to invoke the UDF:

    exprs = [count_not_null(c) for c in df.columns]
    df1 = df.agg(*exprs).withColumn('table_name',lit('dim_acc.select('table_name').withColumn('null_ct',lit(-1)).limit(1).withColumn('col_name',lit('dummy')).limit(1)
    df3 = df2.select('null_ct','table_name','col_name')
    for c in df.columns:
     exprs = [count_not_null(c)]
     df4 = df.agg(*exprs).withColumn('table_name',lit('dim_acc')).withC
    Answered on February 23, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.