Slowly changing dimensions in Data warehouse are commonly known as SCD, usually captures the data that changes slowly but unpredictably, rather than regular bases. Slowly changing dimension type 2 is most popular method used in dimensional modelling to preserve historical data. Since Cloudera impala or Hadoop Hive does not support update statements, you have to implement the update using intermediate tables. In this article, we will check Cloudera Impala or Hive Slowly Changing Dimension – SCD Type 2 Implementation steps with an example.
For demonstration purpose, lets take the example of patient dimension. Patient dimension contain the information about patient. The patient information will not change on day to day bases. It’ll change once in a while (slowly changes).
Initial Load
You have to transfer all the data to the Hadoop environment either using Apache Sqoop import data to HDFS or create destination table on top of HDFS file. Later, you have to consider slowly changing data that you may receive in the form of flat file.
You can read on how import the data using Apache Sqoop in my other post:
Impala or Hive Slowly Changing Dimension – SCD Type 2 Implementation
We will be using following tables in Impala SCD type2:
LOAD: Holds incremental, updated or new records. This is an external table.
TARGET: Holds all patient database
INT: Intermediate table for SCD type implementation
Below are the content of LOAD and TARGET table at the time building Slowly Changing Dimension Type 2 Type 2.
PAT_LOAD:
SK | PHONE_NO | NAME |
111 | 987654321 | Jhoney |
222 | 876543210 | Stuart |
555 | 345678901 | Moxie |
666 | 765432101 | Jeff |
PAT_DTLS:
SK | PHONE_NO | NAME | EFF_FR_DT | EFF_TO_DT | FLAG |
111 | 1234567890 | Jhoney | 2016-06-01 | NULL | 1 |
222 | 2345678901 | Stuart | 2016-06-01 | NULL | 1 |
333 | 3456789012 | Jhoney | 2016-06-01 | NULL | 1 |
444 | 4567890123 | Max | 2016-06-01 | NULL | 1 |
Now we are all set with tables and data.
Let’s get started with detailed steps…
Step 1: Create INT table same as Target and copy expired records.
Copy all expired records INT table . This step just copying all records with flag = 0 and non-null EFF_TO_DT records.
Query:
CREATE TABLE PAT_INT AS SELECT * FROM PAT_DTLS WHERE FLAG = 0 and EFF_TO_DT is not null;
Step 2: Copy all going to expire records
These are the records that are updated and are recieved in LOAD table. Since these records are expired, You also need set the EFF_TO_DT to the ‘current_date-1’ that means record was active till yesterday and flag as ‘0’. The updated records will be active from the moment we load them so EFF_FR_DT will be current date.
Note that, Hive does not support OR in JOIN ON clause.
Query:
INSERT INTO PAT_INT SELECT TGT.SK , TGT.PHONE_NO, TGT.NAME, EFF_FR_DT, to_date(adddate(now(),-1)) as EFF_TO_DT, 0 as flag FROM PAT_DTLS TGT JOIN PAT_LOAD SRC on (TGT.sk = src.sk) and (TGT.PHONE_NO <> src.PHONE_NO or TGT.NAME <> src.NAME) WHERE TGT.flag = 1 ;
Step 3: Copy active records from TGT to INT table
These are the records which are currently active and not updated. Since target table is going to be insert overwrite, You will have to copy all active records to INT table.
Query:
INSERT INTO PAT_INT SELECT TGT.SK , TGT.PHONE_NO, TGT.NAME, EFF_FR_DT, EFF_TO_DT, FLAG FROM PAT_DTLS TGT WHERE FLAG = 1 AND NOT EXISTS (SELECT 1 FROM PAT_LOAD SRC WHERE TGT.SK = SRC.SK ) union all -- include unchanged records SELECT TGT.SK , TGT.PHONE_NO, TGT.NAME, EFF_FR_DT, EFF_TO_DT, FLAG FROM PAT_DTLS TGT WHERE TGT.FLAG = 1 AND EXISTS (SELECT 1 FROM PAT_LOAD SRC WHERE TGT.SK = SRC.SK AND ( TGT.PHONE_NO = SRC.PHONE_NO AND TGT.NAME = SRC.NAME ) ) ;
Step 4: Copy only updated records from LOAD table
These are records which are updated in this load cycle. Just compare the LOAD and TGT table records on Patient ID or SK column and copy the record which is updated.
Related reading:
Query:
INSERT INTO PAT_INT SELECT SRC.SK , SRC.PHONE_NO, SRC.NAME, to_date(now()), NULL, 1 FROM PAT_LOAD src WHERE EXISTS (SELECT 1 FROM PAT_INT INT1 WHERE src.sk = INT1.sk AND flag = 0 AND NOT EXISTS (SELECT 1 FROM PAT_INT INT2 WHERE INT1.sk = INT2.sk AND flag = 1 ) );
Step 5: Copy new records from LOAD to INT
At any given load cycle, you may receive either updated or fresh records. In this step you are going to load the new records. This is the simple step to compare the LOAD and TGT table on patient ID or SK and copy the record which is new.
Query:
INSERT INTO PAT_INT SELECT SRC.SK , SRC.PHONE_NO, SRC.NAME, to_date(NOW()), NULL, 1 FROM PAT_LOAD SRC WHERE NOT EXISTS (SELECT 1 FROM PAT_INT INT1 WHERE SRC.SK = INT1.SK);
Step 6: Perform Insert Overwrote on TGT table.
You have to perform INSERT OVERWRITE on TGT table and select records from intermediate tables. Or you can also drop TGT table and rename INT as TGT table. Renaming is fast compared to INSERT.
Query:
INSERT OVERWRITE TABLE PAT_TGT SELECT * FROM PAT_INT;
Below is the output of the destination patient table:
SK | PHONE_NO | NAME | EFF_FR_DT | EFF_TO_DT | FLAG |
111 | 1234567890 | Jhoney | 2017-09-30 00:00:00 | 2017-09-29 00:00:00 | 0 |
111 | 987654321 | Jhoney | 2017-09-30 00:00:00 | NULL | 1 |
222 | 2345678901 | Stuart | 2017-09-30 00:00:00 | 2017-09-29 00:00:00 | 0 |
222 | 876543210 | Stuart | 2017-09-30 00:00:00 | NULL | 1 |
333 | 3456789012 | Jhoney | 2017-09-30 00:00:00 | NULL | 1 |
444 | 4567890123 | Max | 2017-09-30 00:00:00 | NULL | 1 |
555 | 345678901 | Moxie | 2017-09-30 00:00:00 | NULL | 1 |
666 | 765432101 | Jeff | 2017-09-30 00:00:00 | NULL | 1 |
Step 7: Drop Intermediate and External Tables
Once the destination table is populated you should drop the intermediate and external table:
drop table PAT_INT; drop table PAT_LOAD;
Awesome… it really saved my day!!
Thank you Bhriti. Glad this helped 🙂