A complete data pipeline implementation for the Chinook database with dbt transformations and Dagster orchestration.
chinook-pipeline/
βββ dbt/ # dbt project
β βββ chinook_analytics/ # Main project
β β βββ models/ # Data models
β β β βββ staging/ # Raw source models
β β β βββ core/ # Business transformations
β β βββ seeds/ # Reference data
β β βββ tests/ # Data tests
β β βββ dbt_project.yml # Project config
βββ dagster/ # Orchestration
β βββ repository.py # Pipeline definitions
β βββ schedules.py # Automation config
β βββ workspace.yaml # Dev environment
βββ docker-compose.yml # Container setup
βββ Dockerfile_dbt # dbt environment
βββ scripts/ # Utility scripts
- Docker
- Docker Compose
git clone https://github.com/fegvilela/chinook-pipeline.git
cd chinook-pipeline# Start all services
docker-compose up -d --build
# Run all models
docker-compose exec dbt_chinook dbt run
# Run specific model
docker-compose exec dbt_chinook dbt run --models <MODEL_NAME>
# Generate docs
docker-compose exec dbt_chinook dbt docs generate| Service | URL | Credentials |
|---|---|---|
| Dagster UI | http://localhost:3000 | None |
| dbt Docs | http://localhost:8080 | After docs generate |
| PostgreSQL | yugabyteDB | - |
The Dagster pipeline is configured to:
- Load new data at 02:00 UTC daily
- Run dbt transformations
- Handle incremental updates
View scheduled runs in the Dagster UI.
# Run dbt tests
docker-compose exec dbt_chinook dbt test
# Run unit tests
docker-compose exec dagster pytest- Incremental models for fact tables
- Dagster assets for data lineage
- Containerized environment
- Daily automation with error handling
Star schema for chinook (simplified diagram)
| Column | Type | Description | Relationship |
|---|---|---|---|
| invoice_info_key | PK | Surrogate key for invoice + line | - |
| invoice_key | varchar | Surrogate key for invoice | - |
| invoice_line_key | varchar | Surrogate key for invoice line | - |
| customer_key | FK | Customer key | β dim_customer |
| employee_key | FK | Support rep key | β dim_employee |
| date_key | FK | Date key | β dim_date |
| track_key | FK | Music track key | β dim_track |
| invoice_line_id | INT | Original line ID (business key) | - |
| invoice_id | INT | Original invoice ID | - |
| quantity | INT | Quantity sold | - |
| unit_price | DECIMAL | Unit price | - |
| total_amount | DECIMAL | Total (quantity Γ unit_price) | - |
-
PK for line-item granularity:
invoice_line_keyas surrogate primary key- Enables analysis of individual sale items
-
Dual-key for traceability:
- Maintains
invoice_line_id(original ID) as business key - Adds
invoice_idfor full document reference
- Maintains
-
Optional self-relationship:
invoice_keycan create an invoice header dimension if needed
- Fact table has invoice line granularity (not invoice-level)
- All metrics (
quantity,unit_price) are naturally line-items - Aggregate queries should use
invoice_idfor document-level analysis
-- Line-item analysis
SELECT
f.invoice_line_id,
t.track_name,
f.quantity,
f.total_amount
FROM fact_invoice f
JOIN dim_track t ON f.track_key = t.track_key
-- Complete invoice analysis
SELECT
f.invoice_id,
SUM(f.total_amount) as invoice_total
FROM fact_invoice f
GROUP BY f.invoice_id| Column | Type | Description |
|---|---|---|
| customer_key | PK | Surrogate key |
| customer_id | INT | Original system ID |
| first_name | VARCHAR | Customer first name |
| last_name | VARCHAR | Customer last name |
| company | VARCHAR | Company (if applicable) |
| city | VARCHAR | City |
| country | VARCHAR | Country |
| support_rep_name | VARCHAR | Support rep name (denormalized) |
| Column | Type | Description |
|---|---|---|
| employee_key | PK | Surrogate key |
| employee_id | INT | Original ID |
| first_name | VARCHAR | First name |
| last_name | VARCHAR | Last name |
| title | VARCHAR | Job title |
| hire_date | DATE | Hire date |
| reports_to_name | VARCHAR | Supervisor name (denormalized) |
| Column | Type | Description |
|---|---|---|
| track_key | PK | Surrogate key |
| track_id | INT | Original ID |
| track_name | VARCHAR | Track name |
| album_name | VARCHAR | Album (denormalized) |
| artist_name | VARCHAR | Artist (denormalized) |
| genre_name | VARCHAR | Music genre |
| media_type | VARCHAR | Format (MP3, AAC, etc.) |
| duration_ms | INT | Duration in milliseconds |
| Column | Type | Description |
|---|---|---|
| date_key | PK | YYYYMMDD format |
| full_date | DATE | Complete date |
| day | INT | Day of month (1-31) |
| month | INT | Month (1-12) |
| year | INT | Year |
| quarter | INT | Quarter (1-4) |
fact_invoice.customer_keyβdim_customer.customer_keyfact_invoice.employee_keyβdim_employee.employee_keyfact_invoice.date_keyβdim_date.date_keyfact_invoice.track_keyβdim_track.track_key
- Each row in
fact_invoicerepresents a sale line item - Calculated metrics:
total_amount = quantity Γ unit_price - Denormalized dimensions for query optimization
Note: All primary keys (PK) are dbt-generated surrogate keys.
The invoces fact is incremental, based on the invoice_info_key as unique key, that's a surrogate key composed by invoice_id and invoice_line_id. The incremental strategy chosen was the delete+insert, as merge option is not available for the yugabyte database (link), that would be the best one for this case, as we wanted to insert new rows and add the updates, but delete+insert ensures updated records are fully replaced.
invoice_info_key was chosen instead of invoice_id because it enables 1 - n relationships with dimension tables, such as dim_tracks
When implementing a streaming solution that consumes data from a REST API, alternatives like Apache Kafka, Apache Flink, or AWS Kinesis are often more suitable than Dagster or dbt. These tools are designed for real-time data processing, offering low-latency ingestion, event-driven processing, and scalability. For instance, Kafka can act as a message broker to buffer API data, while Flink or Spark Streaming can process it in real time. Lightweight frameworks like FastAPI with WebSockets or serverless solutions (AWS Lambda, Google Cloud Functions) can also be used for near-real-time polling of APIs, depending on the use case.
Dagster and dbt, however, are not ideal for streaming because they are primarily batch-oriented frameworks. Dagster focuses on orchestrating scheduled data pipelines, while dbt specializes in transforming batch-loaded data in warehouses. Neither supports true event-driven processing or low-latency streaming natively. While Dagster can trigger pipelines frequently (e.g., every minute), it still introduces delays and overhead compared to dedicated streaming tools. For real-time use cases, leveraging purpose-built streaming technologies ensures better performance and scalability.
