Skip to content

fegvilela/chinook-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

20 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Chinook Data Pipeline

Data Pipeline Tools

A complete data pipeline implementation for the Chinook database with dbt transformations and Dagster orchestration.

πŸ“‚ Project Structure

chinook-pipeline/
β”œβ”€β”€ dbt/                        # dbt project
β”‚   β”œβ”€β”€ chinook_analytics/      # Main project
β”‚   β”‚   β”œβ”€β”€ models/             # Data models
β”‚   β”‚   β”‚   β”œβ”€β”€ staging/       # Raw source models
β”‚   β”‚   β”‚   └── core/          # Business transformations
β”‚   β”‚   β”œβ”€β”€ seeds/             # Reference data
β”‚   β”‚   β”œβ”€β”€ tests/             # Data tests
β”‚   β”‚   └── dbt_project.yml    # Project config
β”œβ”€β”€ dagster/                   # Orchestration
β”‚   β”œβ”€β”€ repository.py          # Pipeline definitions
β”‚   β”œβ”€β”€ schedules.py           # Automation config
β”‚   └── workspace.yaml         # Dev environment
β”œβ”€β”€ docker-compose.yml         # Container setup
β”œβ”€β”€ Dockerfile_dbt             # dbt environment
└── scripts/                   # Utility scripts

πŸš€ Quick Start

Prerequisites

  • Docker
  • Docker Compose

Installation

git clone https://github.com/fegvilela/chinook-pipeline.git
cd chinook-pipeline

Running the Pipeline

# Start all services
docker-compose up -d --build

πŸ› οΈ Usage

dbt Commands

# Run all models
docker-compose exec dbt_chinook dbt run

# Run specific model
docker-compose exec dbt_chinook dbt run --models <MODEL_NAME>

# Generate docs
docker-compose exec dbt_chinook dbt docs generate

Access Services

Service URL Credentials
Dagster UI http://localhost:3000 None
dbt Docs http://localhost:8080 After docs generate
PostgreSQL yugabyteDB -

πŸ”„ Daily Pipeline

The Dagster pipeline is configured to:

  1. Load new data at 02:00 UTC daily
  2. Run dbt transformations
  3. Handle incremental updates

View scheduled runs in the Dagster UI.

πŸ›‘οΈ Testing

# Run dbt tests
docker-compose exec dbt_chinook dbt test

# Run unit tests
docker-compose exec dagster pytest

πŸ“ Design Highlights

  • Incremental models for fact tables
  • Dagster assets for data lineage
  • Containerized environment
  • Daily automation with error handling

Dimensional Star Schema Modeling - Chinook Database

alt text Star schema for chinook (simplified diagram)

Central Fact Table

fact_invoice

Column Type Description Relationship
invoice_info_key PK Surrogate key for invoice + line -
invoice_key varchar Surrogate key for invoice -
invoice_line_key varchar Surrogate key for invoice line -
customer_key FK Customer key β†’ dim_customer
employee_key FK Support rep key β†’ dim_employee
date_key FK Date key β†’ dim_date
track_key FK Music track key β†’ dim_track
invoice_line_id INT Original line ID (business key) -
invoice_id INT Original invoice ID -
quantity INT Quantity sold -
unit_price DECIMAL Unit price -
total_amount DECIMAL Total (quantity Γ— unit_price) -
  1. PK for line-item granularity:

    • invoice_line_key as surrogate primary key
    • Enables analysis of individual sale items
  2. Dual-key for traceability:

    • Maintains invoice_line_id (original ID) as business key
    • Adds invoice_id for full document reference
  3. Optional self-relationship:

    • invoice_key can create an invoice header dimension if needed

Impact on Relationships:

  1. Fact table has invoice line granularity (not invoice-level)
  2. All metrics (quantity, unit_price) are naturally line-items
  3. Aggregate queries should use invoice_id for document-level analysis

Usage Example:

-- Line-item analysis
SELECT
    f.invoice_line_id,
    t.track_name,
    f.quantity,
    f.total_amount
FROM fact_invoice f
JOIN dim_track t ON f.track_key = t.track_key

-- Complete invoice analysis
SELECT
    f.invoice_id,
    SUM(f.total_amount) as invoice_total
FROM fact_invoice f
GROUP BY f.invoice_id

Dimension Tables

dim_customer

Column Type Description
customer_key PK Surrogate key
customer_id INT Original system ID
first_name VARCHAR Customer first name
last_name VARCHAR Customer last name
company VARCHAR Company (if applicable)
city VARCHAR City
country VARCHAR Country
support_rep_name VARCHAR Support rep name (denormalized)

dim_employee

Column Type Description
employee_key PK Surrogate key
employee_id INT Original ID
first_name VARCHAR First name
last_name VARCHAR Last name
title VARCHAR Job title
hire_date DATE Hire date
reports_to_name VARCHAR Supervisor name (denormalized)

dim_track

Column Type Description
track_key PK Surrogate key
track_id INT Original ID
track_name VARCHAR Track name
album_name VARCHAR Album (denormalized)
artist_name VARCHAR Artist (denormalized)
genre_name VARCHAR Music genre
media_type VARCHAR Format (MP3, AAC, etc.)
duration_ms INT Duration in milliseconds

dim_date

Column Type Description
date_key PK YYYYMMDD format
full_date DATE Complete date
day INT Day of month (1-31)
month INT Month (1-12)
year INT Year
quarter INT Quarter (1-4)

Relationships

  1. fact_invoice.customer_key β†’ dim_customer.customer_key
  2. fact_invoice.employee_key β†’ dim_employee.employee_key
  3. fact_invoice.date_key β†’ dim_date.date_key
  4. fact_invoice.track_key β†’ dim_track.track_key

Mapped Business Rules

  • Each row in fact_invoice represents a sale line item
  • Calculated metrics: total_amount = quantity Γ— unit_price
  • Denormalized dimensions for query optimization

Note: All primary keys (PK) are dbt-generated surrogate keys.

Notes on fact implementation

The invoces fact is incremental, based on the invoice_info_key as unique key, that's a surrogate key composed by invoice_id and invoice_line_id. The incremental strategy chosen was the delete+insert, as merge option is not available for the yugabyte database (link), that would be the best one for this case, as we wanted to insert new rows and add the updates, but delete+insert ensures updated records are fully replaced.

invoice_info_key was chosen instead of invoice_id because it enables 1 - n relationships with dimension tables, such as dim_tracks

Notes on data enrichment with Streaming Pipelines consuming from REST API

When implementing a streaming solution that consumes data from a REST API, alternatives like Apache Kafka, Apache Flink, or AWS Kinesis are often more suitable than Dagster or dbt. These tools are designed for real-time data processing, offering low-latency ingestion, event-driven processing, and scalability. For instance, Kafka can act as a message broker to buffer API data, while Flink or Spark Streaming can process it in real time. Lightweight frameworks like FastAPI with WebSockets or serverless solutions (AWS Lambda, Google Cloud Functions) can also be used for near-real-time polling of APIs, depending on the use case.

Dagster and dbt, however, are not ideal for streaming because they are primarily batch-oriented frameworks. Dagster focuses on orchestrating scheduled data pipelines, while dbt specializes in transforming batch-loaded data in warehouses. Neither supports true event-driven processing or low-latency streaming natively. While Dagster can trigger pipelines frequently (e.g., every minute), it still introduces delays and overhead compared to dedicated streaming tools. For real-time use cases, leveraging purpose-built streaming technologies ensures better performance and scalability.

alt text Streaming architecture (simplified diagram)

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published