This project uses Pentaho Data Integration and PostgreSQL to build a data warehouse (DWH). It includes the sales fact table and two dimensional tables (dim_product and dim_payment). The ETL process implements incremental delta to optimize the extraction and loading of new data.
- Setup
- Table Structure
- ETL Job & Incremental Load: Process
- ETL Job & Incremental Load: Final Output
- Software: Pentaho Data Integration v10.2
- RDBMS: PostgreSQL
- pgAdmin4: open-source Management Tool for PostgreSQL
Below is the DDL of the sales fact table, which contains sales, linked to the dim_product and dim_payment dimension tables using a star schema.
-- Table: public.sales
-- DROP TABLE IF EXISTS public.sales;
CREATE TABLE IF NOT EXISTS public.sales
(
transaction_id integer NOT NULL,
transactional_date timestamp without time zone,
product_id character varying COLLATE pg_catalog."default",
customer_id integer,
payment character varying COLLATE pg_catalog."default",
credit_card bigint,
loyalty_card character varying COLLATE pg_catalog."default",
cost character varying COLLATE pg_catalog."default",
quantity integer,
price numeric,
CONSTRAINT sales_pkey PRIMARY KEY (transaction_id)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS public.sales
OWNER to postgres;
-- Table: Staging.sales
-- DROP TABLE IF EXISTS "Staging".sales;
CREATE TABLE IF NOT EXISTS "Staging".sales
(
transaction_id integer NOT NULL,
transactional_date timestamp without time zone,
product_id character varying COLLATE pg_catalog."default",
customer_id integer,
payment character varying COLLATE pg_catalog."default",
credit_card bigint,
loyalty_card character varying COLLATE pg_catalog."default",
cost numeric,
quantity integer,
price numeric,
CONSTRAINT sales_pkey PRIMARY KEY (transaction_id)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS "Staging".sales
OWNER to postgres;
-- Table: core.sales
-- DROP TABLE IF EXISTS core.sales;
CREATE TABLE IF NOT EXISTS core.sales
(
transaction_id integer NOT NULL,
transactional_date timestamp without time zone,
transactional_date_fk bigint,
product_id character varying COLLATE pg_catalog."default",
product_fk integer,
customer_id integer,
payment_fk integer,
credit_card bigint,
cost numeric,
quantity integer,
price numeric,
total_cost numeric,
total_price numeric,
profit numeric,
CONSTRAINT sales_pkey PRIMARY KEY (transaction_id)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS core.sales
OWNER to postgres;
- transaction_id: this column is a natural key, but since it has an integer as its data type, we can already define it as a surrogate key, so we do not need to add another one;
- transactional_date: this column, since it has timestamp (date/time) as its data type, we can interpret it as a delta column, so we can define an incremental load;
- product_id: this column represents the natural key of the dim_product dimension table;
- customer_id: this column represents an FK in this fact table, but a plausible PK in a dimension table representing customers;
- payment, loyalty_card: we can define them as flags, so we can put them in a junk dimension;
- credit_card: degenerate dimension, bigint type, and not linked to any dimension table, so we can leave it on the fact table;
- cost, quantity, price: non-aggregate measures;
- total_price, total_cost: total_price and total_cost are two fields calculated through Pentaho Data Integration, with Calculator object, given by the product between price and quantity and the product between cost and quantity, respectively;
- profit: profit is field calculated through Pentaho Data Integration, with Calculator object, given by the product between total_price and total_cost rispectively.
- product_fk: product_fk is the foreign key for the primary found in the dim_product dimensional table;
- payment_fk: payment_fk is the foreign key for the primary found in the dim_payment dimensional table.
-- Table: public.products
-- DROP TABLE IF EXISTS public.products;
CREATE TABLE IF NOT EXISTS public.products
(
product_id character varying(5) COLLATE pg_catalog."default",
product_name character varying(100) COLLATE pg_catalog."default",
category character varying(50) COLLATE pg_catalog."default",
subcategory character varying(50) COLLATE pg_catalog."default"
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS public.products
OWNER to postgres;
-- Table: Staging.dim_product
-- DROP TABLE IF EXISTS "Staging".dim_product;
CREATE TABLE IF NOT EXISTS "Staging".dim_product
(
"Product_PK" integer NOT NULL GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 2147483647 CACHE 1 ),
product_id character varying COLLATE pg_catalog."default",
product_name character varying COLLATE pg_catalog."default",
category character varying COLLATE pg_catalog."default",
subcategory character varying COLLATE pg_catalog."default",
CONSTRAINT dim_product_pkey PRIMARY KEY ("Product_PK")
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS "Staging".dim_product
OWNER to postgres;
-- Table: core.dim_product
-- DROP TABLE IF EXISTS core.dim_product;
CREATE TABLE IF NOT EXISTS core.dim_product
(
product_pk integer,
product_id character varying(5) COLLATE pg_catalog."default",
product_name character varying(100) COLLATE pg_catalog."default",
category character varying(50) COLLATE pg_catalog."default",
subcategory character varying(50) COLLATE pg_catalog."default",
brand character varying(50) COLLATE pg_catalog."default"
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS core.dim_product
OWNER to postgres;
- product_pk: product_pk is the Surrogate Key;
- product_id: product_id is the Primay Key;
- product_name, category, subcategory, brand: These columns are the Dimensions.
-- Table: core.dim_payment
-- DROP TABLE IF EXISTS core.dim_payment;
CREATE TABLE IF NOT EXISTS core.dim_payment
(
payment_pk integer NOT NULL GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 2147483647 CACHE 1 ),
payment character varying COLLATE pg_catalog."default",
loyalty_card character varying COLLATE pg_catalog."default",
CONSTRAINT dim_payment_pkey PRIMARY KEY (payment_pk)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS core.dim_payment
OWNER to postgres;
- payment_pk: These column is a natural key, but since it has an integer as its data type, we can already define it as a surrogate key;
- payment, loyalty_card: These columns are dimensions, and from the fact table we have already seen that they are flags, so put this junk in the dimension table.
This parent-job consists of two child-jobs, called Staging and Transform&Load respectively
Staging Job consists of four transformations:
- Set Variable;
- Get Data and output data;
- Set variable for Sales;
- Get variable for Sales and Load.
Starting from the first transformation, Set Variable, we can see that:
This transformation finds the last upload date and contains the following steps:
- Table input: Runs a query, on a table core.dim_product, to determine the last upload date
SELECT MAX(product_id) FROM core.dim_product
- Set variable: allowed us to set an environment variable called max, which is the maximum value returned by the query, for the incremental delta operation.
The second transformation is as follows:
This transformation makes it possible to identify records that have been added or modified since the last upload date and contains the following steps:
- Get variables: Retrieves the environment variable set on the first transformation, Set Variable
- Table input: Lets you run an SQL query that filters the source data (i.e., from the data source) by comparing product_id to the last product upload entered.
SELECT * FROM "public".products WHERE product_id > '${LastLoad}'
- Table output: It allows these new or updated records to be written to the dim_product table in the data warehouse (more specifically, on the staging tier) according to the incremental load model.
The third transformation is as follows:
This transformation finds the last upload date and contains the following steps:
- Table input: Runs a query, on a table core.sales, to determine the last upload date, where:
- for Full load, since you have no value at the beginning, you have to enter a dummy value by entering a very old date only for the first load cycle, then later we can use SELECT for Delta load:
SELECT '1970-01-01 00:00:00' AS LastLoadDate
- for Delta load, through the delta column associated with timestamp type transactions, take for each workflow the new rows:
SELECT MAX(transactional_date) AS LastLoadDate FROM core.sales
- for Full load, since you have no value at the beginning, you have to enter a dummy value by entering a very old date only for the first load cycle, then later we can use SELECT for Delta load:
- Set variable: allowed us to set an environment variable called max, which is the maximum value returned by the query, for the incremental delta operation:
The fourth transformation is as follows:
This transformation contains the following steps:
- Get variables: Retrieves the environment variable set on the third transformation, Set variable for Sales
- Table input: From
public.sales
we go to find the rows for which the delta columntransactional_date > '${LastLoadDate}'
SELECT * FROM public.sales WHERE transactional_date > '${LastLoadDate}'
- Select values: Since between staging layer and core layer the data type of the cost column is different, through Pentaho we applied a data type transformation, from character varying to number, through the object “Select values”
- Table output: we can send what we return with that query to the “Staging” table.sales
Transform & Load Job consists of two transformations:
- dim_payment;
- fact_sales.
From here we structure our core layer, starting with the dim_payment size table. First, we will manage all the information from PostgresSQL via PgAdmin; in particular, we will look at the different values of the payment and loyalty_card columns to check all possible combinations:
SELECT DISTINCT payment, loyalty_card
FROM "Staging".sales;
We got in output 4 distinct values for payment and 2 distinct values for loyalty_card. So we will have to change:
- the null values with a dummy value because the null values represent the people who paid cash;
- go to remove whitespace with a trim.
On Postgres, this can be done through the COALESCE function, where null values for payments will be replaced with the string cash:
SELECT DISTINCT COALESCE(payment, 'cash') as payment, loyalty_card
FROM "Staging".sales;
So we will do this through Pentaho, through the Table input object:
Next the insert/update object, where we are going to load the data on the core.dim_payment size table, checking if the value already exists and if yes, optionally update, if not insert a new combination:
With this transformation we will read the data from the staging table, “Staging”.sales:
SELECT
transaction_id ,
transactional_date ,
EXTRACT(year from transactional_date)*10000 + EXTRACT('month' from transactional_date)*100+EXTRACT('day' from transactional_date)as transactional_date_fk,
f.product_id ,
p.product_PK as product_FK,
payment_PK as payment_FK,
customer_id ,
credit_card ,
cost ,
quantity ,
price
FROM "Staging".sales f
LEFT JOIN
core.dim_payment d
ON d.payment = COALESCE(f.payment,'cash') AND d.loyalty_card=f.loyalty_card
LEFT JOIN core.dim_product p on p.product_id=f.product_id
order by transaction_id
Through Pentaho we go to add the Table input object, reading the data through this query tried earlier on PostgreSQL:
Now we are going to calculate the additional fields, such as total_price, total_cost and profit, through the Calculator object:
Then we will bring data on core.sales:
Through PostgreSQL, with pgAdmin4, we check if there is data on the core layer, both for the sales facts table and the dim_payment size table:
Let's test Delta Load by entering data that was not entered at the start, first adding the data in our data source, then go to “DB DataWarehouseX->Public Schema->Facts Sales Table,” then “Import/Export Data...,” importing the data from the Fact_Sales_2.csv file, with expective output:
SELECT COUNT(*)
FROM public.sales;
SELECT COUNT(*)
FROM core.sales;
Now we are going to test the Delta Incremental, where we have to remember that we had set the dummy value ‘1970-01-01 00:00:00’
in SetLastLoadSales.ktr, and going to run the parent job CompleteETLprocess.kjb again, the output is: