2023-04-17 data

The Warehouse Layer: Transforming data with dbt (Part 7)

(This post is part of a series on working with data from start to finish.)

Over the past decade, the advent of the distributed data warehouse dramatically simplified how data is stored, managed and analyzed. No longer did engineers need to manage physical servers or configure Hadoop clusters; instead, they could operate exclusively within the data warehouse using “just SQL.”

This SQL constituted the “code” behind the data warehouse and, like all code, it needed to be organized. In addition, the data itself it needed to be organized. These were respectively called “software architecture” and “data architecture.”

It is worth clarifying why software and data are treated differently. Software lives in a codebase and is applied to data[0]. Data, on the other hand, lives in a database. Software consists of “stateless” operations (functions or algorithms) which, like moving pieces on a chessboard, transition data from one “state” to the next.

Because software concerns the logic upon which data transitions between states, it is organized “imperatively” as a sequence of steps, or a pipeline. Conversely, data itself is laid out along a flat chessboard where everything can be viewed, related and operated upon from above.

For the purposes of this essay, we’ll review both the software and data architecture of a codebase using dbt, the foremost orchestration tool for SQL-based data pipelines.

Software and data architecture #

Software architecture refers to how code is organized within the codebase while data architecture refers to how data is organized within the database. While software architecture concerns the hierarchical and modular nature of code, data architecture concerns the relational model of data.

For example, defining the behavior of a payments system, such as making a transaction, performing a refund or querying an account balance, would constitute software architecture. Modeling each discrete component - a transaction, a refund, a balance - and their respective relationships would instead comprise data architecture. This is conventionally taught as the difference between “verbs” (actions) and “nouns” (things).

In a dbt-managed data pipeline, software architecture is the sequence of SQL code responsible for transititioning data from one state to the next. Data architecture, on the other hand, describes the structure of that data at any particular state in time.

As data moves through the pipeline, it is progressively transformed from normalized to denormalized, from system-designed to human-designed and, most importantly, from raw to business-ready.

Although dbt recommends three stages of the data pipeline (staging, intermediate, marts), we’ll explore a slightly different version in this essay:

  1. Sources
  2. Staging
  3. Core (facts and dimensions)
  4. Data marts

Sources #

Like a shipping port welcoming the arrival of new freight, the sources stage collects all data from across the firm into a single location for subsequent processing. In the vision of Bill Inmon, this represents the “enterprise data warehouse” (EDW), which retains the maximally normalized relational model of transactional databases.

In the sources stage, we do not perform any data processing. Data is transmitted as-is from data sources (e.g. Salesforce, Hubspot, Postgres, Mixpanel) to the enterprise data warehouse. Data integration tools, such as Airbyte or Fivetran, perform this row-for-row replication automatically.

Data integration tools will specify a “destination” in the data warehouse, typically an isolated database such as DW_PROD_SOURCES. Each data source is assigned its own schema: Salesforce data in SALESFORCE, Hubspot in HUBSPOT and so on.

In the dbt codebase, one could organize the code using folders for each source:

sources/
    hubspot/
        sources.yml
    salesforce/
        sources.yml
    mixpanel/
        sources.yml
    jira/
        sources.yml

Generally speaking, prefixing file names is equivalent to nesting files within folders (as in AWS S3). Therefore, the following structure would also work:

sources/
    hubspot_sources.yml
    salesforce_sources.yml
    mixpanel_sources.yml
    jira_sources.yml

For the intrepid, one could also consolidate all data source information into a single, warehouse-wide sources.yml file, although this is not recommended.

Staging #

After ingesting our sources, we perform initial processing of the data. Here, we use “staging tables” to:

Each staging table should correspond with an entity in the data source. For example, DW_PROD_SOURCES.SALESFORCE.CONTACT might have an associated staging table of DW_PROD.STAGING.STG_SALESFORCE_CONTACT to process raw Salesforce contacts.

Because staging tables correspond to source data and not to business-specific use cases, we should rarely if ever perform joins (JOIN), aggregations (GROUP BY) or filters (WHERE) in staging.

As shown in the prior example, we typically use a single STAGING schema within a DW_PROD data warehouse (as separate from our sources database, DW_PROD_SOURCES) to consolidate all processed data. Staging tables follow the nomenclature of stg__, such as stg_salesforce_contact and stg_salesforce_account.

One might wonder why we do not use separate schemas for each source, as we had done in DW_PROD_SOURCES, especially since staging tables have one-to-one correspondence with source tables. In addition, why do we put staging tables in DW_PROD and not DW_PROD_SOURCES?

If we were to keep our staging tables adjacent to our source tables in DW_PROD_SOURCES, for example by placing STG_SALESFORCE_CONTACT next to CONTACT in the DW_PROD_SOURCES.SALESFORCE schema, then we have coupled our processing of data with our ingestion of it. Should a developer want to test both staging and “data mart” code in the same session, he or she must constantly switch between databases.

