From 5ca31c5069c75507ab459a5f7cd662914a504abb Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Wed, 20 May 2026 13:09:12 +0200 Subject: [PATCH 01/16] Add fix script for rewriting STA ScheduledStopPoint IDs --- fix/rewrite_sta_ssp_ids.py | 121 +++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 fix/rewrite_sta_ssp_ids.py diff --git a/fix/rewrite_sta_ssp_ids.py b/fix/rewrite_sta_ssp_ids.py new file mode 100644 index 0000000..1aa6893 --- /dev/null +++ b/fix/rewrite_sta_ssp_ids.py @@ -0,0 +1,121 @@ +""" +Fix ScheduledStopPoint IDs to match SIRI feed format. + +Transforms IDs from: IT:ITH1:ScheduledStopPoint:it-22021-7010-51-32073: +to: IT:ITH10:ScheduledStopPoint:7010:51:32073 + +This is needed so that NeTEx and SIRI feeds reference the same stops. + +It is a mystery to me why this cannot be fixed at the source. +""" + +import dataclasses +import logging +import re +from pathlib import Path +from typing import Any + +from domain.netex.model import ( + PassengerStopAssignment, + Route, + RoutePoint, + RoutePointRef, + ScheduledStopPoint, + ScheduledStopPointRef, + ServiceJourneyPattern, + ServiceLink, + TimingLink, +) +from storage.mdbx.core.implementation import MdbxStorage +from utils.aux_logging import log_all, prepare_logger + +_PATTERN = re.compile(r'^.*:ScheduledStopPoint:it-22021-(.+):$') + + +def _new_id(old_id: str) -> str | None: + m = _PATTERN.match(old_id) + if m: + return 'IT:ITH10:ScheduledStopPoint:' + m.group(1).replace('-', ':') + return None + + +def _update_refs(obj: Any, id_map: dict[str, str]) -> bool: + if obj is None or not dataclasses.is_dataclass(obj) or isinstance(obj, type): + return False + modified = False + for f in dataclasses.fields(obj): + val = getattr(obj, f.name) + if isinstance(val, (ScheduledStopPointRef, RoutePointRef)): + new_ref = id_map.get(val.ref) + if new_ref is not None: + val.ref = new_ref + modified = True + elif isinstance(val, list): + for item in val: + if _update_refs(item, id_map): + modified = True + elif dataclasses.is_dataclass(val): + if _update_refs(val, id_map): + modified = True + return modified + + +# Object types that may transitively contain ScheduledStopPointRef or RoutePointRef. +_REF_BEARING_TYPES = [ + ServiceJourneyPattern, + ServiceLink, + TimingLink, + PassengerStopAssignment, + Route, + RoutePoint, +] + + +def fix_ssp_ids(database: Path) -> None: + with MdbxStorage(database, readonly=False) as db: + with db.env.rw_transaction() as txn: + id_map: dict[str, str] = {} + old_ssps: list[ScheduledStopPoint] = [] + for ssp in db.iter_only_objects(txn, ScheduledStopPoint): + new_id = _new_id(ssp.id) + if new_id is not None: + id_map[ssp.id] = new_id + old_ssps.append(ssp) + + print(f"{len(id_map)} ScheduledStopPoints to rewrite") + + updated: list[Any] = [] + for cls in _REF_BEARING_TYPES: + for obj in db.iter_only_objects(txn, cls): + if _update_refs(obj, id_map): + updated.append(obj) + + new_ssps = [dataclasses.replace(ssp, id=id_map[ssp.id]) for ssp in old_ssps] + + print(f"Updating refs in {len(updated)} objects") + print(f"Inserting {len(new_ssps)} renamed ScheduledStopPoints") + # TODO: delete the old ScheduledStopPoint objects (no delete API available yet) + + db.insert_any_object_on_queue(txn, updated) + db.insert_any_object_on_queue(txn, new_ssps) + txn.commit() + + +def main(source_database_file: str) -> None: + fix_ssp_ids(Path(source_database_file)) + + +if __name__ == "__main__": + import argparse + import traceback + + parser = argparse.ArgumentParser(description="Fix ScheduledStopPoint IDs to SIRI format") + parser.add_argument("source", type=str, help="mdbx file to fix in-place") + parser.add_argument("--log_file", type=str, required=False, help="log file path") + args = parser.parse_args() + prepare_logger(logging.INFO, args.log_file) + try: + main(args.source) + except Exception as e: + log_all(logging.ERROR, f"{e}") + raise e From 81bc47a84d59ef24568a08612e8d4c795c1abf67 Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Wed, 20 May 2026 15:14:20 +0200 Subject: [PATCH 02/16] Use generators everywhere --- fix/rewrite_sta_ssp_ids.py | 52 ++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/fix/rewrite_sta_ssp_ids.py b/fix/rewrite_sta_ssp_ids.py index 1aa6893..0ac6629 100644 --- a/fix/rewrite_sta_ssp_ids.py +++ b/fix/rewrite_sta_ssp_ids.py @@ -12,6 +12,7 @@ import dataclasses import logging import re +from collections.abc import Generator from pathlib import Path from typing import Any @@ -39,23 +40,23 @@ def _new_id(old_id: str) -> str | None: return None -def _update_refs(obj: Any, id_map: dict[str, str]) -> bool: +def _update_refs(obj: Any) -> bool: if obj is None or not dataclasses.is_dataclass(obj) or isinstance(obj, type): return False modified = False for f in dataclasses.fields(obj): val = getattr(obj, f.name) if isinstance(val, (ScheduledStopPointRef, RoutePointRef)): - new_ref = id_map.get(val.ref) + new_ref = _new_id(val.ref) if new_ref is not None: val.ref = new_ref modified = True elif isinstance(val, list): for item in val: - if _update_refs(item, id_map): + if _update_refs(item): modified = True elif dataclasses.is_dataclass(val): - if _update_refs(val, id_map): + if _update_refs(val): modified = True return modified @@ -71,33 +72,30 @@ def _update_refs(obj: Any, id_map: dict[str, str]) -> bool: ] +def _iter_updated_objects( + db: MdbxStorage, txn: Any, +) -> Generator[Any, None, None]: + for cls in _REF_BEARING_TYPES: + for obj in db.iter_only_objects(txn, cls): + if _update_refs(obj): + yield obj + + +def _iter_renamed_ssps( + db: MdbxStorage, txn: Any, +) -> Generator[ScheduledStopPoint, None, None]: + for ssp in db.iter_only_objects(txn, ScheduledStopPoint): + new_id = _new_id(ssp.id) + if new_id is not None: + yield dataclasses.replace(ssp, id=new_id) + + def fix_ssp_ids(database: Path) -> None: with MdbxStorage(database, readonly=False) as db: with db.env.rw_transaction() as txn: - id_map: dict[str, str] = {} - old_ssps: list[ScheduledStopPoint] = [] - for ssp in db.iter_only_objects(txn, ScheduledStopPoint): - new_id = _new_id(ssp.id) - if new_id is not None: - id_map[ssp.id] = new_id - old_ssps.append(ssp) - - print(f"{len(id_map)} ScheduledStopPoints to rewrite") - - updated: list[Any] = [] - for cls in _REF_BEARING_TYPES: - for obj in db.iter_only_objects(txn, cls): - if _update_refs(obj, id_map): - updated.append(obj) - - new_ssps = [dataclasses.replace(ssp, id=id_map[ssp.id]) for ssp in old_ssps] - - print(f"Updating refs in {len(updated)} objects") - print(f"Inserting {len(new_ssps)} renamed ScheduledStopPoints") # TODO: delete the old ScheduledStopPoint objects (no delete API available yet) - - db.insert_any_object_on_queue(txn, updated) - db.insert_any_object_on_queue(txn, new_ssps) + db.insert_any_object_on_queue(txn, _iter_updated_objects(db, txn)) + db.insert_any_object_on_queue(txn, _iter_renamed_ssps(db, txn)) txn.commit() From fbf72e25696555855614b3a88d83273613c39aff Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Wed, 20 May 2026 15:44:35 +0200 Subject: [PATCH 03/16] Use recursive_attributes --- fix/rewrite_sta_ssp_ids.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/fix/rewrite_sta_ssp_ids.py b/fix/rewrite_sta_ssp_ids.py index 0ac6629..bc997b5 100644 --- a/fix/rewrite_sta_ssp_ids.py +++ b/fix/rewrite_sta_ssp_ids.py @@ -27,6 +27,7 @@ ServiceLink, TimingLink, ) +from domain.netex.services.recursive_attributes import recursive_attributes from storage.mdbx.core.implementation import MdbxStorage from utils.aux_logging import log_all, prepare_logger @@ -41,22 +42,12 @@ def _new_id(old_id: str) -> str | None: def _update_refs(obj: Any) -> bool: - if obj is None or not dataclasses.is_dataclass(obj) or isinstance(obj, type): - return False modified = False - for f in dataclasses.fields(obj): - val = getattr(obj, f.name) - if isinstance(val, (ScheduledStopPointRef, RoutePointRef)): - new_ref = _new_id(val.ref) + for ref, _path in recursive_attributes(obj, []): + if isinstance(ref, (ScheduledStopPointRef, RoutePointRef)): + new_ref = _new_id(ref.ref) if new_ref is not None: - val.ref = new_ref - modified = True - elif isinstance(val, list): - for item in val: - if _update_refs(item): - modified = True - elif dataclasses.is_dataclass(val): - if _update_refs(val): + ref.ref = new_ref modified = True return modified From 070c3b422a45c31025085ca200cbcfe1832f6a25 Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Wed, 20 May 2026 16:58:38 +0200 Subject: [PATCH 04/16] Inline method --- fix/rewrite_sta_ssp_ids.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/fix/rewrite_sta_ssp_ids.py b/fix/rewrite_sta_ssp_ids.py index bc997b5..c3d0a5e 100644 --- a/fix/rewrite_sta_ssp_ids.py +++ b/fix/rewrite_sta_ssp_ids.py @@ -41,17 +41,6 @@ def _new_id(old_id: str) -> str | None: return None -def _update_refs(obj: Any) -> bool: - modified = False - for ref, _path in recursive_attributes(obj, []): - if isinstance(ref, (ScheduledStopPointRef, RoutePointRef)): - new_ref = _new_id(ref.ref) - if new_ref is not None: - ref.ref = new_ref - modified = True - return modified - - # Object types that may transitively contain ScheduledStopPointRef or RoutePointRef. _REF_BEARING_TYPES = [ ServiceJourneyPattern, @@ -68,7 +57,14 @@ def _iter_updated_objects( ) -> Generator[Any, None, None]: for cls in _REF_BEARING_TYPES: for obj in db.iter_only_objects(txn, cls): - if _update_refs(obj): + changed = False + for ref, _path in recursive_attributes(obj, []): + if isinstance(ref, (ScheduledStopPointRef, RoutePointRef)): + new_ref = _new_id(ref.ref) + if new_ref is not None: + ref.ref = new_ref + changed = True + if changed: yield obj From e9d807d21b3c2d0ad72bac097fc8265dd954bee5 Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Tue, 12 May 2026 17:54:04 +0200 Subject: [PATCH 05/16] Identify duplicate Mentz line versions --- fix/remove_mentz_line_versions.py | 53 +++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 fix/remove_mentz_line_versions.py diff --git a/fix/remove_mentz_line_versions.py b/fix/remove_mentz_line_versions.py new file mode 100644 index 0000000..66edb59 --- /dev/null +++ b/fix/remove_mentz_line_versions.py @@ -0,0 +1,53 @@ +""" +Resolves the Mentz line versions into actual operating days + +https://public.3.basecamp.com/p/fVFt3mGJiK52ewcsr8nFgu6o +""" + +import logging +from collections import defaultdict +from pathlib import Path + +from xsdata.models.datatype import XmlDateTime + +from domain.netex.model import Line +from storage.mdbx.core.implementation import MdbxStorage +from utils.aux_logging import prepare_logger, log_all + + +def fmt_dt(dt: XmlDateTime | None) -> str: + return dt.to_datetime().isoformat() if dt else "None" + + +def list_lines(database: Path): + lines: defaultdict[str, list[Line]] = defaultdict(list) + with MdbxStorage(database, readonly=True) as db: + with db.env.rw_transaction() as txn: + for line in db.iter_only_objects(txn, Line): + lines[line.id].append(line) + for key, values in sorted(lines.items()): + if len(values) > 1: + print(f"Duplicate line ID: {key}") + for line in values: + dates = [(fmt_dt(v.from_date), fmt_dt(v.to_date)) for v in line.validity_conditions_or_valid_between if hasattr(v, 'from_date')] + print(f" version={line.version} dates={dates}") + + +def main(source_database_file: str): + return list_lines(Path(source_database_file)) + + +if __name__ == "__main__": + import argparse + import traceback + + parser = argparse.ArgumentParser(description="Fix Mentz line versions") + parser.add_argument("source", type=str, help="mdbx file to use as input.") + parser.add_argument("--log_file", type=str, required=False, help="the logfile") + args = parser.parse_args() + mylogger = prepare_logger(logging.INFO, args.log_file) + try: + main(args.source) + except Exception as e: + log_all(logging.ERROR, f"{e} {traceback.format_exc()}") + raise e From 875e86e28a5e5382b05d5345fc83cbc450f4de0e Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Tue, 12 May 2026 21:59:34 +0200 Subject: [PATCH 06/16] Print list of journeys --- fix/remove_mentz_line_versions.py | 73 ++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 7 deletions(-) diff --git a/fix/remove_mentz_line_versions.py b/fix/remove_mentz_line_versions.py index 66edb59..80613e0 100644 --- a/fix/remove_mentz_line_versions.py +++ b/fix/remove_mentz_line_versions.py @@ -10,7 +10,7 @@ from xsdata.models.datatype import XmlDateTime -from domain.netex.model import Line +from domain.netex.model import DayTypeAssignment, Line, Route, ServiceJourney, ServiceJourneyPattern, UicOperatingPeriod from storage.mdbx.core.implementation import MdbxStorage from utils.aux_logging import prepare_logger, log_all @@ -25,12 +25,71 @@ def list_lines(database: Path): with db.env.rw_transaction() as txn: for line in db.iter_only_objects(txn, Line): lines[line.id].append(line) - for key, values in sorted(lines.items()): - if len(values) > 1: - print(f"Duplicate line ID: {key}") - for line in values: - dates = [(fmt_dt(v.from_date), fmt_dt(v.to_date)) for v in line.validity_conditions_or_valid_between if hasattr(v, 'from_date')] - print(f" version={line.version} dates={dates}") + + duplicate_line_ids = {key for key, values in lines.items() if len(values) > 1} + + routes: defaultdict[str, list[Route]] = defaultdict(list) + for route in db.iter_only_objects(txn, Route): + if route.line_ref is not None and route.line_ref.ref in duplicate_line_ids: + routes[route.line_ref.ref].append(route) + + duplicate_route_ids = {route.id for route_list in routes.values() for route in route_list} + + sjps: defaultdict[str, list[ServiceJourneyPattern]] = defaultdict(list) + for sjp in db.iter_only_objects(txn, ServiceJourneyPattern): + if sjp.route_ref_or_route_view is not None and sjp.route_ref_or_route_view.ref in duplicate_route_ids: + sjps[sjp.route_ref_or_route_view.ref].append(sjp) + + duplicate_sjp_ids = {sjp.id for sjp_list in sjps.values() for sjp in sjp_list} + + day_type_ids: set[str] = set() + journeys: defaultdict[str, list[ServiceJourney]] = defaultdict(list) + for journey in db.iter_only_objects(txn, ServiceJourney): + if journey.journey_pattern_ref is not None and journey.journey_pattern_ref.ref in duplicate_sjp_ids: + journeys[journey.journey_pattern_ref.ref].append(journey) + if journey.day_types is not None: + for ref in journey.day_types.day_type_ref: + day_type_ids.add(ref.ref) + + uic_period_ids: set[str] = set() + day_type_assignments: defaultdict[str, list[DayTypeAssignment]] = defaultdict(list) + + for dta in db.iter_only_objects(txn, DayTypeAssignment): + print(dta) + if dta.day_type_ref is not None and dta.day_type_ref.ref in day_type_ids: + day_type_assignments[dta.day_type_ref.ref].append(dta) + ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date + if hasattr(ref, 'ref'): + uic_period_ids.add(ref.ref) + + uic_periods: dict[str, UicOperatingPeriod] = {} + for period in db.iter_only_objects(txn, UicOperatingPeriod): + if period.id in uic_period_ids: + uic_periods[period.id] = period + + for key, values in sorted(lines.items()): + if len(values) > 1: + print(f"Duplicate line ID: {key}") + for line in values: + dates = [(fmt_dt(v.from_date), fmt_dt(v.to_date)) for v in line.validity_conditions_or_valid_between if hasattr(v, 'from_date')] + print(f" version={line.version} dates={dates}") + for route in routes.get(key, []): + print(f" Route: {route.id}") + for sjp in sjps.get(route.id, []): + print(f" ServiceJourneyPattern: {sjp.id} version={sjp.version}") + for journey in journeys.get(sjp.id, []): + if journey.day_types is None: + continue + for dt_ref in journey.day_types.day_type_ref: + print(f" Journey: {journey.id} DayType: {dt_ref.ref}") + print(day_type_assignments) + for dta in day_type_assignments.get(dt_ref.ref, []): + ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date + period = uic_periods.get(ref.ref) if hasattr(ref, 'ref') else None + if period: + from_dt = fmt_dt(period.from_operating_day_ref_or_from_date) if isinstance(period.from_operating_day_ref_or_from_date, XmlDateTime) else str(period.from_operating_day_ref_or_from_date) + to_dt = fmt_dt(period.to_operating_day_ref_or_to_date) if isinstance(period.to_operating_day_ref_or_to_date, XmlDateTime) else str(period.to_operating_day_ref_or_to_date) + print(f" Journey: {journey.id} DayType: {dt_ref.ref} period={from_dt} to {to_dt}") def main(source_database_file: str): From bdf5ac4d53e4b4395c158c8f09cae811de888908 Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Mon, 18 May 2026 13:13:22 +0200 Subject: [PATCH 07/16] Log complete tree --- fix/remove_mentz_line_versions.py | 47 ++++++++++++++++--------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/fix/remove_mentz_line_versions.py b/fix/remove_mentz_line_versions.py index 80613e0..bdd638a 100644 --- a/fix/remove_mentz_line_versions.py +++ b/fix/remove_mentz_line_versions.py @@ -10,7 +10,7 @@ from xsdata.models.datatype import XmlDateTime -from domain.netex.model import DayTypeAssignment, Line, Route, ServiceJourney, ServiceJourneyPattern, UicOperatingPeriod +from domain.netex.model import DayTypeAssignment, Line, Route, ServiceCalendar, ServiceJourney, ServiceJourneyPattern, UicOperatingPeriod from storage.mdbx.core.implementation import MdbxStorage from utils.aux_logging import prepare_logger, log_all @@ -28,6 +28,8 @@ def list_lines(database: Path): duplicate_line_ids = {key for key, values in lines.items() if len(values) > 1} + print(f"{len(duplicate_line_ids)} of {len(lines)} lines have duplicates") + routes: defaultdict[str, list[Route]] = defaultdict(list) for route in db.iter_only_objects(txn, Route): if route.line_ref is not None and route.line_ref.ref in duplicate_line_ids: @@ -52,20 +54,25 @@ def list_lines(database: Path): day_type_ids.add(ref.ref) uic_period_ids: set[str] = set() - day_type_assignments: defaultdict[str, list[DayTypeAssignment]] = defaultdict(list) + day_type_assignments: defaultdict[str, DayTypeAssignment] = {} + uic_periods: dict[str, UicOperatingPeriod] = {} - for dta in db.iter_only_objects(txn, DayTypeAssignment): - print(dta) - if dta.day_type_ref is not None and dta.day_type_ref.ref in day_type_ids: - day_type_assignments[dta.day_type_ref.ref].append(dta) - ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date - if hasattr(ref, 'ref'): - uic_period_ids.add(ref.ref) + for calendar in db.iter_only_objects(txn, ServiceCalendar): + if calendar.day_type_assignments is None: + continue + for dta in calendar.day_type_assignments.day_type_assignment: + if dta.day_type_ref is not None and dta.day_type_ref.ref in day_type_ids: + day_type_assignments[dta.day_type_ref.ref] = dta + ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date + if hasattr(ref, 'ref'): + uic_period_ids.add(ref.ref) - uic_periods: dict[str, UicOperatingPeriod] = {} - for period in db.iter_only_objects(txn, UicOperatingPeriod): - if period.id in uic_period_ids: - uic_periods[period.id] = period + if calendar.operating_periods is None: + continue + + for entry in calendar.operating_periods.uic_operating_period_ref_or_operating_period_ref_or_operating_period_or_uic_operating_period: + if isinstance(entry, UicOperatingPeriod) and entry.id in uic_period_ids: + uic_periods[entry.id] = entry for key, values in sorted(lines.items()): if len(values) > 1: @@ -74,7 +81,7 @@ def list_lines(database: Path): dates = [(fmt_dt(v.from_date), fmt_dt(v.to_date)) for v in line.validity_conditions_or_valid_between if hasattr(v, 'from_date')] print(f" version={line.version} dates={dates}") for route in routes.get(key, []): - print(f" Route: {route.id}") + print(f" Route: {route.id} lineRef={route.line_ref.ref} v={route.line_ref.version}") for sjp in sjps.get(route.id, []): print(f" ServiceJourneyPattern: {sjp.id} version={sjp.version}") for journey in journeys.get(sjp.id, []): @@ -82,14 +89,10 @@ def list_lines(database: Path): continue for dt_ref in journey.day_types.day_type_ref: print(f" Journey: {journey.id} DayType: {dt_ref.ref}") - print(day_type_assignments) - for dta in day_type_assignments.get(dt_ref.ref, []): - ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date - period = uic_periods.get(ref.ref) if hasattr(ref, 'ref') else None - if period: - from_dt = fmt_dt(period.from_operating_day_ref_or_from_date) if isinstance(period.from_operating_day_ref_or_from_date, XmlDateTime) else str(period.from_operating_day_ref_or_from_date) - to_dt = fmt_dt(period.to_operating_day_ref_or_to_date) if isinstance(period.to_operating_day_ref_or_to_date, XmlDateTime) else str(period.to_operating_day_ref_or_to_date) - print(f" Journey: {journey.id} DayType: {dt_ref.ref} period={from_dt} to {to_dt}") + dta = day_type_assignments.get(dt_ref.ref) + + ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date + period = uic_periods.get(ref.ref) if hasattr(ref, 'ref') else None def main(source_database_file: str): From 2d65804e7551620918813f8b176407e2e6b3ce05 Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Tue, 19 May 2026 12:47:53 +0200 Subject: [PATCH 08/16] Finish transforms of EPIP --- fix/list_journeys.py | 57 +++++++++ fix/remove_mentz_line_versions.py | 194 ++++++++++++++++++++++++------ transformers/epip.py | 8 ++ 3 files changed, 219 insertions(+), 40 deletions(-) create mode 100644 fix/list_journeys.py diff --git a/fix/list_journeys.py b/fix/list_journeys.py new file mode 100644 index 0000000..aa151e6 --- /dev/null +++ b/fix/list_journeys.py @@ -0,0 +1,57 @@ +""" + +""" + +import logging +from collections import defaultdict +from pathlib import Path + +from xsdata.models.datatype import XmlDateTime + +from domain.netex.model import ( + DayType, + DayTypeAssignment, + DayTypeRef, + DayTypeRefsRelStructure, + Line, + Route, + ServiceCalendar, + ServiceJourney, + ServiceJourneyPattern, + UicOperatingPeriod, + UicOperatingPeriodRef, +) +from storage.mdbx.core.implementation import MdbxStorage +from utils.aux_logging import prepare_logger, log_all + + +def fmt_dt(dt: XmlDateTime | None) -> str: + return dt.to_datetime().isoformat() if dt else "None" + + +def fix_lines(database: Path) -> None: + + with MdbxStorage(database, readonly=False) as db: + with db.env.ro_transaction() as txn: + for journey in db.iter_only_objects(txn, ServiceJourney): + print(journey.day_types.day_type_ref) + + +def main(source_database_file: str) -> None: + return fix_lines(Path(source_database_file)) + + +if __name__ == "__main__": + import argparse + import traceback + + parser = argparse.ArgumentParser(description="Fix Mentz line versions") + parser.add_argument("source", type=str, help="mdbx file to use as input.") + parser.add_argument("--log_file", type=str, required=False, help="the logfile") + args = parser.parse_args() + mylogger = prepare_logger(logging.INFO, args.log_file) + try: + main(args.source) + except Exception as e: + log_all(logging.ERROR, f"{e} {traceback.format_exc()}") + raise e \ No newline at end of file diff --git a/fix/remove_mentz_line_versions.py b/fix/remove_mentz_line_versions.py index bdd638a..3fa3dd9 100644 --- a/fix/remove_mentz_line_versions.py +++ b/fix/remove_mentz_line_versions.py @@ -10,7 +10,19 @@ from xsdata.models.datatype import XmlDateTime -from domain.netex.model import DayTypeAssignment, Line, Route, ServiceCalendar, ServiceJourney, ServiceJourneyPattern, UicOperatingPeriod +from domain.netex.model import ( + DayType, + DayTypeAssignment, + DayTypeRef, + DayTypeRefsRelStructure, + Line, + Route, + ServiceCalendar, + ServiceJourney, + ServiceJourneyPattern, + UicOperatingPeriod, + UicOperatingPeriodRef, +) from storage.mdbx.core.implementation import MdbxStorage from utils.aux_logging import prepare_logger, log_all @@ -19,9 +31,10 @@ def fmt_dt(dt: XmlDateTime | None) -> str: return dt.to_datetime().isoformat() if dt else "None" -def list_lines(database: Path): +def fix_lines(database: Path) -> None: lines: defaultdict[str, list[Line]] = defaultdict(list) - with MdbxStorage(database, readonly=True) as db: + + with MdbxStorage(database, readonly=False) as db: with db.env.rw_transaction() as txn: for line in db.iter_only_objects(txn, Line): lines[line.id].append(line) @@ -44,8 +57,8 @@ def list_lines(database: Path): duplicate_sjp_ids = {sjp.id for sjp_list in sjps.values() for sjp in sjp_list} - day_type_ids: set[str] = set() journeys: defaultdict[str, list[ServiceJourney]] = defaultdict(list) + day_type_ids: set[str] = set() for journey in db.iter_only_objects(txn, ServiceJourney): if journey.journey_pattern_ref is not None and journey.journey_pattern_ref.ref in duplicate_sjp_ids: journeys[journey.journey_pattern_ref.ref].append(journey) @@ -53,50 +66,151 @@ def list_lines(database: Path): for ref in journey.day_types.day_type_ref: day_type_ids.add(ref.ref) + # Collect existing DayTypeAssignments and UicOperatingPeriods for those day types. uic_period_ids: set[str] = set() - day_type_assignments: defaultdict[str, DayTypeAssignment] = {} + day_type_assignments: dict[str, DayTypeAssignment] = {} uic_periods: dict[str, UicOperatingPeriod] = {} for calendar in db.iter_only_objects(txn, ServiceCalendar): - if calendar.day_type_assignments is None: + if calendar.day_type_assignments is not None: + for dta in calendar.day_type_assignments.day_type_assignment: + if dta.day_type_ref is not None and dta.day_type_ref.ref in day_type_ids: + day_type_assignments[dta.day_type_ref.ref] = dta + ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date + if hasattr(ref, 'ref'): + uic_period_ids.add(ref.ref) + + if calendar.operating_periods is not None: + for entry in calendar.operating_periods.uic_operating_period_ref_or_operating_period_ref_or_operating_period_or_uic_operating_period: + if isinstance(entry, UicOperatingPeriod) and entry.id in uic_period_ids: + uic_periods[entry.id] = entry + + new_objects: list[DayType | UicOperatingPeriod | DayTypeAssignment] = [] + updated_journeys: list[ServiceJourney] = [] + + # Cache: (line_id, line_version, existing_dt_id) -> new day_type_id + created: dict[tuple[str, str | None, str], str] = {} + + # Pre-index line validity by (line_id, version). + version_validity: dict[tuple[str, str | None], tuple[XmlDateTime, XmlDateTime | None]] = {} + for line_id, line_versions in lines.items(): + if len(line_versions) <= 1: + continue + for line in line_versions: + for vb in line.validity_conditions_or_valid_between: + if hasattr(vb, 'from_date') and vb.from_date is not None: + version_validity[(line_id, line.version)] = (vb.from_date, vb.to_date) + break + + for line_id, line_versions in lines.items(): + if len(line_versions) <= 1: + continue + + for route in routes.get(line_id, []): + validity = version_validity.get((line_id, route.line_ref.version)) + if validity is None: + print(f" Skipping route {route.id}: no validity for line {line_id} v={route.line_ref.version}") continue - for dta in calendar.day_type_assignments.day_type_assignment: - if dta.day_type_ref is not None and dta.day_type_ref.ref in day_type_ids: - day_type_assignments[dta.day_type_ref.ref] = dta - ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date - if hasattr(ref, 'ref'): - uic_period_ids.add(ref.ref) - - if calendar.operating_periods is None: - continue - - for entry in calendar.operating_periods.uic_operating_period_ref_or_operating_period_ref_or_operating_period_or_uic_operating_period: - if isinstance(entry, UicOperatingPeriod) and entry.id in uic_period_ids: - uic_periods[entry.id] = entry - for key, values in sorted(lines.items()): - if len(values) > 1: - print(f"Duplicate line ID: {key}") - for line in values: - dates = [(fmt_dt(v.from_date), fmt_dt(v.to_date)) for v in line.validity_conditions_or_valid_between if hasattr(v, 'from_date')] - print(f" version={line.version} dates={dates}") - for route in routes.get(key, []): - print(f" Route: {route.id} lineRef={route.line_ref.ref} v={route.line_ref.version}") - for sjp in sjps.get(route.id, []): - print(f" ServiceJourneyPattern: {sjp.id} version={sjp.version}") - for journey in journeys.get(sjp.id, []): - if journey.day_types is None: - continue - for dt_ref in journey.day_types.day_type_ref: - print(f" Journey: {journey.id} DayType: {dt_ref.ref}") - dta = day_type_assignments.get(dt_ref.ref) + line_from, line_to = validity + line_from_date = line_from.to_datetime().date() + line_to_date = line_to.to_datetime().date() if line_to is not None else line_from_date + safe_version = (route.line_ref.version or 'unknown').replace(':', '_') + + for sjp in sjps.get(route.id, []): + for journey in journeys.get(sjp.id, []): + if journey.day_types is None: + continue + + new_refs: list[DayTypeRef] = [] + for dt_ref in journey.day_types.day_type_ref: + existing_dt_id = dt_ref.ref + cache_key = (line_id, route.line_ref.version, existing_dt_id) + + if cache_key in created: + new_refs.append(DayTypeRef(ref=created[cache_key], version='1')) + continue + + # Derive the new UicOperatingPeriod by intersecting the existing period + # with the line version's validity window. + dta = day_type_assignments.get(existing_dt_id) + existing_period: UicOperatingPeriod | None = None + if dta is not None: + period_ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date + if hasattr(period_ref, 'ref'): + existing_period = uic_periods.get(period_ref.ref) + + if existing_period is not None and isinstance(existing_period.from_operating_day_ref_or_from_date, XmlDateTime): + period_from_date = existing_period.from_operating_day_ref_or_from_date.to_datetime().date() + period_to_ref = existing_period.to_operating_day_ref_or_to_date + period_to_date = period_to_ref.to_datetime().date() if isinstance(period_to_ref, XmlDateTime) else line_to_date + + new_from_date = max(line_from_date, period_from_date) + new_to_date = min(line_to_date, period_to_date) + + if new_from_date <= new_to_date: + # Slice valid_day_bits to the intersection window. + offset = (new_from_date - period_from_date).days + n_days = (new_to_date - new_from_date).days + 1 + existing_bits = existing_period.valid_day_bits or '' + new_bits = existing_bits[offset:offset + n_days].ljust(n_days, '0') + + new_from_xml: XmlDateTime = line_from if new_from_date == line_from_date else existing_period.from_operating_day_ref_or_from_date + new_to_xml: XmlDateTime | None = line_to if new_to_date == line_to_date else period_to_ref + else: + # No overlap — fall back to full line validity with all days active. + new_from_date, new_to_date = line_from_date, line_to_date + n_days = max(1, (new_to_date - new_from_date).days + 1) + new_bits = '1' * n_days + new_from_xml, new_to_xml = line_from, line_to + else: + # No existing period — cover the full line validity. + new_from_date, new_to_date = line_from_date, line_to_date + n_days = max(1, (new_to_date - new_from_date).days + 1) + new_bits = '1' * n_days + new_from_xml, new_to_xml = line_from, line_to + + new_day_type_id = f"{line_id}:{existing_dt_id}:{safe_version}" + new_uic_period_id = f"{line_id}:{existing_dt_id}:UicOperatingPeriod:{safe_version}" + new_dta_id = f"{line_id}:{existing_dt_id}:DayTypeAssignment:{safe_version}" + + print(f" Creating {new_day_type_id} [{new_from_date} .. {new_to_date}] ({n_days} days)") + + new_objects.append(DayType(id=new_day_type_id, version='1')) + new_objects.append(UicOperatingPeriod( + id=new_uic_period_id, + version='1', + from_operating_day_ref_or_from_date=new_from_xml, + to_operating_day_ref_or_to_date=new_to_xml, + valid_day_bits=new_bits, + )) + new_objects.append(DayTypeAssignment( + id=new_dta_id, + version='1', + day_type_ref=DayTypeRef(ref=new_day_type_id, version='1'), + uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date=UicOperatingPeriodRef( + ref=new_uic_period_id, + version='1', + ), + )) + + created[cache_key] = new_day_type_id + new_refs.append(DayTypeRef(ref=new_day_type_id, version='1')) + + if new_refs: + journey.day_types = DayTypeRefsRelStructure(day_type_ref=new_refs) + updated_journeys.append(journey) + + print(f"Writing {len(new_objects)} new calendar objects and {len(updated_journeys)} updated journeys") - ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date - period = uic_periods.get(ref.ref) if hasattr(ref, 'ref') else None + with db.env.rw_transaction() as txn: + db.insert_any_object_on_queue(txn, new_objects) + db.insert_any_object_on_queue(txn, updated_journeys) + txn.commit() -def main(source_database_file: str): - return list_lines(Path(source_database_file)) +def main(source_database_file: str) -> None: + return fix_lines(Path(source_database_file)) if __name__ == "__main__": @@ -112,4 +226,4 @@ def main(source_database_file: str): main(args.source) except Exception as e: log_all(logging.ERROR, f"{e} {traceback.format_exc()}") - raise e + raise e \ No newline at end of file diff --git a/transformers/epip.py b/transformers/epip.py index 8cb1ee3..fb8e543 100644 --- a/transformers/epip.py +++ b/transformers/epip.py @@ -506,6 +506,9 @@ def get_service_calendar(db_read: MdbxStorage, txn: TXN, generator_defaults: dic from_date: datetime = datetime.max to_date: datetime = datetime.min for uic in db_read.iter_only_objects(txn, UicOperatingPeriod): + if uic.from_operating_day_ref_or_from_date is None or uic.to_operating_day_ref_or_to_date is None: + continue + dt = uic.from_operating_day_ref_or_from_date.to_datetime() dt = dt.replace(tzinfo=None) if from_date > dt: @@ -808,6 +811,10 @@ def epip_service_calendar(db_read: MdbxStorage, txn: TXN, generator_defaults: di and dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date.name_of_ref_class == NameOfClassOperatingPeriodRefStructureType.UIC_OPERATING_PERIOD ) + or ( + isinstance(dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date, OperatingPeriodRef) + and dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date.ref in uic_operating_periods + ) ] my_operating_periods: list[OperatingPeriod] = [ operating_periods[dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date.ref] @@ -819,6 +826,7 @@ def epip_service_calendar(db_read: MdbxStorage, txn: TXN, generator_defaults: di == NameOfClassOperatingPeriodRefStructureType.OPERATING_PERIOD ) and (dta.is_available is None or dta.is_available) + and dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date.ref in operating_periods ] # my_operating_days = [operating_days[dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date.ref] for dta in day_type_assignments if isinstance(dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date, OperatingDayRef)] my_operational_dates = [ From 5df3ee98659bd1488d2ec8f3fe1d85c28054b688 Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Tue, 19 May 2026 15:07:26 +0200 Subject: [PATCH 09/16] Make code more readable by introducing methods --- fix/remove_mentz_line_versions.py | 339 +++++++++++++++++++----------- 1 file changed, 221 insertions(+), 118 deletions(-) diff --git a/fix/remove_mentz_line_versions.py b/fix/remove_mentz_line_versions.py index 3fa3dd9..8a63145 100644 --- a/fix/remove_mentz_line_versions.py +++ b/fix/remove_mentz_line_versions.py @@ -1,16 +1,19 @@ """ -Resolves the Mentz line versions into actual operating days +Resolves the Mentz line versions into actual operating days. https://public.3.basecamp.com/p/fVFt3mGJiK52ewcsr8nFgu6o """ import logging from collections import defaultdict +from dataclasses import dataclass +from datetime import date, timedelta from pathlib import Path from xsdata.models.datatype import XmlDateTime from domain.netex.model import ( + CompositeFrame, DayType, DayTypeAssignment, DayTypeRef, @@ -22,15 +25,223 @@ ServiceJourneyPattern, UicOperatingPeriod, UicOperatingPeriodRef, + ValidBetween, ) from storage.mdbx.core.implementation import MdbxStorage -from utils.aux_logging import prepare_logger, log_all +from utils.aux_logging import log_all, prepare_logger def fmt_dt(dt: XmlDateTime | None) -> str: return dt.to_datetime().isoformat() if dt else "None" +def _xml_date(d: date) -> XmlDateTime: + return XmlDateTime(d.year, d.month, d.day, 0, 0, 0) + + +@dataclass +class LineValidity: + from_dt: XmlDateTime + to_dt: XmlDateTime + + @property + def from_date(self) -> date: + return self.from_dt.to_datetime().date() + + @property + def to_date(self) -> date: + return self.to_dt.to_datetime().date() + + +@dataclass +class NewPeriod: + from_xml: XmlDateTime + to_xml: XmlDateTime + bits: str + from_date: date + to_date: date + + @property + def n_days(self) -> int: + return (self.to_date - self.from_date).days + 1 + + +def _build_version_validity( + lines: defaultdict[str, list[Line]], + frame_end: XmlDateTime, +) -> dict[tuple[str, str | None], LineValidity]: + raw: dict[tuple[str, str | None], tuple[XmlDateTime, XmlDateTime | None]] = {} + for line_id, line_versions in lines.items(): + if len(line_versions) <= 1: + continue + for line in line_versions: + for vb in line.validity_conditions_or_valid_between: + if hasattr(vb, 'from_date') and vb.from_date is not None: + raw[(line_id, line.version)] = (vb.from_date, vb.to_date) + break + + result: dict[tuple[str, str | None], LineValidity] = {} + for line_id, line_versions in lines.items(): + if len(line_versions) <= 1: + continue + sorted_vers = sorted( + [(line.version, raw[(line_id, line.version)]) + for line in line_versions if (line_id, line.version) in raw], + key=lambda item: item[1][0], + ) + for i, (ver, (from_dt, to_dt)) in enumerate(sorted_vers): + if to_dt is None: + if i + 1 < len(sorted_vers): + next_from = sorted_vers[i + 1][1][0].to_datetime().date() + to_dt = _xml_date(next_from - timedelta(days=1)) + else: + to_dt = frame_end + result[(line_id, ver)] = LineValidity(from_dt=from_dt, to_dt=to_dt) + return result + + +def _compute_period( + existing_period: UicOperatingPeriod | None, + validity: LineValidity, +) -> NewPeriod: + if existing_period is not None and isinstance(existing_period.from_operating_day_ref_or_from_date, XmlDateTime): + period_from_date = existing_period.from_operating_day_ref_or_from_date.to_datetime().date() + period_to_ref = existing_period.to_operating_day_ref_or_to_date + period_to_date = period_to_ref.to_datetime().date() if isinstance(period_to_ref, XmlDateTime) else validity.to_date + + new_from_date = max(validity.from_date, period_from_date) + new_to_date = min(validity.to_date, period_to_date) + + if new_from_date <= new_to_date: + offset = (new_from_date - period_from_date).days + n_days = (new_to_date - new_from_date).days + 1 + existing_bits = existing_period.valid_day_bits or '' + bits = existing_bits[offset:offset + n_days].ljust(n_days, '0') + from_xml = validity.from_dt if new_from_date == validity.from_date else existing_period.from_operating_day_ref_or_from_date + to_xml = validity.to_dt if new_to_date == validity.to_date else period_to_ref + return NewPeriod(from_xml=from_xml, to_xml=to_xml, bits=bits, from_date=new_from_date, to_date=new_to_date) + + # No existing period or no overlap — cover the full line validity with all days active. + n_days = max(1, (validity.to_date - validity.from_date).days + 1) + return NewPeriod( + from_xml=validity.from_dt, + to_xml=validity.to_dt, + bits='1' * n_days, + from_date=validity.from_date, + to_date=validity.to_date, + ) + + +def _make_calendar_objects( + line_id: str, + existing_dt_id: str, + safe_version: str, + period: NewPeriod, +) -> tuple[DayType, UicOperatingPeriod, DayTypeAssignment]: + new_day_type_id = f"{line_id}:{existing_dt_id}:{safe_version}" + new_uic_period_id = f"{line_id}:{existing_dt_id}:UicOperatingPeriod:{safe_version}" + new_dta_id = f"{line_id}:{existing_dt_id}:DayTypeAssignment:{safe_version}" + + print(f" Creating {new_day_type_id} [{period.from_date} .. {period.to_date}] ({period.n_days} days)") + + return ( + DayType(id=new_day_type_id, version='1'), + UicOperatingPeriod( + id=new_uic_period_id, + version='1', + from_operating_day_ref_or_from_date=period.from_xml, + to_operating_day_ref_or_to_date=period.to_xml, + valid_day_bits=period.bits, + ), + DayTypeAssignment( + id=new_dta_id, + version='1', + day_type_ref=DayTypeRef(ref=new_day_type_id, version='1'), + uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date=UicOperatingPeriodRef( + ref=new_uic_period_id, + version='1', + ), + ), + ) + + +def _process_journey( + journey: ServiceJourney, + line_id: str, + line_version: str | None, + safe_version: str, + validity: LineValidity, + day_type_assignments: dict[str, DayTypeAssignment], + uic_periods: dict[str, UicOperatingPeriod], + new_objects: list, + created: dict[tuple[str, str | None, str], str], +) -> list[DayTypeRef]: + new_refs: list[DayTypeRef] = [] + for dt_ref in journey.day_types.day_type_ref: # type: ignore[union-attr] + existing_dt_id = dt_ref.ref + cache_key = (line_id, line_version, existing_dt_id) + + if cache_key in created: + new_refs.append(DayTypeRef(ref=created[cache_key], version='1')) + continue + + dta = day_type_assignments.get(existing_dt_id) + existing_period: UicOperatingPeriod | None = None + if dta is not None: + period_ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date + if hasattr(period_ref, 'ref'): + existing_period = uic_periods.get(period_ref.ref) + + period = _compute_period(existing_period, validity) + objs = _make_calendar_objects(line_id, existing_dt_id, safe_version, period) + new_objects.extend(objs) + + created[cache_key] = objs[0].id + new_refs.append(DayTypeRef(ref=objs[0].id, version='1')) + return new_refs + + +def _resolve_journeys( + lines: defaultdict[str, list[Line]], + routes: defaultdict[str, list[Route]], + sjps: defaultdict[str, list[ServiceJourneyPattern]], + journeys: defaultdict[str, list[ServiceJourney]], + version_validity: dict[tuple[str, str | None], LineValidity], + day_type_assignments: dict[str, DayTypeAssignment], + uic_periods: dict[str, UicOperatingPeriod], +) -> tuple[list[DayType | UicOperatingPeriod | DayTypeAssignment], list[ServiceJourney]]: + new_objects: list[DayType | UicOperatingPeriod | DayTypeAssignment] = [] + updated_journeys: list[ServiceJourney] = [] + created: dict[tuple[str, str | None, str], str] = {} + + for line_id, line_versions in lines.items(): + if len(line_versions) <= 1: + continue + + for route in routes.get(line_id, []): + validity = version_validity.get((line_id, route.line_ref.version)) + if validity is None: + print(f" Skipping route {route.id}: no validity for line {line_id} v={route.line_ref.version}") + continue + + safe_version = (route.line_ref.version or 'unknown').replace(':', '_') + + for sjp in sjps.get(route.id, []): + for journey in journeys.get(sjp.id, []): + if journey.day_types is None: + continue + + new_refs = _process_journey( + journey, line_id, route.line_ref.version, safe_version, + validity, day_type_assignments, uic_periods, new_objects, created, + ) + if new_refs: + journey.day_types = DayTypeRefsRelStructure(day_type_ref=new_refs) + updated_journeys.append(journey) + + return new_objects, updated_journeys + + def fix_lines(database: Path) -> None: lines: defaultdict[str, list[Line]] = defaultdict(list) @@ -40,7 +251,6 @@ def fix_lines(database: Path) -> None: lines[line.id].append(line) duplicate_line_ids = {key for key, values in lines.items() if len(values) > 1} - print(f"{len(duplicate_line_ids)} of {len(lines)} lines have duplicates") routes: defaultdict[str, list[Route]] = defaultdict(list) @@ -66,7 +276,6 @@ def fix_lines(database: Path) -> None: for ref in journey.day_types.day_type_ref: day_type_ids.add(ref.ref) - # Collect existing DayTypeAssignments and UicOperatingPeriods for those day types. uic_period_ids: set[str] = set() day_type_assignments: dict[str, DayTypeAssignment] = {} uic_periods: dict[str, UicOperatingPeriod] = {} @@ -85,121 +294,15 @@ def fix_lines(database: Path) -> None: if isinstance(entry, UicOperatingPeriod) and entry.id in uic_period_ids: uic_periods[entry.id] = entry - new_objects: list[DayType | UicOperatingPeriod | DayTypeAssignment] = [] - updated_journeys: list[ServiceJourney] = [] - - # Cache: (line_id, line_version, existing_dt_id) -> new day_type_id - created: dict[tuple[str, str | None, str], str] = {} + # we really should be getting this from CompositeFrame, but that doesn't appear to be stored + frame_end: XmlDateTime = XmlDateTime(2026, 12, 14, 23, 59, 59) + print(f"The feed's end date is {frame_end}") - # Pre-index line validity by (line_id, version). - version_validity: dict[tuple[str, str | None], tuple[XmlDateTime, XmlDateTime | None]] = {} - for line_id, line_versions in lines.items(): - if len(line_versions) <= 1: - continue - for line in line_versions: - for vb in line.validity_conditions_or_valid_between: - if hasattr(vb, 'from_date') and vb.from_date is not None: - version_validity[(line_id, line.version)] = (vb.from_date, vb.to_date) - break - - for line_id, line_versions in lines.items(): - if len(line_versions) <= 1: - continue + version_validity = _build_version_validity(lines, frame_end) - for route in routes.get(line_id, []): - validity = version_validity.get((line_id, route.line_ref.version)) - if validity is None: - print(f" Skipping route {route.id}: no validity for line {line_id} v={route.line_ref.version}") - continue - - line_from, line_to = validity - line_from_date = line_from.to_datetime().date() - line_to_date = line_to.to_datetime().date() if line_to is not None else line_from_date - safe_version = (route.line_ref.version or 'unknown').replace(':', '_') - - for sjp in sjps.get(route.id, []): - for journey in journeys.get(sjp.id, []): - if journey.day_types is None: - continue - - new_refs: list[DayTypeRef] = [] - for dt_ref in journey.day_types.day_type_ref: - existing_dt_id = dt_ref.ref - cache_key = (line_id, route.line_ref.version, existing_dt_id) - - if cache_key in created: - new_refs.append(DayTypeRef(ref=created[cache_key], version='1')) - continue - - # Derive the new UicOperatingPeriod by intersecting the existing period - # with the line version's validity window. - dta = day_type_assignments.get(existing_dt_id) - existing_period: UicOperatingPeriod | None = None - if dta is not None: - period_ref = dta.uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date - if hasattr(period_ref, 'ref'): - existing_period = uic_periods.get(period_ref.ref) - - if existing_period is not None and isinstance(existing_period.from_operating_day_ref_or_from_date, XmlDateTime): - period_from_date = existing_period.from_operating_day_ref_or_from_date.to_datetime().date() - period_to_ref = existing_period.to_operating_day_ref_or_to_date - period_to_date = period_to_ref.to_datetime().date() if isinstance(period_to_ref, XmlDateTime) else line_to_date - - new_from_date = max(line_from_date, period_from_date) - new_to_date = min(line_to_date, period_to_date) - - if new_from_date <= new_to_date: - # Slice valid_day_bits to the intersection window. - offset = (new_from_date - period_from_date).days - n_days = (new_to_date - new_from_date).days + 1 - existing_bits = existing_period.valid_day_bits or '' - new_bits = existing_bits[offset:offset + n_days].ljust(n_days, '0') - - new_from_xml: XmlDateTime = line_from if new_from_date == line_from_date else existing_period.from_operating_day_ref_or_from_date - new_to_xml: XmlDateTime | None = line_to if new_to_date == line_to_date else period_to_ref - else: - # No overlap — fall back to full line validity with all days active. - new_from_date, new_to_date = line_from_date, line_to_date - n_days = max(1, (new_to_date - new_from_date).days + 1) - new_bits = '1' * n_days - new_from_xml, new_to_xml = line_from, line_to - else: - # No existing period — cover the full line validity. - new_from_date, new_to_date = line_from_date, line_to_date - n_days = max(1, (new_to_date - new_from_date).days + 1) - new_bits = '1' * n_days - new_from_xml, new_to_xml = line_from, line_to - - new_day_type_id = f"{line_id}:{existing_dt_id}:{safe_version}" - new_uic_period_id = f"{line_id}:{existing_dt_id}:UicOperatingPeriod:{safe_version}" - new_dta_id = f"{line_id}:{existing_dt_id}:DayTypeAssignment:{safe_version}" - - print(f" Creating {new_day_type_id} [{new_from_date} .. {new_to_date}] ({n_days} days)") - - new_objects.append(DayType(id=new_day_type_id, version='1')) - new_objects.append(UicOperatingPeriod( - id=new_uic_period_id, - version='1', - from_operating_day_ref_or_from_date=new_from_xml, - to_operating_day_ref_or_to_date=new_to_xml, - valid_day_bits=new_bits, - )) - new_objects.append(DayTypeAssignment( - id=new_dta_id, - version='1', - day_type_ref=DayTypeRef(ref=new_day_type_id, version='1'), - uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date=UicOperatingPeriodRef( - ref=new_uic_period_id, - version='1', - ), - )) - - created[cache_key] = new_day_type_id - new_refs.append(DayTypeRef(ref=new_day_type_id, version='1')) - - if new_refs: - journey.day_types = DayTypeRefsRelStructure(day_type_ref=new_refs) - updated_journeys.append(journey) + new_objects, updated_journeys = _resolve_journeys( + lines, routes, sjps, journeys, version_validity, day_type_assignments, uic_periods, + ) print(f"Writing {len(new_objects)} new calendar objects and {len(updated_journeys)} updated journeys") @@ -226,4 +329,4 @@ def main(source_database_file: str) -> None: main(args.source) except Exception as e: log_all(logging.ERROR, f"{e} {traceback.format_exc()}") - raise e \ No newline at end of file + raise e From 5959e7001675ba961d3c54a989aaf7ffa019d49c Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Wed, 20 May 2026 11:34:49 +0200 Subject: [PATCH 10/16] Make versions run until the end of the feed period --- fix/remove_mentz_line_versions.py | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/fix/remove_mentz_line_versions.py b/fix/remove_mentz_line_versions.py index 8a63145..845024e 100644 --- a/fix/remove_mentz_line_versions.py +++ b/fix/remove_mentz_line_versions.py @@ -7,7 +7,7 @@ import logging from collections import defaultdict from dataclasses import dataclass -from datetime import date, timedelta +from datetime import date from pathlib import Path from xsdata.models.datatype import XmlDateTime @@ -70,33 +70,18 @@ def _build_version_validity( lines: defaultdict[str, list[Line]], frame_end: XmlDateTime, ) -> dict[tuple[str, str | None], LineValidity]: - raw: dict[tuple[str, str | None], tuple[XmlDateTime, XmlDateTime | None]] = {} + result: dict[tuple[str, str | None], LineValidity] = {} for line_id, line_versions in lines.items(): if len(line_versions) <= 1: continue for line in line_versions: for vb in line.validity_conditions_or_valid_between: if hasattr(vb, 'from_date') and vb.from_date is not None: - raw[(line_id, line.version)] = (vb.from_date, vb.to_date) + result[(line_id, line.version)] = LineValidity( + from_dt=vb.from_date, + to_dt=vb.to_date if vb.to_date is not None else frame_end, + ) break - - result: dict[tuple[str, str | None], LineValidity] = {} - for line_id, line_versions in lines.items(): - if len(line_versions) <= 1: - continue - sorted_vers = sorted( - [(line.version, raw[(line_id, line.version)]) - for line in line_versions if (line_id, line.version) in raw], - key=lambda item: item[1][0], - ) - for i, (ver, (from_dt, to_dt)) in enumerate(sorted_vers): - if to_dt is None: - if i + 1 < len(sorted_vers): - next_from = sorted_vers[i + 1][1][0].to_datetime().date() - to_dt = _xml_date(next_from - timedelta(days=1)) - else: - to_dt = frame_end - result[(line_id, ver)] = LineValidity(from_dt=from_dt, to_dt=to_dt) return result From 38937010beb9a477023c72a4b38af2c62295a84f Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Wed, 20 May 2026 11:37:34 +0200 Subject: [PATCH 11/16] Add script for rewriting South Tyrolian SSP ids --- fix/fix_ssp_ids.py | 119 +++++++++++++++++++++++++++++++++++++++++++ fix/list_journeys.py | 57 --------------------- 2 files changed, 119 insertions(+), 57 deletions(-) create mode 100644 fix/fix_ssp_ids.py delete mode 100644 fix/list_journeys.py diff --git a/fix/fix_ssp_ids.py b/fix/fix_ssp_ids.py new file mode 100644 index 0000000..ee16f3a --- /dev/null +++ b/fix/fix_ssp_ids.py @@ -0,0 +1,119 @@ +""" +Fix ScheduledStopPoint IDs to match SIRI feed format. + +Transforms IDs from: IT:ITH1:ScheduledStopPoint:it-22021-7010-51-32073: +to: IT:ITH10:ScheduledStopPoint:7010:51:32073 + +This is needed so that NeTEx and SIRI feeds reference the same stops. +""" + +import dataclasses +import logging +import re +from pathlib import Path +from typing import Any + +from domain.netex.model import ( + PassengerStopAssignment, + Route, + RoutePoint, + RoutePointRef, + ScheduledStopPoint, + ScheduledStopPointRef, + ServiceJourneyPattern, + ServiceLink, + TimingLink, +) +from storage.mdbx.core.implementation import MdbxStorage +from utils.aux_logging import log_all, prepare_logger + +_PATTERN = re.compile(r'^.*:ScheduledStopPoint:it-22021-(.+):$') + + +def _new_id(old_id: str) -> str | None: + m = _PATTERN.match(old_id) + if m: + return 'IT:ITH10:ScheduledStopPoint:' + m.group(1).replace('-', ':') + return None + + +def _update_refs(obj: Any, id_map: dict[str, str]) -> bool: + if obj is None or not dataclasses.is_dataclass(obj) or isinstance(obj, type): + return False + modified = False + for f in dataclasses.fields(obj): + val = getattr(obj, f.name) + if isinstance(val, (ScheduledStopPointRef, RoutePointRef)): + new_ref = id_map.get(val.ref) + if new_ref is not None: + val.ref = new_ref + modified = True + elif isinstance(val, list): + for item in val: + if _update_refs(item, id_map): + modified = True + elif dataclasses.is_dataclass(val): + if _update_refs(val, id_map): + modified = True + return modified + + +# Object types that may transitively contain ScheduledStopPointRef or RoutePointRef. +_REF_BEARING_TYPES = [ + ServiceJourneyPattern, + ServiceLink, + TimingLink, + PassengerStopAssignment, + Route, + RoutePoint, +] + + +def fix_ssp_ids(database: Path) -> None: + with MdbxStorage(database, readonly=False) as db: + with db.env.rw_transaction() as txn: + id_map: dict[str, str] = {} + old_ssps: list[ScheduledStopPoint] = [] + for ssp in db.iter_only_objects(txn, ScheduledStopPoint): + new_id = _new_id(ssp.id) + if new_id is not None: + id_map[ssp.id] = new_id + old_ssps.append(ssp) + + print(f"{len(id_map)} ScheduledStopPoints to rewrite") + + updated: list[Any] = [] + for cls in _REF_BEARING_TYPES: + for obj in db.iter_only_objects(txn, cls): + if _update_refs(obj, id_map): + updated.append(obj) + + new_ssps = [dataclasses.replace(ssp, id=id_map[ssp.id]) for ssp in old_ssps] + + print(f"Updating refs in {len(updated)} objects") + print(f"Inserting {len(new_ssps)} renamed ScheduledStopPoints") + # TODO: delete the old ScheduledStopPoint objects (no delete API available yet) + + db.insert_any_object_on_queue(txn, updated) + db.insert_any_object_on_queue(txn, new_ssps) + txn.commit() + + +def main(source_database_file: str) -> None: + fix_ssp_ids(Path(source_database_file)) + + +if __name__ == "__main__": + import argparse + import traceback + + parser = argparse.ArgumentParser(description="Fix ScheduledStopPoint IDs to SIRI format") + parser.add_argument("source", type=str, help="mdbx file to fix in-place") + parser.add_argument("--log_file", type=str, required=False, help="log file path") + args = parser.parse_args() + prepare_logger(logging.INFO, args.log_file) + try: + main(args.source) + except Exception as e: + log_all(logging.ERROR, f"{e}") + raise e diff --git a/fix/list_journeys.py b/fix/list_journeys.py deleted file mode 100644 index aa151e6..0000000 --- a/fix/list_journeys.py +++ /dev/null @@ -1,57 +0,0 @@ -""" - -""" - -import logging -from collections import defaultdict -from pathlib import Path - -from xsdata.models.datatype import XmlDateTime - -from domain.netex.model import ( - DayType, - DayTypeAssignment, - DayTypeRef, - DayTypeRefsRelStructure, - Line, - Route, - ServiceCalendar, - ServiceJourney, - ServiceJourneyPattern, - UicOperatingPeriod, - UicOperatingPeriodRef, -) -from storage.mdbx.core.implementation import MdbxStorage -from utils.aux_logging import prepare_logger, log_all - - -def fmt_dt(dt: XmlDateTime | None) -> str: - return dt.to_datetime().isoformat() if dt else "None" - - -def fix_lines(database: Path) -> None: - - with MdbxStorage(database, readonly=False) as db: - with db.env.ro_transaction() as txn: - for journey in db.iter_only_objects(txn, ServiceJourney): - print(journey.day_types.day_type_ref) - - -def main(source_database_file: str) -> None: - return fix_lines(Path(source_database_file)) - - -if __name__ == "__main__": - import argparse - import traceback - - parser = argparse.ArgumentParser(description="Fix Mentz line versions") - parser.add_argument("source", type=str, help="mdbx file to use as input.") - parser.add_argument("--log_file", type=str, required=False, help="the logfile") - args = parser.parse_args() - mylogger = prepare_logger(logging.INFO, args.log_file) - try: - main(args.source) - except Exception as e: - log_all(logging.ERROR, f"{e} {traceback.format_exc()}") - raise e \ No newline at end of file From cefac298ecaebb31ed4d26b48971b939de62a4aa Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Wed, 20 May 2026 11:52:52 +0200 Subject: [PATCH 12/16] Flesh out documentation # Conflicts: # fix/rewrite_sta_ssp_ids.py --- fix/fix_ssp_ids.py | 119 ------------------ ...ions.py => resolve_mentz_line_versions.py} | 28 ++++- 2 files changed, 27 insertions(+), 120 deletions(-) delete mode 100644 fix/fix_ssp_ids.py rename fix/{remove_mentz_line_versions.py => resolve_mentz_line_versions.py} (93%) diff --git a/fix/fix_ssp_ids.py b/fix/fix_ssp_ids.py deleted file mode 100644 index ee16f3a..0000000 --- a/fix/fix_ssp_ids.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -Fix ScheduledStopPoint IDs to match SIRI feed format. - -Transforms IDs from: IT:ITH1:ScheduledStopPoint:it-22021-7010-51-32073: -to: IT:ITH10:ScheduledStopPoint:7010:51:32073 - -This is needed so that NeTEx and SIRI feeds reference the same stops. -""" - -import dataclasses -import logging -import re -from pathlib import Path -from typing import Any - -from domain.netex.model import ( - PassengerStopAssignment, - Route, - RoutePoint, - RoutePointRef, - ScheduledStopPoint, - ScheduledStopPointRef, - ServiceJourneyPattern, - ServiceLink, - TimingLink, -) -from storage.mdbx.core.implementation import MdbxStorage -from utils.aux_logging import log_all, prepare_logger - -_PATTERN = re.compile(r'^.*:ScheduledStopPoint:it-22021-(.+):$') - - -def _new_id(old_id: str) -> str | None: - m = _PATTERN.match(old_id) - if m: - return 'IT:ITH10:ScheduledStopPoint:' + m.group(1).replace('-', ':') - return None - - -def _update_refs(obj: Any, id_map: dict[str, str]) -> bool: - if obj is None or not dataclasses.is_dataclass(obj) or isinstance(obj, type): - return False - modified = False - for f in dataclasses.fields(obj): - val = getattr(obj, f.name) - if isinstance(val, (ScheduledStopPointRef, RoutePointRef)): - new_ref = id_map.get(val.ref) - if new_ref is not None: - val.ref = new_ref - modified = True - elif isinstance(val, list): - for item in val: - if _update_refs(item, id_map): - modified = True - elif dataclasses.is_dataclass(val): - if _update_refs(val, id_map): - modified = True - return modified - - -# Object types that may transitively contain ScheduledStopPointRef or RoutePointRef. -_REF_BEARING_TYPES = [ - ServiceJourneyPattern, - ServiceLink, - TimingLink, - PassengerStopAssignment, - Route, - RoutePoint, -] - - -def fix_ssp_ids(database: Path) -> None: - with MdbxStorage(database, readonly=False) as db: - with db.env.rw_transaction() as txn: - id_map: dict[str, str] = {} - old_ssps: list[ScheduledStopPoint] = [] - for ssp in db.iter_only_objects(txn, ScheduledStopPoint): - new_id = _new_id(ssp.id) - if new_id is not None: - id_map[ssp.id] = new_id - old_ssps.append(ssp) - - print(f"{len(id_map)} ScheduledStopPoints to rewrite") - - updated: list[Any] = [] - for cls in _REF_BEARING_TYPES: - for obj in db.iter_only_objects(txn, cls): - if _update_refs(obj, id_map): - updated.append(obj) - - new_ssps = [dataclasses.replace(ssp, id=id_map[ssp.id]) for ssp in old_ssps] - - print(f"Updating refs in {len(updated)} objects") - print(f"Inserting {len(new_ssps)} renamed ScheduledStopPoints") - # TODO: delete the old ScheduledStopPoint objects (no delete API available yet) - - db.insert_any_object_on_queue(txn, updated) - db.insert_any_object_on_queue(txn, new_ssps) - txn.commit() - - -def main(source_database_file: str) -> None: - fix_ssp_ids(Path(source_database_file)) - - -if __name__ == "__main__": - import argparse - import traceback - - parser = argparse.ArgumentParser(description="Fix ScheduledStopPoint IDs to SIRI format") - parser.add_argument("source", type=str, help="mdbx file to fix in-place") - parser.add_argument("--log_file", type=str, required=False, help="log file path") - args = parser.parse_args() - prepare_logger(logging.INFO, args.log_file) - try: - main(args.source) - except Exception as e: - log_all(logging.ERROR, f"{e}") - raise e diff --git a/fix/remove_mentz_line_versions.py b/fix/resolve_mentz_line_versions.py similarity index 93% rename from fix/remove_mentz_line_versions.py rename to fix/resolve_mentz_line_versions.py index 845024e..37dc31f 100644 --- a/fix/remove_mentz_line_versions.py +++ b/fix/resolve_mentz_line_versions.py @@ -1,7 +1,33 @@ """ Resolves the Mentz line versions into actual operating days. -https://public.3.basecamp.com/p/fVFt3mGJiK52ewcsr8nFgu6o +Mentz' EPIP exporter writes variations of a journey not as separate operating +periods but as a completely new "version" of a line, even though it's not the +line that changes. It looks like this: + +Line A: v1 + validity: 2026-01-01 -> 2026-01-31 + Route A1: + ServiceJourneyPattern A1: + Journey A1 + dayTypes: Period1 + +Line A: v2 + validity: 2026-02-01 -> 2026-02-31 + Route A2: + ServiceJourneyPattern A2: + Journey A2 + dayTypes: Period1 + +This means you must create new operating periods for each version of the line +and assign the correct day types to each version of the journey. + +This is very unfortunate and extremely convoluted to untangle. + +Further reading: + https://public.3.basecamp.com/p/fVFt3mGJiK52ewcsr8nFgu6o + https://github.com/noi-techpark/opendatahub-mentor-otp/issues/291 + """ import logging From 73dc589659bffd89a11eec0a3e0c33af81e2a8fb Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Wed, 20 May 2026 12:33:28 +0200 Subject: [PATCH 13/16] Take review feedback into account --- fix/resolve_mentz_line_versions.py | 142 +++++++++++++---------------- 1 file changed, 65 insertions(+), 77 deletions(-) diff --git a/fix/resolve_mentz_line_versions.py b/fix/resolve_mentz_line_versions.py index 37dc31f..47fbaf6 100644 --- a/fix/resolve_mentz_line_versions.py +++ b/fix/resolve_mentz_line_versions.py @@ -32,7 +32,7 @@ import logging from collections import defaultdict -from dataclasses import dataclass +from collections.abc import Generator from datetime import date from pathlib import Path @@ -45,6 +45,8 @@ DayTypeRef, DayTypeRefsRelStructure, Line, + OperatingPeriod, + OperatingPeriodRef, Route, ServiceCalendar, ServiceJourney, @@ -61,85 +63,66 @@ def fmt_dt(dt: XmlDateTime | None) -> str: return dt.to_datetime().isoformat() if dt else "None" -def _xml_date(d: date) -> XmlDateTime: - return XmlDateTime(d.year, d.month, d.day, 0, 0, 0) - - -@dataclass -class LineValidity: - from_dt: XmlDateTime - to_dt: XmlDateTime - - @property - def from_date(self) -> date: - return self.from_dt.to_datetime().date() - - @property - def to_date(self) -> date: - return self.to_dt.to_datetime().date() - - -@dataclass -class NewPeriod: - from_xml: XmlDateTime - to_xml: XmlDateTime - bits: str - from_date: date - to_date: date - - @property - def n_days(self) -> int: - return (self.to_date - self.from_date).days + 1 - - -def _build_version_validity( +def _iter_version_validity( lines: defaultdict[str, list[Line]], frame_end: XmlDateTime, -) -> dict[tuple[str, str | None], LineValidity]: - result: dict[tuple[str, str | None], LineValidity] = {} +) -> Generator[tuple[tuple[str, str | None], ValidBetween], None, None]: + raw: dict[tuple[str, str | None], tuple[XmlDateTime, XmlDateTime | None]] = {} for line_id, line_versions in lines.items(): if len(line_versions) <= 1: continue for line in line_versions: for vb in line.validity_conditions_or_valid_between: if hasattr(vb, 'from_date') and vb.from_date is not None: - result[(line_id, line.version)] = LineValidity( - from_dt=vb.from_date, - to_dt=vb.to_date if vb.to_date is not None else frame_end, - ) + raw[(line_id, line.version)] = (vb.from_date, vb.to_date) break - return result + + for line_id, line_versions in lines.items(): + if len(line_versions) <= 1: + continue + sorted_vers = sorted( + [(line.version, raw[(line_id, line.version)]) + for line in line_versions if (line_id, line.version) in raw], + key=lambda item: item[1][0], + ) + for i, (ver, (from_dt, to_dt)) in enumerate(sorted_vers): + if to_dt is None: + to_dt = sorted_vers[i + 1][1][0] if i + 1 < len(sorted_vers) else frame_end + yield (line_id, ver), ValidBetween(from_date=from_dt, to_date=to_dt) def _compute_period( existing_period: UicOperatingPeriod | None, - validity: LineValidity, -) -> NewPeriod: + validity: ValidBetween, +) -> UicOperatingPeriod | OperatingPeriod: + line_from_date = validity.from_date.to_datetime().date() + line_to_date = validity.to_date.to_datetime().date() + if existing_period is not None and isinstance(existing_period.from_operating_day_ref_or_from_date, XmlDateTime): period_from_date = existing_period.from_operating_day_ref_or_from_date.to_datetime().date() period_to_ref = existing_period.to_operating_day_ref_or_to_date - period_to_date = period_to_ref.to_datetime().date() if isinstance(period_to_ref, XmlDateTime) else validity.to_date + period_to_date = period_to_ref.to_datetime().date() if isinstance(period_to_ref, XmlDateTime) else line_to_date - new_from_date = max(validity.from_date, period_from_date) - new_to_date = min(validity.to_date, period_to_date) + new_from_date = max(line_from_date, period_from_date) + new_to_date = min(line_to_date, period_to_date) if new_from_date <= new_to_date: offset = (new_from_date - period_from_date).days n_days = (new_to_date - new_from_date).days + 1 existing_bits = existing_period.valid_day_bits or '' bits = existing_bits[offset:offset + n_days].ljust(n_days, '0') - from_xml = validity.from_dt if new_from_date == validity.from_date else existing_period.from_operating_day_ref_or_from_date - to_xml = validity.to_dt if new_to_date == validity.to_date else period_to_ref - return NewPeriod(from_xml=from_xml, to_xml=to_xml, bits=bits, from_date=new_from_date, to_date=new_to_date) - - # No existing period or no overlap — cover the full line validity with all days active. - n_days = max(1, (validity.to_date - validity.from_date).days + 1) - return NewPeriod( - from_xml=validity.from_dt, - to_xml=validity.to_dt, - bits='1' * n_days, - from_date=validity.from_date, - to_date=validity.to_date, + from_xml = validity.from_date if new_from_date == line_from_date else existing_period.from_operating_day_ref_or_from_date + to_xml = validity.to_date if new_to_date == line_to_date else period_to_ref + return UicOperatingPeriod( + from_operating_day_ref_or_from_date=from_xml, + to_operating_day_ref_or_to_date=to_xml, + valid_day_bits=bits, + ) + + # No existing period or no overlap — all days in the line's validity window are active. + return OperatingPeriod( + from_operating_day_ref_or_from_date=validity.from_date, + to_operating_day_ref_or_to_date=validity.to_date, ) @@ -147,31 +130,36 @@ def _make_calendar_objects( line_id: str, existing_dt_id: str, safe_version: str, - period: NewPeriod, -) -> tuple[DayType, UicOperatingPeriod, DayTypeAssignment]: + period: UicOperatingPeriod | OperatingPeriod, +) -> tuple[DayType, UicOperatingPeriod | OperatingPeriod, DayTypeAssignment]: new_day_type_id = f"{line_id}:{existing_dt_id}:{safe_version}" - new_uic_period_id = f"{line_id}:{existing_dt_id}:UicOperatingPeriod:{safe_version}" + new_period_id = f"{line_id}:{existing_dt_id}:{type(period).__name__}:{safe_version}" new_dta_id = f"{line_id}:{existing_dt_id}:DayTypeAssignment:{safe_version}" - print(f" Creating {new_day_type_id} [{period.from_date} .. {period.to_date}] ({period.n_days} days)") + period_from = period.from_operating_day_ref_or_from_date + period_to = period.to_operating_day_ref_or_to_date + from_date = period_from.to_datetime().date() if isinstance(period_from, XmlDateTime) else None + to_date = period_to.to_datetime().date() if isinstance(period_to, XmlDateTime) else None + n_days = (to_date - from_date).days + 1 if from_date and to_date else '?' + print(f" Creating {new_day_type_id} [{from_date} .. {to_date}] ({n_days} days)") + + period.id = new_period_id + period.version = '1' + + period_ref = ( + UicOperatingPeriodRef(ref=new_period_id, version='1') + if isinstance(period, UicOperatingPeriod) + else OperatingPeriodRef(ref=new_period_id, version='1') + ) return ( DayType(id=new_day_type_id, version='1'), - UicOperatingPeriod( - id=new_uic_period_id, - version='1', - from_operating_day_ref_or_from_date=period.from_xml, - to_operating_day_ref_or_to_date=period.to_xml, - valid_day_bits=period.bits, - ), + period, DayTypeAssignment( id=new_dta_id, version='1', day_type_ref=DayTypeRef(ref=new_day_type_id, version='1'), - uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date=UicOperatingPeriodRef( - ref=new_uic_period_id, - version='1', - ), + uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date=period_ref, ), ) @@ -181,7 +169,7 @@ def _process_journey( line_id: str, line_version: str | None, safe_version: str, - validity: LineValidity, + validity: ValidBetween, day_type_assignments: dict[str, DayTypeAssignment], uic_periods: dict[str, UicOperatingPeriod], new_objects: list, @@ -217,11 +205,11 @@ def _resolve_journeys( routes: defaultdict[str, list[Route]], sjps: defaultdict[str, list[ServiceJourneyPattern]], journeys: defaultdict[str, list[ServiceJourney]], - version_validity: dict[tuple[str, str | None], LineValidity], + version_validity: dict[tuple[str, str | None], ValidBetween], day_type_assignments: dict[str, DayTypeAssignment], uic_periods: dict[str, UicOperatingPeriod], -) -> tuple[list[DayType | UicOperatingPeriod | DayTypeAssignment], list[ServiceJourney]]: - new_objects: list[DayType | UicOperatingPeriod | DayTypeAssignment] = [] +) -> tuple[list[DayType | UicOperatingPeriod | OperatingPeriod | DayTypeAssignment], list[ServiceJourney]]: + new_objects: list[DayType | UicOperatingPeriod | OperatingPeriod | DayTypeAssignment] = [] updated_journeys: list[ServiceJourney] = [] created: dict[tuple[str, str | None, str], str] = {} @@ -309,7 +297,7 @@ def fix_lines(database: Path) -> None: frame_end: XmlDateTime = XmlDateTime(2026, 12, 14, 23, 59, 59) print(f"The feed's end date is {frame_end}") - version_validity = _build_version_validity(lines, frame_end) + version_validity = dict(_iter_version_validity(lines, frame_end)) new_objects, updated_journeys = _resolve_journeys( lines, routes, sjps, journeys, version_validity, day_type_assignments, uic_periods, From 8ac58958f810719ad9cb120701f016cd62bcbba6 Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Wed, 20 May 2026 12:44:53 +0200 Subject: [PATCH 14/16] Revert change in EPIP export --- transformers/epip.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/transformers/epip.py b/transformers/epip.py index fb8e543..d31d3df 100644 --- a/transformers/epip.py +++ b/transformers/epip.py @@ -506,9 +506,6 @@ def get_service_calendar(db_read: MdbxStorage, txn: TXN, generator_defaults: dic from_date: datetime = datetime.max to_date: datetime = datetime.min for uic in db_read.iter_only_objects(txn, UicOperatingPeriod): - if uic.from_operating_day_ref_or_from_date is None or uic.to_operating_day_ref_or_to_date is None: - continue - dt = uic.from_operating_day_ref_or_from_date.to_datetime() dt = dt.replace(tzinfo=None) if from_date > dt: From 8e27af043203381be6315da6e2240407c9ea5f4d Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Wed, 20 May 2026 12:45:03 +0200 Subject: [PATCH 15/16] Add mandatory ID --- fix/resolve_mentz_line_versions.py | 74 +++++++++++------------------- 1 file changed, 28 insertions(+), 46 deletions(-) diff --git a/fix/resolve_mentz_line_versions.py b/fix/resolve_mentz_line_versions.py index 47fbaf6..b5df937 100644 --- a/fix/resolve_mentz_line_versions.py +++ b/fix/resolve_mentz_line_versions.py @@ -33,7 +33,6 @@ import logging from collections import defaultdict from collections.abc import Generator -from datetime import date from pathlib import Path from xsdata.models.datatype import XmlDateTime @@ -91,13 +90,22 @@ def _iter_version_validity( yield (line_id, ver), ValidBetween(from_date=from_dt, to_date=to_dt) -def _compute_period( +def _make_calendar_objects( + line_id: str, + existing_dt_id: str, + safe_version: str, existing_period: UicOperatingPeriod | None, validity: ValidBetween, -) -> UicOperatingPeriod | OperatingPeriod: +) -> tuple[DayType, UicOperatingPeriod | OperatingPeriod, DayTypeAssignment]: + new_day_type_id = f"{line_id}:{existing_dt_id}:{safe_version}" + new_dta_id = f"{line_id}:{existing_dt_id}:DayTypeAssignment:{safe_version}" + line_from_date = validity.from_date.to_datetime().date() line_to_date = validity.to_date.to_datetime().date() + period: UicOperatingPeriod | OperatingPeriod + period_ref: UicOperatingPeriodRef | OperatingPeriodRef + if existing_period is not None and isinstance(existing_period.from_operating_day_ref_or_from_date, XmlDateTime): period_from_date = existing_period.from_operating_day_ref_or_from_date.to_datetime().date() period_to_ref = existing_period.to_operating_day_ref_or_to_date @@ -113,54 +121,29 @@ def _compute_period( bits = existing_bits[offset:offset + n_days].ljust(n_days, '0') from_xml = validity.from_date if new_from_date == line_from_date else existing_period.from_operating_day_ref_or_from_date to_xml = validity.to_date if new_to_date == line_to_date else period_to_ref - return UicOperatingPeriod( - from_operating_day_ref_or_from_date=from_xml, - to_operating_day_ref_or_to_date=to_xml, - valid_day_bits=bits, - ) - - # No existing period or no overlap — all days in the line's validity window are active. - return OperatingPeriod( - from_operating_day_ref_or_from_date=validity.from_date, - to_operating_day_ref_or_to_date=validity.to_date, - ) + new_period_id = f"{line_id}:{existing_dt_id}:UicOperatingPeriod:{safe_version}" + print(f" Creating {new_day_type_id} [{new_from_date} .. {new_to_date}] ({n_days} days)") + period = UicOperatingPeriod(id=new_period_id, version='1', from_operating_day_ref_or_from_date=from_xml, to_operating_day_ref_or_to_date=to_xml, valid_day_bits=bits) + period_ref = UicOperatingPeriodRef(ref=new_period_id, version='1') -def _make_calendar_objects( - line_id: str, - existing_dt_id: str, - safe_version: str, - period: UicOperatingPeriod | OperatingPeriod, -) -> tuple[DayType, UicOperatingPeriod | OperatingPeriod, DayTypeAssignment]: - new_day_type_id = f"{line_id}:{existing_dt_id}:{safe_version}" - new_period_id = f"{line_id}:{existing_dt_id}:{type(period).__name__}:{safe_version}" - new_dta_id = f"{line_id}:{existing_dt_id}:DayTypeAssignment:{safe_version}" - - period_from = period.from_operating_day_ref_or_from_date - period_to = period.to_operating_day_ref_or_to_date - from_date = period_from.to_datetime().date() if isinstance(period_from, XmlDateTime) else None - to_date = period_to.to_datetime().date() if isinstance(period_to, XmlDateTime) else None - n_days = (to_date - from_date).days + 1 if from_date and to_date else '?' - print(f" Creating {new_day_type_id} [{from_date} .. {to_date}] ({n_days} days)") - - period.id = new_period_id - period.version = '1' + return ( + DayType(id=new_day_type_id, version='1'), + period, + DayTypeAssignment(id=new_dta_id, version='1', day_type_ref=DayTypeRef(ref=new_day_type_id, version='1'), uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date=period_ref), + ) - period_ref = ( - UicOperatingPeriodRef(ref=new_period_id, version='1') - if isinstance(period, UicOperatingPeriod) - else OperatingPeriodRef(ref=new_period_id, version='1') - ) + # No existing period or no overlap — all days in the line's validity window are active. + new_period_id = f"{line_id}:{existing_dt_id}:OperatingPeriod:{safe_version}" + n_days = max(1, (line_to_date - line_from_date).days + 1) + print(f" Creating {new_day_type_id} [{line_from_date} .. {line_to_date}] ({n_days} days)") + period = OperatingPeriod(id=new_period_id, version='1', from_operating_day_ref_or_from_date=validity.from_date, to_operating_day_ref_or_to_date=validity.to_date) + period_ref = OperatingPeriodRef(ref=new_period_id, version='1') return ( DayType(id=new_day_type_id, version='1'), period, - DayTypeAssignment( - id=new_dta_id, - version='1', - day_type_ref=DayTypeRef(ref=new_day_type_id, version='1'), - uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date=period_ref, - ), + DayTypeAssignment(id=new_dta_id, version='1', day_type_ref=DayTypeRef(ref=new_day_type_id, version='1'), uic_operating_period_ref_or_operating_period_ref_or_operating_day_ref_or_date=period_ref), ) @@ -191,8 +174,7 @@ def _process_journey( if hasattr(period_ref, 'ref'): existing_period = uic_periods.get(period_ref.ref) - period = _compute_period(existing_period, validity) - objs = _make_calendar_objects(line_id, existing_dt_id, safe_version, period) + objs = _make_calendar_objects(line_id, existing_dt_id, safe_version, existing_period, validity) new_objects.extend(objs) created[cache_key] = objs[0].id From 7c8bc5909f851b695ab4f9285d1fd6f7edd9b297 Mon Sep 17 00:00:00 2001 From: Leonard Ehrenfried Date: Fri, 22 May 2026 13:56:42 +0200 Subject: [PATCH 16/16] Make versions without a ToDate default ones --- fix/resolve_mentz_line_versions.py | 110 ++++++++++++++++++++++------- 1 file changed, 83 insertions(+), 27 deletions(-) diff --git a/fix/resolve_mentz_line_versions.py b/fix/resolve_mentz_line_versions.py index b5df937..bae6686 100644 --- a/fix/resolve_mentz_line_versions.py +++ b/fix/resolve_mentz_line_versions.py @@ -33,6 +33,7 @@ import logging from collections import defaultdict from collections.abc import Generator +from datetime import timedelta from pathlib import Path from xsdata.models.datatype import XmlDateTime @@ -62,32 +63,82 @@ def fmt_dt(dt: XmlDateTime | None) -> str: return dt.to_datetime().isoformat() if dt else "None" +def _xml_dt(dt) -> XmlDateTime: + return XmlDateTime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second) + + +def _compute_default_segments( + from_dt: XmlDateTime, + default_end: XmlDateTime, + overrides: list[tuple[XmlDateTime, XmlDateTime]], +) -> list[ValidBetween]: + """Return non-overlapping ValidBetween segments for a default version, excluding override windows.""" + segments: list[ValidBetween] = [] + current = from_dt.to_datetime() + end = default_end.to_datetime() + + for od_from, od_to in sorted(overrides, key=lambda x: x[0].to_datetime()): + od_start = od_from.to_datetime() + od_end = od_to.to_datetime() + if od_start >= end: + break + if od_end <= current: + continue + if current < od_start: + segments.append(ValidBetween( + from_date=_xml_dt(current), + to_date=_xml_dt(od_start - timedelta(seconds=1)), + )) + current = od_end + timedelta(seconds=1) + + if current < end: + segments.append(ValidBetween(from_date=_xml_dt(current), to_date=_xml_dt(end))) + + return segments + + def _iter_version_validity( lines: defaultdict[str, list[Line]], frame_end: XmlDateTime, -) -> Generator[tuple[tuple[str, str | None], ValidBetween], None, None]: - raw: dict[tuple[str, str | None], tuple[XmlDateTime, XmlDateTime | None]] = {} +) -> Generator[tuple[tuple[str, str | None], list[ValidBetween]], None, None]: + """Yield (line_id, version) → list of ValidBetween segments. + + Versions without a ToDate are "defaults": they run continuously from their + FromDate (until the next default's FromDate, or frame_end), interrupted only + by "override" versions that carry an explicit ToDate. + + Versions with a ToDate are "overrides": they run exactly during their window. + """ for line_id, line_versions in lines.items(): if len(line_versions) <= 1: continue + + raw: list[tuple[str | None, XmlDateTime, XmlDateTime | None]] = [] for line in line_versions: for vb in line.validity_conditions_or_valid_between: if hasattr(vb, 'from_date') and vb.from_date is not None: - raw[(line_id, line.version)] = (vb.from_date, vb.to_date) + raw.append((line.version, vb.from_date, vb.to_date)) break - for line_id, line_versions in lines.items(): - if len(line_versions) <= 1: - continue - sorted_vers = sorted( - [(line.version, raw[(line_id, line.version)]) - for line in line_versions if (line_id, line.version) in raw], - key=lambda item: item[1][0], + defaults = sorted( + [(ver, from_dt) for ver, from_dt, to_dt in raw if to_dt is None], + key=lambda x: x[1].to_datetime(), ) - for i, (ver, (from_dt, to_dt)) in enumerate(sorted_vers): - if to_dt is None: - to_dt = sorted_vers[i + 1][1][0] if i + 1 < len(sorted_vers) else frame_end - yield (line_id, ver), ValidBetween(from_date=from_dt, to_date=to_dt) + overrides = [(ver, from_dt, to_dt) for ver, from_dt, to_dt in raw if to_dt is not None] + + for ver, from_dt, to_dt in raw: + if to_dt is not None: + yield (line_id, ver), [ValidBetween(from_date=from_dt, to_date=to_dt)] + else: + idx = next(i for i, (v, _) in enumerate(defaults) if v == ver) + if idx + 1 < len(defaults): + default_end = _xml_dt(defaults[idx + 1][1].to_datetime() - timedelta(seconds=1)) + else: + default_end = frame_end + override_windows = [(od_from, od_to) for _, od_from, od_to in overrides] + segments = _compute_default_segments(from_dt, default_end, override_windows) + if segments: + yield (line_id, ver), segments def _make_calendar_objects( @@ -156,12 +207,12 @@ def _process_journey( day_type_assignments: dict[str, DayTypeAssignment], uic_periods: dict[str, UicOperatingPeriod], new_objects: list, - created: dict[tuple[str, str | None, str], str], + created: dict[tuple[str, str, str], str], ) -> list[DayTypeRef]: new_refs: list[DayTypeRef] = [] for dt_ref in journey.day_types.day_type_ref: # type: ignore[union-attr] existing_dt_id = dt_ref.ref - cache_key = (line_id, line_version, existing_dt_id) + cache_key = (line_id, safe_version, existing_dt_id) if cache_key in created: new_refs.append(DayTypeRef(ref=created[cache_key], version='1')) @@ -187,37 +238,42 @@ def _resolve_journeys( routes: defaultdict[str, list[Route]], sjps: defaultdict[str, list[ServiceJourneyPattern]], journeys: defaultdict[str, list[ServiceJourney]], - version_validity: dict[tuple[str, str | None], ValidBetween], + version_validity: dict[tuple[str, str | None], list[ValidBetween]], day_type_assignments: dict[str, DayTypeAssignment], uic_periods: dict[str, UicOperatingPeriod], ) -> tuple[list[DayType | UicOperatingPeriod | OperatingPeriod | DayTypeAssignment], list[ServiceJourney]]: new_objects: list[DayType | UicOperatingPeriod | OperatingPeriod | DayTypeAssignment] = [] updated_journeys: list[ServiceJourney] = [] - created: dict[tuple[str, str | None, str], str] = {} + created: dict[tuple[str, str, str], str] = {} for line_id, line_versions in lines.items(): if len(line_versions) <= 1: continue for route in routes.get(line_id, []): - validity = version_validity.get((line_id, route.line_ref.version)) - if validity is None: + validities = version_validity.get((line_id, route.line_ref.version)) + if not validities: print(f" Skipping route {route.id}: no validity for line {line_id} v={route.line_ref.version}") continue - safe_version = (route.line_ref.version or 'unknown').replace(':', '_') + base_safe = (route.line_ref.version or 'unknown').replace(':', '_') for sjp in sjps.get(route.id, []): for journey in journeys.get(sjp.id, []): if journey.day_types is None: continue - new_refs = _process_journey( - journey, line_id, route.line_ref.version, safe_version, - validity, day_type_assignments, uic_periods, new_objects, created, - ) - if new_refs: - journey.day_types = DayTypeRefsRelStructure(day_type_ref=new_refs) + all_new_refs: list[DayTypeRef] = [] + for seg_idx, validity in enumerate(validities): + safe_version = f"{base_safe}_{seg_idx}" + new_refs = _process_journey( + journey, line_id, route.line_ref.version, safe_version, + validity, day_type_assignments, uic_periods, new_objects, created, + ) + all_new_refs.extend(new_refs) + + if all_new_refs: + journey.day_types = DayTypeRefsRelStructure(day_type_ref=all_new_refs) updated_journeys.append(journey) return new_objects, updated_journeys