- Architecture
- Final Table
- Quickstart
- Project Structure
- Data Issues Handled
- Pipeline – Daily Run (D-1)
- Possible Improvements
- Observations
This project implements a data pipeline based on CDC (Change Data Capture) to simulate an event ingestion scenario in a data lake and enable the calculation of daily GMV (Gross Merchandising Value) by subsidiary.
The pipeline covers:
- Synthetic data generation with CDC events
- Simulation of real-world inconsistencies in the bronze layer (duplication, corrections, late arrivals)
- Medallion architecture (Bronze, Silver, Gold)
- Analytical layer for GMV by day and subsidiary
Medallion Architecture:
Bronze (CSV) → Silver (Parquet) → Gold (Partitioned Parquet)
| Layer | Description |
|---|---|
| Bronze | Mock ingestion of 3 CDC event CSVs |
| Silver | Deduplication, joins and cleansing — purchase_clean.parquet |
| Gold | GMV aggregated by day and subsidiary, partitioned by release_date |
Grain: one row per transaction_date × subsidiary
Partitioning: data/gold/{transaction_date}/data.parquet
| Field | Type | Description |
|---|---|---|
date |
date | Payment release date (release_date) |
subsidiary |
varchar | Subsidiary (nacional / internacional) |
gmv |
double | Sum of total purchase value for paid orders on that day |
-
Install uv — the Python package manager used in this project.
-
Clone the repository:
git clone https://github.com/GabrielGRR/teste_teachable.git cd teste_teachable -
Run the following commands in the CLI (Terminal):
pip install uv uv sync uv run main.py
This runs the following steps in sequence:
generate_mock_data.py— generates CSVs in the bronze layerbronze.py— loads CSVs into DuckDBsilver.py— deduplicates, joins and exportspurchase_clean.parquetgold.py— aggregates daily GMV and exports partitionsdata/— the lake is created with the following layers:bronze/— mock CSVssilver/— deduplicated Parquetgold/— Parquet partitioned byrelease_date
teachable/
├── data/ # Generated locally — not versioned
│ ├── bronze/ # Raw CSVs generated by mock
│ ├── silver/ # purchase_clean.parquet
│ ├── gold/ # GMV partitioned by date
│ └── teachable.duckdb # Local DuckDB database
├── docs/
│ ├── data_schema.md # Bronze table schema
│ ├── mock_data_rules.md # Mock data generation rules
│ └── Technical_case.pdf # Case statement
├── scripts/
│ ├── generate_mock_data.py # Bronze data generation
│ ├── silver.py # Deduplication and joins
│ └── gold.py # Daily GMV aggregation
├── utils/
│ └── logging_utils.py # Logging helper with BR timezone
├── main.py # Pipeline entrypoint
├── pyproject.toml
└── uv.lock
| Problem | Solution |
|---|---|
| Duplicate events | ROW_NUMBER() partitioned by purchase_id + purchase_partition, ordered by transaction_datetime DESC |
| Historical correction | Same dedup — the most recent event wins |
| Late arrivals cross-partition | Second dedup by purchase_id guarantees absolute uniqueness in silver |
| Idempotent pipeline | Existing partitions are skipped; reprocess by deleting the partition and rerunning |
The pipeline is designed to run daily processing D-1 (previous day), orchestrated via Apache Airflow.
S3 (bronze) → Silver (S3) → Gold (S3) → Redshift
All three medallion layers would be stored in Amazon S3. DuckDB supports reading and writing to S3 natively, meaning the queries would only change in terms of paths — the transformation logic remains the same.
The pipeline is split into two DAGs to balance cost and freshness:
bronze_silver_pipeline — runs hourly throughout the day, absorbing late arrivals as they land in S3. Bronze ingestion and silver deduplication are lightweight operations that can run frequently without significant cost.
gold_pipeline — runs once daily after the ingestion window closes. Before aggregating, it checks whether the silver pipeline completed successfully for that day using an ExternalTaskSensor. The gold layer only runs if silver is confirmed healthy — avoiding partial or incorrect aggregations.
# Processes events throughout the day
@dag(schedule="0 * * * *")
def bronze_silver_pipeline():
wait_for_bronze >> run_silver
# Aggregates GMV once daily after silver is confirmed healthy
@dag(schedule="0 9 * * *") # 06h BRT = 09h UTC
def gold_pipeline():
wait_for_silver = ExternalTaskSensor(
task_id="wait_for_silver",
external_dag_id="bronze_silver_pipeline",
external_task_id="run_silver",
)
wait_for_silver >> run_goldS3KeySensor— waits for the day's CSVs or parquets to arrive in the bucket before startingcatchup=True— ensures days with late arrivals are reprocessedIdempotency— S3 partition existence check viaboto3.head_object()replaces the localPath.exists(); reprocessing a day means deleting the partition and clearing the task in AirflowAnalytics— the gold layer would be exposed to analysts via Redshift or Athena, pointing to the partitions in S3
The silver layer currently handles duplicates and late arrivals via deduplication, but does not validate data integrity. Some possible improvements:
- Check for nulls in mandatory fields (
buyer_id,producer_id,purchase_total_value) - Validate expected values (
purchase_statusoutside the domain,item_quantity <= 0) - Validate calculated consistency (
purchase_total_value == item_quantity × purchase_value) - Export invalid rows to a separate quarantine layer for investigation
In production, the pipeline could be monitored on three fronts:
- Row counts — on each run, compare the number of processed rows against the historical average for the same day of the week. A sharp drop may indicate a silent failure in bronze ingestion or a problem in the silver join.
- GMV delta — compare the day's GMV against the 7 and 30-day moving average. Variations outside the expected range would trigger an alert.
- Freshness — verify that the day's partition was created within the expected window. If by noon the D-1 partition still does not exist, something failed. Airflow has native tooling for this type of validation.
- Late arrival reprocessing — since the pipeline is append-only per partition, an event arriving late is only incorporated if the partition is reprocessed. A periodic routine (e.g. weekly) could identify partitions with new data in bronze and trigger an automatic backfill via Airflow.
The case statement includes example images where purchase_id appears as a field inside product_item. However, the reference schema defines prod_item_id as the join key between purchase and product_item. This implementation follows the official schema, disregarding the example images and treating the schema as the source of truth.
The challenge simultaneously requests a single analytical GMV table and partitioning by transaction_date. The adopted solution physically partitions the data into data/gold/{transaction_date}/data.parquet.