If, on the other hand, we created a schema for each source in DW_PROD, then we would have the undesirable layout of source schemas (SALESFORCE, MIXPANEL, JIRA) next to pipeline schemas (STAGING, INTERMEDIATE, CORE, MARTS).

As a result, we typically place all staging tables in the DW_PROD.STAGING schema. While this in theory runs the risk of having too many processed source tables in a single schema, in practice only a limited set of source tables are ever processed and table prefixing by source name (e.g. stg_salesforce_contact) prevents clutter.

In the dbt codebase, we place each staging table into folders segregated by data source:

staging/
	salesforce/
		stg_salesforce_contact.sql
		stg_salesforce_account.sql
	hubspot/
		stg_hubspot_firm.sql
        stg_hubspot_email.sql
        stg_hubspot_campaign.sql

Core and intermediate #

Even after the initial processing of data in our staging tables, our data retains its highly normalized structure. It is only in our (1) “intermediate” or (2) “core” tables that we begin to denormalize data by joining it together. Here, in the heart of the data warehouse, we apply core transformations which render raw data comprehensible to the business.

There are two approaches to structuring data in this stage. dbt recommends the use of intermediate tables: modular components which can be variously assembled to produce business-ready data marts. An alternative approach uses Kimball-style dimensional modeling to construct “fact” and “dimension” tables.

In practice, these two approaches are very similar. Both encode concepts, such as “the number of outreach activities per sales territory per day”, into tables at a given grain. Both produce modular components which can be joined to other components to form a more synoptic view of the business.

They only differ with respect to the recommended amount of denormalization: an intermediate approach will pre-join more data in advance (i.e. our fact and dimension tables), while a dimensional approach will leave fact and dimension tables separate until joined at query time.

As is typical in data warehousing, there is no one right answer, and multiple approaches can be used to achieve the same outcome. Here, we’ll explore the structure of a dimensionally modeled data warehouse using fact, dimension and even intermediate tables.

Fact tables #

Fact tables capture facts about the world, such as “the volume of transactions processed per month” or “the number of users logging in per day.” Generally speaking, they represent “events” (actions, verbs) which occurred over time, such as a history of user logins, clicks or transactions.

Facts exist at a particular grain, such as “per day” in the “number of users logging in per day.” The grain corresponds to a fact’s analytical resolution, meaning you can analyze everything above the “line of representation” but nothing below it. If you need more granular visibility into a given fact, you must choose a higher-resolution grain and create a new fact table.

The most granular fact is, in Kimball’s terminology, the “atomic grain”: the maximum amount of detail that a given business process captures. In the terminology of resolution, this corresponds to a business process’ instrument resolution, the maximum resolution at which we can record data.

Fact tables typically include at least one quantitative measure, such as the “number of distinct users” per month or the “maximum latency” per thousand network requests. These measures frequently correspond to SQL’s most widely used aggregate functions: COUNTD, SUM, MIN, MAX.

In SQL, a fact table (prefixed using fct_) may look like:

fct_user_logins_daily.sql

SELECT
	event_date,
	COUNTD(user_id) AS num_users
FROM i_platform_activity
GROUP BY event_date
ORDER BY event_date DESC
event_date num_users
2022-04-05 10
2022-04-04 14
2022-04-02 12
2022-04-01 16
2022-03-31 10

When possible, measures should be “additive”, meaning they can be summed over any dimension. For example, the SUM of sales over each month is additive because individual monthly totals can be added together to produce an aggregate total.

On the other hand, if each row in the fact table represented the AVG sales per month, then these averages could not be added together: the data would be “non-additive”. Finally, there is “semi-additive” data, such as bank balances, which can be summed across some dimensions (e.g. different bank accounts) but not across time (e.g. last month’s balance plus this month’s balance).

Dimension tables #

If fact tables correspond to “verbs” which occur over time, then dimensions represent the “nouns” and “adjectives” which embroider those events with additional detail.

For example, we might know the “number of website visitors arriving per day from each marketing channel”, but we do not necessarily know which marketing channel is paid versus organic, in-person versus online, or web-based versus mobile. To determine these, we must join our fact table to a dimension table (prefixed using dim_) containing this information.

dim_marketing_channel.sql

marketing_channel_id name is_paid is_in_person is_mobile
1 google ads TRUE FALSE YES
2 conference TRUE TRUE FALSE
3 hubspot_marketing FALSE FALSE FALSE

Why don’t we include all this information in the fact table upfront, thereby obviating the need to perform any joins at all? Indeed, a maximally denormalized table could include all possible fields from related tables. However, such a wide table would routinely include hundreds or thousands of irrelevant fields for any particular business query and would therefore not constitute a reusable component.

