Similar to UDFs in the hive, you can add custom UDFs in pyspark spark context. We have discussed “Register Hive UDF jar into pyspark” in my other post. We have discussed, how to add udf present in jar to spark executor later we register them to Spark SQL using create function command. In this article, we will check how to register Python function into Pyspark with an example.
Register Python Function into Pyspark
Python is one of the widely used programming languages. Most of the organizations using pyspark to perform Spark related task. It is very easy to create functions or methods in Python. Instead of creating complicated Java or Scala methods, you can create Python functions and register them into spark context.
In python spark , the Python function register process will take a slightly different approach compared to adding jar file and register through spark SQL.
Related Article
- Register Hive UDF jar into pyspark – Steps and Examples
- Hive UDF using Python-Use Python Script into Hive-Example
Below are the steps that we can follow to register a python function into Spark. We will be using pyspark to demonstrate the UDF registration process.
Step 1 : Create Python Function
First step is to create the Python function or method that you want to register on to pyspark.
Below is the sample square function:
def square(a):
return a**2
Step 2 : Register Python Function into Spark Context
Next step is to register a python function created in the previous step into spark context so that it is visible to spark SQL during execution.
You can make use of sqlContext.udf.register option available with spark SQL context to register.
sqlContext.udf.register('udf_square', square)
To explain above syntax, we are registering ‘square’ function as a ‘udf_square’ in spark context. The udf_square should be used in subsequent spark SQL statements.
Step 3 : Use UDF in Spark SQL
Now the Python function is visible with spark context, you can directly use with Spark SQL statements.
For examples,
sqlContext.sql("select udf_square(2)")
Below is the complete program that can be used to register Python function into Spark.
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import HiveContext
def square(a):
return a**2
conf = SparkConf().setAppName("Sample_program")
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
sqlContext.setConf("spark.sql.hive.convertMetastoreOrc", "false")
sqlContext.udf.register('udf_square', square)
sqlContext.sql("select udf_square(2)").show()
Using UDF with PySpark DataFrame
You could also use udf on a DataFrame. You can use pyspark UDF function to register Python udf and use it with any DataFrame.
Following example register the above created square Python user defined function.
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
# Register UDF to use into DataFrame
square_udf = udf(square, LongType())
>>> df.select('id', square_udf('id').alias('square_col')).show()
+---+----------+
| id|square_col|
+---+----------+
| 1| 1|
| 2| 4|
| 3| 9|
| 4| 16|
+---+----------+
Alternatively, you can declare the same UDF using annotation syntax:
from pyspark.sql.functions import udf
@udf("long")
def square2_udf(s):
return s * s
Following is the example
>>> df.select('id', square2_udf('id').alias('square_col')).show()
+---+----------+
| id|square_col|
+---+----------+
| 1| 1|
| 2| 4|
| 3| 9|
| 4| 16|
+---+----------+
Related Articles
- Pass Functions to pyspark – Run Python Functions on Cluster
- How to Create Spark SQL User Defined Functions? Example
- Create Spark SQL isdate Function – Date Validation
Hope this helps 🙂