diff --git a/src/dvsim/instrumentation/__init__.py b/src/dvsim/instrumentation/__init__.py index e1cceb38..1f13f8d2 100644 --- a/src/dvsim/instrumentation/__init__.py +++ b/src/dvsim/instrumentation/__init__.py @@ -4,44 +4,24 @@ """DVSim Scheduler Instrumentation.""" -from dvsim.instrumentation.base import ( - InstrumentationAggregator, - InstrumentationFragment, - InstrumentationFragments, - JobFragment, - SchedulerFragment, - SchedulerInstrumentation, -) +from dvsim.instrumentation.base import InstrumentationAggregator, SchedulerInstrumentation from dvsim.instrumentation.factory import InstrumentationFactory -from dvsim.instrumentation.metadata import MetadataInstrumentation, MetadataJobFragment -from dvsim.instrumentation.resources import ( - ResourceInstrumentation, - ResourceJobFragment, - ResourceSchedulerFragment, +from dvsim.instrumentation.records import ( + InstrumentationMetrics, + InstrumentationResults, + JobMetrics, + SchedulerMetrics, ) from dvsim.instrumentation.runtime import flush, get, set_instrumentation, set_report_path -from dvsim.instrumentation.timing import ( - TimingInstrumentation, - TimingJobFragment, - TimingSchedulerFragment, -) __all__ = ( "InstrumentationAggregator", "InstrumentationFactory", - "InstrumentationFragment", - "InstrumentationFragments", - "JobFragment", - "MetadataInstrumentation", - "MetadataJobFragment", - "ResourceInstrumentation", - "ResourceJobFragment", - "ResourceSchedulerFragment", - "SchedulerFragment", + "InstrumentationMetrics", + "InstrumentationResults", + "JobMetrics", "SchedulerInstrumentation", - "TimingInstrumentation", - "TimingJobFragment", - "TimingSchedulerFragment", + "SchedulerMetrics", "flush", "get", "set_instrumentation", diff --git a/src/dvsim/instrumentation/base.py b/src/dvsim/instrumentation/base.py index c9ccd933..255e7381 100644 --- a/src/dvsim/instrumentation/base.py +++ b/src/dvsim/instrumentation/base.py @@ -4,10 +4,16 @@ """DVSim scheduler instrumentation base classes.""" -from collections.abc import Iterable, Sequence -from dataclasses import asdict, dataclass -from typing import Any, TypeAlias - +from collections import defaultdict +from collections.abc import Iterable, Mapping + +from dvsim.instrumentation.records import ( + InstrumentationResults, + JobInstrumentationResults, + JobMetrics, + SchedulerInstrumentationResults, + SchedulerMetrics, +) from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus from dvsim.logging import log @@ -15,40 +21,10 @@ __all__ = ( "InstrumentationAggregator", - "InstrumentationFragment", - "InstrumentationFragments", - "JobFragment", - "SchedulerFragment", "SchedulerInstrumentation", ) -@dataclass -class InstrumentationFragment: - """Base class for instrumentation reports / report fragments.""" - - def to_dict(self) -> dict[str, Any]: - """Convert the report fragment to a dictionary.""" - return asdict(self) - - -@dataclass -class SchedulerFragment(InstrumentationFragment): - """Base class for instrumentation report fragments related to the scheduler.""" - - -@dataclass -class JobFragment(InstrumentationFragment): - """Base class for instrumentation report fragments related to individual jobs.""" - - job: JobSpec - - -# Each instrumentation object can report any number of information fragments about the -# scheduler and about its jobs. -InstrumentationFragments: TypeAlias = tuple[Sequence[SchedulerFragment], Sequence[JobFragment]] - - class SchedulerInstrumentation: """Instrumentation for the scheduler. @@ -56,10 +32,7 @@ class SchedulerInstrumentation: behavioural metrics for analysis. """ - @property - def name(self) -> str: - """The name to use to refer to this instrumentation mechanism.""" - return self.__class__.__name__ + name: str = "" def start(self) -> None: """Begin instrumentation, starting whatever is needed before the scheduler is run.""" @@ -89,10 +62,14 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: # noqa """Notify instrumentation of a change in status for some scheduled job.""" return - def build_report_fragments(self) -> InstrumentationFragments | None: - """Build report fragments from the collected instrumentation information.""" + def get_scheduler_data(self) -> SchedulerMetrics | None: + """Retrieve scheduler metrics measured by this instrumentation.""" return None + def get_job_data(self) -> Mapping[str, JobMetrics]: + """Retrieve per-job metrics measured by this instrumentation.""" + return {} + class InstrumentationAggregator: """Aggregator for scheduler instrumentation collection, composing multiple instrumentations.""" @@ -118,40 +95,28 @@ def stop(self) -> None: for inst in self._instrumentations: inst.stop() - def collect(self) -> dict[str, Any]: + def collect(self) -> InstrumentationResults: """Collect all gathered instrumentation data from the wrapped objects.""" - log.info("Collecting instrumentation data...") - - scheduler_fragments = [] - job_fragments = [] - - for inst in self._instrumentations: - fragments = inst.build_report_fragments() - if fragments is None: - continue - scheduler_fragments += fragments[0] - job_fragments += fragments[1] + log.info("Collecting instrumentation report data...") - log.info("Finished collecting instrumentation data. Merging instrumentation data...") + scheduler_metrics: dict[str, SchedulerMetrics] = {} + job_metrics: dict[str, dict[str, JobMetrics]] = defaultdict(dict) - scheduler: dict[str, Any] = {} - for i, scheduler_frag in enumerate(scheduler_fragments, start=1): + for i, inst in enumerate(self._instrumentations, start=1): log.debug( - "Merging instrumentation report scheduler data (%d/%d)", i, len(scheduler_fragments) + "Collecting instrumentation report data (%d/%d)", i, len(self._instrumentations) ) - scheduler.update(scheduler_frag.to_dict()) - - jobs: dict[tuple[str, str], dict[str, Any]] = {} - for i, job_frag in enumerate(job_fragments, start=1): - log.debug("Merging instrumentation report job data (%d/%d)", i, len(job_fragments)) - spec = job_frag.job - # We can uniquely identify jobs from the combination of their full name & target - job_id = (spec.full_name, spec.target) - job = jobs.get(job_id) - if job is None: - job = {} - jobs[job_id] = job - job.update({k: v for k, v in job_frag.to_dict().items() if k != "job"}) - - log.info("Finished merging instrumentation report data.") - return {"scheduler": scheduler, "jobs": list(jobs.values())} + scheduler_record = inst.get_scheduler_data() + if scheduler_record is not None: + scheduler_metrics[inst.name] = scheduler_record + for job_id, job_record in inst.get_job_data().items(): + job_metrics[job_id][inst.name] = job_record + + log.info("Finished collecting instrumentation report data.") + return InstrumentationResults( + scheduler=SchedulerInstrumentationResults(**scheduler_metrics), # type: ignore[reportArgumentType] + jobs={ + job_id: JobInstrumentationResults(**job_data) # type: ignore[reportArgumentType] + for job_id, job_data in job_metrics.items() + }, + ) diff --git a/src/dvsim/instrumentation/metadata.py b/src/dvsim/instrumentation/metadata.py index da754a86..1b1e549b 100644 --- a/src/dvsim/instrumentation/metadata.py +++ b/src/dvsim/instrumentation/metadata.py @@ -4,34 +4,14 @@ """DVSim scheduler instrumentation metadata (to be included in the generated report).""" -from dataclasses import dataclass +from collections.abc import Mapping -from dvsim.instrumentation.base import ( - InstrumentationFragments, - JobFragment, - SchedulerInstrumentation, -) +from dvsim.instrumentation.base import SchedulerInstrumentation +from dvsim.instrumentation.records import JobInstrumentationMetadata from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus -__all__ = ( - "MetadataInstrumentation", - "MetadataJobFragment", -) - - -@dataclass -class MetadataJobFragment(JobFragment): - """Instrumentation metadata for scheduled jobs, reporting the final status of the job.""" - - name: str - full_name: str - job_type: str - target: str - tool: str - backend: str | None - dependencies: list[str] - status: str +__all__ = ("MetadataInstrumentation",) class MetadataInstrumentation(SchedulerInstrumentation): @@ -41,6 +21,8 @@ class MetadataInstrumentation(SchedulerInstrumentation): part of the instrumentation report for analysis, regardless of other instrumentations. """ + name = "meta" + def __init__(self) -> None: """Construct a `MetadataInstrumentation`.""" super().__init__() @@ -51,20 +33,17 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: status_str = status.name.capitalize() self._jobs[job.id] = (job, status_str) - def build_report_fragments(self) -> InstrumentationFragments | None: - """Build report fragments from the collected instrumentation information.""" - jobs = [ - MetadataJobFragment( - spec, - spec.name, - spec.full_name, - spec.job_type, - spec.target, - spec.tool.name, - spec.backend, - spec.dependencies, - status_str, + def get_job_data(self) -> Mapping[str, JobInstrumentationMetadata]: + """Retrieve per-job metrics measured by this instrumentation.""" + return { + spec.id: JobInstrumentationMetadata( + name=spec.name, + job_type=spec.job_type, + target=spec.target, + tool=spec.tool.name, + backend=spec.backend, + dependencies=list(spec.dependencies), + status=status_str, ) for spec, status_str in self._jobs.values() - ] - return ([], jobs) + } diff --git a/src/dvsim/instrumentation/records.py b/src/dvsim/instrumentation/records.py new file mode 100644 index 00000000..b29089f9 --- /dev/null +++ b/src/dvsim/instrumentation/records.py @@ -0,0 +1,180 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation output (metric) record models.""" + +from typing import Any + +from pydantic import ( + BaseModel, + ConfigDict, + Field, + computed_field, + model_validator, +) + +__all__ = ( + "InstrumentationMetrics", + "InstrumentationResults", + "JobInstrumentationMetadata", + "JobInstrumentationResults", + "JobMetrics", + "JobResourceMetrics", + "JobTimingMetrics", + "SchedulerInstrumentationResults", + "SchedulerMetrics", + "SchedulerResourceMetrics", + "SchedulerTimingMetrics", +) + +# Base model classes + + +class InstrumentationMetrics(BaseModel): + """Base class for instrumentation metrics (report fragments).""" + + +class SchedulerMetrics(InstrumentationMetrics): + """Base class for instrumentation metrics related to the scheduler as a whole.""" + + +class JobMetrics(InstrumentationMetrics): + """Base class for instrumentation metrics related to a specific job.""" + + +class JobInstrumentationMetadata(JobMetrics): + """Instrumented metadata captured for a single scheduled job.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + name: str + job_type: str + target: str + tool: str + backend: str | None + dependencies: list[str] + status: str + + +# Timing metrics + + +class SchedulerTimingMetrics(SchedulerMetrics): + """Instrumented timing metrics measured for the scheduler as a whole.""" + + model_config = ConfigDict(frozen=True, extra="ignore") + + start_time: float | None = None + end_time: float | None = None + + @computed_field + @property + def duration(self) -> float | None: + """The duration of the entire scheduler run.""" + if self.start_time is None or self.end_time is None: + return None + return self.end_time - self.start_time + + @model_validator(mode="before") + @classmethod + def drop_computed_fields(cls, data: Any) -> Any: # noqa: ANN401 + """Drop any computed fields from input dicts before validating.""" + if isinstance(data, dict): + data = dict(data) + data.pop("duration", None) + return data + + +class JobTimingMetrics(JobMetrics): + """Instrumented timing metrics measured for a single scheduled job.""" + + model_config = ConfigDict(frozen=True, extra="ignore") + + start_time: float | None = None + end_time: float | None = None + + @computed_field + @property + def duration(self) -> float | None: + """The duration of the entire job run.""" + if self.start_time is None or self.end_time is None: + return None + return self.end_time - self.start_time + + @model_validator(mode="before") + @classmethod + def drop_computed_fields(cls, data: Any) -> Any: # noqa: ANN401 + """Drop any computed fields from input dicts before validating.""" + if isinstance(data, dict): + data = dict(data) + data.pop("duration", None) + return data + + +# Resource Metrics + + +class SchedulerResourceMetrics(SchedulerMetrics): + """Instrumented resource metrics measured for the scheduler as a whole.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + # Scheduler / DVSim process overhead + scheduler_max_rss_bytes: int | None = None + scheduler_avg_rss_bytes: int | None = None + scheduler_vms_bytes: int | None = None + scheduler_cpu_percent: float | None = None + scheduler_cpu_time: float | None = None + + # System-wide metrics + sys_max_rss_bytes: int | None = None + sys_avg_rss_bytes: int | None = None + sys_swap_used_bytes: int | None = None + sys_cpu_percent: float | None = None + sys_cpu_per_core: list[float] | None = None + + num_resource_samples: int = 0 + + +class JobResourceMetrics(JobMetrics): + """Instrumented resource metrics measured for a single scheduled job.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + max_rss_bytes: int | None = None + avg_rss_bytes: float | None = None + avg_cpu_percent: float | None = None + + num_resource_samples: int = 0 + + +# Combined output reports + + +class SchedulerInstrumentationResults(BaseModel): + """Aggregated instrumentation report data about the scheduler as a whole.""" + + model_config = ConfigDict(frozen=True, extra="allow") + + timing: SchedulerTimingMetrics | None = None + resources: SchedulerResourceMetrics | None = None + + +class JobInstrumentationResults(BaseModel): + """Aggregated instrumentation report data about a single scheduled job.""" + + model_config = ConfigDict(frozen=True, extra="allow") + + meta: JobInstrumentationMetadata | None = None + timing: JobTimingMetrics | None = None + resources: JobResourceMetrics | None = None + + +class InstrumentationResults(BaseModel): + """A complete aggregated instrumentation report with data about the scheduler and all jobs.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + scheduler: SchedulerInstrumentationResults + jobs: dict[str, JobInstrumentationResults] = Field(default_factory=dict) diff --git a/src/dvsim/instrumentation/resources.py b/src/dvsim/instrumentation/resources.py index 9c177b6e..6f7d2621 100644 --- a/src/dvsim/instrumentation/resources.py +++ b/src/dvsim/instrumentation/resources.py @@ -7,60 +7,16 @@ import os import threading import time -from dataclasses import dataclass +from collections.abc import Mapping import psutil -from dvsim.instrumentation.base import ( - InstrumentationFragments, - JobFragment, - SchedulerFragment, - SchedulerInstrumentation, -) +from dvsim.instrumentation.base import SchedulerInstrumentation +from dvsim.instrumentation.records import JobResourceMetrics, SchedulerResourceMetrics from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus -__all__ = ( - "ResourceInstrumentation", - "ResourceJobFragment", - "ResourceSchedulerFragment", -) - - -@dataclass -class ResourceSchedulerFragment(SchedulerFragment): - """Instrumented metrics about the scheduler reported by the `ResourceInstrumentation`.""" - - # Scheduler / DVSim process overhead - scheduler_max_rss_bytes: int | None = None - scheduler_avg_rss_bytes: int | None = None - scheduler_vms_bytes: int | None = None - scheduler_cpu_percent: float | None = None - scheduler_cpu_time: float | None = None - - # System-wide metrics - sys_max_rss_bytes: int | None = None - sys_avg_rss_bytes: int | None = None - sys_swap_used_bytes: int | None = None - sys_cpu_percent: float | None = None - sys_cpu_per_core: list[float] | None = None - - num_resource_samples: int = 0 - - -@dataclass -class ResourceJobFragment(JobFragment): - """Instrumented metrics about jobs reported by the `ResourceInstrumentation`. - - Since we can't directly measure each deployed job, these are instead averages and system - information over the course of the job's runtime. - """ - - max_rss_bytes: int | None = None - avg_rss_bytes: float | None = None - avg_cpu_percent: float | None = None - - num_resource_samples: int = 0 +__all__ = ("ResourceInstrumentation",) class JobResourceAggregate: @@ -69,14 +25,13 @@ class JobResourceAggregate: Tracks aggregate information over a number of samples whilst minimizing memory usage. """ - def __init__(self, job: JobSpec) -> None: + def __init__(self) -> None: """Construct an aggregate for storing sampling info for a given job specification. Arguments: job: The specification of the job which is having its information aggregated. """ - self.job_spec = job self.sample_count = 0 self.sum_rss = 0.0 self.max_rss = 0 @@ -89,13 +44,12 @@ def add_sample(self, rss: int, cpu: float) -> None: self.max_rss = max(self.max_rss, rss) self.sum_cpu += cpu - def finalize(self) -> ResourceJobFragment: + def finalize(self) -> JobResourceMetrics: """Finalize the aggregated information for a job, generating a report fragment.""" if self.sample_count == 0: - return ResourceJobFragment(self.job_spec) + return JobResourceMetrics() - return ResourceJobFragment( - self.job_spec, + return JobResourceMetrics( max_rss_bytes=self.max_rss, avg_rss_bytes=self.sum_rss / self.sample_count, avg_cpu_percent=self.sum_cpu / self.sample_count, @@ -115,6 +69,8 @@ class ResourceInstrumentation(SchedulerInstrumentation): of the samples that fall within that job's execution window. """ + name = "resources" + def __init__(self, sample_interval: float = 0.5) -> None: """Construct a resource instrumentation. @@ -221,45 +177,49 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: running = job.id in self._running_jobs started = running or job.id in self._finished_jobs if not started and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): - self._running_jobs[job.id] = JobResourceAggregate(job) + self._running_jobs[job.id] = JobResourceAggregate() running = True if running and status.is_terminal: aggregates = self._running_jobs.pop(job.id) self._finished_jobs[job.id] = aggregates - def build_report_fragments(self) -> InstrumentationFragments | None: - """Build report fragments from the collected instrumentation information.""" + def get_scheduler_data(self) -> SchedulerResourceMetrics: + """Retrieve scheduler metrics measured by this instrumentation.""" if self._running: raise RuntimeError("Cannot build instrumentation report whilst still running!") if self._sample_count <= 0: - scheduler_frag = ResourceSchedulerFragment() + return SchedulerResourceMetrics() + + scheduler_cpu_time = self._scheduler_cpu_time_end - self._scheduler_cpu_time_start + if self._num_cores is not None: + sys_cpu_per_core = [s / self._sample_count for s in self._sys_sum_cpu_per_core] else: - scheduler_cpu_time = self._scheduler_cpu_time_end - self._scheduler_cpu_time_start - if self._num_cores is not None: - sys_cpu_per_core = [s / self._sample_count for s in self._sys_sum_cpu_per_core] - else: - sys_cpu_per_core = None - try: - vms_bytes = round(self._scheduler_sum_vms / self._sample_count) - except (ValueError, TypeError): - # Suppress unknown types in VMS measurements - vms_bytes = None - - scheduler_frag = ResourceSchedulerFragment( - scheduler_avg_rss_bytes=round(self._scheduler_sum_rss / self._sample_count), - scheduler_max_rss_bytes=self._scheduler_max_rss, - scheduler_vms_bytes=vms_bytes, - scheduler_cpu_percent=self._scheduler_sum_cpu / self._sample_count, - scheduler_cpu_time=scheduler_cpu_time, - sys_max_rss_bytes=self._sys_max_rss, - sys_avg_rss_bytes=round(self._sys_sum_rss / self._sample_count), - sys_cpu_percent=self._sys_sum_cpu / self._sample_count, - sys_cpu_per_core=sys_cpu_per_core, - sys_swap_used_bytes=self._sys_max_swap, - num_resource_samples=self._sample_count, - ) - - aggregates = list(self._finished_jobs.values()) + list(self._running_jobs.values()) - job_frags = [aggregate.finalize() for aggregate in aggregates] - return ([scheduler_frag], job_frags) + sys_cpu_per_core = None + try: + vms_bytes = round(self._scheduler_sum_vms / self._sample_count) + except (ValueError, TypeError): + # Suppress unknown types in VMS measurements + vms_bytes = None + + return SchedulerResourceMetrics( + scheduler_avg_rss_bytes=round(self._scheduler_sum_rss / self._sample_count), + scheduler_max_rss_bytes=self._scheduler_max_rss, + scheduler_vms_bytes=vms_bytes, + scheduler_cpu_percent=self._scheduler_sum_cpu / self._sample_count, + scheduler_cpu_time=scheduler_cpu_time, + sys_max_rss_bytes=self._sys_max_rss, + sys_avg_rss_bytes=round(self._sys_sum_rss / self._sample_count), + sys_cpu_percent=self._sys_sum_cpu / self._sample_count, + sys_cpu_per_core=sys_cpu_per_core, + sys_swap_used_bytes=self._sys_max_swap, + num_resource_samples=self._sample_count, + ) + + def get_job_data(self) -> Mapping[str, JobResourceMetrics]: + """Retrieve per-job metrics measured by this instrumentation.""" + if self._running: + raise RuntimeError("Cannot build instrumentation report whilst still running!") + + aggregates = self._finished_jobs | self._running_jobs + return {job_id: aggregate.finalize() for job_id, aggregate in aggregates.items()} diff --git a/src/dvsim/instrumentation/runtime.py b/src/dvsim/instrumentation/runtime.py index ed114881..d7447e6b 100644 --- a/src/dvsim/instrumentation/runtime.py +++ b/src/dvsim/instrumentation/runtime.py @@ -4,11 +4,9 @@ """DVSim scheduler instrumentation for timing-related information.""" -import json from pathlib import Path -from typing import Any -from dvsim.instrumentation.base import InstrumentationAggregator +from dvsim.instrumentation.base import InstrumentationAggregator, InstrumentationResults from dvsim.logging import log __all__ = ( @@ -24,7 +22,7 @@ class _Runtime: def __init__(self) -> None: self.instrumentation: InstrumentationAggregator | None = None self.report_path: Path | None = None - self.report: dict[str, Any] | None = None + self.report: InstrumentationResults | None = None _runtime = _Runtime() @@ -45,7 +43,7 @@ def get() -> InstrumentationAggregator | None: return _runtime.instrumentation -def flush() -> dict[str, Any] | None: +def flush() -> InstrumentationResults | None: """Dump the instrumentation report as JSON to the configured report path.""" if _runtime.instrumentation is None: return None @@ -58,7 +56,9 @@ def flush() -> dict[str, Any] | None: raise ValueError("Metric report path cannot be a directory.") try: _runtime.report_path.parent.mkdir(parents=True, exist_ok=True) - _runtime.report_path.write_text(json.dumps(_runtime.report, indent=2)) + _runtime.report_path.write_text( + _runtime.report.model_dump_json(indent=2, exclude_none=True) + ) log.info("JSON instrumentation report dumped to: %s", str(_runtime.report_path)) except (OSError, FileNotFoundError) as e: log.error( @@ -68,6 +68,6 @@ def flush() -> dict[str, Any] | None: return _runtime.report -def get_report() -> dict[str, Any] | None: +def get_report() -> InstrumentationResults | None: """Get the latest flushed instrumentation report contents, if any exist.""" return _runtime.report diff --git a/src/dvsim/instrumentation/timing.py b/src/dvsim/instrumentation/timing.py index 1542b0fe..01c1293d 100644 --- a/src/dvsim/instrumentation/timing.py +++ b/src/dvsim/instrumentation/timing.py @@ -5,69 +5,14 @@ """DVSim scheduler instrumentation for timing-related information.""" import time -from dataclasses import asdict, dataclass -from typing import Any +from collections.abc import Mapping -from dvsim.instrumentation.base import ( - InstrumentationFragments, - JobFragment, - SchedulerFragment, - SchedulerInstrumentation, -) +from dvsim.instrumentation.base import SchedulerInstrumentation +from dvsim.instrumentation.records import JobTimingMetrics, SchedulerTimingMetrics from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus -__all__ = ( - "TimingInstrumentation", - "TimingJobFragment", - "TimingSchedulerFragment", -) - - -@dataclass -class TimingSchedulerFragment(SchedulerFragment): - """Instrumented metrics about the scheduler reported by the `TimingInstrumentation`.""" - - start_time: float | None = None - end_time: float | None = None - - @property - def duration(self) -> float | None: - """The duration of the entire scheduler run.""" - if self.start_time is None or self.end_time is None: - return None - return self.end_time - self.start_time - - def to_dict(self) -> dict[str, Any]: - """Convert the scheduler metrics to a dictionary, including the `duration` property.""" - data = asdict(self) - duration = self.duration - if duration: - data["duration"] = duration - return data - - -@dataclass -class TimingJobFragment(JobFragment): - """Instrumented metrics about the scheduler reported by the `TimingInstrumentation`.""" - - start_time: float | None = None - end_time: float | None = None - - @property - def duration(self) -> float | None: - """The duration of the job.""" - if self.start_time is None or self.end_time is None: - return None - return self.end_time - self.start_time - - def to_dict(self) -> dict[str, Any]: - """Convert the job metrics to a dictionary, including the `duration` property.""" - data = asdict(self) - duration = self.duration - if duration: - data["duration"] = duration - return data +__all__ = ("TimingInstrumentation",) class TimingInstrumentation(SchedulerInstrumentation): @@ -77,32 +22,43 @@ class TimingInstrumentation(SchedulerInstrumentation): all of the jobs that it dispatches. """ + name = "timing" + def __init__(self) -> None: """Construct a `TimingInstrumentation`.""" super().__init__() - self._scheduler = TimingSchedulerFragment() - self._jobs: dict[str, TimingJobFragment] = {} + self._start_time: float | None = None + self._end_time: float | None = None + self._jobs: dict[str, tuple[float | None, float | None]] = {} def on_scheduler_start(self) -> None: """Notify instrumentation that the scheduler has begun.""" - self._scheduler.start_time = time.perf_counter() + self._start_time = time.perf_counter() def on_scheduler_end(self) -> None: """Notify instrumentation that the scheduler has finished.""" - self._scheduler.end_time = time.perf_counter() + self._end_time = time.perf_counter() def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: """Notify instrumentation of a change in status for some scheduled job.""" job_info = self._jobs.get(job.id) if job_info is None: - job_info = TimingJobFragment(job) + job_info = (None, None) self._jobs[job.id] = job_info + start_time, end_time = job_info - if job_info.start_time is None and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): - job_info.start_time = time.perf_counter() + if start_time is None and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): + self._jobs[job.id] = (time.perf_counter(), end_time) if status.is_terminal: - job_info.end_time = time.perf_counter() - - def build_report_fragments(self) -> InstrumentationFragments | None: - """Build report fragments from the collected instrumentation information.""" - return ([self._scheduler], list(self._jobs.values())) + self._jobs[job.id] = (start_time, time.perf_counter()) + + def get_scheduler_data(self) -> SchedulerTimingMetrics: + """Retrieve scheduler metrics measured by this instrumentation.""" + return SchedulerTimingMetrics(start_time=self._start_time, end_time=self._end_time) + + def get_job_data(self) -> Mapping[str, JobTimingMetrics]: + """Retrieve per-job metrics measured by this instrumentation.""" + return { + job_id: JobTimingMetrics(start_time=start, end_time=end) + for job_id, (start, end) in self._jobs.items() + }