The grain of a fact table is defined by its primary key, which itself is composed of a set of attributes (dimensions). Thus the “number of website visitors arriving per day from each marketing channel” would have a primary key of (date, marketing_channel), both of which could optionally be joined to dim_date and dim_marketing_channel should we need additional dimensional data.

To visualize which fact tables are associated with which dimension tables, a two-by-two “bus matrix” is typically used to map the relationships. This should be updated whenever new fact tables or dimension tables are added to the data warehouse.

Intermediate tables #

You’ll notice in the fct_user_logins_daily example above that we sourced data from a table called i_platform_activity. Here, i_ signifies “intermediate”.

Intermediate tables can be useful as intermediate stages of processing between source data and business data. Recall that in staging, our tables should have one-to-one correspondence with sources. But what if we need a data set that combines data from many sources?

For example, if we want to consolidate all user platform activity from our backend API logs, our frontend JavaScript logs and our mobile application logs (which all reside in different systems), then we must UNION ALL these data sets together to get a holistic view of our users.

This cannot be done in our staging tables, and if it is done in our fact tables, the logic must be repeated at each grain of fact: fct_user_logins_daily, fct_user_logins_weekly, fct_user_logins_monthly and so on. This would produce considerable redundant code.

Instead, we can build a reusable component, i_platform_activity, which is referenced in each downstream fact table. No longer must each fact table duplicate the UNION ALL logic. It can be stored in an upstream component, leaving the fact tables only to group by various temporal dimensions (day, week, month).

Generally speaking, you should not need intermediate tables until you identify redundant code in the fact or dimension tables. Intermediate tables should live in the INTERMEDIATE schema and be saved as views, as they should not be queried by end users directly.

Late-arriving facts (LAF) and slowly changing dimensions (SCD) #

Sometimes, event data is written to the database “late”, meaning old event data is added to the data warehouse even after newer data has already arrived. For example, imagine a daily ETL job which failed to ingest data yesterday, succeeded today, and then, upon someone noticing the error, was manually rerun today to fetch yesterday’s data. In this case, yesterday’s data arriving today would constitute “late arriving facts” (LAF).

Late-arriving facts can be problematic when we perform “incremental” transformations. Typically, we do not want to process the entire history of data every single day (e.g. all click history), but rather only the last few days’ history. This is called an “incremental run”.

If facts arrive late however, how exactly do we specify data which has already been processed? If we only filter for today’s data using WHERE event_timestamp >= TODAY(), having made the assumption that yesterday’s data has already been processed, then we will fail to process yesterday’s data which arrives today.

The solution to late arriving facts is “bitemporal modeling.” Here, we maintain two timestamps within the data: (1) the original timestamp as recorded by the source system (event_timestamp), and (2) the timestamp at which the data was ingested into to the database (ingested_timestamp).

Now, instead of filtering for today’s data as recorded by the original timestamp, we filter for today’s data based on when it was ingested into the database WHERE ingested_timestamp >= TODAY(). Late arriving facts ingested today will be duly processed along with all other new data[1].

Bitemporal modeling is also used for dimension tables, although for slightly different reasons. Dimensions commonly do not have any temporal component at all: you have a list of all users or all products or all customers, but unlike events, you do not necessarily have them over time. They are ingested at a point in time, after which they change “slowly.”

For example, imagine you are calculating the monthly sales per product for various products listed on Amazon, and specifically you are comparing the sales between products which had free_shipping against those which did not. Your SQL query would look something like:

SELECT
	p.free_shipping,
	p.name AS product_name,
	DATETRUNC(‘month’, o.order_date) AS order_month
	SUM(o.amount) AS total_amount
FROM fct_orders AS o
LEFT JOIN dim_products AS p ON o.product_id = p.product_id
GROUP BY 1, 2, 3
ORDER BY 1, 2 ,3

Although this query is superficially correct, what happens if products have changed their free_shipping status over time? If a product had free_shipping last month but not this month, then the query above would erroneously classify last month’s sales as free_shipping: false (using the current snapshot of product data) when in reality free_shipping: true (had we used a historical snapshot of product data). This is the problem of “slowly changing dimensions” (SCD).

As with late arriving facts, we must inject a temporal component into our dimensions to specify when exactly a given product had free_shipping. In doing so, we transform these tables into what are often called “history tables”, “snapshot tables” or “audit tables”, as they record the history of all changes made to any given dimension.

Our dim_products dimension table would now look something like this:

dim_products_history.sql

products_history_id product_id product_name free_shipping valid_from valid_to
1 1 stapler TRUE 2023-01-01 2023-01-31
2 1 stapler TRUE 2023-02-01 2023-02-28
3 1 stapler FALSE 2023-03-01 NULL

