Apache Spark Foundation Course - Spark UDF in Scala and Python


Hello and welcome back to Learning Journal. In this video, we are going to talk about Spark User-defined functions.
What is a user-defined function? I guess you already know it. User-defined functions or UDFs are a crucial feature of an SQL based environment. They allow you to extend the systems’ built-in functionality. When you do not have an SQL function for doing something, you can create a UDF and use it in your SQL. In Apache Spark, you can achieve this functionality in two steps.

  1. Create a function
  2. Register the function in the Spark session as a UDF

The first thing is to create a function. These functions are not unusual, they are like standard functions. But once you register them as UDF, you can call them in your SQL expression. That’s the only difference in a standard function and a user-defined function. However, you must note one essential thing. When you are creating a function that you want to use in an SQL expression, you must create a pure function. You don’t want your UDF to have side effects. Right? If you do not understand pure functions and side effects, I recommend that you watch our Scala tutorial videos.
I have seen people trying different ways to create a Spark UDF. If you try Google on this topic, you will see a variety of code and will really get confused.
In this tutorial, I will show you the most simple and straightforward method to create and use Spark UDF. Then we will go to the next level, and I will show you the technique for creating your UDF library. You can create and package all your Spark UDFs in a separate Jar, and then, you will be able to use those UDFs in any Spark application by merely including the jar to your classpath. We will not stop there. We will go one step further, and I will show you a UDF that you can define in Scala and use it in your PySpark code. We already talked about PySpark performance limitations in the earlier video, and hence the ability to create your UDFs in Scala and use them in PySpark is critical for the UDF performance.
Let’s start with the most straightforward method for creating and using a Spark UDF.


What is a Spark UDF?

I already talked about it. Apache Spark UDF is nothing more than a pure Scala function value that you register in the Spark session. Once registered, you can use the UDF in your SQL statements in the given session. It is as simple as that. Let me demonstrate the idea with an easy example.

                                
    spark.udf.register("pgender", 
                        (s:String) => if(List("f","female","woman").contains(s.toLowerCase)) "Female"else"Male"
                      )  
                            

So, what are we doing in this simple example? We are creating a Scala function value and registering it as a UDF in a single step. The API spark.udf.register is the standard method for registering a Spark UDF. The first argument is the name for the UDF. I named it as pgender. The second argument is a Scala function value. My Scala function takes one argument. If the argument value is f, female or the woman, we return Female else we return Male. That’s it. It is a simple function. Right? Let’s execute this API, and it registers a Scala function as a UDF. Then, we can use the UDF in a SQL expression. Let’s try it.

                                
    spark.sql("select pgender('female')").show
    +-------------------+                                                            
    |UDF:pgender(female)|
    +-------------------+
    |            Female |
    +-------------------+                                    
                            

Let’s try one more.

                                
    spark.sql("select PGENDER('Woman')").show
    +------------------+
    |UDF:PGENDER(Woman)|
    +------------------+
    |            Female|
    +------------------+                                         
                            

Simple. Isn’t it. The function pgender becomes a UDF. Like any other SQL function, it is case insensitive. I hope you got the first impression of a Spark UDF. It is nothing but a Scala function value. All we need to do is to register it to the Spark session. Once registered, you can use it in the given session. The critical thing to remember is the session. We registered the UDF for a particular session, and you should be able to use it in the same session only.


How to use a Spark UDF in a Scala Application?

The earlier example creates a Scala UDF, and we used it in the Spark SQL statement. However, we did all that in a REPL. You might be wondering, how to use it in a Spark application? The method for creating and using a Spark UDF in an application is as simple as we did in the REPL. Let’s create a simple Spark application to show you the idea.
Create a project directory for your Spark application and then create a build.sbt file. My build file looks like below.

                                
    name := "learningjournal-examples"
    version := "1.0"
    organization := "guru.learningjournal"
    scalaVersion := "2.11.8"
    val sparkVersion = "2.3.0"
                                    
    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion,
    "org.apache.spark" %% "spark-sql" % sparkVersion)                                       
                            

I am using Spark 2.3.0 and Scala 2.11.8 because I am going to execute this example on a Google Dataproc cluster that is built on Spark 2.3 and Scala 2.11.8. Great! So, we have a build file. The next step is to create a simple Spark application. Create a Scala source file and copy/paste the below code.

                                
    package guru.learningjournal.examples
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
                                    
    object SparkUDF{
            def main(args: Array[String]) = {
            //Create a Spark session
            val spark = SparkSession.builder()
                                    .appName("SparkLocalUDF")
                                    .enableHiveSupport()
                                    .getOrCreate()
            
            //Load data and register a view                                                         
            val df = spark.read
                          .format("csv")
                          .option("header","true")
                          .option("inferSchema","true") 
                          .option("nullValue","NA")
                          .option("timestampFormat","yyyy-MM-dd'T'HH:mm:ss")
                          .option("mode","failfast")
                          .load("gs://pkp-gcp-bucket/survey.csv")
            df.createTempView("surveys")
                                                     
            //Create a local function
            val parseGender = (s:String) => {
                    if(List("cis female","f","female","woman","femake","female ",
                            "cis-female/femme","female (cis)","femail").contains(s.toLowerCase)) 
                        "Female"
                    else if(List("male","m","male-ish","maile","mal","male (cis)",
                                 "make","male ","man","msle","mail","malr","cis man","cis male").contains(s.toLowerCase))
                        "Male"
                    else
                        "Transgender"
            }
                                    
            //Register the function as UDF
            spark.udf.register("PGENDER", parseGender)
                                                    
            //Apply the UDF
            spark.sql("select PGENDER(gender) as parsed_gender, * from surveys")
                 .write      
                 .mode("overwrite")
                 .saveAsTable("transformed_survey")

            spark.stop()
        }
    }                                       
                            

