Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Change Log
All notable changes to this project will be documented in this file.


## 2.2.0 - 2026-03
### Runner
- allowing for custom table name (has priority before class name)
- added options to add filter on dependencies and target table based on column-value pairs
- target table can now selectively write based on secondary virtual partitions
### Common
- table reader can also filter based on given column-values pairs

## 2.1.4 - 2026-02
### Common
- table reader optimization
Expand Down
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,44 @@ This library currently contains:
Runner is the orchestrator and scheduler of Rialto. It can be used to execute any [job](#jobs) but is primarily designed to execute feature [maker](#maker) jobs.
The core of runner is execution of a Transformation class that can be extended for any purpose and the execution configuration that defines the handling of i/o, scheduling, dependencies and reporting.

Runner operates on the assumption that your Databricks tables contain a date column (partition column) that indicates the date of data arrival. This enables:

1. **Time-aware computation**: Run operations on dated tables while managing time-related dependencies automatically
2. **Retrospective simulation**: Run computations as they would have occurred on a specific date by setting `run_date` - ensuring data newer than that date is never used
3. **Dependency tracking**: Automatically verify that input data meets required freshness constraints relative to the run date
4. **Automatic completion detection**: Skip computations when output data already exists (configurable with `rerun` parameter)

#### Scheduling and Execution

Runner uses a schedule-based approach:
* Define a schedule (e.g., weekly on day 2, monthly on day 6) in the configuration
* Specify a watch period (how far back to look for missing runs)
* Runner finds all scheduled run dates within the watch period and executes missing ones
* For each date, dependencies are checked and target existence is verified before execution

#### Data Flow

For each pipeline execution:
1. **Dependency verification**: Check that all required input tables have data within specified time intervals
2. **Transformation execution**: Run your transformation to produce a Spark DataFrame
3. **Automatic enrichment**: Runner adds `INFORMATION_DATE` (the run date) and `VERSION` (package version) columns
4. **Partitioned write**: Data is written to Databricks with partitioning configuration
5. **Reporting**: Optional email notifications on failure and run information stored to tracking table

#### Dependency Tracking

Runner's dependency tracking ensures that all required input data is available before executing a pipeline. For each dependency:

* **Date-based checking**: Runner looks for data in the dependency table's date column
* **Interval calculation**: The required date is calculated by subtracting the dependency's interval from the run date
* **Existence verification**: Runner checks if data exists for the calculated date (and within any specified filters)
* **Missing data handling**: If required data is missing, Runner raises an error for that specific pipeline/date but continues executing other pipelines and dates in the queue

**Example:** If you're running a pipeline on 2024-01-15 with a dependency that has a 7-day interval:
* Runner checks if the dependency table has data for 2024-01-08 (15 days - 7 days)
* If the dependency has `filters: {VERSION: "v2"}`, it specifically checks for data where VERSION='v2'
* If data exists, the pipeline proceeds; otherwise, an error is raised for this specific execution, but other scheduled runs continue

### Transformation
For the details on the interface see the [implementation](rialto/runner/transformation.py)
Inside the transformation you have access to a [TableReader](#common), date of running, and if provided to Runner, a live spark session and [metadata manager](#metadata).
Expand Down Expand Up @@ -64,6 +102,13 @@ Transformations are not included in the runner itself, it imports them dynamical

### Configuration

Runner is supplied with a run configuration that defines the computations it will execute. In each pipeline configuration you define:
- **Module**: Python transformation class to execute
- **Schedule**: When to run (daily/weekly/monthly) and on which day
- **Dependencies**: Input tables to check, with required freshness intervals and optional filters
- **Target**: Output location, partitioning strategy, and optional completion filters


```yaml
runner:
watched_period_units: "months" # unit of default run period
Expand Down Expand Up @@ -129,9 +174,19 @@ pipelines: # a list of pipelines to run
interval:
units: "days"
value: 6
filters:
dep_column_name1: "value1"
dep_column_name2: "value2"
target:
target_schema: catalog.schema # schema where tables will be created, must exist
target_partition_column: INFORMATION_DATE # date to partition new tables on
secondary_partition_columns: # optional list of secondary partitions to ensure partial-overwrite of the target table based on generated data for these partitions
- column_name1
- column_name2
rerun_filters: # optional filters to avoid reruning already generated data for secondary partitionons, if secondary partition values are dynamically generated at runtime, leave this empty but the job will always rerun
column_name1: 42
column_name2: "some_value"
custom_name: "custom_table_name" # optional custom table name, if not provided, the table name will be the same as pipeline name
```

The configuration can be dynamically overridden by providing a dictionary of overrides to the runner. All overrides must adhere to configurations schema, with pipeline.extras section available for custom schema.
Expand Down Expand Up @@ -189,6 +244,13 @@ overrides={"runner.watched_period_value": 4,
}
```

### Multiple partitions
Rialto runner and TableReader can handle multiple "partitions", however we only use one primary physical partitions and treat selected columns as other partitions.
When wanting to write to a selected secondary partition/s, you can specify them in the configuration file as **secondary_partition_columns** and provide values for these columns in **rerun_filters**. This way, the runner will only rerun for the data that matches the filters, and leave the rest of the data intact.
You can use env variables to set these filters if the values are available before the job run. If not, the job can be setup without these filters, however by defining secondary target partitions, the job will always rerun because it can't determine whether its supposed to run.

You can also take advantage of these **filters** options in dependency configuration to ensure the right data is available.


## <a id="maker"></a> 2.2 - maker
The purpose of (feature) maker is to simplify feature creation, allow for consistent feature implementation that is standardized and easy to test.
Expand Down
23 changes: 20 additions & 3 deletions rialto/common/table_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import abc
import datetime
from typing import Optional
from typing import Dict, Optional

import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession
Expand All @@ -37,13 +37,15 @@ def get_latest(
date_column: str,
date_until: Optional[datetime.date] = None,
uppercase_columns: bool = False,
filters: Optional[Dict[str, str]] = None,
) -> DataFrame:
"""
Get latest available date partition of the table until specified date

:param table: input table path
:param date_until: Optional until date (inclusive)
:param uppercase_columns: Option to refactor all column names to uppercase
:param filters: Optional dict of column filters to apply before finding latest date
:return: Dataframe
"""
raise NotImplementedError
Expand All @@ -56,6 +58,7 @@ def get_table(
date_from: Optional[datetime.date] = None,
date_to: Optional[datetime.date] = None,
uppercase_columns: bool = False,
filters: Optional[Dict[str, str]] = None,
) -> DataFrame:
"""
Get a whole table or a slice by selected dates
Expand Down Expand Up @@ -96,12 +99,16 @@ def _get_latest_available_date(self, df: DataFrame, date_col: str, until: Option
df = df.select(F.max(date_col)).alias("latest")
return df.head()[0]

def _get_raw_data(self, table: str) -> DataFrame:
return self.spark.read.table(table)

def get_latest(
self,
table: str,
date_column: str,
date_until: Optional[datetime.date] = None,
uppercase_columns: bool = False,
filters: Optional[Dict[str, str]] = None,
) -> DataFrame:
"""
Get latest available date partition of the table until specified date
Expand All @@ -110,9 +117,14 @@ def get_latest(
:param date_until: Optional until date (inclusive)
:param date_column: column to filter dates on, takes highest priority
:param uppercase_columns: Option to refactor all column names to uppercase
:param filters: Optional dict of column filters to apply before finding latest date
:return: Dataframe
"""
df = self.spark.read.table(table)
df = self._get_raw_data(table)

if filters:
for col, val in filters.items():
df = df.filter(df[col] == val)

selected_date = self._get_latest_available_date(df, date_column, date_until)
df = df.filter(F.col(date_column) == selected_date)
Expand All @@ -128,6 +140,7 @@ def get_table(
date_from: Optional[datetime.date] = None,
date_to: Optional[datetime.date] = None,
uppercase_columns: bool = False,
filters: Optional[Dict] = None,
) -> DataFrame:
"""
Get a whole table or a slice by selected dates
Expand All @@ -139,7 +152,11 @@ def get_table(
:param uppercase_columns: Option to refactor all column names to uppercase
:return: Dataframe
"""
df = self.spark.read.table(table)
df = self._get_raw_data(table)

if filters:
for col, val in filters.items():
df = df.filter(df[col] == val)

if date_from:
df = df.filter(F.col(date_column) >= date_from)
Expand Down
4 changes: 4 additions & 0 deletions rialto/runner/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class DependencyConfig(BaseConfig):
name: Optional[str] = None
date_col: str
interval: IntervalConfig
filters: Optional[Dict] = None


class ModuleConfig(BaseConfig):
Expand All @@ -69,6 +70,9 @@ class RunnerConfig(BaseConfig):
class TargetConfig(BaseConfig):
target_schema: str
target_partition_column: str
secondary_partition_columns: Optional[List[str]] = None
rerun_filters: Optional[Dict] = None
custom_name: Optional[str] = None


class MetadataManagerConfig(BaseConfig):
Expand Down
58 changes: 44 additions & 14 deletions rialto/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from datetime import date
from typing import Dict, List, Tuple

import pyspark.sql.functions as F
from loguru import logger
from pyspark.sql import DataFrame, SparkSession

Expand Down Expand Up @@ -97,19 +96,31 @@ def _execute(self, instance: Transformation, run_date: date, pipeline: PipelineC

return df

def _check_written(self, info_date: date, table: Table) -> int:
def _check_written(self, info_date: date, table: Table, df: DataFrame, pipeline: PipelineConfig) -> int:
"""
Check if there are records written for given date

:param info_date: date to check
:param table: target table object
:return: number of records
"""
df = self.spark.read.table(table.get_table_path())
df = df.filter(F.col(table.partition) == info_date)
filters = {}
if pipeline.target.rerun_filters is not None:
filters = pipeline.target.rerun_filters
else:
if table.secondary_partitions:
row = df.select(*table.secondary_partitions).distinct().collect()[0]
for c in table.secondary_partitions:
val = row[0][c]
filters[c] = val

df = self.reader.get_table(
table.get_table_path(), date_column=table.partition, date_from=info_date, date_to=info_date, filters=filters
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

date_from=info_date, date_to=info_date


return df.count()

def check_dates_have_partition(self, table: Table, dates: List[date]) -> List[bool]:
def check_dates_have_data(self, table: Table, dates: List[date], target_filters: Dict = None) -> List[bool]:
"""
For given list of dates, check if there is a matching partition for each

Expand All @@ -118,8 +129,21 @@ def check_dates_have_partition(self, table: Table, dates: List[date]) -> List[bo
:return: list of bool
"""
if utils.table_exists(self.spark, table.get_table_path()):
partitions = utils.get_partitions(self.reader, table)
return [(date in partitions) for date in dates]
checks = []
for check_date in dates:
df = self.reader.get_table(
table.get_table_path(),
date_column=table.partition,
date_from=check_date,
date_to=check_date,
filters=target_filters,
)
data_exists = df.count() > 0
if data_exists and target_filters is None and table.secondary_partitions is not None:
# ensure rerun if the write consideres secondary partitions but the filter doesn't
data_exists = False
checks.append(data_exists)
return checks
else:
logger.info(f"Table {table.get_table_path()} doesn't exist!")
return [False for _ in dates]
Expand All @@ -145,7 +169,7 @@ def check_dependencies(self, pipeline: PipelineConfig, run_date: date) -> bool:
logger.debug(f"Date column for {dependency.table} is {dependency.date_col}")

source = Table(table_path=dependency.table, partition=dependency.date_col)
if True in self.check_dates_have_partition(source, possible_dep_dates):
if True in self.check_dates_have_data(source, possible_dep_dates, dependency.filters):
logger.info(f"Dependency for {dependency.table} from {dep_from} until {run_date} is fulfilled")
else:
msg = f"Missing dependency for {dependency.table} from {dep_from} until {run_date}"
Expand All @@ -158,7 +182,7 @@ def check_dependencies(self, pipeline: PipelineConfig, run_date: date) -> bool:

return True

def _get_completion(self, target: Table, info_dates: List[date]) -> List[bool]:
def _get_completion(self, target: Table, info_dates: List[date], filters: Dict = None) -> List[bool]:
"""
Check if model has run for given dates

Expand All @@ -169,9 +193,9 @@ def _get_completion(self, target: Table, info_dates: List[date]) -> List[bool]:
if self.rerun:
return [False for _ in info_dates]
else:
return self.check_dates_have_partition(target, info_dates)
return self.check_dates_have_data(target, info_dates, filters)

def _select_run_dates(self, pipeline: PipelineConfig, table: Table) -> Tuple[List, List]:
def _select_run_dates(self, pipeline: PipelineConfig, table: Table, filters: Dict = None) -> Tuple[List, List]:
"""
Select run dates and info dates based on completion

Expand All @@ -181,7 +205,7 @@ def _select_run_dates(self, pipeline: PipelineConfig, table: Table) -> Tuple[Lis
"""
possible_run_dates = DateManager.run_dates(self.date_from, self.date_until, pipeline.schedule)
possible_info_dates = [DateManager.to_info_date(x, pipeline.schedule) for x in possible_run_dates]
current_state = self._get_completion(table, possible_info_dates)
current_state = self._get_completion(table, possible_info_dates, filters)

selection = [
(run, info) for run, info, state in zip(possible_run_dates, possible_info_dates, current_state) if not state
Expand Down Expand Up @@ -212,7 +236,7 @@ def _run_one_date(self, pipeline: PipelineConfig, run_date: date, info_date: dat
feature_group = utils.load_module(pipeline.module)
df = self._execute(feature_group, run_date, pipeline)
self.writer.write(df, info_date, target)
records = self._check_written(info_date, target)
records = self._check_written(info_date, target, df, pipeline)
logger.info(f"Generated {records} records")
if records == 0:
raise RuntimeError("No records generated")
Expand All @@ -231,10 +255,14 @@ def _run_pipeline(self, pipeline: PipelineConfig):
schema_path=pipeline.target.target_schema,
class_name=pipeline.module.python_class,
partition=pipeline.target.target_partition_column,
secondary_partitions=pipeline.target.secondary_partition_columns,
table=pipeline.target.custom_name,
)
logger.info(f"Loaded pipeline {pipeline.name}")

selected_run_dates, selected_info_dates = self._select_run_dates(pipeline, target)
selected_run_dates, selected_info_dates = self._select_run_dates(
pipeline, target, pipeline.target.rerun_filters
)

# ----------- Checking dependencies available ----------
for run_date, info_date in zip(selected_run_dates, selected_info_dates):
Expand Down Expand Up @@ -317,6 +345,8 @@ def debug(self) -> DataFrame:
schema_path=pipeline.target.target_schema,
class_name=pipeline.module.python_class,
partition=pipeline.target.target_partition_column,
secondary_partitions=pipeline.target.secondary_partition_columns,
table=pipeline.target.custom_name,
)
selected_run_dates, selected_info_dates = self._select_run_dates(pipeline, target)
if len(selected_run_dates) > 0:
Expand Down
Loading
Loading