Spark SQL Cumulative Sum Function and Examples

  • Post author:
  • Post last modified:November 29, 2022
  • Post category:Apache Spark
  • Reading time:7 mins read

Spark SQL supports Analytics or window function. You can use Spark SQL to calculate certain results based on the range of values. Most of the databases like Netezza, Teradata, Oracle, even latest version of Apache Hive supports analytic or window functions. In this article, we will check Spark SQL cumulative sum function and how to use it with an example.

Spark SQL Cumulative Sum Function

Before going deep into calculating cumulative sum, first, let is check what is running total or cumulative sum?

“A running total or cumulative sum refers to the sum of values in all cells of a column that precedes or follows the next cell in that particular column”.

There are two methods to calculate cumulative sum in Spark: Spark SQL query to Calculate Cumulative Sum and SparkContext or HiveContext to Calculate Cumulative Sum

Now let us check these two methods in details.

Spark SQL query to Calculate Cumulative Sum

Just like Apache Hive, you can write Spark SQL query to calculate cumulative sum. Syntax is similar to analytic functions, only difference is you have to include ‘unbounded preceding’ keyword with window specs.

Related articles:

Below is the syntax of Spark SQL cumulative sum function:

SUM([DISTINCT | ALL] expression) [OVER (analytic_clause)];

And below is the complete example to calculate cumulative sum of insurance amount:

SELECT pat_id, 
ins_amt,
SUM(ins_amt)
over ( PARTITION BY (DEPT_ID) ORDER BY pat_id ROWS BETWEEN unbounded preceding AND CURRENT ROW ) cumsum
FROM patient
ORDER BY cumsum;

Output:

8 10000 10000
1 100000 100000
7 110000 110000
3 150000 150000
2 150000 250000
5 50000 300000
6 90000 390000
4 250000 400000
5 890000 1290000

SparkContext or HiveContext to Calculate Cumulative Sum

You can calculate the cumulative sum without writing Spark SQL query. You can use Spark dataFrames to define window spec and calculate running total.

Steps to calculate running total or cumulative sum using SparkContext or HiveContext:

  • Import necessary modules and create DataFrame to work with:
 import pyspark
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as sf
sqlcontext = HiveContext(sc)
Create Sample Data for calculation
pat_data = sqlcontext.createDataFrame([(1,111,100000),
(2,111,150000),
(3,222,150000),
(4,222,250000),
(5,222,890000),
(6,111,90000),
(7,333,110000),
(8,444,10000)],
["pat_id", "dept_id", "ins_amt"])
pat_data.show()

Note that, you should use HiveContext, otherwise you may end up with an error, ‘org.apache.spark.sql.AnalysisException: Could not resolve window function ‘sum’. Note that, using window functions currently requires a HiveContext;

  • Define window specs.
win_spec = Window.partitionBy('dept_id').orderBy('pat_id').rowsBetween(-sys.maxsize,
0)

Note that, in some version of pyspark Window.unboundedPreceding keyword is used.

For example,

win_spec = Window.partitionBy('dept_id').orderBy('pat_id').rowsBetween
Window.unboundedPreceding, 0)

but in my case, it did not work. So better use latter version of window specs.

  • Calculate cumulative sum or running total.
cum_sum = pat_data.withColumn('cumsum',
sf.sum(pat_data.ins_amt).over(win_spec))

Here is the complete example of pyspark running total or cumulative sum:

import pyspark
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as sf
sqlcontext = HiveContext(sc)
# Create Sample Data for calculation
pat_data = sqlcontext.createDataFrame([(1,111,100000),
(2,111,150000),
(3,222,150000),
(4,222,250000),
(5,222,890000),
(6,111,90000),
(7,333,110000),
(8,444,10000)],
["pat_id", "dept_id", "ins_amt"])
pat_data.show()
+------+-------+-------+
|pat_id|dept_id|ins_amt|
+------+-------+-------+
| 8| 444| 10000|
| 1| 111| 100000|
| 2| 111| 150000|
| 6| 111| 90000|
| 7| 333| 110000|
| 3| 222| 150000|
| 4| 222| 250000|
| 5| 222| 890000|
+------+-------+-------+
win_spec = Window.partitionBy('dept_id').orderBy('pat_id').rowsBetween(-sys.maxsize, 0)

cum_sum = pat_data.withColumn('cumsum', sf.sum(pat_data.ins_amt).over(win_spec))


cum_sum.show()

+------+-------+-------+-------+
|pat_id|dept_id|ins_amt| cumsum|
+------+-------+-------+-------+
| 8| 444| 10000| 10000|
| 1| 111| 100000| 100000|
| 2| 111| 150000| 250000|
| 6| 111| 90000| 340000|
| 7| 333| 110000| 110000|
| 3| 222| 150000| 150000|
| 4| 222| 250000| 400000|
| 5| 222| 890000|1290000|
+------+-------+-------+-------+

Related Articles,

Hope this helps 🙂