Let me quickly walk you through the code. The example code is straightforward. As usual, we create a Spark session. Then we load data from a CSV file source and create a temporary view. You have already seen all this in the earlier videos.
Now let’s come to the UDF thing. We define a local function literal. It takes a string as an argument and returns the gender. Right? Then we register the function literal as a Spark UDF. That’s it. Once we register the UDF in the current Spark session, we should be able to use it in any subsequent Spark SQL statements.
We apply the UDF to create a new data frame, then we write it to Spark SQL database as a new table. Finally, we stop the Spark session.
Once you compile and execute this application, you should have the transformed_survey table created in the default database. The converted survey table is a permanent table, and hence it should be available to you even after the Spark application stops.
Let’s package this application. All you need to do is to execute sbt package command from your project directory. The SBT will create the JAR file in the target/scala-2.11/ directory. Once your JAR file is ready, you can submit your application to the Spark cluster for execution. Use the following command to submit your application.

                                
    spark-submit --master yarn --class guru.learningjournal.examples.SparkUDF target/scala-2.11/learningjournal-examples_2.11-1.0.jar 
                            

Wait for few seconds to get the application complete the execution. Now, you can go to Spark SQL and check for your transformed survey table. You can use a simple SQL statement.

                                
    select gender, parsed_gender from transformed_survey limit 20;                                  
                            

Amazing, isn’t it?


How to create Spark UDF Library?

The earlier example is the most simple and straightforward method for creating Spark UDF in a Scala application. The example works best when you need to create a UDF and use it in the same program. However, you might have a requirement to reuse a UDF in various Spark applications. In that case, you don't want to copy and paste the UDF code in each Spark job. In fact, I want to define all my UDFs at one place and package them in a jar file. That gives me two benefits.

  1. I will have a single copy of the source code for the UDF, and it will be easy to maintain.
  2. I can create a separate jar for all my UDFs and reuse it in various Spark applications.

Let’s create an example to demonstrate the idea. In the earlier example, we created the parse gender UDF. But now I want to take the parse gender UDF and package it in a UDF library. Once I create a JAR for the UDF Library, I should be able to use the parse gender UDF at the following places.

  1. Scala REPL
  2. PySpark REPL
  3. Spark application written in Scala Language
  4. PySpark application written in Python Language

The idea is to define the UDF once in the Scala language and use it wherever you need it. You can use it in your Scala code as well as in your Python code.
Let’s create a new project directory. I call it a UDF Lib project. You will need a build file for this project. Copy and paste the build file content, and you are ready to write your UDF library.

                                
    name := "spark-udf-lib"
    version := "0.1"
    organization := "guru.learningjournal"
    scalaVersion := "2.11.8"
    val sparkVersion = "2.3.0"
                                    
    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion,
    "org.apache.spark" %% "spark-sql" % sparkVersion)                                   
                            

Create a Scala source file and copy/paste the code.


                                
    package guru.learningjournal.SparkUDFLib
    import org.apache.spark.sql.api.java.UDF1
                                    
    class ParseGender extends UDF1[String, String]{
        def call(s:String):String =  {
            if(List("cis female","f","female","woman","femake","female ",
                    "cis-female/femme","female (cis)","femail").contains(s.toLowerCase)) 
                "Female"
            else if(List("male","m","male-ish","maile","mal","male (cis)",
                         "make","male ","man","msle","mail","malr","cis man","cis male").contains(s.toLowerCase))
                "Male"
            else
                "Transgender"
        }
    }                                       
                            