Our updated SQL query would look as follows:

SELECT
	p.free_shipping,
	p.name AS product_name,
	DATETRUNC(‘month’, o.order_date) AS order_month
	SUM(o.amount) AS total_amount
	FROM fct_orders AS o
LEFT JOIN dim_products_history AS p
	ON o.product_id = p.product_id
	AND o.order_date BETWEEN p.valid_from AND COALESCE(valid_to, TODAY())
GROUP BY 1, 2, 3
ORDER BY 1, 2 ,3

Structuring the core and intermediate schemas #

In the database, all fact and dimension tables should live in the CORE schema of PROD_DW. Any intermediate tables, to the extent they are necessary, should live as views in the INTERMEDIATE schema.

In the codebase, we can similarly use intermediate and core folders to delineate the separate schemas:

intermediate/
	i_platform_logins.sql
core/
	fct_platform_logins_daily.sql
	fct_platform_logins_monthly.sql
	fct_email_to_login_conversions.sql
	dim_marketing_campaigns.sql
	dim_users.sql
	dim_firms.sql

Whether they are intermediate tables or core fact and dimension tables, every component should aim to be as synoptic as possible. For example, a dimension table dim_hubspot_marketing_campaigns containing only data from Hubspot would be less synoptic than a dim_marketing_campaigns which creates a UNION ALL of all marketing data sources.

As a general rule, fact and dimension tables should UNION ALL as much data as possible and filter out (WHERE) as little data as possible. Flags can used in the dimension tables to specify their source (e.g. is_hubspot or hubspot_campaign_id).

Data marts #

At last, we have reached the final stage of data processing: clean, joined and enriched data ready for direct consumption by the business teams. When using data marts, business users should do little more than filter, group and aggregate their data.

Data marts represent the most denormalized version of our data. Unlike fact and dimension tables, they are not reusable components and they are not building blocks. Instead, they should be used exclusively and narrowly by the team which requested them.

Data marts are created by joining together fact and dimension tables. For example, if our Finance team needs to analyze “sales by territory over time, excluding holidays and only within the Americas geographic region”, we simply need to join our sales facts to our calendar and region dimensions, then perform some filtering:

mart_finance_sales_by_territory_americas.sql

SELECT
	DATETRUNC(‘month’, o.order_date) AS order_month,
	t.territory_name,
	SUM(o.amount) AS total_amount
	FROM fct_orders_daily AS o
LEFT JOIN dim_calendar_table AS c ON o.order_date = c.date
LEFT JOIN dim_territories AS t
	ON o.territory_id = t.territory_id AND o.order_date BETWEEN t.valid_from AND COALESCE(t.valid_to, TODAY())
WHERE TRUE
	AND c.date NOT c.is_holiday
	AND t.region = ‘Americas’
GROUP BY 1, 2

Notice how the moment we apply filtering in the WHERE clause, our data becomes less reusable for other analytical questions. This is why we seldom use WHERE clauses in our fact and dimension tables but often use them in our data marts.

In the codebase, each business team receives its own folder where all relevant data marts are stored:

marts/
	finance/
		mart_finance_revenue_by_territory_americas.sql
		mart_finance_revenue_by_territory_emea.sql
		mart_finance_revenue_total.sql
		mart_finance_pnl_by_product.sql
	marketing/
		mart_marketing_campaign_conversions.sql
		mart_marketing_webinar_prospects.sql
		mart_marketing_user_journeys.sql
		mart_marketing_email_segments.sql
	product/
		mart_product_feature_usage_by_cohort.sql
		mart_product_usage_stats_by_feature.sql
		mart_product_churned_users_by_feature.sql
		mart_product_journey_completion_funnel.sql

In the database, tables follow the nomenclature of mart__, such as mart_finance_sales_by_territory_americas, and live within the MARTS schema.

Data marts are frequently created in the business intelligence tool, such as Tableau or Looker, rather than in the data warehouse. Tableau, for example, uses the concept of “Data Sources”, wherein fact and dimension tables are joined together using a drag-and-drop interface.

These joins however often occur at query time and can expose substantial latency to the end user. To improve query performance, they can be “pushed down” to the data warehouse where tables are pre-computed in advance. This means that while most data marts will not exist in SQL under marts, those which need to be materialized for convenience or performance reasons will.

--

[0] Of course, software is also a form of data, and can be transitioned through a series of “code states” by way of a version control system, such as `git`.

[1] In practice, we often want to use `WHERE ingested_timestamp >= DATEADD(‘day’, TODAY(), -2)` to include a “lookback period” in our incremental runs. This allows us to write not just today’s data, but additionally overwrite the last two days of data in case any other joined dimensions were late arriving.

(previous)(next)