Spark SQL Cumulative Average Function and Examples

  • Post author:
  • Post last modified:November 29, 2022
  • Post category:Apache Spark
  • Reading time:7 mins read

Spark SQL supports Analytics or window functions. You can use Spark SQL to calculate certain results based on the range of values. Result might be dependent of previous or next row values, in that case you can use cumulative sum or average functions. Databases like Netezza, Teradata, Oracle, even latest version of Apache Hive supports analytic or window functions. In this article, we will check Spark SQL cumulative Average function and how to use it with an example.

Spark SQL Cumulative Average Function

There are two methods to calculate cumulative Average in Spark: Spark SQL query to Calculate Cumulative Average and SparkContext or HiveContext to Calculate Cumulative Average

Now let us check these two methods in details.

Spark SQL query to Calculate Cumulative Average

Just like Apache Hive, you can write Spark SQL query to calculate cumulative average. Syntax is similar to Spark analytic functions, only difference is you have to include ‘unbounded preceding’ or ‘unbounded following’  keyword with window specs.

Related articles:

Below is the syntax of Spark SQL cumulative average function:

SELECT pat_id, 
ins_amt,
AVG(ins_amt)
over ( PARTITION BY (DEPT_ID) ORDER BY pat_id ROWS BETWEEN unbounded preceding AND CURRENT ROW ) cumavg
FROM patient
ORDER BY cumavg;

Output:

8 10000 10000.0
6 90000 97500.0
1 100000 100000.0
5 50000 100000.0
7 110000 110000.0
2 150000 125000.0
3 150000 150000.0
4 250000 200000.0
5 890000 430000.0
Time taken: 1.728 seconds, Fetched 9 row(s)

SparkContext or HiveContext to Calculate Cumulative Average

You can calculate the cumulative average without writing Spark SQL query. You can use Spark dataFrames to define window spec and calculate cumulative average.

Steps to calculate cumulative average using SparkContext or HiveContext:

  • Import necessary modules and create DataFrame to work with
import pyspark
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as sf
sqlcontext = HiveContext(sc)

# Create Sample Data for calculation
pat_data = sqlcontext.createDataFrame([(1,111,100000),
(2,111,150000),
(3,222,150000),
(4,222,250000),
(5,222,890000),
(6,111,90000),
(7,333,110000),
(8,444,10000)],
["pat_id", "dept_id", "ins_amt"])

Note that, you should use HiveContext, otherwise you may end up with an error, ‘org.apache.spark.sql.AnalysisException: Could not resolve window function ‘avg’. Note that, using window functions currently requires a HiveContext;

  • Define window specs.
win_spec = Window.partitionBy('dept_id').orderBy(sf.col('pat_id').ASC()) .rowsBetween(-sys.maxsize, 0)

Note that, in some version of pyspark Window.unboundedPreceding keyword is used.

For example,

win_spec = Window.partitionBy('dept_id').orderBy(sf.col('pat_id').ASC()) .rowsBetween(Window.unboundedPreceding, 0)

but it will not work in some version of spark. So better use latter version of window specs.

  • Calculate cumulative average.
cum_avg = pat_data.withColumn('cumavg',
sf.avg(pat_data.ins_amt).over(win_spec))

Here is the complete example of pyspark running total or cumulative average example:

import pyspark
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as sf
sqlcontext = HiveContext(sc)

#Create Sample Data for calculation
pat_data = sqlcontext.createDataFrame([(1,111,100000),
(2,111,150000),
(3,222,150000),
(4,222,250000),
(5,222,890000),
(6,111,90000),
(7,333,110000),
(8,444,10000)],
["pat_id", "dept_id", "ins_amt"])

pat_data.show()

+------+-------+-------+
|pat_id|dept_id|ins_amt|
+------+-------+-------+
| 1| 111| 100000|
| 2| 111| 150000|
| 3| 222| 150000|
| 4| 222| 250000|
| 5| 222| 890000|
| 6| 111| 90000|
| 7| 333| 110000|
| 8| 444| 10000|
+------+-------+-------+

win_spec = Window.partitionBy('dept_id').orderBy(sf.col('pat_id').ASC()) .rowsBetween(-sys.maxsize, 0)

cum_avg = pat_data.withColumn('cumavg', sf.avg(pat_data.ins_amt).over(win_spec))

cum_avg.show()

+------+-------+-------+------------------+
|pat_id|dept_id|ins_amt| cumavg|
+------+-------+-------+------------------+
| 8| 444| 10000| 10000.0|
| 1| 111| 100000| 100000.0|
| 2| 111| 150000| 125000.0|
| 6| 111| 90000| 97500.0|
| 7| 333| 110000| 110000.0|
| 3| 222| 150000| 150000.0|
| 4| 222| 250000| 200000.0|
| 5| 222| 890000| 430000.0|
+------+-------+-------+------------------+

Related Article

Hope this helps 🙂