Let me quickly walk you through the code. The first line is the package definition. Giving a proper package structure to your UDF library is essential. We will use this package name to import the UDFs when we want to use the UDFs from this library. We are going to use Spark Java interface to define our UDF, and that’s necessary to be able to use these UDFs from Scala as well as from Python. The Scala’s approach to creating a UDF is straightforward. Create a function and register it as a UDF. However, the Java APIs follow a traditions approach. You must create a class by extending one of the UDF interfaces.
Spark Java API offers a bunch of UDF interfaces starting from UDF0 to UDF22. I am going to create a parse gender UDF. The parse gender UDF should take a single argument as input. So, I need to extend the UDF1 interface for the parse gender class. If you are creating a UDF that should take 5 input parameters, you should extend the UDF5 interface. As you can see in the documentation, you can have up to twenty-two arguments for your UDF.
Great! Let’s come back to the example. The second line of the code imports the UDF1 interface. Then we create a ParseGender class that extends UDF1. Then we define the types. The first one is the input type and the second one is the output type. For our example, input and output are both strings. The body of the class is straightforward. We define a method, and we name it as the call. You might be wondering to see that code is written in Scala. We are extending the Java Interface. However, there is nothing that restricts us to create the UDF class in Scala language, and that’s what the example is doing. The entire code for the UDF is written in Scala. Amazing! Isn’t it.
Great! Let’s package the UDF in a JAR file. Just execute sbt package. That’s it. The SBT will create the JAR file in the target directory. Now it’s time to use the Spark UDF library in Scala as well as Python. Let’s do that.


How to use Spark UDF Library in Scala?

I am going to demonstrate the method using Spark Shell. However, the process is the same for your Spark application a well. To use a UDF from a JAR file, you need to take care of two things.

  1. Make sure the JAR is available to all your worker nodes.
  2. Import and register the UDF in your Spark session.

So, how do you make a JAR available to your Spark worker nodes? You already know it. Right? Supply the jar using --jars option. I am going to use the Spark shell. But if you have a Spark application and you are using Spark submit, you can supply your UDF library using --jars option for the Spark submit.

                                
    spark-shell --jars ./target/scala-2.11/spark-udf-lib_2.11-0.1.jar   
                            

Now let’s come to the next step. Import your UDF class. If you remember the UDF code, you can recognize the package name. So we import all the classes in that UDF library package. Then we instantiate an object for the parse gender class function. I hope you understand the underscore at the end of the line. That’s a Scala technique known as partially applied function. If you don’t understand that underscore and what it does, I recommend you to check out our Scala tutorials.
Great! Once we instantiate the function, we are ready to register for the current Spark session. Registering a UDF is as simple as making a call to the spark.udf.register API. That’s it. You are now ready to use this UDF in your Spark SQL. Rest of the code is straightforward, and I already explained it in the earlier example.

                                
    import guru.learningjournal.SparkUDFLib._
    val pgender_f = new ParseGender().call _
    spark.udf.register("PGENDER", pgender_f)
                                            
    val df = spark.read
                  .format("csv")
                  .option("header","true")
                  .option("inferSchema","true") 
                  .option("nullValue","NA")
                  .option("timestampFormat","yyyy-MM-dd'T'HH:mm:ss")
                  .option("mode","failfast")
                  .load("gs://pkp-gcp-bucket/survey.csv")
    df.createTempView("surveys")    
    spark.sql("select pgender(gender) as parsed_gender, age from surveys limit 10").show 
                            

How to use Spark Scala UDF Library in Python?

Now it’s time to USE the same UDF in your Python application. The process for using the UDF from your JAR file is same as we did it in Scala. There are two steps.

  1. Make sure the JAR is available to all your worker nodes.
  2. Register the UDF in your Spark session.

To make your UDF jar available to the workers, we supply it using the --jars option. This is precisely the same as we did with Scala shell.

                                
    pyspark --jars ./target/scala-2.11/spark-udf-lib_2.11-0.1.jar                                  
                            

The next step is to register the function in the current Spark session. In a Python code, unlike Scala, you do not need to instantiate the function object and then register the UDF using the object. You can directly register it using the spark.udf.registerJavaFunction API. The first parameter is the UDF name and the second parameter is the UDF class name.

                                
    spark.udf.registerJavaFunction("PGENDER","guru.learningjournal.SparkUDFLib.ParseGender")
    df = spark.read.format("csv").option("header","true").option("inferSchema","true").option("nullValue","NA").option("timestampFormat","yyyy-MM-dd'T'HH:mm:ss").option("mode","failfast").load("gs://pkp-gcp-bucket/survey.csv")
    df.createTempView("surveys")
    spark.sql("select PGENDER(gender) as parsed_gender, age from surveys limit 10").show()  
                            

The spark.udf.registerJavaFunction API is available in Spark 2.3.0 and above. If you are using an older version, you might find this API in the SQL Context. The next line shows the code for Spark 2.2.

                                
    sqlContext.registerJavaFunction("PGENDER","guru.learningjournal.SparkUDF.ParseGender")                                  
                            

Once you register the UDF, rest is simple. Load the data, create a temporary view and start using your UDF in the Spark SQL code.
Great! Hope you enjoyed this video. Thank you for watching Learning. Keep learning and Keep growing.



You will also like:


Kafka Enterprise Architecture

How do you deploy Apache Kafka in a large Enterprise?

Learning Journal

What is Bastion Host?

What is Bastion Host? Learn Why and How of Bastion Server.

Learning Journal

Scala Function Values

What are Scala Function Values? How to use Scala Function Values?

Learning Journal

Referential Transparency

Referential Transparency is an easy method to verify the purity of a function.

Learning Journal

Pure Function benefits

Pure Functions are used heavily in functional programming. Learn Why?

Learning Journal