Impala or Hive Slowly Changing Dimension – SCD Type 2 Implementation

  • Post author:
  • Post last modified:August 28, 2018
  • Post category:BigData
  • Reading time:8 mins read

Slowly changing dimensions in Data warehouse are commonly known as SCD, usually captures the data that changes slowly but unpredictably, rather than regular bases. Slowly changing dimension type 2 is most popular method used in dimensional modelling to preserve historical data. Since Cloudera impala or Hadoop Hive does not support update statements, you have to implement the update using intermediate tables. In this article, we will check Cloudera Impala or Hive Slowly Changing Dimension – SCD Type 2 Implementation steps with an example.

For demonstration purpose, lets take the example of patient dimension. Patient dimension contain the information about patient. The patient information will not change on day to day bases. It’ll change once in a while (slowly changes).

Initial Load

You have to transfer all the data to the Hadoop environment either using Apache Sqoop import data to HDFS or create destination table on top of HDFS file. Later, you have to consider slowly changing data that you may receive in the form of flat file.

You can read on how import the data using Apache Sqoop in my other post:

Impala or Hive Slowly Changing Dimension – SCD Type 2 Implementation

We will be using following tables in Impala SCD type2:

LOAD: Holds incremental, updated or new records. This is an external table.

TARGET: Holds all patient database

INT: Intermediate table for SCD type implementation

Below are the content of LOAD and TARGET table at the time building Slowly Changing Dimension Type 2 Type 2.

PAT_LOAD:

SK PHONE_NO NAME
111 987654321 Jhoney
222 876543210 Stuart
555 345678901 Moxie
666 765432101 Jeff

PAT_DTLS:

SK PHONE_NO NAME EFF_FR_DT EFF_TO_DT FLAG
111 1234567890 Jhoney 2016-06-01 NULL 1
222 2345678901 Stuart 2016-06-01 NULL 1
333 3456789012 Jhoney 2016-06-01 NULL 1
444 4567890123 Max 2016-06-01 NULL 1

Now we are all set with tables and data.

Let’s get started with detailed steps…

Step 1: Create INT table same as Target and copy expired records.

Copy all expired records INT table . This step just copying all records with flag = 0 and non-null EFF_TO_DT records.

Query:

CREATE TABLE PAT_INT AS
SELECT * FROM PAT_DTLS
WHERE FLAG = 0 and EFF_TO_DT is not null;

Step 2: Copy all going to expire records

These are the records that are updated and are recieved in LOAD table. Since these records are expired, You also need set the EFF_TO_DT to the ‘current_date-1’ that means record was active till yesterday and flag as ‘0’. The updated records will be active from the moment we load them so EFF_FR_DT will be current date.

Note that, Hive does not support OR in JOIN ON clause.

Query:

INSERT INTO PAT_INT
SELECT TGT.SK ,
TGT.PHONE_NO,
TGT.NAME,
EFF_FR_DT,
to_date(adddate(now(),-1)) as EFF_TO_DT,
0 as flag
FROM PAT_DTLS TGT
JOIN PAT_LOAD SRC
on (TGT.sk = src.sk)
and 
(TGT.PHONE_NO <> src.PHONE_NO
 or 
 TGT.NAME <> src.NAME)
WHERE TGT.flag = 1
;

Step 3: Copy active records from TGT to INT table

These are the records which are currently active and not updated. Since target table is going to be insert overwrite, You will have to copy all active records to INT table.

Query:

INSERT INTO PAT_INT
SELECT TGT.SK ,
TGT.PHONE_NO,
TGT.NAME,
EFF_FR_DT,
EFF_TO_DT,
FLAG
FROM PAT_DTLS TGT
WHERE FLAG = 1
AND NOT EXISTS (SELECT 1 FROM
 PAT_LOAD SRC
 WHERE TGT.SK = SRC.SK )

union all -- include unchanged records

SELECT TGT.SK ,
TGT.PHONE_NO,
TGT.NAME,
EFF_FR_DT,
EFF_TO_DT,
FLAG
FROM PAT_DTLS TGT
WHERE TGT.FLAG = 1
AND EXISTS 
(SELECT 1 
FROM PAT_LOAD SRC
WHERE TGT.SK = SRC.SK
AND (
 TGT.PHONE_NO = SRC.PHONE_NO
 AND TGT.NAME = SRC.NAME )
 ) ;

Step 4: Copy only updated records from LOAD table

These are records which are updated in this load cycle. Just compare the LOAD and TGT table records on Patient ID or SK column and copy the record which is updated.

Related reading:

Query:

INSERT INTO PAT_INT
SELECT SRC.SK ,
SRC.PHONE_NO,
SRC.NAME,
to_date(now()),
NULL,
1
FROM PAT_LOAD src
WHERE EXISTS (SELECT 1
 FROM PAT_INT INT1
 WHERE src.sk = INT1.sk
 AND flag = 0
 AND NOT EXISTS
 (SELECT 1
 FROM PAT_INT INT2
 WHERE INT1.sk = INT2.sk
 AND flag = 1 )
 );

Step 5: Copy new records from LOAD to INT

At any given load cycle, you may receive either updated or fresh records. In this step you are going to load the new records. This is the simple step to compare the LOAD and TGT table on patient ID or SK and copy the record which is new.

Query:

INSERT INTO PAT_INT
SELECT SRC.SK ,
SRC.PHONE_NO,
SRC.NAME,
to_date(NOW()),
NULL,
1
FROM PAT_LOAD SRC
WHERE NOT EXISTS (SELECT 1
 FROM PAT_INT INT1
 WHERE SRC.SK = INT1.SK);

Step 6: Perform Insert Overwrote on TGT table.

You have to perform INSERT OVERWRITE on TGT table and select records from intermediate tables. Or you can also drop TGT table and rename INT as TGT table. Renaming is fast compared to INSERT.

Query:

INSERT OVERWRITE TABLE PAT_TGT
SELECT * FROM PAT_INT;

Below is the output of the destination patient table:

SK PHONE_NO NAME EFF_FR_DT EFF_TO_DT FLAG
111 1234567890 Jhoney 2017-09-30 00:00:00 2017-09-29 00:00:00 0
111 987654321 Jhoney 2017-09-30 00:00:00 NULL 1
222 2345678901 Stuart 2017-09-30 00:00:00 2017-09-29 00:00:00 0
222 876543210 Stuart 2017-09-30 00:00:00 NULL 1
333 3456789012 Jhoney 2017-09-30 00:00:00 NULL 1
444 4567890123 Max 2017-09-30 00:00:00 NULL 1
555 345678901 Moxie 2017-09-30 00:00:00 NULL 1
666 765432101 Jeff 2017-09-30 00:00:00 NULL 1

Step 7: Drop Intermediate and External Tables

Once the destination table is populated you should drop the intermediate and external table:

drop table PAT_INT;
drop table PAT_LOAD;

This Post Has 2 Comments

  1. Bhriti

    Awesome… it really saved my day!!

    1. Vithal S

      Thank you Bhriti. Glad this helped 🙂

Comments are closed.