From 11f75aa21556145c77c9ffca12ced3baaad1217c Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 13 May 2026 18:24:57 +0100 Subject: [PATCH 1/3] feat: switch to pydantic models for instrumentation metrics Migrate from Python dataclasses to Pydantic models for instrumentation metric reports, with a central `instrumentation/records.py` file containing each of the definition. In a future commit, this will allow us to easily make a statically typed Pydantic model for a full instrumentation report of the combined results. To promote better typing, this also changes the `SchedulerInstrumentation` interface such that you can now separately query the collected scheduler and job data, to provide a slightly more intuitive interface. This change is then propagated across the various built-in instrumentation types. Signed-off-by: Alex Jones --- src/dvsim/instrumentation/__init__.py | 36 +----- src/dvsim/instrumentation/base.py | 62 +++-------- src/dvsim/instrumentation/metadata.py | 55 +++------- src/dvsim/instrumentation/records.py | 145 +++++++++++++++++++++++++ src/dvsim/instrumentation/resources.py | 130 ++++++++-------------- src/dvsim/instrumentation/timing.py | 96 +++++----------- 6 files changed, 251 insertions(+), 273 deletions(-) create mode 100644 src/dvsim/instrumentation/records.py diff --git a/src/dvsim/instrumentation/__init__.py b/src/dvsim/instrumentation/__init__.py index e1cceb38..962d11ed 100644 --- a/src/dvsim/instrumentation/__init__.py +++ b/src/dvsim/instrumentation/__init__.py @@ -4,44 +4,18 @@ """DVSim Scheduler Instrumentation.""" -from dvsim.instrumentation.base import ( - InstrumentationAggregator, - InstrumentationFragment, - InstrumentationFragments, - JobFragment, - SchedulerFragment, - SchedulerInstrumentation, -) +from dvsim.instrumentation.base import InstrumentationAggregator, SchedulerInstrumentation from dvsim.instrumentation.factory import InstrumentationFactory -from dvsim.instrumentation.metadata import MetadataInstrumentation, MetadataJobFragment -from dvsim.instrumentation.resources import ( - ResourceInstrumentation, - ResourceJobFragment, - ResourceSchedulerFragment, -) +from dvsim.instrumentation.records import InstrumentationMetrics, JobMetrics, SchedulerMetrics from dvsim.instrumentation.runtime import flush, get, set_instrumentation, set_report_path -from dvsim.instrumentation.timing import ( - TimingInstrumentation, - TimingJobFragment, - TimingSchedulerFragment, -) __all__ = ( "InstrumentationAggregator", "InstrumentationFactory", - "InstrumentationFragment", - "InstrumentationFragments", - "JobFragment", - "MetadataInstrumentation", - "MetadataJobFragment", - "ResourceInstrumentation", - "ResourceJobFragment", - "ResourceSchedulerFragment", - "SchedulerFragment", + "InstrumentationMetrics", + "JobMetrics", "SchedulerInstrumentation", - "TimingInstrumentation", - "TimingJobFragment", - "TimingSchedulerFragment", + "SchedulerMetrics", "flush", "get", "set_instrumentation", diff --git a/src/dvsim/instrumentation/base.py b/src/dvsim/instrumentation/base.py index c9ccd933..0f850f49 100644 --- a/src/dvsim/instrumentation/base.py +++ b/src/dvsim/instrumentation/base.py @@ -4,10 +4,10 @@ """DVSim scheduler instrumentation base classes.""" -from collections.abc import Iterable, Sequence -from dataclasses import asdict, dataclass -from typing import Any, TypeAlias +from collections.abc import Iterable, Mapping +from typing import Any +from dvsim.instrumentation.records import JobMetrics, SchedulerMetrics from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus from dvsim.logging import log @@ -15,40 +15,10 @@ __all__ = ( "InstrumentationAggregator", - "InstrumentationFragment", - "InstrumentationFragments", - "JobFragment", - "SchedulerFragment", "SchedulerInstrumentation", ) -@dataclass -class InstrumentationFragment: - """Base class for instrumentation reports / report fragments.""" - - def to_dict(self) -> dict[str, Any]: - """Convert the report fragment to a dictionary.""" - return asdict(self) - - -@dataclass -class SchedulerFragment(InstrumentationFragment): - """Base class for instrumentation report fragments related to the scheduler.""" - - -@dataclass -class JobFragment(InstrumentationFragment): - """Base class for instrumentation report fragments related to individual jobs.""" - - job: JobSpec - - -# Each instrumentation object can report any number of information fragments about the -# scheduler and about its jobs. -InstrumentationFragments: TypeAlias = tuple[Sequence[SchedulerFragment], Sequence[JobFragment]] - - class SchedulerInstrumentation: """Instrumentation for the scheduler. @@ -89,10 +59,14 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: # noqa """Notify instrumentation of a change in status for some scheduled job.""" return - def build_report_fragments(self) -> InstrumentationFragments | None: - """Build report fragments from the collected instrumentation information.""" + def get_scheduler_data(self) -> SchedulerMetrics | None: + """Retrieve scheduler metrics measured by this instrumentation.""" return None + def get_job_data(self) -> Mapping[str, JobMetrics]: + """Retrieve per-job metrics measured by this instrumentation.""" + return {} + class InstrumentationAggregator: """Aggregator for scheduler instrumentation collection, composing multiple instrumentations.""" @@ -126,11 +100,10 @@ def collect(self) -> dict[str, Any]: job_fragments = [] for inst in self._instrumentations: - fragments = inst.build_report_fragments() - if fragments is None: - continue - scheduler_fragments += fragments[0] - job_fragments += fragments[1] + scheduler_data = inst.get_scheduler_data() + if scheduler_data is not None: + scheduler_fragments.append(scheduler_data) + job_fragments += inst.get_job_data().items() log.info("Finished collecting instrumentation data. Merging instrumentation data...") @@ -139,19 +112,16 @@ def collect(self) -> dict[str, Any]: log.debug( "Merging instrumentation report scheduler data (%d/%d)", i, len(scheduler_fragments) ) - scheduler.update(scheduler_frag.to_dict()) + scheduler.update(scheduler_frag.model_dump()) jobs: dict[tuple[str, str], dict[str, Any]] = {} - for i, job_frag in enumerate(job_fragments, start=1): + for i, (job_id, job_frag) in enumerate(job_fragments, start=1): log.debug("Merging instrumentation report job data (%d/%d)", i, len(job_fragments)) - spec = job_frag.job - # We can uniquely identify jobs from the combination of their full name & target - job_id = (spec.full_name, spec.target) job = jobs.get(job_id) if job is None: job = {} jobs[job_id] = job - job.update({k: v for k, v in job_frag.to_dict().items() if k != "job"}) + job.update({k: v for k, v in job_frag.model_dump().items() if k != "job"}) log.info("Finished merging instrumentation report data.") return {"scheduler": scheduler, "jobs": list(jobs.values())} diff --git a/src/dvsim/instrumentation/metadata.py b/src/dvsim/instrumentation/metadata.py index da754a86..13f0020b 100644 --- a/src/dvsim/instrumentation/metadata.py +++ b/src/dvsim/instrumentation/metadata.py @@ -4,34 +4,14 @@ """DVSim scheduler instrumentation metadata (to be included in the generated report).""" -from dataclasses import dataclass +from collections.abc import Mapping -from dvsim.instrumentation.base import ( - InstrumentationFragments, - JobFragment, - SchedulerInstrumentation, -) +from dvsim.instrumentation.base import SchedulerInstrumentation +from dvsim.instrumentation.records import JobInstrumentationMetadata from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus -__all__ = ( - "MetadataInstrumentation", - "MetadataJobFragment", -) - - -@dataclass -class MetadataJobFragment(JobFragment): - """Instrumentation metadata for scheduled jobs, reporting the final status of the job.""" - - name: str - full_name: str - job_type: str - target: str - tool: str - backend: str | None - dependencies: list[str] - status: str +__all__ = ("MetadataInstrumentation",) class MetadataInstrumentation(SchedulerInstrumentation): @@ -51,20 +31,17 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: status_str = status.name.capitalize() self._jobs[job.id] = (job, status_str) - def build_report_fragments(self) -> InstrumentationFragments | None: - """Build report fragments from the collected instrumentation information.""" - jobs = [ - MetadataJobFragment( - spec, - spec.name, - spec.full_name, - spec.job_type, - spec.target, - spec.tool.name, - spec.backend, - spec.dependencies, - status_str, + def get_job_data(self) -> Mapping[str, JobInstrumentationMetadata]: + """Retrieve per-job metrics measured by this instrumentation.""" + return { + spec.id: JobInstrumentationMetadata( + name=spec.name, + job_type=spec.job_type, + target=spec.target, + tool=spec.tool.name, + backend=spec.backend, + dependencies=list(spec.dependencies), + status=status_str, ) for spec, status_str in self._jobs.values() - ] - return ([], jobs) + } diff --git a/src/dvsim/instrumentation/records.py b/src/dvsim/instrumentation/records.py new file mode 100644 index 00000000..cf109b25 --- /dev/null +++ b/src/dvsim/instrumentation/records.py @@ -0,0 +1,145 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation output (metric) record models.""" + +from typing import Any + +from pydantic import ( + BaseModel, + ConfigDict, + computed_field, + model_validator, +) + +__all__ = ( + "InstrumentationMetrics", + "JobInstrumentationMetadata", + "JobMetrics", + "JobResourceMetrics", + "JobTimingMetrics", + "SchedulerMetrics", + "SchedulerResourceMetrics", + "SchedulerTimingMetrics", +) + +# Base model classes + + +class InstrumentationMetrics(BaseModel): + """Base class for instrumentation metrics (report fragments).""" + + +class SchedulerMetrics(InstrumentationMetrics): + """Base class for instrumentation metrics related to the scheduler as a whole.""" + + +class JobMetrics(InstrumentationMetrics): + """Base class for instrumentation metrics related to a specific job.""" + + +class JobInstrumentationMetadata(JobMetrics): + """Instrumented metadata captured for a single scheduled job.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + name: str + job_type: str + target: str + tool: str + backend: str | None + dependencies: list[str] + status: str + + +# Timing metrics + + +class SchedulerTimingMetrics(SchedulerMetrics): + """Instrumented timing metrics measured for the scheduler as a whole.""" + + model_config = ConfigDict(frozen=True, extra="ignore") + + start_time: float | None = None + end_time: float | None = None + + @computed_field + @property + def duration(self) -> float | None: + """The duration of the entire scheduler run.""" + if self.start_time is None or self.end_time is None: + return None + return self.end_time - self.start_time + + @model_validator(mode="before") + @classmethod + def drop_computed_fields(cls, data: Any) -> Any: # noqa: ANN401 + """Drop any computed fields from input dicts before validating.""" + if isinstance(data, dict): + data = dict(data) + data.pop("duration", None) + return data + + +class JobTimingMetrics(JobMetrics): + """Instrumented timing metrics measured for a single scheduled job.""" + + model_config = ConfigDict(frozen=True, extra="ignore") + + start_time: float | None = None + end_time: float | None = None + + @computed_field + @property + def duration(self) -> float | None: + """The duration of the entire job run.""" + if self.start_time is None or self.end_time is None: + return None + return self.end_time - self.start_time + + @model_validator(mode="before") + @classmethod + def drop_computed_fields(cls, data: Any) -> Any: # noqa: ANN401 + """Drop any computed fields from input dicts before validating.""" + if isinstance(data, dict): + data = dict(data) + data.pop("duration", None) + return data + + +# Resource Metrics + + +class SchedulerResourceMetrics(SchedulerMetrics): + """Instrumented resource metrics measured for the scheduler as a whole.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + # Scheduler / DVSim process overhead + scheduler_max_rss_bytes: int | None = None + scheduler_avg_rss_bytes: int | None = None + scheduler_vms_bytes: int | None = None + scheduler_cpu_percent: float | None = None + scheduler_cpu_time: float | None = None + + # System-wide metrics + sys_max_rss_bytes: int | None = None + sys_avg_rss_bytes: int | None = None + sys_swap_used_bytes: int | None = None + sys_cpu_percent: float | None = None + sys_cpu_per_core: list[float] | None = None + + num_resource_samples: int = 0 + + +class JobResourceMetrics(JobMetrics): + """Instrumented resource metrics measured for a single scheduled job.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + max_rss_bytes: int | None = None + avg_rss_bytes: float | None = None + avg_cpu_percent: float | None = None + + num_resource_samples: int = 0 diff --git a/src/dvsim/instrumentation/resources.py b/src/dvsim/instrumentation/resources.py index 9c177b6e..fc9b522c 100644 --- a/src/dvsim/instrumentation/resources.py +++ b/src/dvsim/instrumentation/resources.py @@ -7,60 +7,16 @@ import os import threading import time -from dataclasses import dataclass +from collections.abc import Mapping import psutil -from dvsim.instrumentation.base import ( - InstrumentationFragments, - JobFragment, - SchedulerFragment, - SchedulerInstrumentation, -) +from dvsim.instrumentation.base import SchedulerInstrumentation +from dvsim.instrumentation.records import JobResourceMetrics, SchedulerResourceMetrics from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus -__all__ = ( - "ResourceInstrumentation", - "ResourceJobFragment", - "ResourceSchedulerFragment", -) - - -@dataclass -class ResourceSchedulerFragment(SchedulerFragment): - """Instrumented metrics about the scheduler reported by the `ResourceInstrumentation`.""" - - # Scheduler / DVSim process overhead - scheduler_max_rss_bytes: int | None = None - scheduler_avg_rss_bytes: int | None = None - scheduler_vms_bytes: int | None = None - scheduler_cpu_percent: float | None = None - scheduler_cpu_time: float | None = None - - # System-wide metrics - sys_max_rss_bytes: int | None = None - sys_avg_rss_bytes: int | None = None - sys_swap_used_bytes: int | None = None - sys_cpu_percent: float | None = None - sys_cpu_per_core: list[float] | None = None - - num_resource_samples: int = 0 - - -@dataclass -class ResourceJobFragment(JobFragment): - """Instrumented metrics about jobs reported by the `ResourceInstrumentation`. - - Since we can't directly measure each deployed job, these are instead averages and system - information over the course of the job's runtime. - """ - - max_rss_bytes: int | None = None - avg_rss_bytes: float | None = None - avg_cpu_percent: float | None = None - - num_resource_samples: int = 0 +__all__ = ("ResourceInstrumentation",) class JobResourceAggregate: @@ -69,14 +25,13 @@ class JobResourceAggregate: Tracks aggregate information over a number of samples whilst minimizing memory usage. """ - def __init__(self, job: JobSpec) -> None: + def __init__(self) -> None: """Construct an aggregate for storing sampling info for a given job specification. Arguments: job: The specification of the job which is having its information aggregated. """ - self.job_spec = job self.sample_count = 0 self.sum_rss = 0.0 self.max_rss = 0 @@ -89,13 +44,12 @@ def add_sample(self, rss: int, cpu: float) -> None: self.max_rss = max(self.max_rss, rss) self.sum_cpu += cpu - def finalize(self) -> ResourceJobFragment: + def finalize(self) -> JobResourceMetrics: """Finalize the aggregated information for a job, generating a report fragment.""" if self.sample_count == 0: - return ResourceJobFragment(self.job_spec) + return JobResourceMetrics() - return ResourceJobFragment( - self.job_spec, + return JobResourceMetrics( max_rss_bytes=self.max_rss, avg_rss_bytes=self.sum_rss / self.sample_count, avg_cpu_percent=self.sum_cpu / self.sample_count, @@ -221,45 +175,49 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: running = job.id in self._running_jobs started = running or job.id in self._finished_jobs if not started and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): - self._running_jobs[job.id] = JobResourceAggregate(job) + self._running_jobs[job.id] = JobResourceAggregate() running = True if running and status.is_terminal: aggregates = self._running_jobs.pop(job.id) self._finished_jobs[job.id] = aggregates - def build_report_fragments(self) -> InstrumentationFragments | None: - """Build report fragments from the collected instrumentation information.""" + def get_scheduler_data(self) -> SchedulerResourceMetrics: + """Retrieve scheduler metrics measured by this instrumentation.""" if self._running: raise RuntimeError("Cannot build instrumentation report whilst still running!") if self._sample_count <= 0: - scheduler_frag = ResourceSchedulerFragment() + return SchedulerResourceMetrics() + + scheduler_cpu_time = self._scheduler_cpu_time_end - self._scheduler_cpu_time_start + if self._num_cores is not None: + sys_cpu_per_core = [s / self._sample_count for s in self._sys_sum_cpu_per_core] else: - scheduler_cpu_time = self._scheduler_cpu_time_end - self._scheduler_cpu_time_start - if self._num_cores is not None: - sys_cpu_per_core = [s / self._sample_count for s in self._sys_sum_cpu_per_core] - else: - sys_cpu_per_core = None - try: - vms_bytes = round(self._scheduler_sum_vms / self._sample_count) - except (ValueError, TypeError): - # Suppress unknown types in VMS measurements - vms_bytes = None - - scheduler_frag = ResourceSchedulerFragment( - scheduler_avg_rss_bytes=round(self._scheduler_sum_rss / self._sample_count), - scheduler_max_rss_bytes=self._scheduler_max_rss, - scheduler_vms_bytes=vms_bytes, - scheduler_cpu_percent=self._scheduler_sum_cpu / self._sample_count, - scheduler_cpu_time=scheduler_cpu_time, - sys_max_rss_bytes=self._sys_max_rss, - sys_avg_rss_bytes=round(self._sys_sum_rss / self._sample_count), - sys_cpu_percent=self._sys_sum_cpu / self._sample_count, - sys_cpu_per_core=sys_cpu_per_core, - sys_swap_used_bytes=self._sys_max_swap, - num_resource_samples=self._sample_count, - ) - - aggregates = list(self._finished_jobs.values()) + list(self._running_jobs.values()) - job_frags = [aggregate.finalize() for aggregate in aggregates] - return ([scheduler_frag], job_frags) + sys_cpu_per_core = None + try: + vms_bytes = round(self._scheduler_sum_vms / self._sample_count) + except (ValueError, TypeError): + # Suppress unknown types in VMS measurements + vms_bytes = None + + return SchedulerResourceMetrics( + scheduler_avg_rss_bytes=round(self._scheduler_sum_rss / self._sample_count), + scheduler_max_rss_bytes=self._scheduler_max_rss, + scheduler_vms_bytes=vms_bytes, + scheduler_cpu_percent=self._scheduler_sum_cpu / self._sample_count, + scheduler_cpu_time=scheduler_cpu_time, + sys_max_rss_bytes=self._sys_max_rss, + sys_avg_rss_bytes=round(self._sys_sum_rss / self._sample_count), + sys_cpu_percent=self._sys_sum_cpu / self._sample_count, + sys_cpu_per_core=sys_cpu_per_core, + sys_swap_used_bytes=self._sys_max_swap, + num_resource_samples=self._sample_count, + ) + + def get_job_data(self) -> Mapping[str, JobResourceMetrics]: + """Retrieve per-job metrics measured by this instrumentation.""" + if self._running: + raise RuntimeError("Cannot build instrumentation report whilst still running!") + + aggregates = self._finished_jobs | self._running_jobs + return {job_id: aggregate.finalize() for job_id, aggregate in aggregates.items()} diff --git a/src/dvsim/instrumentation/timing.py b/src/dvsim/instrumentation/timing.py index 1542b0fe..2502ac20 100644 --- a/src/dvsim/instrumentation/timing.py +++ b/src/dvsim/instrumentation/timing.py @@ -5,69 +5,14 @@ """DVSim scheduler instrumentation for timing-related information.""" import time -from dataclasses import asdict, dataclass -from typing import Any +from collections.abc import Mapping -from dvsim.instrumentation.base import ( - InstrumentationFragments, - JobFragment, - SchedulerFragment, - SchedulerInstrumentation, -) +from dvsim.instrumentation.base import SchedulerInstrumentation +from dvsim.instrumentation.records import JobTimingMetrics, SchedulerTimingMetrics from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus -__all__ = ( - "TimingInstrumentation", - "TimingJobFragment", - "TimingSchedulerFragment", -) - - -@dataclass -class TimingSchedulerFragment(SchedulerFragment): - """Instrumented metrics about the scheduler reported by the `TimingInstrumentation`.""" - - start_time: float | None = None - end_time: float | None = None - - @property - def duration(self) -> float | None: - """The duration of the entire scheduler run.""" - if self.start_time is None or self.end_time is None: - return None - return self.end_time - self.start_time - - def to_dict(self) -> dict[str, Any]: - """Convert the scheduler metrics to a dictionary, including the `duration` property.""" - data = asdict(self) - duration = self.duration - if duration: - data["duration"] = duration - return data - - -@dataclass -class TimingJobFragment(JobFragment): - """Instrumented metrics about the scheduler reported by the `TimingInstrumentation`.""" - - start_time: float | None = None - end_time: float | None = None - - @property - def duration(self) -> float | None: - """The duration of the job.""" - if self.start_time is None or self.end_time is None: - return None - return self.end_time - self.start_time - - def to_dict(self) -> dict[str, Any]: - """Convert the job metrics to a dictionary, including the `duration` property.""" - data = asdict(self) - duration = self.duration - if duration: - data["duration"] = duration - return data +__all__ = ("TimingInstrumentation",) class TimingInstrumentation(SchedulerInstrumentation): @@ -80,29 +25,38 @@ class TimingInstrumentation(SchedulerInstrumentation): def __init__(self) -> None: """Construct a `TimingInstrumentation`.""" super().__init__() - self._scheduler = TimingSchedulerFragment() - self._jobs: dict[str, TimingJobFragment] = {} + self._start_time: float | None = None + self._end_time: float | None = None + self._jobs: dict[str, tuple[float | None, float | None]] = {} def on_scheduler_start(self) -> None: """Notify instrumentation that the scheduler has begun.""" - self._scheduler.start_time = time.perf_counter() + self._start_time = time.perf_counter() def on_scheduler_end(self) -> None: """Notify instrumentation that the scheduler has finished.""" - self._scheduler.end_time = time.perf_counter() + self._end_time = time.perf_counter() def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: """Notify instrumentation of a change in status for some scheduled job.""" job_info = self._jobs.get(job.id) if job_info is None: - job_info = TimingJobFragment(job) + job_info = (None, None) self._jobs[job.id] = job_info + start_time, end_time = job_info - if job_info.start_time is None and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): - job_info.start_time = time.perf_counter() + if start_time is None and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): + self._jobs[job.id] = (time.perf_counter(), end_time) if status.is_terminal: - job_info.end_time = time.perf_counter() - - def build_report_fragments(self) -> InstrumentationFragments | None: - """Build report fragments from the collected instrumentation information.""" - return ([self._scheduler], list(self._jobs.values())) + self._jobs[job.id] = (start_time, time.perf_counter()) + + def get_scheduler_data(self) -> SchedulerTimingMetrics: + """Retrieve scheduler metrics measured by this instrumentation.""" + return SchedulerTimingMetrics(start_time=self._start_time, end_time=self._end_time) + + def get_job_data(self) -> Mapping[str, JobTimingMetrics]: + """Retrieve per-job metrics measured by this instrumentation.""" + return { + job_id: JobTimingMetrics(start_time=start, end_time=end) + for job_id, (start, end) in self._jobs.items() + } From 8b321f029ccd9a7cf2c7fd52271109767034a392 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 13 May 2026 18:25:52 +0100 Subject: [PATCH 2/3] refactor: introduce explicit instrumentation name field Rather than just relying on the class `__name__`, we will want a well- defined field that matches the various fields we are using within our Pydantic model. This will allow plugins to declare a 'name' that can then be queried to get their specific metrics. Note that there is now some overlap with the name that is used in the instrumentation registry/factory, but that will be addressed in a future commit. Signed-off-by: Alex Jones --- src/dvsim/instrumentation/base.py | 5 +---- src/dvsim/instrumentation/metadata.py | 2 ++ src/dvsim/instrumentation/resources.py | 2 ++ src/dvsim/instrumentation/timing.py | 2 ++ 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/dvsim/instrumentation/base.py b/src/dvsim/instrumentation/base.py index 0f850f49..9ed4aff0 100644 --- a/src/dvsim/instrumentation/base.py +++ b/src/dvsim/instrumentation/base.py @@ -26,10 +26,7 @@ class SchedulerInstrumentation: behavioural metrics for analysis. """ - @property - def name(self) -> str: - """The name to use to refer to this instrumentation mechanism.""" - return self.__class__.__name__ + name: str = "" def start(self) -> None: """Begin instrumentation, starting whatever is needed before the scheduler is run.""" diff --git a/src/dvsim/instrumentation/metadata.py b/src/dvsim/instrumentation/metadata.py index 13f0020b..1b1e549b 100644 --- a/src/dvsim/instrumentation/metadata.py +++ b/src/dvsim/instrumentation/metadata.py @@ -21,6 +21,8 @@ class MetadataInstrumentation(SchedulerInstrumentation): part of the instrumentation report for analysis, regardless of other instrumentations. """ + name = "meta" + def __init__(self) -> None: """Construct a `MetadataInstrumentation`.""" super().__init__() diff --git a/src/dvsim/instrumentation/resources.py b/src/dvsim/instrumentation/resources.py index fc9b522c..6f7d2621 100644 --- a/src/dvsim/instrumentation/resources.py +++ b/src/dvsim/instrumentation/resources.py @@ -69,6 +69,8 @@ class ResourceInstrumentation(SchedulerInstrumentation): of the samples that fall within that job's execution window. """ + name = "resources" + def __init__(self, sample_interval: float = 0.5) -> None: """Construct a resource instrumentation. diff --git a/src/dvsim/instrumentation/timing.py b/src/dvsim/instrumentation/timing.py index 2502ac20..01c1293d 100644 --- a/src/dvsim/instrumentation/timing.py +++ b/src/dvsim/instrumentation/timing.py @@ -22,6 +22,8 @@ class TimingInstrumentation(SchedulerInstrumentation): all of the jobs that it dispatches. """ + name = "timing" + def __init__(self) -> None: """Construct a `TimingInstrumentation`.""" super().__init__() From c9bbf0aabeb7a0f8e474a6510697107727451d68 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Wed, 13 May 2026 18:29:27 +0100 Subject: [PATCH 3/3] feat: add a pydantic model for the combined instrumentation results Get rid of the existing logic that collapsed everything to arbitrary dictionaries and ensure that the merged instrumentation report is a well-formed Pydantic model that can then be confidently queried, with extra attributes (instrumentation records) still allowed to support custom instrumentations registered via e.g. plugins. Note that there is slightly less flattening on the produced JSONs (this is a design change) - each independent job (and the scheduler itself) now contains many dictionaries, with one per instrumentation, to effectively scope each instrumentation's contribution to the final report. This doesn't impact readability too much and will importantly allow us to load the reports back in the future. The intention here is that built-in instrumentation types have a well-defined field in the report model that can be queried, whereas any additional plugin data is still made available through the Pydantic `extra="allow"` configuration option. Signed-off-by: Alex Jones --- src/dvsim/instrumentation/__init__.py | 8 +++- src/dvsim/instrumentation/base.py | 58 +++++++++++++-------------- src/dvsim/instrumentation/records.py | 35 ++++++++++++++++ src/dvsim/instrumentation/runtime.py | 14 +++---- 4 files changed, 77 insertions(+), 38 deletions(-) diff --git a/src/dvsim/instrumentation/__init__.py b/src/dvsim/instrumentation/__init__.py index 962d11ed..1f13f8d2 100644 --- a/src/dvsim/instrumentation/__init__.py +++ b/src/dvsim/instrumentation/__init__.py @@ -6,13 +6,19 @@ from dvsim.instrumentation.base import InstrumentationAggregator, SchedulerInstrumentation from dvsim.instrumentation.factory import InstrumentationFactory -from dvsim.instrumentation.records import InstrumentationMetrics, JobMetrics, SchedulerMetrics +from dvsim.instrumentation.records import ( + InstrumentationMetrics, + InstrumentationResults, + JobMetrics, + SchedulerMetrics, +) from dvsim.instrumentation.runtime import flush, get, set_instrumentation, set_report_path __all__ = ( "InstrumentationAggregator", "InstrumentationFactory", "InstrumentationMetrics", + "InstrumentationResults", "JobMetrics", "SchedulerInstrumentation", "SchedulerMetrics", diff --git a/src/dvsim/instrumentation/base.py b/src/dvsim/instrumentation/base.py index 9ed4aff0..255e7381 100644 --- a/src/dvsim/instrumentation/base.py +++ b/src/dvsim/instrumentation/base.py @@ -4,10 +4,16 @@ """DVSim scheduler instrumentation base classes.""" +from collections import defaultdict from collections.abc import Iterable, Mapping -from typing import Any -from dvsim.instrumentation.records import JobMetrics, SchedulerMetrics +from dvsim.instrumentation.records import ( + InstrumentationResults, + JobInstrumentationResults, + JobMetrics, + SchedulerInstrumentationResults, + SchedulerMetrics, +) from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus from dvsim.logging import log @@ -89,36 +95,28 @@ def stop(self) -> None: for inst in self._instrumentations: inst.stop() - def collect(self) -> dict[str, Any]: + def collect(self) -> InstrumentationResults: """Collect all gathered instrumentation data from the wrapped objects.""" - log.info("Collecting instrumentation data...") - - scheduler_fragments = [] - job_fragments = [] - - for inst in self._instrumentations: - scheduler_data = inst.get_scheduler_data() - if scheduler_data is not None: - scheduler_fragments.append(scheduler_data) - job_fragments += inst.get_job_data().items() + log.info("Collecting instrumentation report data...") - log.info("Finished collecting instrumentation data. Merging instrumentation data...") + scheduler_metrics: dict[str, SchedulerMetrics] = {} + job_metrics: dict[str, dict[str, JobMetrics]] = defaultdict(dict) - scheduler: dict[str, Any] = {} - for i, scheduler_frag in enumerate(scheduler_fragments, start=1): + for i, inst in enumerate(self._instrumentations, start=1): log.debug( - "Merging instrumentation report scheduler data (%d/%d)", i, len(scheduler_fragments) + "Collecting instrumentation report data (%d/%d)", i, len(self._instrumentations) ) - scheduler.update(scheduler_frag.model_dump()) - - jobs: dict[tuple[str, str], dict[str, Any]] = {} - for i, (job_id, job_frag) in enumerate(job_fragments, start=1): - log.debug("Merging instrumentation report job data (%d/%d)", i, len(job_fragments)) - job = jobs.get(job_id) - if job is None: - job = {} - jobs[job_id] = job - job.update({k: v for k, v in job_frag.model_dump().items() if k != "job"}) - - log.info("Finished merging instrumentation report data.") - return {"scheduler": scheduler, "jobs": list(jobs.values())} + scheduler_record = inst.get_scheduler_data() + if scheduler_record is not None: + scheduler_metrics[inst.name] = scheduler_record + for job_id, job_record in inst.get_job_data().items(): + job_metrics[job_id][inst.name] = job_record + + log.info("Finished collecting instrumentation report data.") + return InstrumentationResults( + scheduler=SchedulerInstrumentationResults(**scheduler_metrics), # type: ignore[reportArgumentType] + jobs={ + job_id: JobInstrumentationResults(**job_data) # type: ignore[reportArgumentType] + for job_id, job_data in job_metrics.items() + }, + ) diff --git a/src/dvsim/instrumentation/records.py b/src/dvsim/instrumentation/records.py index cf109b25..b29089f9 100644 --- a/src/dvsim/instrumentation/records.py +++ b/src/dvsim/instrumentation/records.py @@ -9,16 +9,20 @@ from pydantic import ( BaseModel, ConfigDict, + Field, computed_field, model_validator, ) __all__ = ( "InstrumentationMetrics", + "InstrumentationResults", "JobInstrumentationMetadata", + "JobInstrumentationResults", "JobMetrics", "JobResourceMetrics", "JobTimingMetrics", + "SchedulerInstrumentationResults", "SchedulerMetrics", "SchedulerResourceMetrics", "SchedulerTimingMetrics", @@ -143,3 +147,34 @@ class JobResourceMetrics(JobMetrics): avg_cpu_percent: float | None = None num_resource_samples: int = 0 + + +# Combined output reports + + +class SchedulerInstrumentationResults(BaseModel): + """Aggregated instrumentation report data about the scheduler as a whole.""" + + model_config = ConfigDict(frozen=True, extra="allow") + + timing: SchedulerTimingMetrics | None = None + resources: SchedulerResourceMetrics | None = None + + +class JobInstrumentationResults(BaseModel): + """Aggregated instrumentation report data about a single scheduled job.""" + + model_config = ConfigDict(frozen=True, extra="allow") + + meta: JobInstrumentationMetadata | None = None + timing: JobTimingMetrics | None = None + resources: JobResourceMetrics | None = None + + +class InstrumentationResults(BaseModel): + """A complete aggregated instrumentation report with data about the scheduler and all jobs.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + scheduler: SchedulerInstrumentationResults + jobs: dict[str, JobInstrumentationResults] = Field(default_factory=dict) diff --git a/src/dvsim/instrumentation/runtime.py b/src/dvsim/instrumentation/runtime.py index ed114881..d7447e6b 100644 --- a/src/dvsim/instrumentation/runtime.py +++ b/src/dvsim/instrumentation/runtime.py @@ -4,11 +4,9 @@ """DVSim scheduler instrumentation for timing-related information.""" -import json from pathlib import Path -from typing import Any -from dvsim.instrumentation.base import InstrumentationAggregator +from dvsim.instrumentation.base import InstrumentationAggregator, InstrumentationResults from dvsim.logging import log __all__ = ( @@ -24,7 +22,7 @@ class _Runtime: def __init__(self) -> None: self.instrumentation: InstrumentationAggregator | None = None self.report_path: Path | None = None - self.report: dict[str, Any] | None = None + self.report: InstrumentationResults | None = None _runtime = _Runtime() @@ -45,7 +43,7 @@ def get() -> InstrumentationAggregator | None: return _runtime.instrumentation -def flush() -> dict[str, Any] | None: +def flush() -> InstrumentationResults | None: """Dump the instrumentation report as JSON to the configured report path.""" if _runtime.instrumentation is None: return None @@ -58,7 +56,9 @@ def flush() -> dict[str, Any] | None: raise ValueError("Metric report path cannot be a directory.") try: _runtime.report_path.parent.mkdir(parents=True, exist_ok=True) - _runtime.report_path.write_text(json.dumps(_runtime.report, indent=2)) + _runtime.report_path.write_text( + _runtime.report.model_dump_json(indent=2, exclude_none=True) + ) log.info("JSON instrumentation report dumped to: %s", str(_runtime.report_path)) except (OSError, FileNotFoundError) as e: log.error( @@ -68,6 +68,6 @@ def flush() -> dict[str, Any] | None: return _runtime.report -def get_report() -> dict[str, Any] | None: +def get_report() -> InstrumentationResults | None: """Get the latest flushed instrumentation report contents, if any exist.""" return _runtime.report